Electroneum
clt.cpp
Go to the documentation of this file.
1 // Copyrights(c) 2017-2021, The Electroneum Project
2 // Copyrights(c) 2014-2019, The Monero Project
3 //
4 // All rights reserved.
5 //
6 // Redistribution and use in source and binary forms, with or without modification, are
7 // permitted provided that the following conditions are met:
8 //
9 // 1. Redistributions of source code must retain the above copyright notice, this list of
10 // conditions and the following disclaimer.
11 //
12 // 2. Redistributions in binary form must reproduce the above copyright notice, this list
13 // of conditions and the following disclaimer in the documentation and/or other
14 // materials provided with the distribution.
15 //
16 // 3. Neither the name of the copyright holder nor the names of its contributors may be
17 // used to endorse or promote products derived from this software without specific
18 // prior written permission.
19 //
20 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
21 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
22 // MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
23 // THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
25 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
27 // STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
28 // THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 //
30 // Parts of this file are originally copyright (c) 2012-2013 The Cryptonote developers
31 
32 #include <atomic>
33 #include <chrono>
34 #include <functional>
35 #include <numeric>
36 #include <boost/thread/thread.hpp>
37 #include <vector>
38 
39 #include "gtest/gtest.h"
40 
41 #include "include_base_utils.h"
42 #include "misc_language.h"
43 #include "misc_log_ex.h"
45 #include "common/util.h"
46 
47 #include "net_load_tests.h"
48 
49 using namespace net_load_tests;
50 
51 namespace
52 {
53  const size_t CONNECTION_COUNT = 100000;
54  const size_t CONNECTION_TIMEOUT = 10000;
55  const size_t DEFAULT_OPERATION_TIMEOUT = 30000;
56  const size_t RESERVED_CONN_CNT = 1;
57 
58  template<typename t_predicate>
59  bool busy_wait_for(size_t timeout_ms, const t_predicate& predicate, size_t sleep_ms = 10)
60  {
61  for (size_t i = 0; i < timeout_ms / sleep_ms; ++i)
62  {
63  if (predicate())
64  return true;
65  //std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
66  epee::misc_utils::sleep_no_w(static_cast<long>(sleep_ms));
67  }
68  return false;
69  }
70 
71  class t_connection_opener_1
72  {
73  public:
74  t_connection_opener_1(test_tcp_server& tcp_server, size_t open_request_target)
75  : m_tcp_server(tcp_server)
76  , m_open_request_target(open_request_target)
77  , m_next_id(0)
78  , m_error_count(0)
79  , m_connections(open_request_target)
80  {
81  for (auto& conn_id : m_connections)
82  conn_id = boost::uuids::nil_uuid();
83  }
84 
85  bool open()
86  {
87  size_t id = m_next_id.fetch_add(1, std::memory_order_relaxed);
88  if (m_open_request_target <= id)
89  return false;
90 
91  bool r = m_tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [=](const test_connection_context& context, const boost::system::error_code& ec) {
92  if (!ec)
93  {
94  m_connections[id] = context.m_connection_id;
95  }
96  else
97  {
98  m_error_count.fetch_add(1, std::memory_order_relaxed);
99  }
100  });
101 
102  if (!r)
103  {
104  m_error_count.fetch_add(1, std::memory_order_relaxed);
105  }
106 
107  return true;
108  }
109 
110  bool close(size_t id)
111  {
112  if (!m_connections[id].is_nil())
113  {
114  m_tcp_server.get_config_object().close(m_connections[id]);
115  return true;
116  }
117  else
118  {
119  return false;
120  }
121  }
122 
123  size_t error_count() const { return m_error_count.load(std::memory_order_relaxed); }
124 
125  private:
126  test_tcp_server& m_tcp_server;
127  size_t m_open_request_target;
128  std::atomic<size_t> m_next_id;
129  std::atomic<size_t> m_error_count;
130  std::vector<boost::uuids::uuid> m_connections;
131  };
132 
133  class t_connection_opener_2
134  {
135  public:
136  t_connection_opener_2(test_tcp_server& tcp_server, size_t open_request_target, size_t max_opened_connection_count)
137  : m_tcp_server(tcp_server)
138  , m_open_request_target(open_request_target)
139  , m_open_request_count(0)
140  , m_error_count(0)
141  , m_open_close_test_helper(tcp_server, open_request_target, max_opened_connection_count)
142  {
143  }
144 
145  bool open_and_close()
146  {
147  size_t req_count = m_open_request_count.fetch_add(1, std::memory_order_relaxed);
148  if (m_open_request_target <= req_count)
149  return false;
150 
151  bool r = m_tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [=](const test_connection_context& context, const boost::system::error_code& ec) {
152  if (!ec)
153  {
154  m_open_close_test_helper.handle_new_connection(context.m_connection_id);
155  }
156  else
157  {
158  m_error_count.fetch_add(1, std::memory_order_relaxed);
159  }
160  });
161 
162  if (!r)
163  {
164  m_error_count.fetch_add(1, std::memory_order_relaxed);
165  }
166 
167  return true;
168  }
169 
170  void close_remaining_connections()
171  {
172  m_open_close_test_helper.close_remaining_connections();
173  }
174 
175  size_t opened_connection_count() const { return m_open_close_test_helper.opened_connection_count(); }
176  size_t error_count() const { return m_error_count.load(std::memory_order_relaxed); }
177 
178  private:
179  test_tcp_server& m_tcp_server;
180  size_t m_open_request_target;
181  std::atomic<size_t> m_open_request_count;
182  std::atomic<size_t> m_error_count;
183  open_close_test_helper m_open_close_test_helper;
184  };
185 
186  class net_load_test_clt : public ::testing::Test
187  {
188  public:
189  net_load_test_clt()
190  : m_tcp_server(epee::net_utils::e_connection_type_RPC) // RPC disables network limit for unit tests
191  {
192  }
193  protected:
194  virtual void SetUp()
195  {
196  m_thread_count = (std::max)(min_thread_count, boost::thread::hardware_concurrency() / 2);
197 
198  m_tcp_server.get_config_object().set_handler(&m_commands_handler);
199  m_tcp_server.get_config_object().m_invoke_timeout = CONNECTION_TIMEOUT;
200 
201  ASSERT_TRUE(m_tcp_server.init_server(clt_port, "127.0.0.1"));
202  ASSERT_TRUE(m_tcp_server.run_server(m_thread_count, false));
203 
204  // Connect to server
205  std::atomic<int> conn_status(0);
206  m_cmd_conn_id = boost::uuids::nil_uuid();
207  ASSERT_TRUE(m_tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [&](const test_connection_context& context, const boost::system::error_code& ec) {
208  if (!ec)
209  {
210  m_cmd_conn_id = context.m_connection_id;
211  }
212  else
213  {
214  LOG_ERROR("Connection error: " << ec.message());
215  }
216  conn_status.store(1, std::memory_order_seq_cst);
217  }));
218 
219  EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) << "connect_async timed out";
220  ASSERT_EQ(1, conn_status.load(std::memory_order_seq_cst));
221  ASSERT_FALSE(m_cmd_conn_id.is_nil());
222 
223  conn_status.store(0, std::memory_order_seq_cst);
225  ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_RESET_STATISTICS::response>(m_cmd_conn_id, CMD_RESET_STATISTICS::ID, req,
226  m_tcp_server.get_config_object(), [&](int code, const CMD_RESET_STATISTICS::response& rsp, const test_connection_context&) {
227  conn_status.store(code, std::memory_order_seq_cst);
228  }));
229 
230  EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) << "reset statistics timed out";
231  ASSERT_LT(0, conn_status.load(std::memory_order_seq_cst));
232  }
233 
234  virtual void TearDown()
235  {
236  m_tcp_server.send_stop_signal();
237  ASSERT_TRUE(m_tcp_server.timed_wait_server_stop(DEFAULT_OPERATION_TIMEOUT));
238  }
239 
240  static void TearDownTestCase()
241  {
242  // Stop server
243  test_levin_commands_handler *commands_handler_ptr = new test_levin_commands_handler();
244  test_levin_commands_handler &commands_handler = *commands_handler_ptr;
246  tcp_server.get_config_object().set_handler(commands_handler_ptr, [](epee::levin::levin_commands_handler<test_connection_context> *handler)->void { delete handler; });
247  tcp_server.get_config_object().m_invoke_timeout = CONNECTION_TIMEOUT;
248 
249  if (!tcp_server.init_server(clt_port, "127.0.0.1")) return;
250  if (!tcp_server.run_server(2, false)) return;
251 
252  // Connect to server and invoke shutdown command
253  std::atomic<int> conn_status(0);
254  boost::uuids::uuid cmd_conn_id = boost::uuids::nil_uuid();
255  tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [&](const test_connection_context& context, const boost::system::error_code& ec) {
256  cmd_conn_id = context.m_connection_id;
257  conn_status.store(!ec ? 1 : -1, std::memory_order_seq_cst);
258  });
259 
260  if (!busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) return;
261  if (1 != conn_status.load(std::memory_order_seq_cst)) return;
262 
264 
265  busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != commands_handler.close_connection_counter(); });
266  }
267 
268  template<typename Func>
269  static auto call_func(size_t /*thread_index*/, const Func& func, int) -> decltype(func())
270  {
271  func();
272  }
273 
274  template<typename Func>
275  static auto call_func(size_t thread_index, const Func& func, long) -> decltype(func(thread_index))
276  {
277  func(thread_index);
278  }
279 
280  template<typename Func>
281  void parallel_exec(const Func& func)
282  {
283  unit_test::call_counter properly_finished_threads;
284  std::vector<boost::thread> threads(m_thread_count);
285  for (size_t i = 0; i < threads.size(); ++i)
286  {
287  threads[i] = boost::thread([&, i] {
288  call_func(i, func, 0);
289  properly_finished_threads.inc();
290  });
291  }
292 
293  for (auto& th : threads)
294  th.join();
295 
296  ASSERT_EQ(properly_finished_threads.get(), m_thread_count);
297  }
298 
299  void get_server_statistics(CMD_GET_STATISTICS::response& statistics)
300  {
301  std::atomic<int> req_status(0);
303  ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_GET_STATISTICS::response>(m_cmd_conn_id, CMD_GET_STATISTICS::ID, req,
304  m_tcp_server.get_config_object(), [&](int code, const CMD_GET_STATISTICS::response& rsp, const test_connection_context&) {
305  if (0 < code)
306  {
307  statistics = rsp;
308  }
309  else
310  {
311  LOG_ERROR("Get server statistics error: " << code);
312  }
313  req_status.store(0 < code ? 1 : -1, std::memory_order_seq_cst);
314  }));
315 
316  EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != req_status.load(std::memory_order_seq_cst); })) << "get_server_statistics timed out";
317  ASSERT_EQ(1, req_status.load(std::memory_order_seq_cst));
318  }
319 
320  template <typename t_predicate>
321  bool busy_wait_for_server_statistics(CMD_GET_STATISTICS::response& statistics, const t_predicate& predicate)
322  {
323  for (size_t i = 0; i < 30; ++i)
324  {
325  get_server_statistics(statistics);
326  if (predicate(statistics))
327  {
328  return true;
329  }
330 
331  //std::this_thread::sleep_for(std::chrono::seconds(1));
333  }
334 
335  return false;
336  }
337 
338  void ask_for_data_requests(size_t request_size = 0)
339  {
341  req.request_size = request_size;
342  epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_SEND_DATA_REQUESTS::ID, req, m_tcp_server.get_config_object());
343  }
344 
345  protected:
346  test_tcp_server m_tcp_server;
347  test_levin_commands_handler m_commands_handler;
348  size_t m_thread_count;
349  boost::uuids::uuid m_cmd_conn_id;
350  };
351 }
352 
353 TEST_F(net_load_test_clt, a_lot_of_client_connections_and_connections_closed_by_client)
354 {
355  // Open connections
356  t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT);
357  parallel_exec([&] {
358  while (connection_opener.open());
359  });
360 
361  // Wait for all open requests to complete
362  EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
363  LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
364  " / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");
365 
366  // Check
367  ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
368  ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
369  ASSERT_EQ(m_commands_handler.new_connection_counter() - m_commands_handler.close_connection_counter(), m_tcp_server.get_config_object().get_connections_count());
370 
371  // Close connections
372  parallel_exec([&](size_t thread_idx) {
373  for (size_t i = thread_idx; i < CONNECTION_COUNT; i += m_thread_count)
374  {
375  connection_opener.close(i);
376  }
377  });
378 
379  // Wait for all opened connections to close
380  EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); }));
381  LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
382  " / " << m_commands_handler.close_connection_counter());
383 
384  // Check all connections are closed
385  ASSERT_EQ(m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT, m_commands_handler.close_connection_counter());
386  ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
387 
388  // Wait for server to handle all open and close requests
390  busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
391  LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
392 
393  // Check server status
394  // It's OK, if server didn't close all opened connections, because of it could receive not all FIN packets
395  ASSERT_LE(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
396  ASSERT_LE(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
397 
398  // Request data from server, it causes to close rest connections
399  ask_for_data_requests();
400 
401  // Wait for server to close rest connections
402  busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
403  LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
404 
405  // Check server status. All connections should be closed
406  ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
407  ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
408 }
409 
410 TEST_F(net_load_test_clt, a_lot_of_client_connections_and_connections_closed_by_server)
411 {
412  // Open connections
413  t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT);
414  parallel_exec([&] {
415  while (connection_opener.open());
416  });
417 
418  // Wait for all open requests to complete
419  EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
420  LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
421  " / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");
422 
423  // Check
424  ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
425  ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
426  ASSERT_EQ(m_commands_handler.new_connection_counter() - m_commands_handler.close_connection_counter(), m_tcp_server.get_config_object().get_connections_count());
427 
428  // Wait for server accepts all connections
430  int last_new_connection_counter = -1;
431  busy_wait_for_server_statistics(srv_stat, [&last_new_connection_counter](const CMD_GET_STATISTICS::response& stat) {
432  if (last_new_connection_counter == static_cast<int>(stat.new_connection_counter)) return true;
433  else { last_new_connection_counter = static_cast<int>(stat.new_connection_counter); return false; }
434  });
435 
436  // Close connections
438  ASSERT_TRUE(epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_CLOSE_ALL_CONNECTIONS::ID, req, m_tcp_server.get_config_object()));
439 
440  // Wait for all opened connections to close
441  busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); });
442  LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
443  " / " << m_commands_handler.close_connection_counter());
444 
445  // It's OK, if server didn't close all connections, because it could accept not all our connections
446  ASSERT_LE(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT);
447  ASSERT_LE(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
448 
449  // Wait for server to handle all open and close requests
450  busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
451  LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
452 
453  // Check server status
454  ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
455  ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
456 
457  // Close rest connections
458  m_tcp_server.get_config_object().foreach_connection([&](test_connection_context& ctx) {
459  if (ctx.m_connection_id != m_cmd_conn_id)
460  {
461  CMD_DATA_REQUEST::request req;
462  bool r = epee::net_utils::async_invoke_remote_command2<CMD_DATA_REQUEST::response>(ctx.m_connection_id, CMD_DATA_REQUEST::ID, req,
463  m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) {
464  if (code <= 0)
465  {
466  LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST. code = " << code);
467  }
468  });
469  if (!r)
470  LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST");
471  }
472  return true;
473  });
474 
475  // Wait for all opened connections to close
476  EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); }));
477  LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
478  " / " << m_commands_handler.close_connection_counter());
479 
480  // Check
481  ASSERT_EQ(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT);
482  ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
483 }
484 
485 TEST_F(net_load_test_clt, permament_open_and_close_and_connections_closed_by_client)
486 {
487  static const size_t MAX_OPENED_CONN_COUNT = 100;
488 
489  // Open/close connections
490  t_connection_opener_2 connection_opener(m_tcp_server, CONNECTION_COUNT, MAX_OPENED_CONN_COUNT);
491  parallel_exec([&] {
492  while (connection_opener.open_and_close());
493  });
494 
495  // Wait for all open requests to complete
496  EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
497  LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
498  " / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");
499 
500  // Check
501  ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
502  ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
503 
504  // Wait for all close requests to complete
505  EXPECT_TRUE(busy_wait_for(4 * DEFAULT_OPERATION_TIMEOUT, [&](){ return connection_opener.opened_connection_count() <= MAX_OPENED_CONN_COUNT; }));
506  LOG_PRINT_L0("actual number of opened connections: " << connection_opener.opened_connection_count());
507 
508  // Check
509  ASSERT_EQ(MAX_OPENED_CONN_COUNT, connection_opener.opened_connection_count());
510 
511  connection_opener.close_remaining_connections();
512 
513  // Wait for all close requests to complete
514  EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() <= m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT; }));
515  LOG_PRINT_L0("actual number of opened connections: " << connection_opener.opened_connection_count());
516 
517  ASSERT_EQ(m_commands_handler.new_connection_counter(), m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT);
518  ASSERT_EQ(0, connection_opener.opened_connection_count());
519  ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
520 
521  // Wait for server to handle all open and close requests
523  busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
524  LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
525 
526  // Check server status
527  // It's OK, if server didn't close all opened connections, because of it could receive not all FIN packets
528  ASSERT_LE(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
529  ASSERT_LE(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
530 
531  // Request data from server, it causes to close rest connections
532  ask_for_data_requests();
533 
534  // Wait for server to close rest connections
535  busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
536  LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
537 
538  // Check server status. All connections should be closed
539  ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
540  ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
541 }
542 
543 TEST_F(net_load_test_clt, permament_open_and_close_and_connections_closed_by_server)
544 {
545  static const size_t MAX_OPENED_CONN_COUNT = 100;
546 
547  // Init test
548  std::atomic<int> test_state(0);
550  req_start.open_request_target = CONNECTION_COUNT;
551  req_start.max_opened_conn_count = MAX_OPENED_CONN_COUNT;
552  ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_START_OPEN_CLOSE_TEST::response>(m_cmd_conn_id, CMD_START_OPEN_CLOSE_TEST::ID, req_start,
553  m_tcp_server.get_config_object(), [&](int code, const CMD_START_OPEN_CLOSE_TEST::response&, const test_connection_context&) {
554  test_state.store(0 < code ? 1 : -1, std::memory_order_seq_cst);
555  }));
556 
557  // Wait for server response
558  EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 1 == test_state.load(std::memory_order_seq_cst); }));
559  ASSERT_EQ(1, test_state.load(std::memory_order_seq_cst));
560 
561  // Open connections
562  t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT);
563  parallel_exec([&] {
564  while (connection_opener.open());
565  });
566 
567  // Wait for all open requests to complete
568  EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
569  LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
570  " / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");
571  LOG_PRINT_L0("actual number of opened connections: " << m_tcp_server.get_config_object().get_connections_count());
572 
573  ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
574  ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
575 
576  // Wait for server accepts all connections
578  int last_new_connection_counter = -1;
579  busy_wait_for_server_statistics(srv_stat, [&last_new_connection_counter](const CMD_GET_STATISTICS::response& stat) {
580  if (last_new_connection_counter == static_cast<int>(stat.new_connection_counter)) return true;
581  else { last_new_connection_counter = static_cast<int>(stat.new_connection_counter); return false; }
582  });
583 
584  // Ask server to close rest connections
586  ASSERT_TRUE(epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_CLOSE_ALL_CONNECTIONS::ID, req, m_tcp_server.get_config_object()));
587 
588  // Wait for almost all connections to be closed by server
589  busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() <= m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT; });
590 
591  // It's OK, if there are opened connections, because server could accept not all our connections
592  ASSERT_LE(m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT, m_commands_handler.new_connection_counter());
593  ASSERT_LE(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
594 
595  // Wait for server to handle all open and close requests
596  busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
597  LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
598 
599  // Check server status
600  ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
601  ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
602 
603  // Close rest connections
604  m_tcp_server.get_config_object().foreach_connection([&](test_connection_context& ctx) {
605  if (ctx.m_connection_id != m_cmd_conn_id)
606  {
607  CMD_DATA_REQUEST::request req;
608  bool r = epee::net_utils::async_invoke_remote_command2<CMD_DATA_REQUEST::response>(ctx.m_connection_id, CMD_DATA_REQUEST::ID, req,
609  m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) {
610  if (code <= 0)
611  {
612  LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST. code = " << code);
613  }
614  });
615  if (!r)
616  LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST");
617  }
618  return true;
619  });
620 
621  // Wait for all opened connections to close
622  EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); }));
623  LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
624  " / " << m_commands_handler.close_connection_counter());
625 
626  // Check
627  ASSERT_EQ(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT);
628  ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
629 }
630 
631 int main(int argc, char** argv)
632 {
633  TRY_ENTRY();
636  //set up logging options
637  mlog_configure(mlog_get_default_log_path("net_load_tests_clt.log"), true);
638 
639  ::testing::InitGoogleTest(&argc, argv);
640  return RUN_ALL_TESTS();
641  CATCH_ENTRY_L0("main", 1);
642 }
int main(int argc, char **argv)
Definition: clt.cpp:631
#define EXPECT_TRUE(condition)
Definition: gtest.h:1859
bool get_set_enable_assert(bool set=false, bool v=false)
Definition: misc_log_ex.h:138
boost::uuids::uuid uuid
std::string mlog_get_default_log_path(const char *default_filename)
Definition: mlog.cpp:72
const std::string clt_port("36230")
void mlog_configure(const std::string &filename_base, bool console, const std::size_t max_log_file_size=MAX_LOG_FILE_SIZE, const std::size_t max_log_files=MAX_LOG_FILES)
Definition: mlog.cpp:148
bool notify_remote_command2(int command, const t_arg &out_struct, t_transport &transport)
#define LOG_PRINT_L0(x)
Definition: misc_log_ex.h:99
connections_container m_connections
bool init_server(uint32_t port, const std::string address="0.0.0.0", ssl_options_t ssl_options=ssl_support_t::e_ssl_support_autodetect)
bool run_server(size_t threads_count, bool wait=true, const boost::thread::attributes &attrs=boost::thread::attributes())
Run the server&#39;s io_service loop.
TEST_F(net_load_test_clt, a_lot_of_client_connections_and_connections_closed_by_client)
Definition: clt.cpp:353
#define ASSERT_FALSE(condition)
Definition: gtest.h:1868
bool on_startup()
Definition: util.cpp:778
#define ASSERT_EQ(val1, val2)
Definition: gtest.h:1956
#define TRY_ENTRY()
Definition: misc_log_ex.h:151
GTEST_API_ void InitGoogleTest(int *argc, char **argv)
#define ASSERT_GT(val1, val2)
Definition: gtest.h:1976
std::unique_ptr< void, terminate > context
Unique ZMQ context handle, calls zmq_term on destruction.
Definition: zmq.h:98
bool sleep_no_w(long ms)
bool connect_async(const std::string &adr, const std::string &port, uint32_t conn_timeot, const t_callback &cb, const std::string &bind_ip="0.0.0.0", epee::net_utils::ssl_support_t ssl_support=epee::net_utils::ssl_support_t::e_ssl_support_autodetect)
#define ASSERT_LT(val1, val2)
Definition: gtest.h:1968
const std::string srv_port("36231")
#define LOG_ERROR(x)
Definition: misc_log_ex.h:98
#define ASSERT_TRUE(condition)
Definition: gtest.h:1865
const unsigned int min_thread_count
int RUN_ALL_TESTS() GTEST_MUST_USE_RESULT_
Definition: gtest.h:2232
#define ASSERT_LE(val1, val2)
Definition: gtest.h:1964
#define CATCH_ENTRY_L0(lacation, return_val)
Definition: misc_log_ex.h:165
const boost::uuids::uuid m_connection_id
size_t get() volatile const
t_protocol_handler::config_type & get_config_object()