ChimeraTK-cppext 01.05.02
All Classes Namespaces Files Functions Variables Typedefs Friends Macros Pages
example2.cc
Go to the documentation of this file.
1// SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
2// SPDX-License-Identifier: LGPL-3.0-or-later
3/**********************************************************************************************************************
4 *
5 * VERY IMPORTANT NOTE!
6 *
7 * Whenever this file is changed, please update the example in the README.md
8 *file as well!
9 *
10 *********************************************************************************************************************/
11
12#include <future_queue.hpp>
13
14#include <iostream>
15#include <thread>
16#include <unistd.h>
17
18// define 3 future_queue of int with a length of 5
19static cppext::future_queue<int> inputQueue1(5);
20static cppext::future_queue<int> inputQueue2(5);
21static cppext::future_queue<std::string> inputQueue3(5);
22
23// define functions sending data in separate threads
25 for(int i = 0; i < 10; ++i) {
26 usleep(100000); // wait 0.1 second
27 inputQueue1.push(i);
28 }
29}
31 for(int i = 20; i < 30; ++i) {
32 inputQueue2.push(i);
33 }
34}
36 for(int i = 20; i < 30; ++i) {
37 usleep(100000); // wait 0.1 second
38 inputQueue3.push("Value " + std::to_string(i));
39 }
40}
41
42int main() {
43 // setup continuations
44 std::vector<cppext::future_queue<double>> temp; // we should have a convenience function to avoid needing a
45 // temporary container...
46 temp.push_back(inputQueue1.then<double>([](int x) { return x / 2.; }));
47 temp.push_back(inputQueue2.then<double>([](int x) { return x * 3.; }));
48
49 // process both results of the continuations together
50 auto when12 = cppext::when_all(temp.begin(), temp.end());
51 auto result12 = when12.then<double>([temp]() mutable {
52 double x, y;
53 temp[0].pop(x);
54 temp[1].pop(y);
55 return x + y;
56 });
57
58 // launch sender threads
59 std::thread myThread1(&senderThread1);
60 std::thread myThread2(&senderThread2);
61 std::thread myThread3(&senderThread3);
62
63 // create notification queue when any result is ready
64 std::vector<cppext::future_queue_base> temp2;
65 temp2.push_back(result12);
66 temp2.push_back(inputQueue3);
67 auto ready = cppext::when_any(temp2.begin(), temp2.end());
68
69 // receive 10 values and print them
70 for(size_t i = 0; i < 10; ++i) {
71 size_t index;
72 ready.pop_wait(index);
73 if(index == 0) {
74 double value;
75 result12.pop(value);
76 std::cout << value << std::endl;
77 }
78 else {
79 std::string value;
80 inputQueue3.pop(value);
81 std::cout << value << std::endl;
82 }
83 }
84
85 myThread1.join();
86 myThread2.join();
87 myThread3.join();
88 return 0;
89}
void senderThread1()
Definition example2.cc:24
void senderThread2()
Definition example2.cc:30
int main()
Definition example2.cc:42
void senderThread3()
Definition example2.cc:35
future_queue< size_t > when_any(ITERATOR_TYPE begin, ITERATOR_TYPE end)
Implementations of non-member functions.
future_queue< void > when_all(ITERATOR_TYPE begin, ITERATOR_TYPE end)
This function expects two forward iterators pointing to a region of a container of future_queue objec...