ChimeraTK-cppext 01.05.02
Loading...
Searching...
No Matches
testStresstestMultiproducer.cc
Go to the documentation of this file.
1// SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
2// SPDX-License-Identifier: LGPL-3.0-or-later
3#define BOOST_TEST_MODULE test_future_queue
4#include <boost/test/included/unit_test.hpp>
5using namespace boost::unit_test_framework;
6
7#include "future_queue.hpp"
9
10#include <boost/thread/thread.hpp>
11
12BOOST_AUTO_TEST_SUITE(testStresstestultiproducer)
13
14/*********************************************************************************************************************/
15
16BOOST_AUTO_TEST_CASE(stresstestMultiproducer) {
17 constexpr size_t runForSeconds = 10;
18 constexpr size_t nSenders = 200;
19 constexpr size_t lengthOfQueue = 10;
20 constexpr int idsPerSender = 1000;
21
22 std::atomic<bool> shutdownSenders, shutdownReceiver;
23 shutdownSenders = false;
24 shutdownReceiver = false;
25
26 // create the queue
27 cppext::future_queue<int> q(lengthOfQueue);
28
29 // list of threads so we can collect them later
30 std::list<boost::thread> senderThreads;
31
32 // launch sender threads
33 for(size_t i = 0; i < nSenders; ++i) {
34 senderThreads.emplace_back([i, q, &shutdownSenders]() mutable {
35 int senderFirstValue = i * idsPerSender;
36 // 'endless' loop to send data
37 size_t consequtive_fails = 0;
38 int nextValue = senderFirstValue;
39 while(!shutdownSenders) {
40 bool success = q.push(nextValue);
41 if(success) {
42 ++nextValue;
43 if(nextValue >= senderFirstValue + idsPerSender) nextValue = senderFirstValue;
44 consequtive_fails = 0;
45 }
46 else {
47 ++consequtive_fails;
48 BOOST_CHECK_TS(consequtive_fails < 1000);
49 if(consequtive_fails > 100) usleep(100000);
50 }
51 }
52 }); // end sender thread
53 }
54
55 // launch receiver thread
56 boost::thread receiverThread([q, &shutdownReceiver]() mutable {
57 std::vector<int> nextValues(nSenders);
58 for(size_t i = 0; i < nSenders; ++i) nextValues[i] = i * idsPerSender;
59
60 // 'endless' loop to receive data
61 while(!shutdownReceiver) {
62 int value;
63 q.pop_wait(value);
64 size_t senderId = value / idsPerSender;
65 BOOST_CHECK_TS(senderId < nSenders);
66 BOOST_CHECK_EQUAL_TS(value, nextValues[senderId]);
67 nextValues[senderId]++;
68 if(nextValues[senderId] >= ((signed)senderId + 1) * idsPerSender) nextValues[senderId] = senderId * idsPerSender;
69 }
70 }); // end receiver thread
71
72 // run the test for N seconds
73 std::cout << "Keep the test running for " << runForSeconds << " seconds..." << std::endl;
74 sleep(runForSeconds);
75
76 // Shutdown all threads and join them. It is important do to that before the
77 // queues get destroyed.
78 std::cout << "Terminate all threads..." << std::endl;
79 shutdownReceiver = true;
80 receiverThread.join();
81 std::cout << "Receiver thread terminated." << std::endl;
82 shutdownSenders = true;
83 for(auto& t : senderThreads) t.join();
84 std::cout << "All senders are terminated." << std::endl;
85}
86
87/*********************************************************************************************************************/
88
89BOOST_AUTO_TEST_CASE(stresstestMultiproducerOverwrite) {
90 constexpr size_t runForSeconds = 10;
91 constexpr size_t nSenders = 200;
92 constexpr size_t lengthOfQueue = 10;
93 constexpr int idsPerSender = 1000;
94
95 std::atomic<bool> shutdownSenders, shutdownReceiver;
96 shutdownSenders = false;
97 shutdownReceiver = false;
98
99 // create the queue
100 cppext::future_queue<int> q(lengthOfQueue);
101
102 // list of threads so we can collect them later
103 std::list<boost::thread> senderThreads;
104
105 // launch sender threads
106 for(size_t i = 0; i < nSenders; ++i) {
107 senderThreads.emplace_back([i, q, &shutdownSenders]() mutable {
108 int senderFirstValue = i * idsPerSender;
109 // 'endless' loop to send data
110 size_t consequtive_fails = 0;
111 int nextValue = senderFirstValue;
112 while(!shutdownSenders) {
113 bool success = q.push_overwrite(nextValue);
114 ++nextValue;
115 if(nextValue >= senderFirstValue + idsPerSender) nextValue = senderFirstValue;
116 if(success) {
117 consequtive_fails = 0;
118 }
119 else {
120 ++consequtive_fails;
121 BOOST_CHECK_TS(consequtive_fails < 1000);
122 if(consequtive_fails > 100) usleep(100000);
123 }
124 }
125 }); // end sender thread
126 }
127
128 // launch receiver thread
129 boost::thread receiverThread([q, &shutdownReceiver]() mutable {
130 // 'endless' loop to receive data
131 while(!shutdownReceiver) {
132 int value;
133 q.pop_wait(value);
134 size_t senderId = value / idsPerSender;
135 assert(senderId < nSenders);
136 (void)senderId; // avoid warning in Release builds
137 }
138 }); // end receiver thread
139
140 // run the test for N seconds
141 std::cout << "Keep the test running for " << runForSeconds << " seconds..." << std::endl;
142 sleep(runForSeconds);
143
144 // Shutdown all threads and join them. It is important do to that before the
145 // queues get destroyed.
146 std::cout << "Terminate all threads..." << std::endl;
147 shutdownReceiver = true;
148 receiverThread.join();
149 std::cout << "Receiver thread terminated." << std::endl;
150 shutdownSenders = true;
151 for(auto& t : senderThreads) t.join();
152 std::cout << "All senders are terminated." << std::endl;
153}
154
155/*********************************************************************************************************************/
156
157BOOST_AUTO_TEST_SUITE_END()
BOOST_AUTO_TEST_CASE(stresstestMultiproducer)
#define BOOST_CHECK_EQUAL_TS(a, b)
#define BOOST_CHECK_TS(condition)