ChimeraTK-cppext 01.05.02
Loading...
Searching...
No Matches
testStresstest.cc
Go to the documentation of this file.
1// SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
2// SPDX-License-Identifier: LGPL-3.0-or-later
3#define BOOST_TEST_MODULE test_future_queue
4#include <boost/test/included/unit_test.hpp>
5using namespace boost::unit_test_framework;
6
7#include "future_queue.hpp"
9
10#include <boost/thread/thread.hpp>
11
12#include <random>
13
14BOOST_AUTO_TEST_SUITE(testStresstest)
15
16/*********************************************************************************************************************/
17
19 constexpr size_t runForSeconds = 10;
20 constexpr size_t nQueues = 500;
21 constexpr size_t nQueuesPerSender = 20;
22 constexpr size_t nQueuesPerReceiver = 50;
23
24 static_assert(nQueues % nQueuesPerSender == 0, "nQueues and nQueuesPerSender do not fit together");
25 static_assert(nQueues % nQueuesPerReceiver == 0, "nQueues and nQueuesPerReceiver do not fit together");
26 constexpr size_t nSenders = nQueues / nQueuesPerSender;
27 constexpr size_t nReceivers = nQueues / nQueuesPerReceiver;
28
29 std::atomic<bool> shutdownSenders, shutdownReceivers;
30 shutdownSenders = false;
31 shutdownReceivers = false;
32
33 std::random_device rd;
34 std::mt19937 gen(rd());
35
36 // create queues, with each a random length
37 std::list<cppext::future_queue<int>> qlist;
38 std::uniform_int_distribution<> qlength(2, 123);
39 for(size_t iq = 0; iq < nQueues; ++iq) {
40 qlist.emplace_back(qlength(gen));
41 }
42
43 // list of threads so we can collect them later
44 std::list<boost::thread> senderThreads, receiverThreads;
45
46 // launch sender threads
47 auto qit = qlist.begin();
48 for(size_t i = 0; i < nSenders; ++i) {
49 // build list of queues
50 std::vector<cppext::future_queue<int>> myqueues;
51 for(size_t k = 0; k < nQueuesPerSender; ++k) {
52 myqueues.emplace_back(*qit);
53 ++qit;
54 }
55
56 senderThreads.emplace_back([myqueues, &shutdownSenders]() mutable {
57 std::vector<int> nextValues;
58 for(size_t j = 0; j < myqueues.size(); ++j) nextValues.push_back(0);
59 // 'endless' loop to send data
60 std::vector<size_t> consequtive_fails(nQueuesPerSender);
61 while(!shutdownSenders) {
62 for(size_t k = 0; k < nQueuesPerSender; ++k) {
63 bool success = myqueues[k].push(nextValues[k]);
64 if(success) {
65 ++nextValues[k];
66 consequtive_fails[k] = 0;
67 }
68 else {
69 ++consequtive_fails[k];
70 assert(consequtive_fails[k] < 1030);
71 if(consequtive_fails[k] > 100) usleep(1000);
72 if(consequtive_fails[k] > 1000) usleep(1000000);
73 }
74 }
75 }
76 // send one more value before shutting down, so the receiving side does
77 // not hang
78 for(size_t k = 0; k < nQueuesPerSender; ++k) myqueues[k].push(nextValues[k]);
79 }); // end sender thread
80 }
81 assert(qit == qlist.end());
82
83 // launch receiver threads
84 qit = qlist.begin();
85 for(size_t j = 0; j < nReceivers; ++j) {
86 // build list of queues and next values to send
87 std::vector<cppext::future_queue<int>> myqueues;
88 for(size_t k = 0; k < nQueuesPerReceiver; ++k) {
89 myqueues.emplace_back(*qit);
90 ++qit;
91 }
92
93 int type = j % 2; // alternate the different receiver types
94 if(type == 0) { // first type: go through all queues and wait on each once
95 receiverThreads.emplace_back([myqueues, &shutdownReceivers]() mutable {
96 std::vector<int> nextValues;
97 for(size_t l = 0; l < myqueues.size(); ++l) nextValues.push_back(0);
98 // 'endless' loop to send data
99 while(!shutdownReceivers) {
100 for(size_t k = 0; k < nQueuesPerReceiver; ++k) {
101 int value;
102 myqueues[k].pop_wait(value);
103 assert(value == nextValues[k]);
104 ++nextValues[k];
105 }
106 }
107 }); // end receiver thread for first type
108 }
109 else if(type == 1) { // second type: use when_any
110 // launch the thread
111 receiverThreads.emplace_back([myqueues, &shutdownReceivers]() mutable {
112 std::vector<int> nextValues;
113 for(size_t l = 0; l < myqueues.size(); ++l) nextValues.push_back(0);
114 // obtain notification queue
115 auto notifyer = when_any(myqueues.begin(), myqueues.end());
116 // 'endless' loop to send data
117 while(!shutdownReceivers) {
118 size_t id;
119 notifyer.pop_wait(id);
120 int value;
121 assert(myqueues[id].empty() == false);
122 bool ret = myqueues[id].pop(value);
123 (void)ret;
124 assert(ret);
125 assert(value == nextValues[id]);
126 ++nextValues[id];
127 }
128 }); // end receiver thread for first type
129 }
130 }
131 assert(qit == qlist.end());
132
133 // run the test for N seconds
134 std::cout << "Keep the test running for " << runForSeconds << " seconds..." << std::endl;
135 sleep(runForSeconds);
136
137 // Shutdown all threads and join them. It is important do to that before the
138 // queues get destroyed.
139 std::cout << "Terminate all threads..." << std::endl;
140 shutdownReceivers = true;
141 for(auto& t : receiverThreads) t.join();
142 std::cout << "All receivers are terminated." << std::endl;
143 shutdownSenders = true;
144 for(auto& t : senderThreads) t.join();
145 std::cout << "All senders are terminated." << std::endl;
146}
147
148/*********************************************************************************************************************/
149
150BOOST_AUTO_TEST_SUITE_END()
constexpr size_t nQueues
BOOST_AUTO_TEST_CASE(stresstest)