35 #include <boost/version.hpp> 36 #include <boost/asio/io_service.hpp> 37 #include <boost/asio/ip/tcp.hpp> 38 #include <boost/asio/read.hpp> 39 #include <boost/asio/ssl.hpp> 40 #include <boost/asio/steady_timer.hpp> 41 #include <boost/thread/future.hpp> 42 #include <boost/lambda/bind.hpp> 43 #include <boost/lambda/lambda.hpp> 44 #include <boost/interprocess/detail/atomic.hpp> 45 #include <boost/system/error_code.hpp> 51 #undef ELECTRONEUM_DEFAULT_LOG_CATEGORY 52 #define ELECTRONEUM_DEFAULT_LOG_CATEGORY "net" 55 #define MAKE_IP( a1, a2, a3, a4 ) (a1|(a2<<8)|(a3<<16)|(a4<<24)) 65 boost::unique_future<boost::asio::ip::tcp::socket>
72 enum try_connect_result_t
83 handler_obj(boost::system::error_code&
error,
size_t& bytes_transferred):ref_error(
error), ref_bytes_transferred(bytes_transferred)
85 handler_obj(
const handler_obj& other_obj):ref_error(other_obj.ref_error), ref_bytes_transferred(other_obj.ref_bytes_transferred)
88 boost::system::error_code& ref_error;
89 size_t& ref_bytes_transferred;
91 void operator()(
const boost::system::error_code&
error,
92 std::size_t bytes_transferred
96 ref_bytes_transferred = bytes_transferred;
175 m_deadline.expires_at(std::chrono::steady_clock::time_point::max());
183 boost::system::error_code ignored_ec;
184 m_ssl_socket->next_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec);
187 return CONNECT_NO_SSL;
191 MWARNING(
"Failed to establish SSL connection");
193 return CONNECT_FAILURE;
197 return CONNECT_SUCCESS;
200 MWARNING(
"Some problems at connect, expected open socket");
201 return CONNECT_FAILURE;
221 if (try_connect_result == CONNECT_FAILURE)
225 if (try_connect_result == CONNECT_NO_SSL)
227 MERROR(
"SSL handshake failed on an autodetect connection, reconnecting without SSL");
234 catch(
const boost::system::system_error& er)
236 MDEBUG(
"Some problems at connect, message: " << er.what());
241 MDEBUG(
"Some fatal problems.");
263 m_ssl_socket->next_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both);
266 catch(
const boost::system::system_error& )
293 boost::system::error_code ec = boost::asio::error::would_block;
302 while (ec == boost::asio::error::would_block)
315 m_deadline.expires_at(std::chrono::steady_clock::time_point::max());
320 catch(
const boost::system::system_error& er)
322 LOG_ERROR(
"Some problems at connect, message: " << er.what());
335 bool send(
const void* data,
size_t sz)
361 boost::system::error_code ec;
363 size_t writen =
write(data, sz, ec);
372 m_deadline.expires_at(std::chrono::steady_clock::time_point::max());
377 catch(
const boost::system::system_error& er)
379 LOG_ERROR(
"Some problems at send, message: " << er.what());
424 boost::system::error_code ec = boost::asio::error::would_block;
425 size_t bytes_transfered = 0;
427 handler_obj hndlr(ec, bytes_transfered);
429 static const size_t max_size = 16384;
430 buff.resize(max_size);
432 async_read(&buff[0], max_size, boost::asio::transfer_at_least(1), hndlr);
435 while (ec == boost::asio::error::would_block && !boost::interprocess::ipcdetail::atomic_read32(&
m_shutdowned))
444 MTRACE(
"READ ENDS: Connection err_code " << ec.value());
445 if(ec == boost::asio::error::eof)
447 MTRACE(
"Connection err_code eof.");
453 MDEBUG(
"Problems at read: " << ec.message());
458 MTRACE(
"READ ENDS: Success. bytes_tr: " << bytes_transfered);
459 m_deadline.expires_at(std::chrono::steady_clock::time_point::max());
466 buff.resize(bytes_transfered);
470 catch(
const boost::system::system_error& er)
472 LOG_ERROR(
"Some problems at read, message: " << er.what());
478 LOG_ERROR(
"Some fatal problems at read.");
510 buff.resize(static_cast<size_t>(sz));
511 boost::system::error_code ec = boost::asio::error::would_block;
512 size_t bytes_transfered = 0;
515 handler_obj hndlr(ec, bytes_transfered);
516 async_read((
char*)buff.data(), buff.size(), boost::asio::transfer_at_least(buff.size()), hndlr);
519 while (ec == boost::asio::error::would_block && !boost::interprocess::ipcdetail::atomic_read32(&
m_shutdowned))
531 m_deadline.expires_at(std::chrono::steady_clock::time_point::max());
535 if(bytes_transfered != buff.size())
537 LOG_ERROR(
"Transferred mismatch with transfer_at_least value: m_bytes_transferred=" << bytes_transfered <<
" at_least value=" << buff.size());
544 catch(
const boost::system::system_error& er)
546 LOG_ERROR(
"Some problems at read, message: " << er.what());
552 LOG_ERROR(
"Some fatal problems at read.");
564 boost::system::error_code ec;
569 MDEBUG(
"Problems at cancel: " << ec.message());
570 m_ssl_socket->next_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
572 MDEBUG(
"Problems at shutdown: " << ec.message());
575 MDEBUG(
"Problems at close: " << ec.message());
576 boost::interprocess::ipcdetail::atomic_write32(&
m_shutdowned, 1);
603 void check_deadline()
608 if (
m_deadline.expires_at() <= std::chrono::steady_clock::now())
619 m_deadline.expires_at(std::chrono::steady_clock::time_point::max());
623 m_deadline.async_wait(boost::bind(&blocked_mode_client::check_deadline,
this));
626 void shutdown_ssl() {
628 boost::system::error_code ec = boost::asio::error::would_block;
629 m_deadline.expires_from_now(std::chrono::milliseconds(2000));
630 m_ssl_socket->async_shutdown(boost::lambda::var(ec) = boost::lambda::_1);
631 while (ec == boost::asio::error::would_block)
637 if (ec.category() == boost::asio::error::get_ssl_category() &&
639 #if BOOST_VERSION >= 106200 640 boost::asio::ssl::error::stream_truncated
641 #else // older Boost supports only OpenSSL 1.0, so 1.0-only macros are appropriate 642 ERR_PACK(ERR_LIB_SSL, 0, SSL_R_SHORT_READ)
645 MDEBUG(
"Problems at ssl shutdown: " << ec.message());
649 bool write(
const void* data,
size_t sz, boost::system::error_code& ec)
655 success = boost::asio::write(
m_ssl_socket->next_layer(), boost::asio::buffer(data, sz), ec);
659 void async_write(
const void* data,
size_t sz, boost::system::error_code& ec)
662 boost::asio::async_write(*
m_ssl_socket, boost::asio::buffer(data, sz), boost::lambda::var(ec) = boost::lambda::_1);
664 boost::asio::async_write(
m_ssl_socket->next_layer(), boost::asio::buffer(data, sz), boost::lambda::var(ec) = boost::lambda::_1);
667 void async_read(
char* buff,
size_t sz, boost::asio::detail::transfer_at_least_t transfer_at_least, handler_obj& hndlr)
670 boost::asio::async_read(
m_ssl_socket->next_layer(), boost::asio::buffer(buff, sz), transfer_at_least, hndlr);
672 boost::asio::async_read(*
m_ssl_socket, boost::asio::buffer(buff, sz), transfer_at_least, hndlr);
679 std::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>
m_ssl_socket;
703 m_send_deadline.expires_at(boost::posix_time::pos_infin);
706 check_send_deadline();
710 m_send_deadline.cancel();
716 m_send_deadline.cancel();
721 bool send(
const void* data,
size_t sz)
747 boost::system::error_code ec;
749 size_t writen =
write(data, sz, ec);
757 m_send_deadline.expires_at(boost::posix_time::pos_infin);
761 catch(
const boost::system::system_error& er)
763 LOG_ERROR(
"Some problems at connect, message: " << er.what());
778 boost::asio::deadline_timer m_send_deadline;
780 void check_send_deadline()
785 if (m_send_deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now())
795 m_send_deadline.expires_at(boost::posix_time::pos_infin);
799 m_send_deadline.async_wait(boost::bind(&async_blocked_mode_client::check_send_deadline,
this));
void async_write(const void *data, size_t sz, boost::system::error_code &ec)
void set_connector(std::function< connect_func > connector)
Change the connection routine (proxy, etc.)
boost::asio::ssl::context m_ctx
uint64_t get_bytes_received() const
void set_ssl(ssl_options_t ssl_options)
std::shared_ptr< boost::asio::ssl::stream< boost::asio::ip::tcp::socket > > m_ssl_socket
bool send(const void *data, size_t sz)
std::unique_ptr< void, close > socket
Unique ZMQ socket handle, calls zmq_close on destruction.
bool send(const void *data, size_t sz)
boost::unique_future< boost::asio::ip::tcp::socket >(const std::string &, const std::string &, boost::asio::steady_timer &) connect_func
bool is_connected(bool *ssl=NULL)
bool connect(const std::string &addr, const std::string &port, std::chrono::milliseconds timeout)
async_blocked_mode_client()
boost::asio::io_service m_io_service
ssl_options_t m_ssl_options
bool connect(const std::string &addr, int port, std::chrono::milliseconds timeout)
Represents a single connection from a client.
bool handshake(boost::asio::ssl::stream< boost::asio::ip::tcp::socket > &socket, boost::asio::ssl::stream_base::handshake_type type, const std::string &host={}) const
unsigned __int64 uint64_t
boost::asio::steady_timer m_deadline
std::unique_ptr< void, terminate > context
Unique ZMQ context handle, calls zmq_term on destruction.
bool write(const void *data, size_t sz, boost::system::error_code &ec)
try_connect_result_t try_connect(const std::string &addr, const std::string &port, std::chrono::milliseconds timeout, epee::net_utils::ssl_support_t ssl_support)
boost::endian::big_uint16_t port
expect< void > success() noexcept
~async_blocked_mode_client()
const T & move(const T &t)
std::function< connect_func > m_connector
bool recv_n(std::string &buff, int64_t sz, std::chrono::milliseconds timeout)
uint64_t get_bytes_sent() const
std::string to_string(t_connection_type type)
boost::asio::io_service & get_io_service()
boost::asio::ssl::context create_context() const
volatile uint32_t m_shutdowned
void async_read(char *buff, size_t sz, boost::asio::detail::transfer_at_least_t transfer_at_least, handler_obj &hndlr)
error
Tracks LMDB error codes.
boost::asio::ip::tcp::socket & get_socket()
std::atomic< uint64_t > m_bytes_received
boost::unique_future< boost::asio::ip::tcp::socket > operator()(const std::string &addr, const std::string &port, boost::asio::steady_timer &) const
bool send(const std::string &buff, std::chrono::milliseconds timeout)
bool recv(std::string &buff, std::chrono::milliseconds timeout)
std::atomic< uint64_t > m_bytes_sent