3#define BOOST_TEST_MODULE test_future_queue
4#include <boost/test/included/unit_test.hpp>
5using namespace boost::unit_test_framework;
10#include <boost/lockfree/queue.hpp>
11#include <boost/lockfree/spsc_queue.hpp>
12#include <boost/next_prior.hpp>
13#include <boost/thread/future.hpp>
28 helper_iterator(std::list<std::unique_ptr<boost::lockfree::spsc_queue<boost::shared_future<int32_t>>>>::iterator _it)
43 std::unique_ptr<boost::lockfree::spsc_queue<boost::shared_future<int32_t>>>& q = *it;
51 std::unique_ptr<boost::lockfree::spsc_queue<boost::shared_future<int32_t>>>&
get_queue() {
return *it; }
54 std::list<std::unique_ptr<boost::lockfree::spsc_queue<boost::shared_future<int32_t>>>>::iterator it;
60 typedef boost::shared_future<int32_t> value_type;
61 typedef size_t difference_type;
62 typedef std::forward_iterator_tag iterator_category;
68BOOST_AUTO_TEST_SUITE(testPerformance)
73 std::cout <<
"Measure performance of future_queue with spin-waiting" << std::endl;
77 auto start = std::chrono::steady_clock::now();
79 std::thread sender([&theQueue] {
81 while(theQueue.push(i & 0xFFFF) ==
false) usleep(1);
87 while(theQueue.pop(val) ==
false)
continue;
90 auto end = std::chrono::steady_clock::now();
91 std::chrono::duration<double> diff = end - start;
92 std::cout <<
"Time for " <<
nTransfers <<
" transfers: " << diff.count() <<
" s\n";
93 std::cout <<
"Average time per transfer: " << diff.count() / (double)
nTransfers * 1e6 <<
" us\n";
100#ifdef ENABLE_BOOST_LOCKFREE_QUEUE_PERFORMANCE_MEASUREMENT
103 std::cout <<
"Measure performance of boost::lockfree::queue" << std::endl;
105 boost::lockfree::queue<int32_t> theQueue(
queueLength);
107 auto start = std::chrono::steady_clock::now();
109 std::thread sender([&theQueue] {
111 while(theQueue.push(i & 0xFFFF) ==
false) usleep(1);
117 while(theQueue.pop(val) ==
false)
continue;
120 auto end = std::chrono::steady_clock::now();
121 std::chrono::duration<double> diff = end - start;
122 std::cout <<
"Time for " <<
nTransfers <<
" transfers: " << diff.count() <<
" s\n";
123 std::cout <<
"Average time per transfer: " << diff.count() / (double)
nTransfers * 1e6 <<
" us\n";
132#ifdef ENABLE_BOOST_LOCKFREE_QUEUE_PERFORMANCE_MEASUREMENT
135 std::cout <<
"Measure performance of boost::lockfree::spsc_queue" << std::endl;
137 boost::lockfree::spsc_queue<int32_t> theQueue(
queueLength);
139 auto start = std::chrono::steady_clock::now();
141 std::thread sender([&theQueue] {
143 while(theQueue.push(i & 0xFFFF) ==
false) usleep(1);
149 while(theQueue.pop(val) ==
false)
continue;
152 auto end = std::chrono::steady_clock::now();
153 std::chrono::duration<double> diff = end - start;
154 std::cout <<
"Time for " <<
nTransfers <<
" transfers: " << diff.count() <<
" s\n";
155 std::cout <<
"Average time per transfer: " << diff.count() / (double)
nTransfers * 1e6 <<
" us\n";
165 std::cout <<
"Measure performance of future_queue with pop_wait" << std::endl;
169 auto start = std::chrono::steady_clock::now();
171 std::thread sender([&theQueue] {
173 while(theQueue.push(i & 0xFFFF) ==
false) usleep(1);
179 theQueue.pop_wait(val);
182 auto end = std::chrono::steady_clock::now();
183 std::chrono::duration<double> diff = end - start;
184 std::cout <<
"Time for " <<
nTransfers <<
" transfers: " << diff.count() <<
" s\n";
185 std::cout <<
"Average time per transfer: " << diff.count() / (double)
nTransfers * 1e6 <<
" us\n";
192#ifdef ENABLE_BOOST_LOCKFREE_QUEUE_PERFORMANCE_MEASUREMENT
195 std::cout <<
"Measure performance of "
196 "boost::lockfree::spsc_queue<std::shared_future<T>>"
199 boost::lockfree::spsc_queue<boost::shared_future<int32_t>> theQueue(
queueLength);
200 boost::promise<int32_t> thePromise;
201 theQueue.push(thePromise.get_future().share());
203 auto start = std::chrono::steady_clock::now();
205 std::thread sender([&theQueue, &thePromise] {
207 boost::promise<int32_t> newPromise;
208 auto newFuture = newPromise.get_future().share();
209 while(theQueue.push(newFuture) ==
false) usleep(1);
210 thePromise.set_value(i & 0xFFFF);
211 thePromise = std::move(newPromise);
216 boost::shared_future<int32_t> theFuture;
217 theQueue.pop(theFuture);
221 auto end = std::chrono::steady_clock::now();
222 std::chrono::duration<double> diff = end - start;
223 std::cout <<
"Time for " <<
nTransfers <<
" transfers: " << diff.count() <<
" s\n";
224 std::cout <<
"Average time per transfer: " << diff.count() / (double)
nTransfers * 1e6 <<
" us\n";
234 std::cout <<
"Measure performance of future_queue with when_any" << std::endl;
236 static_assert(
nTransfers %
nQueues == 0,
"nQueues must be an integer divider of nTransfers.");
238 std::vector<cppext::future_queue<int32_t>> vectorOfQueues;
241 auto notificationQueue = when_any(vectorOfQueues.begin(), vectorOfQueues.end());
245 std::vector<std::thread> senders;
246 for(
auto& q : vectorOfQueues) {
247 senders.emplace_back([&q, &b1, &b2] {
251 while(q.push(i & 0xFFFF) ==
false) usleep(1);
257 auto start = std::chrono::steady_clock::now();
262 notificationQueue.pop_wait(
id);
264 vectorOfQueues[id].pop(val);
267 auto end = std::chrono::steady_clock::now();
268 std::chrono::duration<double> diff = end - start;
269 std::cout <<
"Time for " <<
nTransfers <<
" transfers: " << diff.count() <<
" s\n";
270 std::cout <<
"Average time per transfer: " << diff.count() / (double)
nTransfers * 1e6 <<
" us\n";
272 for(
auto& t : senders) t.join();
277#ifdef ENABLE_BOOST_LOCKFREE_QUEUE_PERFORMANCE_MEASUREMENT
280 std::cout <<
"Measure performance of "
281 "boost::lockfree::spsc_queue<boost::shared_future<T>> with "
285 static_assert(
nTransfers %
nQueues == 0,
"nQueues must be an integer divider of nTransfers.");
287 std::list<std::unique_ptr<boost::lockfree::spsc_queue<boost::shared_future<int32_t>>>> listOfQueues;
288 for(
size_t i = 0; i <
nQueues; ++i) {
289 listOfQueues.emplace_back(
new boost::lockfree::spsc_queue<boost::shared_future<int32_t>>(
queueLength));
294 std::vector<std::thread> senders;
295 for(
auto& q : listOfQueues) {
296 senders.emplace_back([&q, &b1, &b2, &b3] {
297 boost::promise<int32_t> thePromise;
298 q->push(thePromise.get_future().share());
302 boost::promise<int32_t> newPromise;
303 auto newFuture = newPromise.get_future().share();
304 while(q->push(newFuture) ==
false) usleep(1);
305 thePromise.set_value(i & 0xFFFF);
306 thePromise = std::move(newPromise);
313 auto start = std::chrono::steady_clock::now();
318 auto& theQueue = ret.get_queue();
319 boost::shared_future<int32_t> theFuture;
320 theQueue->pop(theFuture);
324 auto end = std::chrono::steady_clock::now();
325 std::chrono::duration<double> diff = end - start;
326 std::cout <<
"Time for " <<
nTransfers <<
" transfers: " << diff.count() <<
" s\n";
327 std::cout <<
"Average time per transfer: " << diff.count() / (double)
nTransfers * 1e6 <<
" us\n";
330 for(
auto& t : senders) t.join();
337BOOST_AUTO_TEST_SUITE_END()
bool operator!=(const helper_iterator &other) const
helper_iterator(std::list< std::unique_ptr< boost::lockfree::spsc_queue< boost::shared_future< int32_t > > > >::iterator _it)
std::unique_ptr< boost::lockfree::spsc_queue< boost::shared_future< int32_t > > > & get_queue()
helper_iterator operator++()
helper_iterator operator++(int)
boost::shared_future< int32_t > & operator*()
bool operator==(const helper_iterator &other) const