Electroneum
levin_client_async.h
Go to the documentation of this file.
1 // Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are met:
6 // * Redistributions of source code must retain the above copyright
7 // notice, this list of conditions and the following disclaimer.
8 // * Redistributions in binary form must reproduce the above copyright
9 // notice, this list of conditions and the following disclaimer in the
10 // documentation and/or other materials provided with the distribution.
11 // * Neither the name of the Andrey N. Sabelnikov nor the
12 // names of its contributors may be used to endorse or promote products
13 // derived from this software without specific prior written permission.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
16 // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17 // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18 // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER BE LIABLE FOR ANY
19 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25 //
26 
27 
28 #pragma once
29 
30 #include ""
31 #include "net_helper.h"
32 #include "levin_base.h"
33 
34 #undef ELECTRONEUM_DEFAULT_LOG_CATEGORY
35 #define ELECTRONEUM_DEFAULT_LOG_CATEGORY "net"
36 
37 
38 namespace epee
39 {
40 namespace levin
41 {
42 
43  /************************************************************************
44  * levin_client_async - probably it is not really fast implementation,
45  * each handler thread could make up to 30 ms latency.
46  * But, handling events in reader thread will cause dead locks in
47  * case of recursive call (call invoke() to the same connection
48  * on reader thread on remote invoke() handler)
49  ***********************************************************************/
50 
51 
52  class levin_client_async
53  {
54  levin_commands_handler* m_pcommands_handler;
55  void (*commands_handler_destroy)(levin_commands_handler*);
56  volatile uint32_t m_is_stop;
57  volatile uint32_t m_threads_count;
58  ::critical_section m_send_lock;
59 
60  std::string m_local_invoke_buff;
61  ::critical_section m_local_invoke_buff_lock;
62  volatile int m_invoke_res;
63 
64  volatile uint32_t m_invoke_data_ready;
65  volatile uint32_t m_invoke_is_active;
66 
67  boost::mutex m_invoke_event;
68  boost::condition_variable m_invoke_cond;
69  size_t m_timeout;
70 
71  ::critical_section m_recieved_packets_lock;
72  struct packet_entry
73  {
74  bucket_head m_hd;
75  std::string m_body;
76  uint32_t m_connection_index;
77  };
78  std::list<packet_entry> m_recieved_packets;
79  /*
80  m_current_connection_index needed when some connection was broken and reconnected - in this
81  case we could have some received packets in que, which shoud not be handled
82  */
83  volatile uint32_t m_current_connection_index;
84  ::critical_section m_invoke_lock;
85  ::critical_section m_reciev_packet_lock;
86  ::critical_section m_connection_lock;
87  net_utils::blocked_mode_client m_transport;
88  public:
89  levin_client_async():m_pcommands_handler(NULL), commands_handler_destroy(NULL), m_is_stop(0), m_threads_count(0), m_invoke_data_ready(0), m_invoke_is_active(0)
90  {}
91  levin_client_async(const levin_client_async& /*v*/):m_pcommands_handler(NULL), commands_handler_destroy(NULL), m_is_stop(0), m_threads_count(0), m_invoke_data_ready(0), m_invoke_is_active(0)
92  {}
93  ~levin_client_async()
94  {
95  boost::interprocess::ipcdetail::atomic_write32(&m_is_stop, 1);
96  disconnect();
97 
98 
99  while(boost::interprocess::ipcdetail::atomic_read32(&m_threads_count))
100  ::Sleep(100);
101 
102  set_handler(NULL);
103  }
104 
105  void set_handler(levin_commands_handler* phandler, void (*destroy)(levin_commands_handler*) = NULL)
106  {
107  if (commands_handler_destroy && m_pcommands_handler)
108  (*commands_handler_destroy)(m_pcommands_handler);
109  m_pcommands_handler = phandler;
110  m_pcommands_handler_destroy = destroy;
111  }
112 
113  bool connect(uint32_t ip, uint32_t port, uint32_t timeout)
114  {
115  loop_call_guard();
116  critical_region cr(m_connection_lock);
117 
118  m_timeout = timeout;
119  bool res = false;
120  CRITICAL_REGION_BEGIN(m_reciev_packet_lock);
121  CRITICAL_REGION_BEGIN(m_send_lock);
122  res = levin_client_impl::connect(ip, port, timeout);
123  boost::interprocess::ipcdetail::atomic_inc32(&m_current_connection_index);
124  CRITICAL_REGION_END();
125  CRITICAL_REGION_END();
126  if(res && !boost::interprocess::ipcdetail::atomic_read32(&m_threads_count) )
127  {
128  //boost::interprocess::ipcdetail::atomic_write32(&m_is_stop, 0);//m_is_stop = false;
129  boost::thread( boost::bind(&levin_duplex_client::reciever_thread, this) );
130  boost::thread( boost::bind(&levin_duplex_client::handler_thread, this) );
131  boost::thread( boost::bind(&levin_duplex_client::handler_thread, this) );
132  }
133 
134  return res;
135  }
136  bool is_connected()
137  {
138  loop_call_guard();
139  critical_region cr(m_cs);
140  return levin_client_impl::is_connected();
141  }
142 
143  inline
144  bool check_connection()
145  {
146  loop_call_guard();
147  critical_region cr(m_cs);
148 
149  if(!is_connected())
150  {
151  if( !reconnect() )
152  {
153  LOG_ERROR("Reconnect Failed. Failed to invoke() because not connected!");
154  return false;
155  }
156  }
157  return true;
158  }
159 
160  //------------------------------------------------------------------------------
161  inline
162  bool recv_n(SOCKET s, char* pbuff, size_t cb)
163  {
164  while(cb)
165  {
166  int res = ::recv(m_socket, pbuff, (int)cb, 0);
167 
168  if(SOCKET_ERROR == res)
169  {
170  if(!m_connected)
171  return false;
172 
173  int err = ::WSAGetLastError();
174  LOG_ERROR("Failed to recv(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
175  disconnect();
176  //reconnect();
177  return false;
178  }else if(res == 0)
179  {
180  disconnect();
181  //reconnect();
182  return false;
183  }
184  LOG_PRINT_L4("[" << m_socket <<"] RECV " << res);
185  cb -= res;
186  pbuff += res;
187  }
188 
189  return true;
190  }
191 
192  //------------------------------------------------------------------------------
193  inline
194  bool recv_n(SOCKET s, std::string& buff)
195  {
196  size_t cb_remain = buff.size();
197  char* m_current_ptr = (char*)buff.data();
198  return recv_n(s, m_current_ptr, cb_remain);
199  }
200 
201  bool disconnect()
202  {
203  //boost::interprocess::ipcdetail::atomic_write32(&m_is_stop, 1);//m_is_stop = true;
204  loop_call_guard();
205  critical_region cr(m_cs);
206  levin_client_impl::disconnect();
207 
208  CRITICAL_REGION_BEGIN(m_local_invoke_buff_lock);
209  m_local_invoke_buff.clear();
210  m_invoke_res = LEVIN_ERROR_CONNECTION_DESTROYED;
212  boost::interprocess::ipcdetail::atomic_write32(&m_invoke_data_ready, 1); //m_invoke_data_ready = true;
213  m_invoke_cond.notify_all();
214  return true;
215  }
216 
218  {
219 
220  }
221 
223  {
224  boost::interprocess::ipcdetail::atomic_write32(&m_invoke_is_active, 0);
225  }
226 
227  int invoke(const GUID& target, int command, const std::string& in_buff, std::string& buff_out)
228  {
229 
230  critical_region cr_invoke(m_invoke_lock);
231 
232  boost::interprocess::ipcdetail::atomic_write32(&m_invoke_is_active, 1);
233  boost::interprocess::ipcdetail::atomic_write32(&m_invoke_data_ready, 0);
234  misc_utils::destr_ptr hdlr = misc_utils::add_exit_scope_handler(boost::bind(&levin_duplex_client::on_leave_invoke, this));
235 
236  loop_call_guard();
237 
238  if(!check_connection())
240 
241 
242  bucket_head head = {0};
243  head.m_signature = LEVIN_SIGNATURE;
244  head.m_cb = in_buff.size();
245  head.m_have_to_return_data = true;
246  head.m_id = target;
247 #ifdef TRACE_LEVIN_PACKETS_BY_GUIDS
248  ::UuidCreate(&head.m_id);
249 #endif
250  head.m_command = command;
251  head.m_protocol_version = LEVIN_PROTOCOL_VER_1;
252  head.m_flags = LEVIN_PACKET_REQUEST;
253  LOG_PRINT("[" << m_socket <<"] Sending invoke data", LOG_LEVEL_4);
254 
255  CRITICAL_REGION_BEGIN(m_send_lock);
256  LOG_PRINT_L4("[" << m_socket <<"] SEND " << sizeof(head));
257  int res = ::send(m_socket, (const char*)&head, sizeof(head), 0);
258  if(SOCKET_ERROR == res)
259  {
260  int err = ::WSAGetLastError();
261  LOG_ERROR("Failed to send(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
262  disconnect();
264  }
265  LOG_PRINT_L4("[" << m_socket <<"] SEND " << (int)in_buff.size());
266  res = ::send(m_socket, in_buff.data(), (int)in_buff.size(), 0);
267  if(SOCKET_ERROR == res)
268  {
269  int err = ::WSAGetLastError();
270  LOG_ERROR("Failed to send(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
271  disconnect();
273  }
275  LOG_PRINT_L4("LEVIN_PACKET_SENT. [len=" << head.m_cb << ", flags=" << head.m_flags << ", is_cmd=" << head.m_have_to_return_data <<", cmd_id = " << head.m_command << ", pr_v=" << head.m_protocol_version << ", uid=" << string_tools::get_str_from_guid_a(head.m_id) << "]");
276 
277  //hard coded timeout in 10 minutes for maximum invoke period. if it happens, it could mean only some real troubles.
278  boost::system_time timeout = boost::get_system_time()+ boost::posix_time::milliseconds(100);
279  size_t timeout_count = 0;
280  boost::unique_lock<boost::mutex> lock(m_invoke_event);
281 
282  while(!boost::interprocess::ipcdetail::atomic_read32(&m_invoke_data_ready))
283  {
284  if(!m_invoke_cond.timed_wait(lock, timeout))
285  {
286  if(timeout_count < 10)
287  {
288  //workaround to avoid freezing at timed_wait called after notify_all.
289  timeout = boost::get_system_time()+ boost::posix_time::milliseconds(100);
290  ++timeout_count;
291  continue;
292  }else if(timeout_count == 10)
293  {
294  //workaround to avoid freezing at timed_wait called after notify_all.
295  timeout = boost::get_system_time()+ boost::posix_time::minutes(10);
296  ++timeout_count;
297  continue;
298  }else
299  {
300  LOG_PRINT("[" << m_socket <<"] Timeout on waiting invoke result. ", LOG_LEVEL_0);
301  //disconnect();
303  }
304  }
305  }
306 
307 
308  CRITICAL_REGION_BEGIN(m_local_invoke_buff_lock);
309  buff_out.swap(m_local_invoke_buff);
310  m_local_invoke_buff.clear();
312  return m_invoke_res;
313  }
314 
315  int notify(const GUID& target, int command, const std::string& in_buff)
316  {
317  if(!check_connection())
319 
320  bucket_head head = {0};
321  head.m_signature = LEVIN_SIGNATURE;
322  head.m_cb = in_buff.size();
323  head.m_have_to_return_data = false;
324  head.m_id = target;
325 #ifdef TRACE_LEVIN_PACKETS_BY_GUIDS
326  ::UuidCreate(&head.m_id);
327 #endif
328  head.m_command = command;
329  head.m_protocol_version = LEVIN_PROTOCOL_VER_1;
330  head.m_flags = LEVIN_PACKET_REQUEST;
331  CRITICAL_REGION_BEGIN(m_send_lock);
332  LOG_PRINT_L4("[" << m_socket <<"] SEND " << sizeof(head));
333  int res = ::send(m_socket, (const char*)&head, sizeof(head), 0);
334  if(SOCKET_ERROR == res)
335  {
336  int err = ::WSAGetLastError();
337  LOG_ERROR("Failed to send(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
338  disconnect();
340  }
341  LOG_PRINT_L4("[" << m_socket <<"] SEND " << (int)in_buff.size());
342  res = ::send(m_socket, in_buff.data(), (int)in_buff.size(), 0);
343  if(SOCKET_ERROR == res)
344  {
345  int err = ::WSAGetLastError();
346  LOG_ERROR("Failed to send(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
347  disconnect();
349  }
351  LOG_PRINT_L4("LEVIN_PACKET_SENT. [len=" << head.m_cb << ", flags=" << head.m_flags << ", is_cmd=" << head.m_have_to_return_data <<", cmd_id = " << head.m_command << ", pr_v=" << head.m_protocol_version << ", uid=" << string_tools::get_str_from_guid_a(head.m_id) << "]");
352 
353  return 1;
354  }
355 
356 
357  private:
358  bool have_some_data(SOCKET sock, int interval = 1)
359  {
360  fd_set fds;
361  FD_ZERO(&fds);
362  FD_SET(sock, &fds);
363 
364  fd_set fdse;
365  FD_ZERO(&fdse);
366  FD_SET(sock, &fdse);
367 
368 
369  timeval tv;
370  tv.tv_sec = interval;
371  tv.tv_usec = 0;
372 
373  int sel_res = select(0, &fds, 0, &fdse, &tv);
374  if(0 == sel_res)
375  return false;
376  else if(sel_res == SOCKET_ERROR)
377  {
378  if(m_is_stop)
379  return false;
380  int err_code = ::WSAGetLastError();
381  LOG_ERROR("Filed to call select, err code = " << err_code);
382  disconnect();
383  }else
384  {
385  if(fds.fd_array[0])
386  {//some read operations was performed
387  return true;
388  }else if(fdse.fd_array[0])
389  {//some error was at the socket
390  return true;
391  }
392  }
393  return false;
394  }
395 
396 
397  bool reciev_and_process_incoming_data()
398  {
399  bucket_head head = {0};
400  uint32_t conn_index = 0;
401  bool is_request = false;
402  std::string local_buff;
403  CRITICAL_REGION_BEGIN(m_reciev_packet_lock);//to protect from socket reconnect between head and body
404 
405  if(!recv_n(m_socket, (char*)&head, sizeof(head)))
406  {
407  if(m_is_stop)
408  return false;
409  LOG_ERROR("Failed to recv_n");
410  return false;
411  }
412 
413  conn_index = boost::interprocess::ipcdetail::atomic_read32(&m_current_connection_index);
414 
415  if(head.m_signature!=LEVIN_SIGNATURE)
416  {
417  LOG_ERROR("Signature mismatch in response");
418  return false;
419  }
420 
421  is_request = (head.m_protocol_version == LEVIN_PROTOCOL_VER_1 && head.m_flags&LEVIN_PACKET_REQUEST);
422 
423 
424  local_buff.resize((size_t)head.m_cb);
425  if(!recv_n(m_socket, local_buff))
426  {
427  if(m_is_stop)
428  return false;
429  LOG_ERROR("Filed to reciev");
430  return false;
431  }
433 
434  LOG_PRINT_L4("LEVIN_PACKET_RECEIVED. [len=" << head.m_cb << ", flags=" << head.m_flags << ", is_cmd=" << head.m_have_to_return_data <<", cmd_id = " << head.m_command << ", pr_v=" << head.m_protocol_version << ", uid=" << string_tools::get_str_from_guid_a(head.m_id) << "]");
435 
436  if(is_request)
437  {
438  CRITICAL_REGION_BEGIN(m_recieved_packets_lock);
439  m_recieved_packets.resize(m_recieved_packets.size() + 1);
440  m_recieved_packets.back().m_hd = head;
441  m_recieved_packets.back().m_body.swap(local_buff);
442  m_recieved_packets.back().m_connection_index = conn_index;
444  /*
445 
446  */
447  }else
448  {//this is some response
449 
450  CRITICAL_REGION_BEGIN(m_local_invoke_buff_lock);
451  m_local_invoke_buff.swap(local_buff);
452  m_invoke_res = head.m_return_code;
454  boost::interprocess::ipcdetail::atomic_write32(&m_invoke_data_ready, 1); //m_invoke_data_ready = true;
455  m_invoke_cond.notify_all();
456 
457  }
458  return true;
459  }
460 
461  bool reciever_thread()
462  {
463  LOG_PRINT_L3("[" << m_socket <<"] Socket reciever thread started.[m_threads_count=" << m_threads_count << "]");
464  log_space::log_singletone::set_thread_log_prefix("RECIEVER_WORKER");
465  boost::interprocess::ipcdetail::atomic_inc32(&m_threads_count);
466 
467  while(!m_is_stop)
468  {
469  if(!m_connected)
470  {
471  Sleep(100);
472  continue;
473  }
474 
475  if(have_some_data(m_socket, 1))
476  {
477  if(!reciev_and_process_incoming_data())
478  {
479  if(m_is_stop)
480  {
481  break;//boost::interprocess::ipcdetail::atomic_dec32(&m_threads_count);
482  //return true;
483  }
484  LOG_ERROR("Failed to reciev_and_process_incoming_data. shutting down");
485  //boost::interprocess::ipcdetail::atomic_dec32(&m_threads_count);
486  //disconnect_no_wait();
487  //break;
488  }
489  }
490  }
491 
492  boost::interprocess::ipcdetail::atomic_dec32(&m_threads_count);
493  LOG_PRINT_L3("[" << m_socket <<"] Socket reciever thread stopped.[m_threads_count=" << m_threads_count << "]");
494  return true;
495  }
496 
497  bool process_recieved_packet(bucket_head& head, const std::string& local_buff, uint32_t conn_index)
498  {
499 
500  net_utils::connection_context_base conn_context;
501  conn_context.m_remote_address = m_address;
502  if(head.m_have_to_return_data)
503  {
504  std::string return_buff;
505  if(m_pcommands_handler)
506  head.m_return_code = m_pcommands_handler->invoke(head.m_id, head.m_command, local_buff, return_buff, conn_context);
507  else
509 
510 
511 
512  head.m_cb = return_buff.size();
513  head.m_have_to_return_data = false;
514  head.m_protocol_version = LEVIN_PROTOCOL_VER_1;
515  head.m_flags = LEVIN_PACKET_RESPONSE;
516 
517  std::string send_buff((const char*)&head, sizeof(head));
518  send_buff += return_buff;
519  CRITICAL_REGION_BEGIN(m_send_lock);
520  if(conn_index != boost::interprocess::ipcdetail::atomic_read32(&m_current_connection_index))
521  {//there was reconnect, send response back is not allowed
522  return true;
523  }
524  int res = ::send(m_socket, (const char*)send_buff.data(), send_buff.size(), 0);
525  if(res == SOCKET_ERROR)
526  {
527  int err_code = ::WSAGetLastError();
528  LOG_ERROR("Failed to send, err = " << err_code);
529  return false;
530  }
532  LOG_PRINT_L4("LEVIN_PACKET_SENT. [len=" << head.m_cb << ", flags=" << head.m_flags << ", is_cmd=" << head.m_have_to_return_data <<", cmd_id = " << head.m_command << ", pr_v=" << head.m_protocol_version << ", uid=" << string_tools::get_str_from_guid_a(head.m_id) << "]");
533 
534  }
535  else
536  {
537  if(m_pcommands_handler)
538  m_pcommands_handler->notify(head.m_id, head.m_command, local_buff, conn_context);
539  }
540 
541  return true;
542  }
543 
544  bool handler_thread()
545  {
546  LOG_PRINT_L3("[" << m_socket <<"] Socket handler thread started.[m_threads_count=" << m_threads_count << "]");
547  log_space::log_singletone::set_thread_log_prefix("HANDLER_WORKER");
548  boost::interprocess::ipcdetail::atomic_inc32(&m_threads_count);
549 
550  while(!m_is_stop)
551  {
552  bool have_some_work = false;
553  std::string local_buff;
554  bucket_head bh = {0};
555  uint32_t conn_index = 0;
556 
557  CRITICAL_REGION_BEGIN(m_recieved_packets_lock);
558  if(m_recieved_packets.size())
559  {
560  bh = m_recieved_packets.begin()->m_hd;
561  conn_index = m_recieved_packets.begin()->m_connection_index;
562  local_buff.swap(m_recieved_packets.begin()->m_body);
563  have_some_work = true;
564  m_recieved_packets.pop_front();
565  }
567 
568  if(have_some_work)
569  {
570  process_recieved_packet(bh, local_buff, conn_index);
571  }else
572  {
573  //Idle when no work
574  Sleep(30);
575  }
576  }
577 
578  boost::interprocess::ipcdetail::atomic_dec32(&m_threads_count);
579  LOG_PRINT_L3("[" << m_socket <<"] Socket handler thread stopped.[m_threads_count=" << m_threads_count << "]");
580  return true;
581  }
582  };
583 
584 }
585 }
const char * res
Definition: hmac_keccak.cpp:41
#define LEVIN_ERROR_CONNECTION_TIMEDOUT
Definition: levin_base.h:97
int notify(const GUID &target, int command, const std::string &in_buff)
#define LEVIN_SIGNATURE
Definition: levin_base.h:34
::std::string string
Definition: gtest-port.h:1097
#define LEVIN_PACKET_REQUEST
Definition: levin_base.h:73
#define LEVIN_ERROR_CONNECTION_HANDLER_NOT_DEFINED
Definition: levin_base.h:99
#define LEVIN_PACKET_RESPONSE
Definition: levin_base.h:74
#define CRITICAL_REGION_END()
Definition: syncobj.h:233
return true
unsigned int uint32_t
Definition: stdint.h:126
#define CRITICAL_REGION_BEGIN(x)
Definition: syncobj.h:229
#define SOCKET
#define false
Definition: stdbool.h:38
int invoke(const GUID &target, int command, const std::string &in_buff, std::string &buff_out)
#define LOG_PRINT_L3(x)
Definition: misc_log_ex.h:102
#define LOG_ERROR(x)
Definition: misc_log_ex.h:98
#define LEVIN_PROTOCOL_VER_1
Definition: levin_base.h:78
struct rule_list head
#define LEVIN_ERROR_CONNECTION_DESTROYED
Definition: levin_base.h:96
#define LOG_PRINT_L4(x)
Definition: misc_log_ex.h:103
expect< void > send(const epee::span< const std::uint8_t > payload, void *const socket, const int flags) noexcept
Definition: zmq.cpp:182
bool recv_n(SOCKET s, std::string &buff)
#define inline
Definition: inline_c.h:35