Electroneum
epee::levin::levin_client_async Class Reference

#include <levin_client_async.h>

Public Member Functions

 levin_client_async ()
 
 levin_client_async (const levin_client_async &)
 
 ~levin_client_async ()
 
void set_handler (levin_commands_handler *phandler, void(*destroy)(levin_commands_handler *)=NULL)
 
bool connect (uint32_t ip, uint32_t port, uint32_t timeout)
 
bool is_connected ()
 
bool check_connection ()
 
bool recv_n (SOCKET s, char *pbuff, size_t cb)
 
bool recv_n (SOCKET s, std::string &buff)
 
bool disconnect ()
 
void loop_call_guard ()
 
void on_leave_invoke ()
 
int invoke (const GUID &target, int command, const std::string &in_buff, std::string &buff_out)
 
int notify (const GUID &target, int command, const std::string &in_buff)
 

Detailed Description

Definition at line 52 of file levin_client_async.h.

Constructor & Destructor Documentation

◆ levin_client_async() [1/2]

epee::levin::levin_client_async::levin_client_async ( )
inline

Definition at line 89 of file levin_client_async.h.

89  :m_pcommands_handler(NULL), commands_handler_destroy(NULL), m_is_stop(0), m_threads_count(0), m_invoke_data_ready(0), m_invoke_is_active(0)
90  {}

◆ levin_client_async() [2/2]

epee::levin::levin_client_async::levin_client_async ( const levin_client_async )
inline

Definition at line 91 of file levin_client_async.h.

91  :m_pcommands_handler(NULL), commands_handler_destroy(NULL), m_is_stop(0), m_threads_count(0), m_invoke_data_ready(0), m_invoke_is_active(0)
92  {}

◆ ~levin_client_async()

epee::levin::levin_client_async::~levin_client_async ( )
inline

Definition at line 93 of file levin_client_async.h.

94  {
95  boost::interprocess::ipcdetail::atomic_write32(&m_is_stop, 1);
96  disconnect();
97 
98 
99  while(boost::interprocess::ipcdetail::atomic_read32(&m_threads_count))
100  ::Sleep(100);
101 
102  set_handler(NULL);
103  }
void set_handler(levin_commands_handler *phandler, void(*destroy)(levin_commands_handler *)=NULL)

Member Function Documentation

◆ check_connection()

bool epee::levin::levin_client_async::check_connection ( )
inline

Definition at line 144 of file levin_client_async.h.

145  {
146  loop_call_guard();
147  critical_region cr(m_cs);
148 
149  if(!is_connected())
150  {
151  if( !reconnect() )
152  {
153  LOG_ERROR("Reconnect Failed. Failed to invoke() because not connected!");
154  return false;
155  }
156  }
157  return true;
158  }
#define LOG_ERROR(x)
Definition: misc_log_ex.h:98

◆ connect()

bool epee::levin::levin_client_async::connect ( uint32_t  ip,
uint32_t  port,
uint32_t  timeout 
)
inline

Definition at line 113 of file levin_client_async.h.

114  {
115  loop_call_guard();
116  critical_region cr(m_connection_lock);
117 
118  m_timeout = timeout;
119  bool res = false;
120  CRITICAL_REGION_BEGIN(m_reciev_packet_lock);
121  CRITICAL_REGION_BEGIN(m_send_lock);
122  res = levin_client_impl::connect(ip, port, timeout);
123  boost::interprocess::ipcdetail::atomic_inc32(&m_current_connection_index);
126  if(res && !boost::interprocess::ipcdetail::atomic_read32(&m_threads_count) )
127  {
128  //boost::interprocess::ipcdetail::atomic_write32(&m_is_stop, 0);//m_is_stop = false;
129  boost::thread( boost::bind(&levin_duplex_client::reciever_thread, this) );
130  boost::thread( boost::bind(&levin_duplex_client::handler_thread, this) );
131  boost::thread( boost::bind(&levin_duplex_client::handler_thread, this) );
132  }
133 
134  return res;
135  }
const char * res
Definition: hmac_keccak.cpp:41
bool connect(u_long ip, int port, unsigned int timeout, const std::string &bind_ip="0.0.0.0")
#define CRITICAL_REGION_END()
Definition: syncobj.h:233
#define CRITICAL_REGION_BEGIN(x)
Definition: syncobj.h:229
boost::endian::big_uint32_t ip
Definition: socks.cpp:61
boost::endian::big_uint16_t port
Definition: socks.cpp:60

◆ disconnect()

bool epee::levin::levin_client_async::disconnect ( )
inline

Definition at line 201 of file levin_client_async.h.

202  {
203  //boost::interprocess::ipcdetail::atomic_write32(&m_is_stop, 1);//m_is_stop = true;
204  loop_call_guard();
205  critical_region cr(m_cs);
207 
208  CRITICAL_REGION_BEGIN(m_local_invoke_buff_lock);
209  m_local_invoke_buff.clear();
210  m_invoke_res = LEVIN_ERROR_CONNECTION_DESTROYED;
212  boost::interprocess::ipcdetail::atomic_write32(&m_invoke_data_ready, 1); //m_invoke_data_ready = true;
213  m_invoke_cond.notify_all();
214  return true;
215  }
#define CRITICAL_REGION_END()
Definition: syncobj.h:233
#define CRITICAL_REGION_BEGIN(x)
Definition: syncobj.h:229
#define LEVIN_ERROR_CONNECTION_DESTROYED
Definition: levin_base.h:96

◆ invoke()

int epee::levin::levin_client_async::invoke ( const GUID &  target,
int  command,
const std::string &  in_buff,
std::string &  buff_out 
)
inline

Definition at line 227 of file levin_client_async.h.

228  {
229 
230  critical_region cr_invoke(m_invoke_lock);
231 
232  boost::interprocess::ipcdetail::atomic_write32(&m_invoke_is_active, 1);
233  boost::interprocess::ipcdetail::atomic_write32(&m_invoke_data_ready, 0);
234  misc_utils::destr_ptr hdlr = misc_utils::add_exit_scope_handler(boost::bind(&levin_duplex_client::on_leave_invoke, this));
235 
236  loop_call_guard();
237 
238  if(!check_connection())
240 
241 
242  bucket_head head = {0};
243  head.m_signature = LEVIN_SIGNATURE;
244  head.m_cb = in_buff.size();
245  head.m_have_to_return_data = true;
246  head.m_id = target;
247 #ifdef TRACE_LEVIN_PACKETS_BY_GUIDS
248  ::UuidCreate(&head.m_id);
249 #endif
250  head.m_command = command;
251  head.m_protocol_version = LEVIN_PROTOCOL_VER_1;
252  head.m_flags = LEVIN_PACKET_REQUEST;
253  LOG_PRINT("[" << m_socket <<"] Sending invoke data", LOG_LEVEL_4);
254 
255  CRITICAL_REGION_BEGIN(m_send_lock);
256  LOG_PRINT_L4("[" << m_socket <<"] SEND " << sizeof(head));
257  int res = ::send(m_socket, (const char*)&head, sizeof(head), 0);
258  if(SOCKET_ERROR == res)
259  {
260  int err = ::WSAGetLastError();
261  LOG_ERROR("Failed to send(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
262  disconnect();
264  }
265  LOG_PRINT_L4("[" << m_socket <<"] SEND " << (int)in_buff.size());
266  res = ::send(m_socket, in_buff.data(), (int)in_buff.size(), 0);
267  if(SOCKET_ERROR == res)
268  {
269  int err = ::WSAGetLastError();
270  LOG_ERROR("Failed to send(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
271  disconnect();
273  }
275  LOG_PRINT_L4("LEVIN_PACKET_SENT. [len=" << head.m_cb << ", flags=" << head.m_flags << ", is_cmd=" << head.m_have_to_return_data <<", cmd_id = " << head.m_command << ", pr_v=" << head.m_protocol_version << ", uid=" << string_tools::get_str_from_guid_a(head.m_id) << "]");
276 
277  //hard coded timeout in 10 minutes for maximum invoke period. if it happens, it could mean only some real troubles.
278  boost::system_time timeout = boost::get_system_time()+ boost::posix_time::milliseconds(100);
279  size_t timeout_count = 0;
280  boost::unique_lock<boost::mutex> lock(m_invoke_event);
281 
282  while(!boost::interprocess::ipcdetail::atomic_read32(&m_invoke_data_ready))
283  {
284  if(!m_invoke_cond.timed_wait(lock, timeout))
285  {
286  if(timeout_count < 10)
287  {
288  //workaround to avoid freezing at timed_wait called after notify_all.
289  timeout = boost::get_system_time()+ boost::posix_time::milliseconds(100);
290  ++timeout_count;
291  continue;
292  }else if(timeout_count == 10)
293  {
294  //workaround to avoid freezing at timed_wait called after notify_all.
295  timeout = boost::get_system_time()+ boost::posix_time::minutes(10);
296  ++timeout_count;
297  continue;
298  }else
299  {
300  LOG_PRINT("[" << m_socket <<"] Timeout on waiting invoke result. ", LOG_LEVEL_0);
301  //disconnect();
303  }
304  }
305  }
306 
307 
308  CRITICAL_REGION_BEGIN(m_local_invoke_buff_lock);
309  buff_out.swap(m_local_invoke_buff);
310  m_local_invoke_buff.clear();
312  return m_invoke_res;
313  }
const char * res
Definition: hmac_keccak.cpp:41
#define LEVIN_ERROR_CONNECTION_TIMEDOUT
Definition: levin_base.h:97
#define LEVIN_SIGNATURE
Definition: levin_base.h:34
#define LEVIN_PACKET_REQUEST
Definition: levin_base.h:73
#define CRITICAL_REGION_END()
Definition: syncobj.h:233
#define CRITICAL_REGION_BEGIN(x)
Definition: syncobj.h:229
#define LOG_ERROR(x)
Definition: misc_log_ex.h:98
#define LEVIN_PROTOCOL_VER_1
Definition: levin_base.h:78
struct rule_list head
#define LEVIN_ERROR_CONNECTION_DESTROYED
Definition: levin_base.h:96
#define LOG_PRINT_L4(x)
Definition: misc_log_ex.h:103
expect< void > send(const epee::span< const std::uint8_t > payload, void *const socket, const int flags) noexcept
Definition: zmq.cpp:182
Here is the call graph for this function:

◆ is_connected()

bool epee::levin::levin_client_async::is_connected ( )
inline

Definition at line 136 of file levin_client_async.h.

137  {
138  loop_call_guard();
139  critical_region cr(m_cs);
141  }

◆ loop_call_guard()

void epee::levin::levin_client_async::loop_call_guard ( )
inline

Definition at line 217 of file levin_client_async.h.

218  {
219 
220  }

◆ notify()

int epee::levin::levin_client_async::notify ( const GUID &  target,
int  command,
const std::string &  in_buff 
)
inline

Definition at line 315 of file levin_client_async.h.

316  {
317  if(!check_connection())
319 
320  bucket_head head = {0};
321  head.m_signature = LEVIN_SIGNATURE;
322  head.m_cb = in_buff.size();
323  head.m_have_to_return_data = false;
324  head.m_id = target;
325 #ifdef TRACE_LEVIN_PACKETS_BY_GUIDS
326  ::UuidCreate(&head.m_id);
327 #endif
328  head.m_command = command;
329  head.m_protocol_version = LEVIN_PROTOCOL_VER_1;
330  head.m_flags = LEVIN_PACKET_REQUEST;
331  CRITICAL_REGION_BEGIN(m_send_lock);
332  LOG_PRINT_L4("[" << m_socket <<"] SEND " << sizeof(head));
333  int res = ::send(m_socket, (const char*)&head, sizeof(head), 0);
334  if(SOCKET_ERROR == res)
335  {
336  int err = ::WSAGetLastError();
337  LOG_ERROR("Failed to send(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
338  disconnect();
340  }
341  LOG_PRINT_L4("[" << m_socket <<"] SEND " << (int)in_buff.size());
342  res = ::send(m_socket, in_buff.data(), (int)in_buff.size(), 0);
343  if(SOCKET_ERROR == res)
344  {
345  int err = ::WSAGetLastError();
346  LOG_ERROR("Failed to send(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
347  disconnect();
349  }
351  LOG_PRINT_L4("LEVIN_PACKET_SENT. [len=" << head.m_cb << ", flags=" << head.m_flags << ", is_cmd=" << head.m_have_to_return_data <<", cmd_id = " << head.m_command << ", pr_v=" << head.m_protocol_version << ", uid=" << string_tools::get_str_from_guid_a(head.m_id) << "]");
352 
353  return 1;
354  }
const char * res
Definition: hmac_keccak.cpp:41
#define LEVIN_SIGNATURE
Definition: levin_base.h:34
#define LEVIN_PACKET_REQUEST
Definition: levin_base.h:73
#define CRITICAL_REGION_END()
Definition: syncobj.h:233
#define CRITICAL_REGION_BEGIN(x)
Definition: syncobj.h:229
#define LOG_ERROR(x)
Definition: misc_log_ex.h:98
#define LEVIN_PROTOCOL_VER_1
Definition: levin_base.h:78
struct rule_list head
#define LEVIN_ERROR_CONNECTION_DESTROYED
Definition: levin_base.h:96
#define LOG_PRINT_L4(x)
Definition: misc_log_ex.h:103
expect< void > send(const epee::span< const std::uint8_t > payload, void *const socket, const int flags) noexcept
Definition: zmq.cpp:182
Here is the call graph for this function:

◆ on_leave_invoke()

void epee::levin::levin_client_async::on_leave_invoke ( )
inline

Definition at line 222 of file levin_client_async.h.

223  {
224  boost::interprocess::ipcdetail::atomic_write32(&m_invoke_is_active, 0);
225  }

◆ recv_n() [1/2]

bool epee::levin::levin_client_async::recv_n ( SOCKET  s,
char *  pbuff,
size_t  cb 
)
inline

Definition at line 162 of file levin_client_async.h.

163  {
164  while(cb)
165  {
166  int res = ::recv(m_socket, pbuff, (int)cb, 0);
167 
168  if(SOCKET_ERROR == res)
169  {
170  if(!m_connected)
171  return false;
172 
173  int err = ::WSAGetLastError();
174  LOG_ERROR("Failed to recv(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"");
175  disconnect();
176  //reconnect();
177  return false;
178  }else if(res == 0)
179  {
180  disconnect();
181  //reconnect();
182  return false;
183  }
184  LOG_PRINT_L4("[" << m_socket <<"] RECV " << res);
185  cb -= res;
186  pbuff += res;
187  }
188 
189  return true;
190  }
const char * res
Definition: hmac_keccak.cpp:41
#define LOG_ERROR(x)
Definition: misc_log_ex.h:98
#define LOG_PRINT_L4(x)
Definition: misc_log_ex.h:103

◆ recv_n() [2/2]

bool epee::levin::levin_client_async::recv_n ( SOCKET  s,
std::string &  buff 
)
inline

Definition at line 194 of file levin_client_async.h.

195  {
196  size_t cb_remain = buff.size();
197  char* m_current_ptr = (char*)buff.data();
198  return recv_n(s, m_current_ptr, cb_remain);
199  }
bool recv_n(SOCKET s, char *pbuff, size_t cb)

◆ set_handler()

void epee::levin::levin_client_async::set_handler ( levin_commands_handler phandler,
void(*)(levin_commands_handler *)  destroy = NULL 
)
inline

Definition at line 105 of file levin_client_async.h.

106  {
107  if (commands_handler_destroy && m_pcommands_handler)
108  (*commands_handler_destroy)(m_pcommands_handler);
109  m_pcommands_handler = phandler;
110  m_pcommands_handler_destroy = destroy;
111  }

The documentation for this class was generated from the following file: