Electroneum
abstract_tcp_server_cp.h
Go to the documentation of this file.
1 // Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are met:
6 // * Redistributions of source code must retain the above copyright
7 // notice, this list of conditions and the following disclaimer.
8 // * Redistributions in binary form must reproduce the above copyright
9 // notice, this list of conditions and the following disclaimer in the
10 // documentation and/or other materials provided with the distribution.
11 // * Neither the name of the Andrey N. Sabelnikov nor the
12 // names of its contributors may be used to endorse or promote products
13 // derived from this software without specific prior written permission.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
16 // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17 // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18 // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER BE LIABLE FOR ANY
19 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25 //
26 
27 
28 #ifndef _LEVIN_CP_SERVER_H_
29 #define _LEVIN_CP_SERVER_H_
30 
31 #include <winsock2.h>
32 #include <rpc.h>
33 #include <string>
34 #include <map>
35 #include <boost/shared_ptr.hpp>
36 
37 #include "misc_log_ex.h"
38 //#include "threads_helper.h"
39 #include "syncobj.h"
40 #define ENABLE_PROFILING
41 #include "profile_tools.h"
42 #include "net_utils_base.h"
43 #include "pragma_comp_defs.h"
44 
45 #undef ELECTRONEUM_DEFAULT_LOG_CATEGORY
46 #define ELECTRONEUM_DEFAULT_LOG_CATEGORY "net"
47 
48 #define LEVIN_DEFAULT_DATA_BUFF_SIZE 2000
49 
50 namespace epee
51 {
52 namespace net_utils
53 {
54 
55  template<class TProtocol>
56  class cp_server_impl//: public abstract_handler
57  {
58  public:
59  cp_server_impl(/*abstract_handler* phandler = NULL*/);
60  virtual ~cp_server_impl();
61 
62  bool init_server(int port_no);
63  bool deinit_server();
64  bool run_server(int threads_count = 0);
65  bool send_stop_signal();
66  bool is_stop_signal();
67  virtual bool on_net_idle(){return true;}
69  typename TProtocol::config_type& get_config_object(){return m_config;}
70  private:
71  enum overlapped_operation_type
72  {
73  op_type_recv,
74  op_type_send,
75  op_type_stop
76  };
77 
78  struct io_data_base
79  {
80  OVERLAPPED m_overlapped;
81  WSABUF DataBuf;
82  overlapped_operation_type m_op_type;
83  DWORD TotalBuffBytes;
84  volatile LONG m_is_in_use;
85  char Buffer[1];
86  };
87 
88 PRAGMA_WARNING_PUSH
89 PRAGMA_WARNING_DISABLE_VS(4355)
90  template<class TProtocol>
91  struct connection: public net_utils::i_service_endpoint
92  {
93  connection(typename TProtocol::config_type& ref_config):m_sock(INVALID_SOCKET), m_tprotocol_handler(this, ref_config, context), m_psend_data(NULL), m_precv_data(NULL), m_asked_to_shutdown(0), m_connection_shutwoned(0)
94  {
95  }
96 
97  //connection():m_sock(INVALID_SOCKET), m_tprotocol_handler(this, m_dummy_config, context), m_psend_data(NULL), m_precv_data(NULL), m_asked_to_shutdown(0), m_connection_shutwoned(0)
98  //{
99  //}
100 
101  connection<TProtocol>& operator=(const connection<TProtocol>& obj)
102  {
103  return *this;
104  }
105 
106  bool init_buffers()
107  {
108  m_psend_data = (io_data_base*)new char[sizeof(io_data_base) + LEVIN_DEFAULT_DATA_BUFF_SIZE-1];
109  m_psend_data->TotalBuffBytes = LEVIN_DEFAULT_DATA_BUFF_SIZE;
110  m_precv_data = (io_data_base*)new char[sizeof(io_data_base) + LEVIN_DEFAULT_DATA_BUFF_SIZE-1];
111  m_precv_data->TotalBuffBytes = LEVIN_DEFAULT_DATA_BUFF_SIZE;
112  return true;
113  }
114 
115  bool query_shutdown()
116  {
117  if(!::InterlockedCompareExchange(&m_asked_to_shutdown, 1, 0))
118  {
119  m_psend_data->m_op_type = op_type_stop;
120  ::PostQueuedCompletionStatus(m_completion_port, 0, (ULONG_PTR)this, &m_psend_data->m_overlapped);
121  }
122  return true;
123  }
124 
125  //bool set_config(typename TProtocol::config_type& config)
126  //{
127  // this->~connection();
128  // new(this) connection<TProtocol>(config);
129  // return true;
130  //}
131  ~connection()
132  {
133  if(m_psend_data)
134  delete m_psend_data;
136  if(m_precv_data)
137  delete m_precv_data;
138  }
139  virtual bool handle_send(const void* ptr, size_t cb)
140  {
141  PROFILE_FUNC("[handle_send]");
142  if(m_psend_data->TotalBuffBytes < cb)
143  resize_send_buff((DWORD)cb);
144 
145  ZeroMemory(&m_psend_data->m_overlapped, sizeof(OVERLAPPED));
146  m_psend_data->DataBuf.len = (u_long)cb;//m_psend_data->TotalBuffBytes;
147  m_psend_data->DataBuf.buf = m_psend_data->Buffer;
148  memcpy(m_psend_data->DataBuf.buf, ptr, cb);
149  m_psend_data->m_op_type = op_type_send;
150  InterlockedExchange(&m_psend_data->m_is_in_use, 1);
151  DWORD bytes_sent = 0;
152  DWORD flags = 0;
153  int res = 0;
154  {
155  PROFILE_FUNC("[handle_send] ::WSASend");
156  res = ::WSASend(m_sock, &(m_psend_data->DataBuf), 1, &bytes_sent, flags, &(m_psend_data->m_overlapped), NULL);
157  }
158 
159  if(res == SOCKET_ERROR )
160  {
161  int err = ::WSAGetLastError();
162  if(WSA_IO_PENDING == err )
163  return true;
164  }
165  LOG_ERROR("BIG FAIL: WSASend error code not correct, res=" << res << " last_err=" << err);
166  ::InterlockedExchange(&m_psend_data->m_is_in_use, 0);
167  query_shutdown();
168  //closesocket(m_psend_data);
169  return false;
170  }else if(0 == res)
171  {
172  ::InterlockedExchange(&m_psend_data->m_is_in_use, 0);
173  if(!bytes_sent || bytes_sent != cb)
174  {
175  int err = ::WSAGetLastError();
176  LOG_ERROR("BIG FAIL: WSASend immediatly complete? but bad results, res=" << res << " last_err=" << err);
177  query_shutdown();
178  return false;
179  }else
180  {
181  return true;
182  }
183  }
184 
185  return true;
186  }
187  bool resize_send_buff(DWORD new_size)
188  {
189  if(m_psend_data->TotalBuffBytes >= new_size)
190  return true;
191 
192  delete m_psend_data;
193  m_psend_data = (io_data_base*)new char[sizeof(io_data_base) + new_size-1];
194  m_psend_data->TotalBuffBytes = new_size;
195  LOG_PRINT("Connection buffer resized up to " << new_size, LOG_LEVEL_3);
196  return true;
197  }
198 
199 
200  SOCKET m_sock;
202  TProtocol m_tprotocol_handler;
203  typename TProtocol::config_type m_dummy_config;
204  io_data_base* m_precv_data;
205  io_data_base* m_psend_data;
206  HANDLE m_completion_port;
207  volatile LONG m_asked_to_shutdown;
208  volatile LONG m_connection_shutwoned;
209  };
210 PRAGMA_WARNING_POP
211 
212  bool worker_thread_member();
213  static unsigned CALLBACK worker_thread(void* param);
214 
215  bool add_new_connection(SOCKET new_sock, long ip_from, int port_from);
216  bool shutdown_connection(connection<TProtocol>* pconn);
217 
218 
219  typedef std::map<SOCKET, boost::shared_ptr<connection<TProtocol> > > connections_container;
224  int m_port;
225  volatile LONG m_stop;
226  //abstract_handler* m_phandler;
228  volatile LONG m_worker_thread_counter;
229  typename TProtocol::config_type m_config;
230  };
231 }
232 }
233 #include "abstract_tcp_server_cp.inl"
234 
235 
236 #endif //_LEVIN_SERVER_H_
const char * res
Definition: hmac_keccak.cpp:41
bool add_new_connection(SOCKET new_sock, long ip_from, int port_from)
virtual bool handle_send(const void *ptr, size_t cb)
#define LEVIN_DEFAULT_DATA_BUFF_SIZE
bool run_server(int threads_count=0)
connection(typename TProtocol::config_type &ref_config)
connections_container m_connections
PRAGMA_WARNING_POP bool worker_thread_member()
std::map< SOCKET, boost::shared_ptr< connection< TProtocol > > > connections_container
critical_section m_connections_lock
#define SOCKET
bool shutdown_connection(connection< TProtocol > *pconn)
#define INVALID_SOCKET
TProtocol::config_type & get_config_object()
bool init_buffers()
bool query_shutdown()
volatile LONG m_worker_thread_counter
#define LOG_ERROR(x)
Definition: misc_log_ex.h:98
void * memcpy(void *a, const void *b, size_t c)
TProtocol::config_type m_config
connection< TProtocol > & operator=(const connection< TProtocol > &obj)
#define PROFILE_FUNC(immortal_ptr_str)
Definition: profile_tools.h:51