17 constexpr size_t runForSeconds = 10;
18 constexpr size_t nSenders = 200;
19 constexpr size_t lengthOfQueue = 10;
20 constexpr int idsPerSender = 1000;
22 std::atomic<bool> shutdownSenders, shutdownReceiver;
23 shutdownSenders =
false;
24 shutdownReceiver =
false;
30 std::list<boost::thread> senderThreads;
33 for(
size_t i = 0; i < nSenders; ++i) {
34 senderThreads.emplace_back([i, q, &shutdownSenders]()
mutable {
35 int senderFirstValue = i * idsPerSender;
37 size_t consequtive_fails = 0;
38 int nextValue = senderFirstValue;
39 while(!shutdownSenders) {
40 bool success = q.push(nextValue);
43 if(nextValue >= senderFirstValue + idsPerSender) nextValue = senderFirstValue;
44 consequtive_fails = 0;
49 if(consequtive_fails > 100) usleep(100000);
56 boost::thread receiverThread([q, &shutdownReceiver]()
mutable {
57 std::vector<int> nextValues(nSenders);
58 for(
size_t i = 0; i < nSenders; ++i) nextValues[i] = i * idsPerSender;
61 while(!shutdownReceiver) {
64 size_t senderId = value / idsPerSender;
67 nextValues[senderId]++;
68 if(nextValues[senderId] >= ((
signed)senderId + 1) * idsPerSender) nextValues[senderId] = senderId * idsPerSender;
73 std::cout <<
"Keep the test running for " << runForSeconds <<
" seconds..." << std::endl;
78 std::cout <<
"Terminate all threads..." << std::endl;
79 shutdownReceiver =
true;
80 receiverThread.join();
81 std::cout <<
"Receiver thread terminated." << std::endl;
82 shutdownSenders =
true;
83 for(
auto& t : senderThreads) t.join();
84 std::cout <<
"All senders are terminated." << std::endl;
90 constexpr size_t runForSeconds = 10;
91 constexpr size_t nSenders = 200;
92 constexpr size_t lengthOfQueue = 10;
93 constexpr int idsPerSender = 1000;
95 std::atomic<bool> shutdownSenders, shutdownReceiver;
96 shutdownSenders =
false;
97 shutdownReceiver =
false;
103 std::list<boost::thread> senderThreads;
106 for(
size_t i = 0; i < nSenders; ++i) {
107 senderThreads.emplace_back([i, q, &shutdownSenders]()
mutable {
108 int senderFirstValue = i * idsPerSender;
110 size_t consequtive_fails = 0;
111 int nextValue = senderFirstValue;
112 while(!shutdownSenders) {
113 bool success = q.push_overwrite(nextValue);
115 if(nextValue >= senderFirstValue + idsPerSender) nextValue = senderFirstValue;
117 consequtive_fails = 0;
122 if(consequtive_fails > 100) usleep(100000);
129 boost::thread receiverThread([q, &shutdownReceiver]()
mutable {
131 while(!shutdownReceiver) {
134 size_t senderId = value / idsPerSender;
135 assert(senderId < nSenders);
141 std::cout <<
"Keep the test running for " << runForSeconds <<
" seconds..." << std::endl;
142 sleep(runForSeconds);
146 std::cout <<
"Terminate all threads..." << std::endl;
147 shutdownReceiver =
true;
148 receiverThread.join();
149 std::cout <<
"Receiver thread terminated." << std::endl;
150 shutdownSenders =
true;
151 for(
auto& t : senderThreads) t.join();
152 std::cout <<
"All senders are terminated." << std::endl;