31 #include "net_helper.h
" 32 #include "levin_base.h
" 34 #undef ELECTRONEUM_DEFAULT_LOG_CATEGORY 35 #define ELECTRONEUM_DEFAULT_LOG_CATEGORY "net" 43 /************************************************************************ 44 * levin_client_async - probably it is not really fast implementation, 45 * each handler thread could make up to 30 ms latency. 46 * But, handling events in reader thread will cause dead locks in 47 * case of recursive call (call invoke() to the same connection 48 * on reader thread on remote invoke() handler) 49 ***********************************************************************/ 52 class levin_client_async 54 levin_commands_handler* m_pcommands_handler; 55 void (*commands_handler_destroy)(levin_commands_handler*); 56 volatile uint32_t m_is_stop; 57 volatile uint32_t m_threads_count; 58 ::critical_section m_send_lock; 60 std::string m_local_invoke_buff; 61 ::critical_section m_local_invoke_buff_lock; 62 volatile int m_invoke_res; 64 volatile uint32_t m_invoke_data_ready; 65 volatile uint32_t m_invoke_is_active; 67 boost::mutex m_invoke_event; 68 boost::condition_variable m_invoke_cond; 71 ::critical_section m_recieved_packets_lock; 76 uint32_t m_connection_index; 78 std::list<packet_entry> m_recieved_packets; 80 m_current_connection_index needed when some connection was broken and reconnected - in this 81 case we could have some received packets in que, which shoud not be handled 83 volatile uint32_t m_current_connection_index; 84 ::critical_section m_invoke_lock; 85 ::critical_section m_reciev_packet_lock; 86 ::critical_section m_connection_lock; 87 net_utils::blocked_mode_client m_transport; 89 levin_client_async():m_pcommands_handler(NULL), commands_handler_destroy(NULL), m_is_stop(0), m_threads_count(0), m_invoke_data_ready(0), m_invoke_is_active(0) 91 levin_client_async(const levin_client_async& /*v*/):m_pcommands_handler(NULL), commands_handler_destroy(NULL), m_is_stop(0), m_threads_count(0), m_invoke_data_ready(0), m_invoke_is_active(0) 95 boost::interprocess::ipcdetail::atomic_write32(&m_is_stop, 1); 99 while(boost::interprocess::ipcdetail::atomic_read32(&m_threads_count)) 105 void set_handler(levin_commands_handler* phandler, void (*destroy)(levin_commands_handler*) = NULL) 107 if (commands_handler_destroy && m_pcommands_handler) 108 (*commands_handler_destroy)(m_pcommands_handler); 109 m_pcommands_handler = phandler; 110 m_pcommands_handler_destroy = destroy; 113 bool connect(uint32_t ip, uint32_t port, uint32_t timeout) 116 critical_region cr(m_connection_lock); 120 CRITICAL_REGION_BEGIN(m_reciev_packet_lock); 121 CRITICAL_REGION_BEGIN(m_send_lock); 122 res = levin_client_impl::connect(ip, port, timeout); 123 boost::interprocess::ipcdetail::atomic_inc32(&m_current_connection_index); 124 CRITICAL_REGION_END(); 125 CRITICAL_REGION_END(); 126 if(res && !boost::interprocess::ipcdetail::atomic_read32(&m_threads_count) ) 128 //boost::interprocess::ipcdetail::atomic_write32(&m_is_stop, 0);//m_is_stop = false; 129 boost::thread( boost::bind(&levin_duplex_client::reciever_thread, this) ); 130 boost::thread( boost::bind(&levin_duplex_client::handler_thread, this) ); 131 boost::thread( boost::bind(&levin_duplex_client::handler_thread, this) ); 139 critical_region cr(m_cs); 140 return levin_client_impl::is_connected(); 144 bool check_connection() 147 critical_region cr(m_cs); 153 LOG_ERROR("Reconnect Failed. Failed to invoke() because not connected!");
162 bool recv_n(
SOCKET s,
char* pbuff,
size_t cb)
166 int res = ::recv(m_socket, pbuff, (
int)cb, 0);
168 if(SOCKET_ERROR ==
res)
173 int err = ::WSAGetLastError();
174 LOG_ERROR(
"Failed to recv(), err = " << err <<
" \"" << socket_errors::get_socket_error_text(err) <<
"\"");
196 size_t cb_remain = buff.size();
197 char* m_current_ptr = (
char*)buff.data();
198 return recv_n(s, m_current_ptr, cb_remain);
206 levin_client_impl::disconnect();
209 m_local_invoke_buff.clear();
212 boost::interprocess::ipcdetail::atomic_write32(&m_invoke_data_ready, 1);
213 m_invoke_cond.notify_all();
224 boost::interprocess::ipcdetail::atomic_write32(&m_invoke_is_active, 0);
232 boost::interprocess::ipcdetail::atomic_write32(&m_invoke_is_active, 1);
233 boost::interprocess::ipcdetail::atomic_write32(&m_invoke_data_ready, 0);
234 misc_utils::destr_ptr hdlr = misc_utils::add_exit_scope_handler(boost::bind(&levin_duplex_client::on_leave_invoke,
this));
238 if(!check_connection())
244 head.m_cb = in_buff.size();
245 head.m_have_to_return_data =
true;
247 #ifdef TRACE_LEVIN_PACKETS_BY_GUIDS 248 ::UuidCreate(&
head.m_id);
250 head.m_command = command;
253 LOG_PRINT(
"[" << m_socket <<
"] Sending invoke data", LOG_LEVEL_4);
258 if(SOCKET_ERROR ==
res)
260 int err = ::WSAGetLastError();
261 LOG_ERROR(
"Failed to send(), err = " << err <<
" \"" << socket_errors::get_socket_error_text(err) <<
"\"");
265 LOG_PRINT_L4(
"[" << m_socket <<
"] SEND " << (
int)in_buff.size());
266 res =
::send(m_socket, in_buff.data(), (int)in_buff.size(), 0);
267 if(SOCKET_ERROR ==
res)
269 int err = ::WSAGetLastError();
270 LOG_ERROR(
"Failed to send(), err = " << err <<
" \"" << socket_errors::get_socket_error_text(err) <<
"\"");
275 LOG_PRINT_L4(
"LEVIN_PACKET_SENT. [len=" <<
head.m_cb <<
", flags=" <<
head.m_flags <<
", is_cmd=" <<
head.m_have_to_return_data <<
", cmd_id = " <<
head.m_command <<
", pr_v=" <<
head.m_protocol_version <<
", uid=" << string_tools::get_str_from_guid_a(
head.m_id) <<
"]");
278 boost::system_time timeout = boost::get_system_time()+ boost::posix_time::milliseconds(100);
279 size_t timeout_count = 0;
280 boost::unique_lock<boost::mutex> lock(m_invoke_event);
282 while(!boost::interprocess::ipcdetail::atomic_read32(&m_invoke_data_ready))
284 if(!m_invoke_cond.timed_wait(lock, timeout))
286 if(timeout_count < 10)
289 timeout = boost::get_system_time()+ boost::posix_time::milliseconds(100);
292 }
else if(timeout_count == 10)
295 timeout = boost::get_system_time()+ boost::posix_time::minutes(10);
300 LOG_PRINT(
"[" << m_socket <<
"] Timeout on waiting invoke result. ", LOG_LEVEL_0);
309 buff_out.swap(m_local_invoke_buff);
310 m_local_invoke_buff.clear();
317 if(!check_connection())
322 head.m_cb = in_buff.size();
323 head.m_have_to_return_data =
false;
325 #ifdef TRACE_LEVIN_PACKETS_BY_GUIDS 326 ::UuidCreate(&
head.m_id);
328 head.m_command = command;
334 if(SOCKET_ERROR ==
res)
336 int err = ::WSAGetLastError();
337 LOG_ERROR(
"Failed to send(), err = " << err <<
" \"" << socket_errors::get_socket_error_text(err) <<
"\"");
341 LOG_PRINT_L4(
"[" << m_socket <<
"] SEND " << (
int)in_buff.size());
342 res =
::send(m_socket, in_buff.data(), (int)in_buff.size(), 0);
343 if(SOCKET_ERROR ==
res)
345 int err = ::WSAGetLastError();
346 LOG_ERROR(
"Failed to send(), err = " << err <<
" \"" << socket_errors::get_socket_error_text(err) <<
"\"");
351 LOG_PRINT_L4(
"LEVIN_PACKET_SENT. [len=" <<
head.m_cb <<
", flags=" <<
head.m_flags <<
", is_cmd=" <<
head.m_have_to_return_data <<
", cmd_id = " <<
head.m_command <<
", pr_v=" <<
head.m_protocol_version <<
", uid=" << string_tools::get_str_from_guid_a(
head.m_id) <<
"]");
358 bool have_some_data(
SOCKET sock,
int interval = 1)
370 tv.tv_sec = interval;
373 int sel_res = select(0, &fds, 0, &fdse, &tv);
376 else if(sel_res == SOCKET_ERROR)
380 int err_code = ::WSAGetLastError();
381 LOG_ERROR(
"Filed to call select, err code = " << err_code);
388 }
else if(fdse.fd_array[0])
397 bool reciev_and_process_incoming_data()
399 bucket_head
head = {0};
401 bool is_request =
false;
405 if(!recv_n(m_socket, (
char*)&
head,
sizeof(
head)))
413 conn_index = boost::interprocess::ipcdetail::atomic_read32(&m_current_connection_index);
417 LOG_ERROR(
"Signature mismatch in response");
424 local_buff.resize((
size_t)
head.m_cb);
425 if(!recv_n(m_socket, local_buff))
434 LOG_PRINT_L4(
"LEVIN_PACKET_RECEIVED. [len=" <<
head.m_cb <<
", flags=" <<
head.m_flags <<
", is_cmd=" <<
head.m_have_to_return_data <<
", cmd_id = " <<
head.m_command <<
", pr_v=" <<
head.m_protocol_version <<
", uid=" << string_tools::get_str_from_guid_a(
head.m_id) <<
"]");
439 m_recieved_packets.resize(m_recieved_packets.size() + 1);
440 m_recieved_packets.back().m_hd =
head;
441 m_recieved_packets.back().m_body.swap(local_buff);
442 m_recieved_packets.back().m_connection_index = conn_index;
451 m_local_invoke_buff.swap(local_buff);
452 m_invoke_res =
head.m_return_code;
454 boost::interprocess::ipcdetail::atomic_write32(&m_invoke_data_ready, 1);
455 m_invoke_cond.notify_all();
461 bool reciever_thread()
463 LOG_PRINT_L3(
"[" << m_socket <<
"] Socket reciever thread started.[m_threads_count=" << m_threads_count <<
"]");
464 log_space::log_singletone::set_thread_log_prefix(
"RECIEVER_WORKER");
465 boost::interprocess::ipcdetail::atomic_inc32(&m_threads_count);
475 if(have_some_data(m_socket, 1))
477 if(!reciev_and_process_incoming_data())
484 LOG_ERROR(
"Failed to reciev_and_process_incoming_data. shutting down");
492 boost::interprocess::ipcdetail::atomic_dec32(&m_threads_count);
493 LOG_PRINT_L3(
"[" << m_socket <<
"] Socket reciever thread stopped.[m_threads_count=" << m_threads_count <<
"]");
500 net_utils::connection_context_base conn_context;
501 conn_context.m_remote_address = m_address;
502 if(
head.m_have_to_return_data)
505 if(m_pcommands_handler)
506 head.m_return_code = m_pcommands_handler->invoke(
head.m_id,
head.m_command, local_buff, return_buff, conn_context);
512 head.m_cb = return_buff.size();
513 head.m_have_to_return_data =
false;
518 send_buff += return_buff;
520 if(conn_index != boost::interprocess::ipcdetail::atomic_read32(&m_current_connection_index))
524 int res =
::send(m_socket, (
const char*)send_buff.data(), send_buff.size(), 0);
525 if(
res == SOCKET_ERROR)
527 int err_code = ::WSAGetLastError();
528 LOG_ERROR(
"Failed to send, err = " << err_code);
532 LOG_PRINT_L4(
"LEVIN_PACKET_SENT. [len=" <<
head.m_cb <<
", flags=" <<
head.m_flags <<
", is_cmd=" <<
head.m_have_to_return_data <<
", cmd_id = " <<
head.m_command <<
", pr_v=" <<
head.m_protocol_version <<
", uid=" << string_tools::get_str_from_guid_a(
head.m_id) <<
"]");
537 if(m_pcommands_handler)
538 m_pcommands_handler->notify(
head.m_id,
head.m_command, local_buff, conn_context);
544 bool handler_thread()
546 LOG_PRINT_L3(
"[" << m_socket <<
"] Socket handler thread started.[m_threads_count=" << m_threads_count <<
"]");
547 log_space::log_singletone::set_thread_log_prefix(
"HANDLER_WORKER");
548 boost::interprocess::ipcdetail::atomic_inc32(&m_threads_count);
552 bool have_some_work =
false;
554 bucket_head bh = {0};
558 if(m_recieved_packets.size())
560 bh = m_recieved_packets.begin()->m_hd;
561 conn_index = m_recieved_packets.begin()->m_connection_index;
562 local_buff.swap(m_recieved_packets.begin()->m_body);
563 have_some_work =
true;
564 m_recieved_packets.pop_front();
570 process_recieved_packet(bh, local_buff, conn_index);
578 boost::interprocess::ipcdetail::atomic_dec32(&m_threads_count);
579 LOG_PRINT_L3(
"[" << m_socket <<
"] Socket handler thread stopped.[m_threads_count=" << m_threads_count <<
"]");
#define LEVIN_ERROR_CONNECTION_TIMEDOUT
int notify(const GUID &target, int command, const std::string &in_buff)
#define LEVIN_PACKET_REQUEST
#define LEVIN_ERROR_CONNECTION_HANDLER_NOT_DEFINED
#define LEVIN_PACKET_RESPONSE
#define CRITICAL_REGION_END()
#define CRITICAL_REGION_BEGIN(x)
int invoke(const GUID &target, int command, const std::string &in_buff, std::string &buff_out)
#define LEVIN_PROTOCOL_VER_1
#define LEVIN_ERROR_CONNECTION_DESTROYED
expect< void > send(const epee::span< const std::uint8_t > payload, void *const socket, const int flags) noexcept
bool recv_n(SOCKET s, std::string &buff)