ChimeraTK-cppext 01.05.02
Loading...
Searching...
No Matches
testPerformance.cc
Go to the documentation of this file.
1// SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
2// SPDX-License-Identifier: LGPL-3.0-or-later
3#define BOOST_TEST_MODULE test_future_queue
4#include <boost/test/included/unit_test.hpp>
5using namespace boost::unit_test_framework;
6
7#include "barrier.hpp"
8#include "future_queue.hpp"
9
10#include <boost/lockfree/queue.hpp>
11#include <boost/lockfree/spsc_queue.hpp>
12#include <boost/next_prior.hpp>
13#include <boost/thread/future.hpp>
14
15#include <iterator>
16#include <thread>
17
18constexpr size_t queueLength = 1000;
19constexpr size_t nTransfers = 1e6;
20constexpr size_t nQueues = 10; // only for when_any & related
21// #define ENABLE_BOOST_LOCKFREE_QUEUE_PERFORMANCE_MEASUREMENT // just for
22// comparison
23
24/*********************************************************************************************************************/
25
27 public:
28 helper_iterator(std::list<std::unique_ptr<boost::lockfree::spsc_queue<boost::shared_future<int32_t>>>>::iterator _it)
29 : it(_it) {}
30
32 ++it;
33 return *this;
34 }
35
37 auto rval = *this;
38 ++it;
39 return rval;
40 }
41
42 boost::shared_future<int32_t>& operator*() {
43 std::unique_ptr<boost::lockfree::spsc_queue<boost::shared_future<int32_t>>>& q = *it;
44 return q->front();
45 }
46
47 bool operator!=(const helper_iterator& other) const { return it != other.it; }
48
49 bool operator==(const helper_iterator& other) const { return it == other.it; }
50
51 std::unique_ptr<boost::lockfree::spsc_queue<boost::shared_future<int32_t>>>& get_queue() { return *it; }
52
53 private:
54 std::list<std::unique_ptr<boost::lockfree::spsc_queue<boost::shared_future<int32_t>>>>::iterator it;
55};
56
57namespace std {
58 template<>
59 struct iterator_traits<helper_iterator> {
60 typedef boost::shared_future<int32_t> value_type;
61 typedef size_t difference_type;
62 typedef std::forward_iterator_tag iterator_category;
63 };
64} // namespace std
65
66/*********************************************************************************************************************/
67
68BOOST_AUTO_TEST_SUITE(testPerformance)
69
70/*********************************************************************************************************************/
71
72BOOST_AUTO_TEST_CASE(future_queue_spin_wait) {
73 std::cout << "Measure performance of future_queue with spin-waiting" << std::endl;
74
76
77 auto start = std::chrono::steady_clock::now();
78
79 std::thread sender([&theQueue] {
80 for(size_t i = 0; i < nTransfers; ++i) {
81 while(theQueue.push(i & 0xFFFF) == false) usleep(1);
82 }
83 }); // end thread sender
84
85 for(size_t i = 0; i < nTransfers; ++i) {
86 int32_t val;
87 while(theQueue.pop(val) == false) continue;
88 }
89
90 auto end = std::chrono::steady_clock::now();
91 std::chrono::duration<double> diff = end - start;
92 std::cout << "Time for " << nTransfers << " transfers: " << diff.count() << " s\n";
93 std::cout << "Average time per transfer: " << diff.count() / (double)nTransfers * 1e6 << " us\n";
94
95 sender.join();
96}
97
98/*********************************************************************************************************************/
99
100#ifdef ENABLE_BOOST_LOCKFREE_QUEUE_PERFORMANCE_MEASUREMENT
101
102BOOST_AUTO_TEST_CASE(boost_queue_spin_wait) {
103 std::cout << "Measure performance of boost::lockfree::queue" << std::endl;
104
105 boost::lockfree::queue<int32_t> theQueue(queueLength);
106
107 auto start = std::chrono::steady_clock::now();
108
109 std::thread sender([&theQueue] {
110 for(size_t i = 0; i < nTransfers; ++i) {
111 while(theQueue.push(i & 0xFFFF) == false) usleep(1);
112 }
113 }); // end thread sender
114
115 for(size_t i = 0; i < nTransfers; ++i) {
116 int32_t val;
117 while(theQueue.pop(val) == false) continue;
118 }
119
120 auto end = std::chrono::steady_clock::now();
121 std::chrono::duration<double> diff = end - start;
122 std::cout << "Time for " << nTransfers << " transfers: " << diff.count() << " s\n";
123 std::cout << "Average time per transfer: " << diff.count() / (double)nTransfers * 1e6 << " us\n";
124
125 sender.join();
126}
127
128#endif
129
130/*********************************************************************************************************************/
131
132#ifdef ENABLE_BOOST_LOCKFREE_QUEUE_PERFORMANCE_MEASUREMENT
133
134BOOST_AUTO_TEST_CASE(boost_spsc_queue_spin_wait) {
135 std::cout << "Measure performance of boost::lockfree::spsc_queue" << std::endl;
136
137 boost::lockfree::spsc_queue<int32_t> theQueue(queueLength);
138
139 auto start = std::chrono::steady_clock::now();
140
141 std::thread sender([&theQueue] {
142 for(size_t i = 0; i < nTransfers; ++i) {
143 while(theQueue.push(i & 0xFFFF) == false) usleep(1);
144 }
145 }); // end thread sender
146
147 for(size_t i = 0; i < nTransfers; ++i) {
148 int32_t val;
149 while(theQueue.pop(val) == false) continue;
150 }
151
152 auto end = std::chrono::steady_clock::now();
153 std::chrono::duration<double> diff = end - start;
154 std::cout << "Time for " << nTransfers << " transfers: " << diff.count() << " s\n";
155 std::cout << "Average time per transfer: " << diff.count() / (double)nTransfers * 1e6 << " us\n";
156
157 sender.join();
158}
159
160#endif
161
162/*********************************************************************************************************************/
163
164BOOST_AUTO_TEST_CASE(future_queue_pop_wait) {
165 std::cout << "Measure performance of future_queue with pop_wait" << std::endl;
166
168
169 auto start = std::chrono::steady_clock::now();
170
171 std::thread sender([&theQueue] {
172 for(size_t i = 0; i < nTransfers; ++i) {
173 while(theQueue.push(i & 0xFFFF) == false) usleep(1);
174 }
175 }); // end thread sender
176
177 for(size_t i = 0; i < nTransfers; ++i) {
178 int32_t val;
179 theQueue.pop_wait(val);
180 }
181
182 auto end = std::chrono::steady_clock::now();
183 std::chrono::duration<double> diff = end - start;
184 std::cout << "Time for " << nTransfers << " transfers: " << diff.count() << " s\n";
185 std::cout << "Average time per transfer: " << diff.count() / (double)nTransfers * 1e6 << " us\n";
186
187 sender.join();
188}
189
190/*********************************************************************************************************************/
191
192#ifdef ENABLE_BOOST_LOCKFREE_QUEUE_PERFORMANCE_MEASUREMENT
193
194BOOST_AUTO_TEST_CASE(boost_spsc_queue_of_futures) {
195 std::cout << "Measure performance of "
196 "boost::lockfree::spsc_queue<std::shared_future<T>>"
197 << std::endl;
198
199 boost::lockfree::spsc_queue<boost::shared_future<int32_t>> theQueue(queueLength);
200 boost::promise<int32_t> thePromise;
201 theQueue.push(thePromise.get_future().share());
202
203 auto start = std::chrono::steady_clock::now();
204
205 std::thread sender([&theQueue, &thePromise] {
206 for(size_t i = 0; i < nTransfers; ++i) {
207 boost::promise<int32_t> newPromise;
208 auto newFuture = newPromise.get_future().share();
209 while(theQueue.push(newFuture) == false) usleep(1);
210 thePromise.set_value(i & 0xFFFF);
211 thePromise = std::move(newPromise);
212 }
213 }); // end thread sender
214
215 for(size_t i = 0; i < nTransfers; ++i) {
216 boost::shared_future<int32_t> theFuture;
217 theQueue.pop(theFuture);
218 theFuture.get();
219 }
220
221 auto end = std::chrono::steady_clock::now();
222 std::chrono::duration<double> diff = end - start;
223 std::cout << "Time for " << nTransfers << " transfers: " << diff.count() << " s\n";
224 std::cout << "Average time per transfer: " << diff.count() / (double)nTransfers * 1e6 << " us\n";
225
226 sender.join();
227}
228
229#endif
230
231/*********************************************************************************************************************/
232
233BOOST_AUTO_TEST_CASE(future_queue_when_any) {
234 std::cout << "Measure performance of future_queue with when_any" << std::endl;
235
236 static_assert(nTransfers % nQueues == 0, "nQueues must be an integer divider of nTransfers.");
237
238 std::vector<cppext::future_queue<int32_t>> vectorOfQueues;
239 for(size_t i = 0; i < nQueues; ++i) vectorOfQueues.emplace_back(queueLength);
240
241 auto notificationQueue = when_any(vectorOfQueues.begin(), vectorOfQueues.end());
242
243 cppext::barrier b1(nQueues + 1), b2(nQueues + 1);
244
245 std::vector<std::thread> senders;
246 for(auto& q : vectorOfQueues) {
247 senders.emplace_back([&q, &b1, &b2] {
248 b1.wait();
249 b2.wait();
250 for(size_t i = 0; i < nTransfers / nQueues; ++i) {
251 while(q.push(i & 0xFFFF) == false) usleep(1);
252 }
253 }); // end thread sender
254 }
255
256 b1.wait();
257 auto start = std::chrono::steady_clock::now();
258 b2.wait();
259
260 for(size_t i = 0; i < nTransfers; ++i) {
261 size_t id;
262 notificationQueue.pop_wait(id);
263 int32_t val;
264 vectorOfQueues[id].pop(val);
265 }
266
267 auto end = std::chrono::steady_clock::now();
268 std::chrono::duration<double> diff = end - start;
269 std::cout << "Time for " << nTransfers << " transfers: " << diff.count() << " s\n";
270 std::cout << "Average time per transfer: " << diff.count() / (double)nTransfers * 1e6 << " us\n";
271
272 for(auto& t : senders) t.join();
273}
274
275/*********************************************************************************************************************/
276
277#ifdef ENABLE_BOOST_LOCKFREE_QUEUE_PERFORMANCE_MEASUREMENT
278
279BOOST_AUTO_TEST_CASE(boost_spsc_queue_wait_for_any) {
280 std::cout << "Measure performance of "
281 "boost::lockfree::spsc_queue<boost::shared_future<T>> with "
282 "wait_for_any"
283 << std::endl;
284
285 static_assert(nTransfers % nQueues == 0, "nQueues must be an integer divider of nTransfers.");
286
287 std::list<std::unique_ptr<boost::lockfree::spsc_queue<boost::shared_future<int32_t>>>> listOfQueues;
288 for(size_t i = 0; i < nQueues; ++i) {
289 listOfQueues.emplace_back(new boost::lockfree::spsc_queue<boost::shared_future<int32_t>>(queueLength));
290 }
291
292 cppext::barrier b1(nQueues + 1), b2(nQueues + 1), b3(nQueues + 1);
293
294 std::vector<std::thread> senders;
295 for(auto& q : listOfQueues) {
296 senders.emplace_back([&q, &b1, &b2, &b3] {
297 boost::promise<int32_t> thePromise;
298 q->push(thePromise.get_future().share());
299 b1.wait();
300 b2.wait();
301 for(size_t i = 0; i < nTransfers / nQueues; ++i) {
302 boost::promise<int32_t> newPromise;
303 auto newFuture = newPromise.get_future().share();
304 while(q->push(newFuture) == false) usleep(1);
305 thePromise.set_value(i & 0xFFFF);
306 thePromise = std::move(newPromise);
307 }
308 b3.wait();
309 }); // end thread sender
310 }
311
312 b1.wait();
313 auto start = std::chrono::steady_clock::now();
314 b2.wait();
315
316 for(size_t i = 0; i < nTransfers; ++i) {
317 auto ret = boost::wait_for_any(helper_iterator(listOfQueues.begin()), helper_iterator(listOfQueues.end()));
318 auto& theQueue = ret.get_queue();
319 boost::shared_future<int32_t> theFuture;
320 theQueue->pop(theFuture);
321 theFuture.get();
322 }
323
324 auto end = std::chrono::steady_clock::now();
325 std::chrono::duration<double> diff = end - start;
326 std::cout << "Time for " << nTransfers << " transfers: " << diff.count() << " s\n";
327 std::cout << "Average time per transfer: " << diff.count() / (double)nTransfers * 1e6 << " us\n";
328
329 b3.wait();
330 for(auto& t : senders) t.join();
331}
332
333#endif
334
335/*********************************************************************************************************************/
336
337BOOST_AUTO_TEST_SUITE_END()
bool operator!=(const helper_iterator &other) const
helper_iterator(std::list< std::unique_ptr< boost::lockfree::spsc_queue< boost::shared_future< int32_t > > > >::iterator _it)
std::unique_ptr< boost::lockfree::spsc_queue< boost::shared_future< int32_t > > > & get_queue()
helper_iterator operator++()
helper_iterator operator++(int)
boost::shared_future< int32_t > & operator*()
bool operator==(const helper_iterator &other) const
STL namespace.
constexpr size_t nTransfers
constexpr size_t nQueues
BOOST_AUTO_TEST_CASE(future_queue_spin_wait)
constexpr size_t queueLength