36 #include <boost/thread/thread.hpp> 39 #include "gtest/gtest.h" 53 const size_t CONNECTION_COUNT = 100000;
54 const size_t CONNECTION_TIMEOUT = 10000;
55 const size_t DEFAULT_OPERATION_TIMEOUT = 30000;
56 const size_t RESERVED_CONN_CNT = 1;
58 template<
typename t_predicate>
59 bool busy_wait_for(
size_t timeout_ms,
const t_predicate& predicate,
size_t sleep_ms = 10)
61 for (
size_t i = 0; i < timeout_ms / sleep_ms; ++i)
71 class t_connection_opener_1
74 t_connection_opener_1(
test_tcp_server& tcp_server,
size_t open_request_target)
75 : m_tcp_server(tcp_server)
76 , m_open_request_target(open_request_target)
82 conn_id = boost::uuids::nil_uuid();
87 size_t id = m_next_id.fetch_add(1, std::memory_order_relaxed);
88 if (m_open_request_target <=
id)
98 m_error_count.fetch_add(1, std::memory_order_relaxed);
104 m_error_count.fetch_add(1, std::memory_order_relaxed);
110 bool close(
size_t id)
123 size_t error_count()
const {
return m_error_count.load(std::memory_order_relaxed); }
127 size_t m_open_request_target;
128 std::atomic<size_t> m_next_id;
129 std::atomic<size_t> m_error_count;
133 class t_connection_opener_2
136 t_connection_opener_2(
test_tcp_server& tcp_server,
size_t open_request_target,
size_t max_opened_connection_count)
137 : m_tcp_server(tcp_server)
138 , m_open_request_target(open_request_target)
139 , m_open_request_count(0)
141 , m_open_close_test_helper(tcp_server, open_request_target, max_opened_connection_count)
145 bool open_and_close()
147 size_t req_count = m_open_request_count.fetch_add(1, std::memory_order_relaxed);
148 if (m_open_request_target <= req_count)
154 m_open_close_test_helper.handle_new_connection(
context.m_connection_id);
158 m_error_count.fetch_add(1, std::memory_order_relaxed);
164 m_error_count.fetch_add(1, std::memory_order_relaxed);
170 void close_remaining_connections()
172 m_open_close_test_helper.close_remaining_connections();
175 size_t opened_connection_count()
const {
return m_open_close_test_helper.opened_connection_count(); }
176 size_t error_count()
const {
return m_error_count.load(std::memory_order_relaxed); }
180 size_t m_open_request_target;
181 std::atomic<size_t> m_open_request_count;
182 std::atomic<size_t> m_error_count;
196 m_thread_count = (std::max)(
min_thread_count, boost::thread::hardware_concurrency() / 2);
198 m_tcp_server.get_config_object().set_handler(&m_commands_handler);
199 m_tcp_server.get_config_object().m_invoke_timeout = CONNECTION_TIMEOUT;
202 ASSERT_TRUE(m_tcp_server.run_server(m_thread_count,
false));
205 std::atomic<int> conn_status(0);
206 m_cmd_conn_id = boost::uuids::nil_uuid();
210 m_cmd_conn_id = context.m_connection_id;
214 LOG_ERROR(
"Connection error: " << ec.message());
216 conn_status.store(1, std::memory_order_seq_cst);
219 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{
return 0 != conn_status.load(std::memory_order_seq_cst); })) <<
"connect_async timed out";
220 ASSERT_EQ(1, conn_status.load(std::memory_order_seq_cst));
223 conn_status.store(0, std::memory_order_seq_cst);
227 conn_status.store(code, std::memory_order_seq_cst);
230 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{
return 0 != conn_status.load(std::memory_order_seq_cst); })) <<
"reset statistics timed out";
231 ASSERT_LT(0, conn_status.load(std::memory_order_seq_cst));
234 virtual void TearDown()
236 m_tcp_server.send_stop_signal();
237 ASSERT_TRUE(m_tcp_server.timed_wait_server_stop(DEFAULT_OPERATION_TIMEOUT));
240 static void TearDownTestCase()
253 std::atomic<int> conn_status(0);
256 cmd_conn_id =
context.m_connection_id;
257 conn_status.store(!ec ? 1 : -1, std::memory_order_seq_cst);
260 if (!busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{
return 0 != conn_status.load(std::memory_order_seq_cst); }))
return;
261 if (1 != conn_status.load(std::memory_order_seq_cst))
return;
268 template<
typename Func>
269 static auto call_func(
size_t ,
const Func& func,
int) -> decltype(func())
274 template<
typename Func>
275 static auto call_func(
size_t thread_index,
const Func& func,
long) -> decltype(func(thread_index))
280 template<
typename Func>
281 void parallel_exec(
const Func& func)
284 std::vector<boost::thread> threads(m_thread_count);
285 for (
size_t i = 0; i < threads.size(); ++i)
287 threads[i] = boost::thread([&, i] {
288 call_func(i, func, 0);
289 properly_finished_threads.
inc();
293 for (
auto& th : threads)
296 ASSERT_EQ(properly_finished_threads.
get(), m_thread_count);
301 std::atomic<int> req_status(0);
311 LOG_ERROR(
"Get server statistics error: " << code);
313 req_status.store(0 < code ? 1 : -1, std::memory_order_seq_cst);
316 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{
return 0 != req_status.load(std::memory_order_seq_cst); })) <<
"get_server_statistics timed out";
317 ASSERT_EQ(1, req_status.load(std::memory_order_seq_cst));
320 template <
typename t_predicate>
323 for (
size_t i = 0; i < 30; ++i)
325 get_server_statistics(statistics);
326 if (predicate(statistics))
338 void ask_for_data_requests(
size_t request_size = 0)
348 size_t m_thread_count;
353 TEST_F(net_load_test_clt, a_lot_of_client_connections_and_connections_closed_by_client)
356 t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT);
358 while (connection_opener.open());
362 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{
return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
363 LOG_PRINT_L0(
"number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
364 " / " << connection_opener.error_count() <<
" (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) <<
")");
367 ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
368 ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
369 ASSERT_EQ(m_commands_handler.new_connection_counter() - m_commands_handler.close_connection_counter(), m_tcp_server.get_config_object().get_connections_count());
372 parallel_exec([&](
size_t thread_idx) {
373 for (
size_t i = thread_idx; i < CONNECTION_COUNT; i += m_thread_count)
375 connection_opener.close(i);
380 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{
return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); }));
381 LOG_PRINT_L0(
"number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
382 " / " << m_commands_handler.close_connection_counter());
385 ASSERT_EQ(m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT, m_commands_handler.close_connection_counter());
386 ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
391 LOG_PRINT_L0(
"server statistics: " << srv_stat.to_string());
395 ASSERT_LE(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
396 ASSERT_LE(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
399 ask_for_data_requests();
403 LOG_PRINT_L0(
"server statistics: " << srv_stat.to_string());
406 ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
407 ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
410 TEST_F(net_load_test_clt, a_lot_of_client_connections_and_connections_closed_by_server)
413 t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT);
415 while (connection_opener.open());
419 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){
return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
420 LOG_PRINT_L0(
"number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
421 " / " << connection_opener.error_count() <<
" (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) <<
")");
424 ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
425 ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
426 ASSERT_EQ(m_commands_handler.new_connection_counter() - m_commands_handler.close_connection_counter(), m_tcp_server.get_config_object().get_connections_count());
430 int last_new_connection_counter = -1;
433 else { last_new_connection_counter = static_cast<int>(stat.new_connection_counter); return false; }
441 busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){
return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); });
442 LOG_PRINT_L0(
"number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
443 " / " << m_commands_handler.close_connection_counter());
446 ASSERT_LE(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT);
447 ASSERT_LE(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
451 LOG_PRINT_L0(
"server statistics: " << srv_stat.to_string());
454 ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
455 ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
461 CMD_DATA_REQUEST::request req;
462 bool r = epee::net_utils::async_invoke_remote_command2<CMD_DATA_REQUEST::response>(ctx.m_connection_id, CMD_DATA_REQUEST::ID, req,
463 m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) {
466 LOG_PRINT_L0(
"Failed to invoke CMD_DATA_REQUEST. code = " << code);
476 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){
return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); }));
477 LOG_PRINT_L0(
"number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
478 " / " << m_commands_handler.close_connection_counter());
481 ASSERT_EQ(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT);
482 ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
485 TEST_F(net_load_test_clt, permament_open_and_close_and_connections_closed_by_client)
487 static const size_t MAX_OPENED_CONN_COUNT = 100;
490 t_connection_opener_2 connection_opener(m_tcp_server, CONNECTION_COUNT, MAX_OPENED_CONN_COUNT);
492 while (connection_opener.open_and_close());
496 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){
return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
497 LOG_PRINT_L0(
"number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
498 " / " << connection_opener.error_count() <<
" (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) <<
")");
501 ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
502 ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
505 EXPECT_TRUE(busy_wait_for(4 * DEFAULT_OPERATION_TIMEOUT, [&](){
return connection_opener.opened_connection_count() <= MAX_OPENED_CONN_COUNT; }));
506 LOG_PRINT_L0(
"actual number of opened connections: " << connection_opener.opened_connection_count());
509 ASSERT_EQ(MAX_OPENED_CONN_COUNT, connection_opener.opened_connection_count());
511 connection_opener.close_remaining_connections();
514 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){
return m_commands_handler.new_connection_counter() <= m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT; }));
515 LOG_PRINT_L0(
"actual number of opened connections: " << connection_opener.opened_connection_count());
517 ASSERT_EQ(m_commands_handler.new_connection_counter(), m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT);
518 ASSERT_EQ(0, connection_opener.opened_connection_count());
519 ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
524 LOG_PRINT_L0(
"server statistics: " << srv_stat.to_string());
528 ASSERT_LE(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
529 ASSERT_LE(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
532 ask_for_data_requests();
536 LOG_PRINT_L0(
"server statistics: " << srv_stat.to_string());
539 ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
540 ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
543 TEST_F(net_load_test_clt, permament_open_and_close_and_connections_closed_by_server)
545 static const size_t MAX_OPENED_CONN_COUNT = 100;
548 std::atomic<int> test_state(0);
552 ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_START_OPEN_CLOSE_TEST::response>(m_cmd_conn_id, CMD_START_OPEN_CLOSE_TEST::ID, req_start,
554 test_state.store(0 < code ? 1 : -1, std::memory_order_seq_cst);
558 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{
return 1 == test_state.load(std::memory_order_seq_cst); }));
559 ASSERT_EQ(1, test_state.load(std::memory_order_seq_cst));
562 t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT);
564 while (connection_opener.open());
568 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){
return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
569 LOG_PRINT_L0(
"number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
570 " / " << connection_opener.error_count() <<
" (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) <<
")");
571 LOG_PRINT_L0(
"actual number of opened connections: " << m_tcp_server.get_config_object().get_connections_count());
573 ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
574 ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
578 int last_new_connection_counter = -1;
581 else { last_new_connection_counter = static_cast<int>(stat.new_connection_counter); return false; }
589 busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){
return m_commands_handler.new_connection_counter() <= m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT; });
592 ASSERT_LE(m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT, m_commands_handler.new_connection_counter());
593 ASSERT_LE(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
597 LOG_PRINT_L0(
"server statistics: " << srv_stat.to_string());
600 ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
601 ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
607 CMD_DATA_REQUEST::request req;
608 bool r = epee::net_utils::async_invoke_remote_command2<CMD_DATA_REQUEST::response>(ctx.m_connection_id, CMD_DATA_REQUEST::ID, req,
609 m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) {
612 LOG_PRINT_L0(
"Failed to invoke CMD_DATA_REQUEST. code = " << code);
622 EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){
return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); }));
623 LOG_PRINT_L0(
"number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
624 " / " << m_commands_handler.close_connection_counter());
627 ASSERT_EQ(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT);
628 ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
631 int main(
int argc,
char** argv)
int main(int argc, char **argv)
#define EXPECT_TRUE(condition)
bool get_set_enable_assert(bool set=false, bool v=false)
std::string mlog_get_default_log_path(const char *default_filename)
const std::string clt_port("36230")
void mlog_configure(const std::string &filename_base, bool console, const std::size_t max_log_file_size=MAX_LOG_FILE_SIZE, const std::size_t max_log_files=MAX_LOG_FILES)
bool notify_remote_command2(int command, const t_arg &out_struct, t_transport &transport)
connections_container m_connections
bool init_server(uint32_t port, const std::string address="0.0.0.0", ssl_options_t ssl_options=ssl_support_t::e_ssl_support_autodetect)
bool run_server(size_t threads_count, bool wait=true, const boost::thread::attributes &attrs=boost::thread::attributes())
Run the server's io_service loop.
TEST_F(net_load_test_clt, a_lot_of_client_connections_and_connections_closed_by_client)
uint64_t max_opened_conn_count
uint64_t close_connection_counter
#define ASSERT_FALSE(condition)
#define ASSERT_EQ(val1, val2)
GTEST_API_ void InitGoogleTest(int *argc, char **argv)
#define ASSERT_GT(val1, val2)
size_t close_connection_counter() const
std::unique_ptr< void, terminate > context
Unique ZMQ context handle, calls zmq_term on destruction.
bool connect_async(const std::string &adr, const std::string &port, uint32_t conn_timeot, const t_callback &cb, const std::string &bind_ip="0.0.0.0", epee::net_utils::ssl_support_t ssl_support=epee::net_utils::ssl_support_t::e_ssl_support_autodetect)
#define ASSERT_LT(val1, val2)
const std::string srv_port("36231")
uint64_t open_request_target
uint64_t new_connection_counter
#define ASSERT_TRUE(condition)
const unsigned int min_thread_count
int RUN_ALL_TESTS() GTEST_MUST_USE_RESULT_
#define ASSERT_LE(val1, val2)
#define CATCH_ENTRY_L0(lacation, return_val)
const boost::uuids::uuid m_connection_id
size_t get() volatile const
t_protocol_handler::config_type & get_config_object()