Electroneum
abstract_tcp_server2.h
Go to the documentation of this file.
1 
7 // Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net
8 // All rights reserved.
9 //
10 // Redistribution and use in source and binary forms, with or without
11 // modification, are permitted provided that the following conditions are met:
12 // * Redistributions of source code must retain the above copyright
13 // notice, this list of conditions and the following disclaimer.
14 // * Redistributions in binary form must reproduce the above copyright
15 // notice, this list of conditions and the following disclaimer in the
16 // documentation and/or other materials provided with the distribution.
17 // * Neither the name of the Andrey N. Sabelnikov nor the
18 // names of its contributors may be used to endorse or promote products
19 // derived from this software without specific prior written permission.
20 //
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
22 // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23 // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
24 // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER BE LIABLE FOR ANY
25 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
26 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
28 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 //
32 
33 
34 
35 #ifndef _ABSTRACT_TCP_SERVER2_H_
36 #define _ABSTRACT_TCP_SERVER2_H_
37 
38 
39 #include <string>
40 #include <vector>
41 #include <boost/noncopyable.hpp>
42 #include <boost/shared_ptr.hpp>
43 #include <atomic>
44 #include <cassert>
45 #include <map>
46 #include <memory>
47 
48 #include <boost/asio.hpp>
49 #include <boost/asio/ssl.hpp>
50 #include <boost/array.hpp>
51 #include <boost/noncopyable.hpp>
52 #include <boost/shared_ptr.hpp>
53 #include <boost/enable_shared_from_this.hpp>
54 #include <boost/interprocess/detail/atomic.hpp>
55 #include <boost/thread/thread.hpp>
56 #include "net_utils_base.h"
57 #include "syncobj.h"
58 #include "connection_basic.hpp"
60 
61 #undef ELECTRONEUM_DEFAULT_LOG_CATEGORY
62 #define ELECTRONEUM_DEFAULT_LOG_CATEGORY "net"
63 
64 #define ABSTRACT_SERVER_SEND_QUE_MAX_COUNT 1000
65 
66 namespace epee
67 {
68 namespace net_utils
69 {
70 
72  {
74  protected:
76  };
77 
78 
79  /************************************************************************/
80  /* */
81  /************************************************************************/
83  template<class t_protocol_handler>
84  class connection
85  : public boost::enable_shared_from_this<connection<t_protocol_handler> >,
86  private boost::noncopyable,
87  public i_service_endpoint,
88  public connection_basic
89  {
90  public:
91  typedef typename t_protocol_handler::connection_context t_connection_context;
92 
94  {
97  {}
98 
100  typename t_protocol_handler::config_type config;
102  };
103 
105  explicit connection( boost::asio::io_service& io_service,
106  boost::shared_ptr<shared_state> state,
107  t_connection_type connection_type,
108  epee::net_utils::ssl_support_t ssl_support);
109 
110  explicit connection( boost::asio::ip::tcp::socket&& sock,
111  boost::shared_ptr<shared_state> state,
112  t_connection_type connection_type,
113  epee::net_utils::ssl_support_t ssl_support);
114 
115 
116 
117  virtual ~connection() noexcept(false);
118 
120  bool start(bool is_income, bool is_multithreaded);
121 
122  // `real_remote` is the actual endpoint (if connection is to proxy, etc.)
123  bool start(bool is_income, bool is_multithreaded, network_address real_remote);
124 
125  void get_context(t_connection_context& context_){context_ = context;}
126 
127  void call_back_starter();
128 
129  void save_dbg_log();
130 
131 
132  bool speed_limit_is_enabled() const;
133 
134  bool cancel();
135 
136  private:
137  //----------------- i_service_endpoint ---------------------
138  virtual bool do_send(const void* ptr, size_t cb);
139  virtual bool do_send_chunk(const void* ptr, size_t cb);
140  virtual bool send_done();
141  virtual bool close();
142  virtual bool call_run_once_service_io();
143  virtual bool request_callback();
144  virtual boost::asio::io_service& get_io_service();
145  virtual bool add_ref();
146  virtual bool release();
147  //------------------------------------------------------
148  boost::shared_ptr<connection<t_protocol_handler> > safe_shared_from_this();
149  bool shutdown();
151  void handle_receive(const boost::system::error_code& e,
152  std::size_t bytes_transferred);
153 
155  void handle_read(const boost::system::error_code& e,
156  std::size_t bytes_transferred);
157 
159  void handle_write(const boost::system::error_code& e, size_t cb);
160 
162  void reset_timer(boost::posix_time::milliseconds ms, bool add);
163  boost::posix_time::milliseconds get_default_timeout();
164  boost::posix_time::milliseconds get_timeout_from_bytes_read(size_t bytes);
165 
167  unsigned int host_count(const std::string &host, int delta = 0);
168 
170  boost::array<char, 8192> buffer_;
171  size_t buffer_ssl_init_fill;
172 
173  t_connection_context context;
174 
175  // TODO what do they mean about wait on destructor?? --rfree :
176  //this should be the last one, because it could be wait on destructor, while other activities possible on other threads
177  t_protocol_handler m_protocol_handler;
178  //typename t_protocol_handler::config_type m_dummy_config;
179  size_t m_reference_count = 0; // reference count managed through add_ref/release support
180  boost::shared_ptr<connection<t_protocol_handler> > m_self_ref; // the reference to hold
181  critical_section m_self_refs_lock;
182  critical_section m_chunking_lock; // held while we add small chunks of the big do_send() to small do_send_chunk()
183  critical_section m_shutdown_lock; // held while shutting down
184 
185  t_connection_type m_connection_type;
186 
187  // for calculate speed (last 60 sec)
188  network_throttle m_throttle_speed_in;
189  network_throttle m_throttle_speed_out;
190  boost::mutex m_throttle_speed_in_mutex;
191  boost::mutex m_throttle_speed_out_mutex;
192 
193  boost::asio::deadline_timer m_timer;
194  bool m_local;
195  bool m_ready_to_close;
196  std::string m_host;
197 
198  public:
199  void setRpcStation();
200  };
201 
202 
203  /************************************************************************/
204  /* */
205  /************************************************************************/
206  template<class t_protocol_handler>
208  : private boost::noncopyable
209  {
210  enum try_connect_result_t
211  {
212  CONNECT_SUCCESS,
213  CONNECT_FAILURE,
214  CONNECT_NO_SSL,
215  };
216 
217  public:
218  typedef boost::shared_ptr<connection<t_protocol_handler> > connection_ptr;
219  typedef typename t_protocol_handler::connection_context t_connection_context;
222 
223  boosted_tcp_server(t_connection_type connection_type);
224  explicit boosted_tcp_server(boost::asio::io_service& external_io_service, t_connection_type connection_type);
226 
227  std::map<std::string, t_connection_type> server_type_map;
228  void create_server_type_map();
229 
232 
234  bool run_server(size_t threads_count, bool wait = true, const boost::thread::attributes& attrs = boost::thread::attributes());
235 
237  bool timed_wait_server_stop(uint64_t wait_mseconds);
238 
240  void send_stop_signal();
241 
242  bool is_stop_signal_sent() const noexcept { return m_stop_signal_sent; };
243 
244  const std::atomic<bool>& get_stop_signal() const noexcept { return m_stop_signal_sent; }
245 
246  void set_threads_prefix(const std::string& prefix_name);
247 
248  bool deinit_server(){return true;}
249 
250  size_t get_threads_count(){return m_threads_count;}
251 
253 
255  {
256  default_remote = std::move(remote);
257  }
258 
260  try_connect_result_t try_connect(connection_ptr new_connection_l, const std::string& adr, const std::string& port, boost::asio::ip::tcp::socket &sock_, const boost::asio::ip::tcp::endpoint &remote_endpoint, const std::string &bind_ip, uint32_t conn_timeout, epee::net_utils::ssl_support_t ssl_support);
262  template<class t_callback>
263  bool connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeot, const t_callback &cb, const std::string& bind_ip = "0.0.0.0", epee::net_utils::ssl_support_t ssl_support = epee::net_utils::ssl_support_t::e_ssl_support_autodetect);
264 
265  typename t_protocol_handler::config_type& get_config_object()
266  {
267  assert(m_state != nullptr); // always set in constructor
268  return m_state->config;
269  }
270 
271  int get_binded_port(){return m_port;}
272 
274  {
275  assert(m_state != nullptr); // always set in constructor
276  auto connections_count = m_state->sock_count > 0 ? (m_state->sock_count - 1) : 0; // Socket count minus listening socket
277  return connections_count;
278  }
279 
280  boost::asio::io_service& get_io_service(){return io_service_;}
281 
283  {
285 
286  virtual bool call_handler(){return true;}
287 
288  idle_callback_conext_base(boost::asio::io_service& io_serice):
289  m_timer(io_serice)
290  {}
291  boost::asio::deadline_timer m_timer;
292  };
293 
294  template <class t_handler>
296  {
297  idle_callback_conext(boost::asio::io_service& io_serice, t_handler& h, uint64_t period):
298  idle_callback_conext_base(io_serice),
299  m_handler(h)
300  {this->m_period = period;}
301 
302  t_handler m_handler;
303  virtual bool call_handler()
304  {
305  return m_handler();
306  }
308  };
309 
310  template<class t_handler>
311  bool add_idle_handler(t_handler t_callback, uint64_t timeout_ms)
312  {
313  boost::shared_ptr<idle_callback_conext<t_handler>> ptr(new idle_callback_conext<t_handler>(io_service_, t_callback, timeout_ms));
314  //needed call handler here ?...
315  ptr->m_timer.expires_from_now(boost::posix_time::milliseconds(ptr->m_period));
316  ptr->m_timer.async_wait(boost::bind(&boosted_tcp_server<t_protocol_handler>::global_timer_handler<t_handler>, this, ptr));
317  return true;
318  }
319 
320  template<class t_handler>
321  bool global_timer_handler(/*const boost::system::error_code& err, */boost::shared_ptr<idle_callback_conext<t_handler>> ptr)
322  {
323  //if handler return false - he don't want to be called anymore
324  if(!ptr->call_handler())
325  return true;
326  ptr->m_timer.expires_from_now(boost::posix_time::milliseconds(ptr->m_period));
327  ptr->m_timer.async_wait(boost::bind(&boosted_tcp_server<t_protocol_handler>::global_timer_handler<t_handler>, this, ptr));
328  return true;
329  }
330 
331  template<class t_handler>
332  bool async_call(t_handler t_callback)
333  {
334  io_service_.post(t_callback);
335  return true;
336  }
337 
338  private:
340  bool worker_thread();
342  void handle_accept(const boost::system::error_code& e);
343 
344  bool is_thread_worker();
345 
346  const boost::shared_ptr<typename connection<t_protocol_handler>::shared_state> m_state;
347 
349  struct worker
350  {
351  worker()
352  : io_service(), work(io_service)
353  {}
354 
355  boost::asio::io_service io_service;
356  boost::asio::io_service::work work;
357  };
358  std::unique_ptr<worker> m_io_service_local_instance;
359  boost::asio::io_service& io_service_;
360 
362  boost::asio::ip::tcp::acceptor acceptor_;
363  epee::net_utils::network_address default_remote;
364 
365  std::atomic<bool> m_stop_signal_sent;
366  uint32_t m_port;
367  std::string m_address;
368  std::string m_thread_name_prefix; //TODO: change to enum server_type, now used
369  size_t m_threads_count;
370  std::vector<boost::shared_ptr<boost::thread> > m_threads;
371  boost::thread::id m_main_thread_id;
372  critical_section m_threads_lock;
373  volatile uint32_t m_thread_index; // TODO change to std::atomic
374 
375  t_connection_type m_connection_type;
376 
378  connection_ptr new_connection_;
379 
380  boost::mutex connections_mutex;
381  std::set<connection_ptr> connections_;
382  }; // class <>boosted_tcp_server
383 
384 
385 } // namespace
386 } // namespace
387 
388 #include "abstract_tcp_server2.inl"
389 
390 #endif
idle_callback_conext(boost::asio::io_service &io_serice, t_handler &h, uint64_t period)
t_protocol_handler::connection_context t_connection_context
boosted_tcp_server(t_connection_type connection_type)
::std::string string
Definition: gtest-port.h:1097
std::unique_ptr< void, close > socket
Unique ZMQ socket handle, calls zmq_close on destruction.
Definition: zmq.h:101
void send_stop_signal()
Stop the server.
bool init_server(uint32_t port, const std::string address="0.0.0.0", ssl_options_t ssl_options=ssl_support_t::e_ssl_support_autodetect)
bool run_server(size_t threads_count, bool wait=true, const boost::thread::attributes &attrs=boost::thread::attributes())
Run the server&#39;s io_service loop.
bool add_connection(t_connection_context &out, boost::asio::ip::tcp::socket &&sock, network_address real_remote, epee::net_utils::ssl_support_t ssl_support=epee::net_utils::ssl_support_t::e_ssl_support_autodetect)
boost::asio::io_service & get_io_service()
const std::atomic< bool > & get_stop_signal() const noexcept
bool start(bool is_income, bool is_multithreaded)
Start the first asynchronous operation for the connection.
bool add_idle_handler(t_handler t_callback, uint64_t timeout_ms)
void set_connection_filter(i_connection_filter *pfilter)
std::map< std::string, t_connection_type > server_type_map
unsigned int uint32_t
Definition: stdint.h:126
connection(boost::asio::io_service &io_service, boost::shared_ptr< shared_state > state, t_connection_type connection_type, epee::net_utils::ssl_support_t ssl_support)
Construct a connection with the given io_service.
bool timed_wait_server_stop(uint64_t wait_mseconds)
wait for service workers stop
boost::shared_ptr< connection< t_protocol_handler > > connection_ptr
Represents a single connection from a client.
unsigned __int64 uint64_t
Definition: stdint.h:136
#define false
Definition: stdbool.h:38
try_connect_result_t try_connect(connection_ptr new_connection_l, const std::string &adr, const std::string &port, boost::asio::ip::tcp::socket &sock_, const boost::asio::ip::tcp::endpoint &remote_endpoint, const std::string &bind_ip, uint32_t conn_timeout, epee::net_utils::ssl_support_t ssl_support)
virtual bool is_remote_host_allowed(const epee::net_utils::network_address &address)=0
bool connect_async(const std::string &adr, const std::string &port, uint32_t conn_timeot, const t_callback &cb, const std::string &bind_ip="0.0.0.0", epee::net_utils::ssl_support_t ssl_support=epee::net_utils::ssl_support_t::e_ssl_support_autodetect)
void set_default_remote(epee::net_utils::network_address remote)
boost::endian::big_uint16_t port
Definition: socks.cpp:60
t_protocol_handler::connection_context t_connection_context
void set_threads_prefix(const std::string &prefix_name)
virtual ~connection() noexcept(false)
const T & move(const T &t)
Definition: gtest-port.h:1317
void get_context(t_connection_context &context_)
Definition: blake256.h:37
const char * address
Definition: multisig.cpp:37
implementaion for throttling of connection (count and rate-limit speed etc)
bool global_timer_handler(boost::shared_ptr< idle_callback_conext< t_handler >> ptr)
Definition: worker.h:82
t_protocol_handler::config_type config
bool async_call(t_handler t_callback)
base for connection, contains e.g. the ratelimit hooks
bool connect(const std::string &adr, const std::string &port, uint32_t conn_timeot, t_connection_context &cn, const std::string &bind_ip="0.0.0.0", epee::net_utils::ssl_support_t ssl_support=epee::net_utils::ssl_support_t::e_ssl_support_autodetect)
bool speed_limit_is_enabled() const
tells us should we be sleeping here (e.g. do not sleep on RPC connections)
t_protocol_handler::config_type & get_config_object()