Electroneum
levin_protocol_handler_async.h
Go to the documentation of this file.
1 // Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are met:
6 // * Redistributions of source code must retain the above copyright
7 // notice, this list of conditions and the following disclaimer.
8 // * Redistributions in binary form must reproduce the above copyright
9 // notice, this list of conditions and the following disclaimer in the
10 // documentation and/or other materials provided with the distribution.
11 // * Neither the name of the Andrey N. Sabelnikov nor the
12 // names of its contributors may be used to endorse or promote products
13 // derived from this software without specific prior written permission.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
16 // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17 // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18 // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER BE LIABLE FOR ANY
19 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25 //
26 
27 #pragma once
28 #include <boost/asio/deadline_timer.hpp>
29 #include <boost/uuid/uuid_generators.hpp>
30 #include <boost/unordered_map.hpp>
31 #include <boost/interprocess/detail/atomic.hpp>
32 #include <boost/smart_ptr/make_shared.hpp>
33 
34 #include <atomic>
35 
36 #include "levin_base.h"
37 #include "buffer.h"
38 #include "misc_language.h"
39 #include "syncobj.h"
40 #include "misc_os_dependent.h"
41 #include "int-util.h"
42 
43 #include <random>
44 #include <chrono>
45 
46 #undef ELECTRONEUM_DEFAULT_LOG_CATEGORY
47 #define ELECTRONEUM_DEFAULT_LOG_CATEGORY "net"
48 
49 #ifndef MIN_BYTES_WANTED
50 #define MIN_BYTES_WANTED 512
51 #endif
52 
53 namespace epee
54 {
55 namespace levin
56 {
57 
58 /************************************************************************/
59 /* */
60 /************************************************************************/
61 template<class t_connection_context>
63 
64 template<class t_connection_context>
66 {
67  typedef boost::unordered_map<boost::uuids::uuid, async_protocol_handler<t_connection_context>* > connections_map;
68  critical_section m_connects_lock;
69  connections_map m_connects;
70 
71  void add_connection(async_protocol_handler<t_connection_context>* pc);
72  void del_connection(async_protocol_handler<t_connection_context>* pc);
73 
74  async_protocol_handler<t_connection_context>* find_connection(boost::uuids::uuid connection_id) const;
75  int find_and_lock_connection(boost::uuids::uuid connection_id, async_protocol_handler<t_connection_context>*& aph);
76 
77  friend class async_protocol_handler<t_connection_context>;
78 
80  void (*m_pcommands_handler_destroy)(levin_commands_handler<t_connection_context>*);
81 
82  void delete_connections (size_t count, bool incoming);
83 
84 public:
85  typedef t_connection_context connection_context;
88 
89  int invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out, boost::uuids::uuid connection_id);
90  template<class callback_t>
91  int invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED);
92 
93  int notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id);
94  bool close(boost::uuids::uuid connection_id);
95  bool update_connection_context(const t_connection_context& contxt);
96  bool request_callback(boost::uuids::uuid connection_id);
97  template<class callback_t>
98  bool foreach_connection(const callback_t &cb);
99  template<class callback_t>
100  bool for_connection(const boost::uuids::uuid &connection_id, const callback_t &cb);
101  size_t get_connections_count();
103 
105  {}
107  void del_out_connections(size_t count);
108  void del_in_connections(size_t count);
109 };
110 
111 
112 /************************************************************************/
113 /* */
114 /************************************************************************/
115 template<class t_connection_context = net_utils::connection_context_base>
116 class async_protocol_handler
117 {
118 public:
119  typedef t_connection_context connection_context;
121 
123  {
126  };
127 
128  std::atomic<bool> m_deletion_initiated;
129  std::atomic<bool> m_protocol_released;
131 
132  volatile int m_invoke_result_code;
133 
136 
139 
145  t_connection_context& m_connection_context;
146 
149 
152 
154  {
155  virtual bool handle(int res, const epee::span<const uint8_t> buff, connection_context& context)=0;
156  virtual bool is_timer_started() const=0;
157  virtual void cancel()=0;
158  virtual bool cancel_timer()=0;
159  virtual void reset_timer()=0;
160  };
161  template <class callback_t>
163  {
164  anvoke_handler(const callback_t& cb, uint64_t timeout, async_protocol_handler& con, int command)
165  :m_cb(cb), m_timeout(timeout), m_con(con), m_timer(con.m_pservice_endpoint->get_io_service()), m_timer_started(false),
167  {
168  if(m_con.start_outer_call())
169  {
170  MDEBUG(con.get_context_ref() << "anvoke_handler, timeout: " << timeout);
171  m_timer.expires_from_now(boost::posix_time::milliseconds(timeout));
172  m_timer.async_wait([&con, command, cb, timeout](const boost::system::error_code& ec)
173  {
174  if(ec == boost::asio::error::operation_aborted)
175  return;
176  MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command << " timeout: " << timeout);
178  cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref());
179  con.close();
180  con.finish_outer_call();
181  });
182  m_timer_started = true;
183  }
184  }
185  virtual ~anvoke_handler()
186  {}
187  callback_t m_cb;
189  boost::asio::deadline_timer m_timer;
196  {
197  if(!cancel_timer())
198  return false;
199  m_cb(res, buff, context);
201  return true;
202  }
203  virtual bool is_timer_started() const
204  {
205  return m_timer_started;
206  }
207  virtual void cancel()
208  {
209  if(cancel_timer())
210  {
214  }
215  }
216  virtual bool cancel_timer()
217  {
219  {
220  m_cancel_timer_called = true;
221  boost::system::error_code ignored_ec;
222  m_timer_cancelled = 1 == m_timer.cancel(ignored_ec);
223  }
224  return m_timer_cancelled;
225  }
226  virtual void reset_timer()
227  {
228  boost::system::error_code ignored_ec;
229  if (!m_cancel_timer_called && m_timer.cancel(ignored_ec) > 0)
230  {
231  callback_t& cb = m_cb;
232  uint64_t timeout = m_timeout;
234  int command = m_command;
235  m_timer.expires_from_now(boost::posix_time::milliseconds(m_timeout));
236  m_timer.async_wait([&con, cb, command, timeout](const boost::system::error_code& ec)
237  {
238  if(ec == boost::asio::error::operation_aborted)
239  return;
240  MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command << " timeout: " << timeout);
242  cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref());
243  con.close();
244  con.finish_outer_call();
245  });
246  }
247  }
248  };
250  std::list<boost::shared_ptr<invoke_response_handler_base> > m_invoke_response_handlers;
251 
252  template<class callback_t>
253  bool add_invoke_response_handler(const callback_t &cb, uint64_t timeout, async_protocol_handler& con, int command)
254  {
256  boost::shared_ptr<invoke_response_handler_base> handler(boost::make_shared<anvoke_handler<callback_t>>(cb, timeout, con, command));
257  m_invoke_response_handlers.push_back(handler);
258  return handler->is_timer_started();
259  }
260  template<class callback_t> friend struct anvoke_handler;
261 public:
264  t_connection_context& conn_context):
266  m_pservice_endpoint(psnd_hndlr),
267  m_config(config),
268  m_connection_context(conn_context),
269  m_cache_in_buffer(4 * 1024),
271  {
272  m_close_called = 0;
273  m_deletion_initiated = false;
274  m_protocol_released = false;
275  m_wait_count = 0;
277  m_connection_initialized = false;
278  m_invoke_buf_ready = 0;
280  }
282  {
283  try
284  {
285 
286  m_deletion_initiated = true;
288  {
289  m_config.del_connection(this);
290  }
291 
292  for (size_t i = 0; i < 60 * 1000 / 100 && 0 != boost::interprocess::ipcdetail::atomic_read32(&m_wait_count); ++i)
293  {
295  }
296  CHECK_AND_ASSERT_MES_NO_RET(0 == boost::interprocess::ipcdetail::atomic_read32(&m_wait_count), "Failed to wait for operation completion. m_wait_count = " << m_wait_count);
297 
298  MTRACE(m_connection_context << "~async_protocol_handler()");
299 
300  }
301  catch (...) { /* ignore */ }
302  }
303 
305  {
306  MTRACE(m_connection_context << "[levin_protocol] -->> start_outer_call");
308  {
309  MERROR(m_connection_context << "[levin_protocol] -->> start_outer_call failed");
310  return false;
311  }
312  boost::interprocess::ipcdetail::atomic_inc32(&m_wait_count);
313  return true;
314  }
316  {
317  MTRACE(m_connection_context << "[levin_protocol] <<-- finish_outer_call");
318  boost::interprocess::ipcdetail::atomic_dec32(&m_wait_count);
320  return true;
321  }
322 
324  {
325  decltype(m_invoke_response_handlers) local_invoke_response_handlers;
327  local_invoke_response_handlers.swap(m_invoke_response_handlers);
328  m_protocol_released = true;
330 
331  // Never call callback inside critical section, that can cause deadlock. Callback can be called when
332  // invoke_response_handler_base is cancelled
333  std::for_each(local_invoke_response_handlers.begin(), local_invoke_response_handlers.end(), [](const boost::shared_ptr<invoke_response_handler_base>& pinv_resp_hndlr) {
334  pinv_resp_hndlr->cancel();
335  });
336 
337  return true;
338  }
339 
340  bool close()
341  {
342  boost::interprocess::ipcdetail::atomic_inc32(&m_close_called);
343 
345  return true;
346  }
347 
349  {
350  m_connection_context = contxt;
351  }
352 
354  {
356  boost::bind(&async_protocol_handler::finish_outer_call, this));
357 
359  }
360 
362  {
363  m_config.m_pcommands_handler->callback(m_connection_context);
364  }
365 
366  virtual bool handle_recv(const void* ptr, size_t cb)
367  {
368  if(boost::interprocess::ipcdetail::atomic_read32(&m_close_called))
369  return false; //closing connections
370 
371  if(!m_config.m_pcommands_handler)
372  {
373  MERROR(m_connection_context << "Commands handler not set!");
374  return false;
375  }
376 
378  {
379  MWARNING(m_connection_context << "Maximum packet size exceed!, m_max_packet_size = " << m_config.m_max_packet_size
380  << ", packet received " << m_cache_in_buffer.size() + cb
381  << ", connection will be closed.");
382  return false;
383  }
384 
385  m_cache_in_buffer.append((const char*)ptr, cb);
386 
387  bool is_continue = true;
388  while(is_continue)
389  {
390  switch(m_state)
391  {
392  case stream_state_body:
394  {
395  is_continue = false;
396  if(cb >= MIN_BYTES_WANTED)
397  {
399  if (!m_invoke_response_handlers.empty())
400  {
401  //async call scenario
402  boost::shared_ptr<invoke_response_handler_base> response_handler = m_invoke_response_handlers.front();
403  response_handler->reset_timer();
404  MDEBUG(m_connection_context << "LEVIN_PACKET partial msg received. len=" << cb);
405  }
406  }
407  break;
408  }
409  {
410  epee::span<const uint8_t> buff_to_invoke = m_cache_in_buffer.carve((std::string::size_type)m_current_head.m_cb);
411 
413 
414  MDEBUG(m_connection_context << "LEVIN_PACKET_RECEIVED. [len=" << m_current_head.m_cb
415  << ", flags" << m_current_head.m_flags
417  <<", cmd = " << m_current_head.m_command
418  << ", v=" << m_current_head.m_protocol_version);
419 
420  if(is_response)
421  {//response to some invoke
422 
424  if(!m_invoke_response_handlers.empty())
425  {//async call scenario
426  boost::shared_ptr<invoke_response_handler_base> response_handler = m_invoke_response_handlers.front();
427  bool timer_cancelled = response_handler->cancel_timer();
428  // Don't pop handler, to avoid destroying it
429  if(timer_cancelled)
430  m_invoke_response_handlers.pop_front();
431  invoke_response_handlers_guard.unlock();
432 
433  if(timer_cancelled)
434  response_handler->handle(m_current_head.m_return_code, buff_to_invoke, m_connection_context);
435  }
436  else
437  {
438  invoke_response_handlers_guard.unlock();
439  //use sync call scenario
440  if(!boost::interprocess::ipcdetail::atomic_read32(&m_wait_count) && !boost::interprocess::ipcdetail::atomic_read32(&m_close_called))
441  {
442  MERROR(m_connection_context << "no active invoke when response came, wtf?");
443  return false;
444  }else
445  {
447  m_local_inv_buff = std::string((const char*)buff_to_invoke.data(), buff_to_invoke.size());
448  buff_to_invoke = epee::span<const uint8_t>((const uint8_t*)NULL, 0);
451  boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 1);
452  }
453  }
454  }else
455  {
457  {
458  std::string return_buff;
459  m_current_head.m_return_code = m_config.m_pcommands_handler->invoke(
461  buff_to_invoke,
462  return_buff,
464  m_current_head.m_cb = return_buff.size();
468 #if BYTE_ORDER == LITTLE_ENDIAN
469  std::string send_buff((const char*)&m_current_head, sizeof(m_current_head));
470 #else
472  head.m_signature = SWAP64LE(head.m_signature);
473  head.m_cb = SWAP64LE(head.m_cb);
474  head.m_command = SWAP32LE(head.m_command);
475  head.m_return_code = SWAP32LE(head.m_return_code);
476  head.m_flags = SWAP32LE(head.m_flags);
477  head.m_protocol_version = SWAP32LE(head.m_protocol_version);
478  std::string send_buff((const char*)&head, sizeof(head));
479 #endif
480  send_buff += return_buff;
482  if(!m_pservice_endpoint->do_send(send_buff.data(), send_buff.size()))
483  return false;
485  MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << m_current_head.m_cb
486  << ", flags" << m_current_head.m_flags
488  <<", cmd = " << m_current_head.m_command
489  << ", ver=" << m_current_head.m_protocol_version);
490  }
491  else
492  m_config.m_pcommands_handler->notify(m_current_head.m_command, buff_to_invoke, m_connection_context);
493  }
494  }
496  break;
497  case stream_state_head:
498  {
499  if(m_cache_in_buffer.size() < sizeof(bucket_head2))
500  {
502  {
503  MWARNING(m_connection_context << "Signature mismatch, connection will be closed");
504  return false;
505  }
506  is_continue = false;
507  break;
508  }
509 
510 #if BYTE_ORDER == LITTLE_ENDIAN
511  bucket_head2& phead = *(bucket_head2*)m_cache_in_buffer.span(sizeof(bucket_head2)).data();
512 #else
513  bucket_head2 phead = *(bucket_head2*)m_cache_in_buffer.span(sizeof(bucket_head2)).data();
514  phead.m_signature = SWAP64LE(phead.m_signature);
515  phead.m_cb = SWAP64LE(phead.m_cb);
516  phead.m_command = SWAP32LE(phead.m_command);
517  phead.m_return_code = SWAP32LE(phead.m_return_code);
518  phead.m_flags = SWAP32LE(phead.m_flags);
520 #endif
521  if(LEVIN_SIGNATURE != phead.m_signature)
522  {
523  LOG_ERROR_CC(m_connection_context, "Signature mismatch, connection will be closed");
524  return false;
525  }
526  m_current_head = phead;
527 
532  {
533  LOG_ERROR_CC(m_connection_context, "Maximum packet size exceed!, m_max_packet_size = " << m_config.m_max_packet_size
534  << ", packet header received " << m_current_head.m_cb
535  << ", connection will be closed.");
536  return false;
537  }
538  }
539  break;
540  default:
541  LOG_ERROR_CC(m_connection_context, "Undefined state in levin_server_impl::connection_handler, m_state=" << m_state);
542  return false;
543  }
544  }
545 
546  return true;
547  }
548 
550  {
552  {
554  m_config.add_connection(this);
555  }
556  return true;
557  }
558 
559  template<class callback_t>
560  bool async_invoke(int command, const epee::span<const uint8_t> in_buff, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
561  {
563  boost::bind(&async_protocol_handler::finish_outer_call, this));
564 
566  timeout = m_config.m_invoke_timeout;
567 
568  int err_code = LEVIN_OK;
569  do
570  {
572  {
574  break;
575  }
576 
578 
580  {
582  break;
583  }
584 
585  bucket_head2 head = {0};
586  head.m_signature = SWAP64LE(LEVIN_SIGNATURE);
587  head.m_cb = SWAP64LE(in_buff.size());
588  head.m_have_to_return_data = true;
589 
591  head.m_command = SWAP32LE(command);
592  head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1);
593 
594  boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 0);
597  if(!m_pservice_endpoint->do_send(&head, sizeof(head)))
598  {
599  LOG_ERROR_CC(m_connection_context, "Failed to do_send");
600  err_code = LEVIN_ERROR_CONNECTION;
601  break;
602  }
603 
604  if(!m_pservice_endpoint->do_send(in_buff.data(), in_buff.size()))
605  {
606  LOG_ERROR_CC(m_connection_context, "Failed to do_send");
607  err_code = LEVIN_ERROR_CONNECTION;
608  break;
609  }
610 
611  if(!add_invoke_response_handler(cb, timeout, *this, command))
612  {
614  break;
615  }
617  } while (false);
618 
619  if (LEVIN_OK != err_code)
620  {
621  epee::span<const uint8_t> stub_buff{(const uint8_t*)"", 0};
622  // Never call callback inside critical section, that can cause deadlock
623  cb(err_code, stub_buff, m_connection_context);
624  return false;
625  }
626 
627  return true;
628  }
629 
630  int invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out)
631  {
633  boost::bind(&async_protocol_handler::finish_outer_call, this));
634 
637 
639 
642 
643  bucket_head2 head = {0};
644  head.m_signature = SWAP64LE(LEVIN_SIGNATURE);
645  head.m_cb = SWAP64LE(in_buff.size());
646  head.m_have_to_return_data = true;
647 
649  head.m_command = SWAP32LE(command);
650  head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1);
651 
652  boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 0);
654  if(!m_pservice_endpoint->do_send(&head, sizeof(head)))
655  {
656  LOG_ERROR_CC(m_connection_context, "Failed to do_send");
657  return LEVIN_ERROR_CONNECTION;
658  }
659 
660  if(!m_pservice_endpoint->do_send(in_buff.data(), in_buff.size()))
661  {
662  LOG_ERROR_CC(m_connection_context, "Failed to do_send");
663  return LEVIN_ERROR_CONNECTION;
664  }
666 
667  MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb
668  << ", f=" << head.m_flags
669  << ", r?=" << head.m_have_to_return_data
670  << ", cmd = " << head.m_command
671  << ", ver=" << head.m_protocol_version);
672 
673  uint64_t ticks_start = misc_utils::get_tick_count();
674  size_t prev_size = 0;
675 
676  while(!boost::interprocess::ipcdetail::atomic_read32(&m_invoke_buf_ready) && !m_deletion_initiated && !m_protocol_released)
677  {
678  if(m_cache_in_buffer.size() - prev_size >= MIN_BYTES_WANTED)
679  {
680  prev_size = m_cache_in_buffer.size();
681  ticks_start = misc_utils::get_tick_count();
682  }
684  {
685  MWARNING(m_connection_context << "invoke timeout (" << m_config.m_invoke_timeout << "), closing connection ");
686  close();
688  }
691  }
692 
695 
697  buff_out.swap(m_local_inv_buff);
698  m_local_inv_buff.clear();
700 
701  return m_invoke_result_code;
702  }
703 
704  int notify(int command, const epee::span<const uint8_t> in_buff)
705  {
707  boost::bind(&async_protocol_handler::finish_outer_call, this));
708 
711 
713 
716 
717  bucket_head2 head = {0};
718  head.m_signature = SWAP64LE(LEVIN_SIGNATURE);
719  head.m_have_to_return_data = false;
720  head.m_cb = SWAP64LE(in_buff.size());
721 
722  head.m_command = SWAP32LE(command);
723  head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1);
726  if(!m_pservice_endpoint->do_send(&head, sizeof(head)))
727  {
728  LOG_ERROR_CC(m_connection_context, "Failed to do_send()");
729  return -1;
730  }
731 
732  if(!m_pservice_endpoint->do_send(in_buff.data(), in_buff.size()))
733  {
734  LOG_ERROR_CC(m_connection_context, "Failed to do_send()");
735  return -1;
736  }
738  LOG_DEBUG_CC(m_connection_context, "LEVIN_PACKET_SENT. [len=" << head.m_cb <<
739  ", f=" << head.m_flags <<
740  ", r?=" << head.m_have_to_return_data <<
741  ", cmd = " << head.m_command <<
742  ", ver=" << head.m_protocol_version);
743 
744  return 1;
745  }
746  //------------------------------------------------------------------------------------------
748  //------------------------------------------------------------------------------------------
749  t_connection_context& get_context_ref() {return m_connection_context;}
750 };
751 //------------------------------------------------------------------------------------------
752 template<class t_connection_context>
753 void async_protocol_handler_config<t_connection_context>::del_connection(async_protocol_handler<t_connection_context>* pconn)
754 {
755  CRITICAL_REGION_BEGIN(m_connects_lock);
756  m_connects.erase(pconn->get_connection_id());
758  m_pcommands_handler->on_connection_close(pconn->m_connection_context);
759 }
760 //------------------------------------------------------------------------------------------
761 template<class t_connection_context>
762 void async_protocol_handler_config<t_connection_context>::delete_connections(size_t count, bool incoming)
763 {
764  std::vector <boost::uuids::uuid> connections;
765  CRITICAL_REGION_BEGIN(m_connects_lock);
766  for (auto& c: m_connects)
767  {
768  if (c.second->m_connection_context.m_is_income == incoming)
769  connections.push_back(c.first);
770  }
771 
772  // close random connections from the provided set
773  // TODO or better just keep removing random elements (performance)
774  unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
775  shuffle(connections.begin(), connections.end(), std::default_random_engine(seed));
776  while (count > 0 && connections.size() > 0)
777  {
778  try
779  {
780  auto i = connections.end() - 1;
781  async_protocol_handler<t_connection_context> *conn = m_connects.at(*i);
782  del_connection(conn);
783  close(*i);
784  connections.erase(i);
785  }
786  catch (const std::out_of_range &e)
787  {
788  MWARNING("Connection not found in m_connects, continuing");
789  }
790  --count;
791  }
792 
794 }
795 //------------------------------------------------------------------------------------------
796 template<class t_connection_context>
798 {
799  delete_connections(count, false);
800 }
801 //------------------------------------------------------------------------------------------
802 template<class t_connection_context>
804 {
805  delete_connections(count, true);
806 }
807 //------------------------------------------------------------------------------------------
808 template<class t_connection_context>
810 {
811  CRITICAL_REGION_BEGIN(m_connects_lock);
812  m_connects[pconn->get_connection_id()] = pconn;
814  m_pcommands_handler->on_connection_new(pconn->m_connection_context);
815 }
816 //------------------------------------------------------------------------------------------
817 template<class t_connection_context>
818 async_protocol_handler<t_connection_context>* async_protocol_handler_config<t_connection_context>::find_connection(boost::uuids::uuid connection_id) const
819 {
820  auto it = m_connects.find(connection_id);
821  return it == m_connects.end() ? 0 : it->second;
822 }
823 //------------------------------------------------------------------------------------------
824 template<class t_connection_context>
825 int async_protocol_handler_config<t_connection_context>::find_and_lock_connection(boost::uuids::uuid connection_id, async_protocol_handler<t_connection_context>*& aph)
826 {
827  CRITICAL_REGION_LOCAL(m_connects_lock);
828  aph = find_connection(connection_id);
829  if(0 == aph)
831  if(!aph->start_outer_call())
833  return LEVIN_OK;
834 }
835 //------------------------------------------------------------------------------------------
836 template<class t_connection_context>
838 {
840  int r = find_and_lock_connection(connection_id, aph);
841  return LEVIN_OK == r ? aph->invoke(command, in_buff, buff_out) : r;
842 }
843 //------------------------------------------------------------------------------------------
844 template<class t_connection_context> template<class callback_t>
845 int async_protocol_handler_config<t_connection_context>::invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout)
846 {
848  int r = find_and_lock_connection(connection_id, aph);
849  return LEVIN_OK == r ? aph->async_invoke(command, in_buff, cb, timeout) : r;
850 }
851 //------------------------------------------------------------------------------------------
852 template<class t_connection_context> template<class callback_t>
854 {
855  CRITICAL_REGION_LOCAL(m_connects_lock);
856  for(auto& c: m_connects)
857  {
859  if(!cb(aph->get_context_ref()))
860  return false;
861  }
862  return true;
863 }
864 //------------------------------------------------------------------------------------------
865 template<class t_connection_context> template<class callback_t>
867 {
868  CRITICAL_REGION_LOCAL(m_connects_lock);
869  async_protocol_handler<t_connection_context>* aph = find_connection(connection_id);
870  if (!aph)
871  return false;
872  if(!cb(aph->get_context_ref()))
873  return false;
874  return true;
875 }
876 //------------------------------------------------------------------------------------------
877 template<class t_connection_context>
879 {
880  CRITICAL_REGION_LOCAL(m_connects_lock);
881  return m_connects.size();
882 }
883 //------------------------------------------------------------------------------------------
884 template<class t_connection_context>
886 {
887  if (m_pcommands_handler && m_pcommands_handler_destroy)
888  (*m_pcommands_handler_destroy)(m_pcommands_handler);
889  m_pcommands_handler = handler;
890  m_pcommands_handler_destroy = destroy;
891 }
892 //------------------------------------------------------------------------------------------
893 template<class t_connection_context>
895 {
897  int r = find_and_lock_connection(connection_id, aph);
898  return LEVIN_OK == r ? aph->notify(command, in_buff) : r;
899 }
900 //------------------------------------------------------------------------------------------
901 template<class t_connection_context>
903 {
904  CRITICAL_REGION_LOCAL(m_connects_lock);
905  async_protocol_handler<t_connection_context>* aph = find_connection(connection_id);
906  return 0 != aph ? aph->close() : false;
907 }
908 //------------------------------------------------------------------------------------------
909 template<class t_connection_context>
911 {
912  CRITICAL_REGION_LOCAL(m_connects_lock);
913  async_protocol_handler<t_connection_context>* aph = find_connection(contxt.m_connection_id);
914  if(0 == aph)
915  return false;
916  aph->update_connection_context(contxt);
917  return true;
918 }
919 //------------------------------------------------------------------------------------------
920 template<class t_connection_context>
922 {
924  int r = find_and_lock_connection(connection_id, aph);
925  if(LEVIN_OK == r)
926  {
927  aph->request_callback();
928  return true;
929  }
930  else
931  {
932  return false;
933  }
934 }
935 }
936 }
const char * res
Definition: hmac_keccak.cpp:41
#define LEVIN_ERROR_CONNECTION_TIMEDOUT
Definition: levin_base.h:97
#define CRITICAL_REGION_LOCAL1(x)
Definition: syncobj.h:230
#define MERROR(x)
Definition: misc_log_ex.h:73
void append(const void *data, size_t sz)
Definition: buffer.cpp:40
#define LEVIN_SIGNATURE
Definition: levin_base.h:34
net_utils::i_service_endpoint * m_pservice_endpoint
async_protocol_handler_config< t_connection_context > config_type
uint64_t get_tick_count()
#define MTRACE(x)
Definition: misc_log_ex.h:77
#define MINFO(x)
Definition: misc_log_ex.h:75
#define LOG_DEBUG_CC(ct, message)
virtual bool handle(int res, const epee::span< const uint8_t > buff, connection_context &context)=0
boost::uuids::uuid uuid
::std::string string
Definition: gtest-port.h:1097
anvoke_handler(const callback_t &cb, uint64_t timeout, async_protocol_handler &con, int command)
#define LEVIN_PACKET_REQUEST
Definition: levin_base.h:73
auto_scope_leave_caller create_scope_leave_handler(t_scope_leave_handler f)
int notify(int command, const epee::span< const uint8_t > in_buff)
#define LEVIN_PACKET_RESPONSE
Definition: levin_base.h:74
#define MIN_BYTES_WANTED
void erase(size_t sz)
Definition: buffer.h:51
Non-owning sequence of data. Does not deep copy.
Definition: span.h:56
bool async_invoke(int command, const epee::span< const uint8_t > in_buff, const callback_t &cb, size_t timeout=LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
bool add_invoke_response_handler(const callback_t &cb, uint64_t timeout, async_protocol_handler &con, int command)
unsigned char uint8_t
Definition: stdint.h:124
int invoke(int command, const epee::span< const uint8_t > in_buff, std::string &buff_out, boost::uuids::uuid connection_id)
#define CHECK_AND_ASSERT_MES_NO_RET(expr, message)
Definition: misc_log_ex.h:198
async_protocol_handler(net_utils::i_service_endpoint *psnd_hndlr, config_type &config, t_connection_context &conn_context)
#define CRITICAL_REGION_END()
Definition: syncobj.h:233
#define LEVIN_DEFAULT_MAX_PACKET_SIZE
Definition: levin_base.h:71
#define MDEBUG(x)
Definition: misc_log_ex.h:76
constexpr std::size_t size() const noexcept
Definition: span.h:111
mdb_size_t count(MDB_cursor *cur)
size_t size() const
Definition: buffer.h:55
std::list< boost::shared_ptr< invoke_response_handler_base > > m_invoke_response_handlers
virtual bool handle(int res, const epee::span< const uint8_t > buff, typename async_protocol_handler::connection_context &context)
bool request_callback(boost::uuids::uuid connection_id)
unsigned int uint32_t
Definition: stdint.h:126
#define CRITICAL_REGION_BEGIN(x)
Definition: syncobj.h:229
boost::shared_ptr< call_befor_die_base > auto_scope_leave_caller
bool close(boost::uuids::uuid connection_id)
epee::span< const uint8_t > carve(size_t sz)
Definition: buffer.h:54
unsigned __int64 uint64_t
Definition: stdint.h:136
#define CRITICAL_REGION_LOCAL(x)
Definition: syncobj.h:228
int invoke(int command, const epee::span< const uint8_t > in_buff, std::string &buff_out)
std::unique_ptr< void, terminate > context
Unique ZMQ context handle, calls zmq_term on destruction.
Definition: zmq.h:98
int invoke_async(int command, const epee::span< const uint8_t > in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout=LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
#define false
Definition: stdbool.h:38
bool sleep_no_w(long ms)
#define MWARNING(x)
Definition: misc_log_ex.h:74
#define LOG_ERROR_CC(ct, message)
virtual bool handle_recv(const void *ptr, size_t cb)
#define LEVIN_ERROR_CONNECTION
Definition: levin_base.h:94
#define SWAP64LE
Definition: int-util.h:252
virtual bool call_run_once_service_io()=0
epee::span< const uint8_t > span(size_t sz) const
Definition: buffer.h:52
void update_connection_context(const connection_context &contxt)
#define LEVIN_PROTOCOL_VER_1
Definition: levin_base.h:78
struct rule_list head
bool update_connection_context(const t_connection_context &contxt)
#define LEVIN_ERROR_CONNECTION_DESTROYED
Definition: levin_base.h:96
void set_handler(levin_commands_handler< t_connection_context > *handler, void(*destroy)(levin_commands_handler< t_connection_context > *)=NULL)
virtual bool do_send(const void *ptr, size_t cb)=0
signed int int32_t
Definition: stdint.h:123
int notify(int command, const epee::span< const uint8_t > in_buff, boost::uuids::uuid connection_id)
#define LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED
Definition: levin_base.h:70
bool for_connection(const boost::uuids::uuid &connection_id, const callback_t &cb)
#define SWAP32LE
Definition: int-util.h:244
#define LEVIN_OK
Definition: levin_base.h:93
#define LEVIN_ERROR_CONNECTION_NOT_FOUND
Definition: levin_base.h:95
constexpr pointer data() const noexcept
Definition: span.h:110