19 constexpr size_t runForSeconds = 10;
21 constexpr size_t nQueuesPerSender = 20;
22 constexpr size_t nQueuesPerReceiver = 50;
24 static_assert(
nQueues % nQueuesPerSender == 0,
"nQueues and nQueuesPerSender do not fit together");
25 static_assert(
nQueues % nQueuesPerReceiver == 0,
"nQueues and nQueuesPerReceiver do not fit together");
26 constexpr size_t nSenders =
nQueues / nQueuesPerSender;
27 constexpr size_t nReceivers =
nQueues / nQueuesPerReceiver;
29 std::atomic<bool> shutdownSenders, shutdownReceivers;
30 shutdownSenders =
false;
31 shutdownReceivers =
false;
33 std::random_device rd;
34 std::mt19937 gen(rd());
37 std::list<cppext::future_queue<int>> qlist;
38 std::uniform_int_distribution<> qlength(2, 123);
39 for(
size_t iq = 0; iq <
nQueues; ++iq) {
40 qlist.emplace_back(qlength(gen));
44 std::list<boost::thread> senderThreads, receiverThreads;
47 auto qit = qlist.begin();
48 for(
size_t i = 0; i < nSenders; ++i) {
50 std::vector<cppext::future_queue<int>> myqueues;
51 for(
size_t k = 0; k < nQueuesPerSender; ++k) {
52 myqueues.emplace_back(*qit);
56 senderThreads.emplace_back([myqueues, &shutdownSenders]()
mutable {
57 std::vector<int> nextValues;
58 for(
size_t j = 0; j < myqueues.size(); ++j) nextValues.push_back(0);
60 std::vector<size_t> consequtive_fails(nQueuesPerSender);
61 while(!shutdownSenders) {
62 for(
size_t k = 0; k < nQueuesPerSender; ++k) {
63 bool success = myqueues[k].push(nextValues[k]);
66 consequtive_fails[k] = 0;
69 ++consequtive_fails[k];
70 assert(consequtive_fails[k] < 1030);
71 if(consequtive_fails[k] > 100) usleep(1000);
72 if(consequtive_fails[k] > 1000) usleep(1000000);
78 for(
size_t k = 0; k < nQueuesPerSender; ++k) myqueues[k].push(nextValues[k]);
81 assert(qit == qlist.end());
85 for(
size_t j = 0; j < nReceivers; ++j) {
87 std::vector<cppext::future_queue<int>> myqueues;
88 for(
size_t k = 0; k < nQueuesPerReceiver; ++k) {
89 myqueues.emplace_back(*qit);
95 receiverThreads.emplace_back([myqueues, &shutdownReceivers]()
mutable {
96 std::vector<int> nextValues;
97 for(
size_t l = 0; l < myqueues.size(); ++l) nextValues.push_back(0);
99 while(!shutdownReceivers) {
100 for(
size_t k = 0; k < nQueuesPerReceiver; ++k) {
102 myqueues[k].pop_wait(value);
103 assert(value == nextValues[k]);
111 receiverThreads.emplace_back([myqueues, &shutdownReceivers]()
mutable {
112 std::vector<int> nextValues;
113 for(
size_t l = 0; l < myqueues.size(); ++l) nextValues.push_back(0);
115 auto notifyer = when_any(myqueues.begin(), myqueues.end());
117 while(!shutdownReceivers) {
119 notifyer.pop_wait(
id);
121 assert(myqueues[
id].empty() ==
false);
122 bool ret = myqueues[id].pop(value);
125 assert(value == nextValues[
id]);
131 assert(qit == qlist.end());
134 std::cout <<
"Keep the test running for " << runForSeconds <<
" seconds..." << std::endl;
135 sleep(runForSeconds);
139 std::cout <<
"Terminate all threads..." << std::endl;
140 shutdownReceivers =
true;
141 for(
auto& t : receiverThreads) t.join();
142 std::cout <<
"All receivers are terminated." << std::endl;
143 shutdownSenders =
true;
144 for(
auto& t : senderThreads) t.join();
145 std::cout <<
"All senders are terminated." << std::endl;