28 #include <boost/asio/deadline_timer.hpp> 29 #include <boost/uuid/uuid_generators.hpp> 30 #include <boost/unordered_map.hpp> 31 #include <boost/interprocess/detail/atomic.hpp> 32 #include <boost/smart_ptr/make_shared.hpp> 46 #undef ELECTRONEUM_DEFAULT_LOG_CATEGORY 47 #define ELECTRONEUM_DEFAULT_LOG_CATEGORY "net" 49 #ifndef MIN_BYTES_WANTED 50 #define MIN_BYTES_WANTED 512 61 template<
class t_connection_context>
64 template<
class t_connection_context>
67 typedef boost::unordered_map<boost::uuids::uuid, async_protocol_handler<t_connection_context>* > connections_map;
69 connections_map m_connects;
82 void delete_connections (
size_t count,
bool incoming);
90 template<
class callback_t>
97 template<
class callback_t>
99 template<
class callback_t>
115 template<
class t_connection_context = net_utils::connection_context_base>
116 class async_protocol_handler
161 template <
class callback_t>
170 MDEBUG(con.get_context_ref() <<
"anvoke_handler, timeout: " << timeout);
171 m_timer.expires_from_now(boost::posix_time::milliseconds(timeout));
172 m_timer.async_wait([&con, command, cb, timeout](
const boost::system::error_code& ec)
174 if(ec == boost::asio::error::operation_aborted)
176 MINFO(con.get_context_ref() <<
"Timeout on invoke operation happened, command: " << command <<
" timeout: " << timeout);
180 con.finish_outer_call();
221 boost::system::error_code ignored_ec;
228 boost::system::error_code ignored_ec;
231 callback_t& cb =
m_cb;
236 m_timer.async_wait([&con, cb, command, timeout](
const boost::system::error_code& ec)
238 if(ec == boost::asio::error::operation_aborted)
240 MINFO(con.get_context_ref() <<
"Timeout on invoke operation happened, command: " << command <<
" timeout: " << timeout);
244 con.finish_outer_call();
252 template<
class callback_t>
256 boost::shared_ptr<invoke_response_handler_base> handler(boost::make_shared<
anvoke_handler<callback_t>>(cb, timeout, con, command));
258 return handler->is_timer_started();
264 t_connection_context& conn_context):
292 for (
size_t i = 0; i < 60 * 1000 / 100 && 0 != boost::interprocess::ipcdetail::atomic_read32(&
m_wait_count); ++i)
312 boost::interprocess::ipcdetail::atomic_inc32(&
m_wait_count);
318 boost::interprocess::ipcdetail::atomic_dec32(&
m_wait_count);
333 std::for_each(local_invoke_response_handlers.begin(), local_invoke_response_handlers.end(), [](
const boost::shared_ptr<invoke_response_handler_base>& pinv_resp_hndlr) {
334 pinv_resp_hndlr->cancel();
368 if(boost::interprocess::ipcdetail::atomic_read32(&
m_close_called))
381 <<
", connection will be closed.");
387 bool is_continue =
true;
403 response_handler->reset_timer();
427 bool timer_cancelled = response_handler->cancel_timer();
431 invoke_response_handlers_guard.
unlock();
438 invoke_response_handlers_guard.
unlock();
440 if(!boost::interprocess::ipcdetail::atomic_read32(&
m_wait_count) && !boost::interprocess::ipcdetail::atomic_read32(&
m_close_called))
468 #if BYTE_ORDER == LITTLE_ENDIAN 480 send_buff += return_buff;
510 #if BYTE_ORDER == LITTLE_ENDIAN 535 <<
", connection will be closed.");
559 template<
class callback_t>
588 head.m_have_to_return_data =
true;
646 head.m_have_to_return_data =
true;
668 <<
", f=" <<
head.m_flags
669 <<
", r?=" <<
head.m_have_to_return_data
670 <<
", cmd = " <<
head.m_command
671 <<
", ver=" <<
head.m_protocol_version);
674 size_t prev_size = 0;
719 head.m_have_to_return_data =
false;
739 ", f=" <<
head.m_flags <<
740 ", r?=" <<
head.m_have_to_return_data <<
741 ", cmd = " <<
head.m_command <<
742 ", ver=" <<
head.m_protocol_version);
752 template<
class t_connection_context>
753 void async_protocol_handler_config<t_connection_context>::del_connection(async_protocol_handler<t_connection_context>* pconn)
756 m_connects.erase(pconn->get_connection_id());
758 m_pcommands_handler->on_connection_close(pconn->m_connection_context);
761 template<
class t_connection_context>
762 void async_protocol_handler_config<t_connection_context>::delete_connections(
size_t count,
bool incoming)
764 std::vector <boost::uuids::uuid> connections;
766 for (
auto& c: m_connects)
768 if (c.second->m_connection_context.m_is_income == incoming)
769 connections.push_back(c.first);
774 unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
775 shuffle(connections.begin(), connections.end(), std::default_random_engine(seed));
776 while (
count > 0 && connections.size() > 0)
780 auto i = connections.end() - 1;
781 async_protocol_handler<t_connection_context> *conn = m_connects.at(*i);
782 del_connection(conn);
784 connections.erase(i);
786 catch (
const std::out_of_range &e)
788 MWARNING(
"Connection not found in m_connects, continuing");
796 template<
class t_connection_context>
799 delete_connections(
count,
false);
802 template<
class t_connection_context>
805 delete_connections(
count,
true);
808 template<
class t_connection_context>
812 m_connects[pconn->get_connection_id()] = pconn;
814 m_pcommands_handler->on_connection_new(pconn->m_connection_context);
817 template<
class t_connection_context>
818 async_protocol_handler<t_connection_context>* async_protocol_handler_config<t_connection_context>::find_connection(
boost::uuids::uuid connection_id)
const 820 auto it = m_connects.find(connection_id);
821 return it == m_connects.end() ? 0 : it->second;
824 template<
class t_connection_context>
825 int async_protocol_handler_config<t_connection_context>::find_and_lock_connection(
boost::uuids::uuid connection_id, async_protocol_handler<t_connection_context>*& aph)
828 aph = find_connection(connection_id);
831 if(!aph->start_outer_call())
836 template<
class t_connection_context>
840 int r = find_and_lock_connection(connection_id, aph);
841 return LEVIN_OK == r ? aph->invoke(command, in_buff, buff_out) : r;
844 template<
class t_connection_context>
template<
class callback_t>
848 int r = find_and_lock_connection(connection_id, aph);
849 return LEVIN_OK == r ? aph->async_invoke(command, in_buff, cb, timeout) : r;
852 template<
class t_connection_context>
template<
class callback_t>
856 for(
auto& c: m_connects)
859 if(!cb(aph->get_context_ref()))
865 template<
class t_connection_context>
template<
class callback_t>
872 if(!cb(aph->get_context_ref()))
877 template<
class t_connection_context>
881 return m_connects.size();
884 template<
class t_connection_context>
887 if (m_pcommands_handler && m_pcommands_handler_destroy)
888 (*m_pcommands_handler_destroy)(m_pcommands_handler);
889 m_pcommands_handler = handler;
890 m_pcommands_handler_destroy = destroy;
893 template<
class t_connection_context>
897 int r = find_and_lock_connection(connection_id, aph);
898 return LEVIN_OK == r ? aph->notify(command, in_buff) : r;
901 template<
class t_connection_context>
906 return 0 != aph ? aph->close() :
false;
909 template<
class t_connection_context>
916 aph->update_connection_context(contxt);
920 template<
class t_connection_context>
924 int r = find_and_lock_connection(connection_id, aph);
927 aph->request_callback();
#define LEVIN_ERROR_CONNECTION_TIMEDOUT
#define CRITICAL_REGION_LOCAL1(x)
void append(const void *data, size_t sz)
net_utils::i_service_endpoint * m_pservice_endpoint
uint32_t m_protocol_version
std::atomic< bool > m_deletion_initiated
async_protocol_handler_config< t_connection_context > config_type
uint64_t get_tick_count()
#define LOG_DEBUG_CC(ct, message)
critical_section m_invoke_response_handlers_lock
virtual bool handle(int res, const epee::span< const uint8_t > buff, connection_context &context)=0
bool m_connection_initialized
t_connection_context connection_context
anvoke_handler(const callback_t &cb, uint64_t timeout, async_protocol_handler &con, int command)
#define LEVIN_PACKET_REQUEST
auto_scope_leave_caller create_scope_leave_handler(t_scope_leave_handler f)
int notify(int command, const epee::span< const uint8_t > in_buff)
#define LEVIN_PACKET_RESPONSE
uint64_t m_max_packet_size
std::string m_local_inv_buff
critical_section m_local_inv_buff_lock
Non-owning sequence of data. Does not deep copy.
bool async_invoke(int command, const epee::span< const uint8_t > in_buff, const callback_t &cb, size_t timeout=LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
bool add_invoke_response_handler(const callback_t &cb, uint64_t timeout, async_protocol_handler &con, int command)
std::atomic< bool > m_protocol_released
~async_protocol_handler_config()
size_t get_connections_count()
int invoke(int command, const epee::span< const uint8_t > in_buff, std::string &buff_out, boost::uuids::uuid connection_id)
bool after_init_connection()
#define CHECK_AND_ASSERT_MES_NO_RET(expr, message)
async_protocol_handler(net_utils::i_service_endpoint *psnd_hndlr, config_type &config, t_connection_context &conn_context)
net_utils::buffer m_cache_in_buffer
#define CRITICAL_REGION_END()
#define LEVIN_DEFAULT_MAX_PACKET_SIZE
t_connection_context & m_connection_context
virtual bool is_timer_started() const
constexpr std::size_t size() const noexcept
mdb_size_t count(MDB_cursor *cur)
std::list< boost::shared_ptr< invoke_response_handler_base > > m_invoke_response_handlers
virtual bool handle(int res, const epee::span< const uint8_t > buff, typename async_protocol_handler::connection_context &context)
bool request_callback(boost::uuids::uuid connection_id)
critical_section m_send_lock
#define CRITICAL_REGION_BEGIN(x)
volatile uint32_t m_close_called
virtual void reset_timer()=0
t_connection_context & get_context_ref()
boost::shared_ptr< call_befor_die_base > auto_scope_leave_caller
uint64_t m_invoke_timeout
bool close(boost::uuids::uuid connection_id)
bool foreach_connection(const callback_t &cb)
void del_in_connections(size_t count)
epee::span< const uint8_t > carve(size_t sz)
unsigned __int64 uint64_t
#define CRITICAL_REGION_LOCAL(x)
int invoke(int command, const epee::span< const uint8_t > in_buff, std::string &buff_out)
std::unique_ptr< void, terminate > context
Unique ZMQ context handle, calls zmq_term on destruction.
void del_out_connections(size_t count)
int invoke_async(int command, const epee::span< const uint8_t > in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout=LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
async_protocol_handler_config()
bool m_have_to_return_data
virtual ~async_protocol_handler()
#define LOG_ERROR_CC(ct, message)
boost::uuids::uuid get_connection_id()
virtual bool handle_recv(const void *ptr, size_t cb)
virtual bool is_timer_started() const =0
#define LEVIN_ERROR_CONNECTION
virtual bool cancel_timer()=0
bucket_head2 m_current_head
volatile int m_invoke_result_code
virtual bool call_run_once_service_io()=0
volatile uint32_t m_invoke_buf_ready
epee::span< const uint8_t > span(size_t sz) const
void update_connection_context(const connection_context &contxt)
#define LEVIN_PROTOCOL_VER_1
bool update_connection_context(const t_connection_context &contxt)
#define LEVIN_ERROR_CONNECTION_DESTROYED
void set_handler(levin_commands_handler< t_connection_context > *handler, void(*destroy)(levin_commands_handler< t_connection_context > *)=NULL)
volatile uint32_t m_wait_count
virtual bool request_callback()=0
virtual bool do_send(const void *ptr, size_t cb)=0
void handle_qued_callback()
int notify(int command, const epee::span< const uint8_t > in_buff, boost::uuids::uuid connection_id)
virtual bool cancel_timer()
#define LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED
int32_t m_oponent_protocol_ver
critical_section m_call_lock
async_protocol_handler & m_con
bool for_connection(const boost::uuids::uuid &connection_id, const callback_t &cb)
virtual ~anvoke_handler()
bool m_cancel_timer_called
virtual void reset_timer()
boost::asio::deadline_timer m_timer
t_connection_context connection_context
#define LEVIN_ERROR_CONNECTION_NOT_FOUND
constexpr pointer data() const noexcept