34 struct shared_state_base;
39 template<
typename T,
typename FEATURES,
typename TOUT,
typename CALLABLE>
40 struct continuation_process_async;
73 operator bool()
const;
101 template<
typename T,
typename FEATURES>
186 template<
typename T,
typename FEATURES>
187 friend class ::cppext::future_queue;
189 template<
typename ITERATOR_TYPE>
192 template<
typename ITERATOR_TYPE>
198 template<
typename T,
typename FEATURES,
typename TOUT,
typename CALLABLE>
228 template<
typename T,
typename FEATURES = MOVE_DATA>
251 template<
typename U = T,
252 typename std::enable_if<std::is_same<T, U>::value && !std::is_same<U, void>::value,
int>::type = 0>
254 template<
typename U = T,
255 typename std::enable_if<!std::is_same<U, void>::value && std::is_copy_constructible<T>::value,
int>::type = 0>
270 template<
typename U = T,
271 typename std::enable_if<std::is_same<T, U>::value && !std::is_same<U, void>::value,
int>::type = 0>
274 template<
typename U = T,
275 typename std::enable_if<!std::is_same<U, void>::value && std::is_copy_constructible<T>::value,
int>::type = 0>
283 template<
typename U = T,
284 typename std::enable_if<std::is_same<T, U>::value && !std::is_same<U, void>::value,
int>::type = 0>
291 template<
typename U = T,
292 typename std::enable_if<std::is_same<T, U>::value && !std::is_same<U, void>::value,
int>::type = 0>
303 template<
typename U = T,
304 typename std::enable_if<std::is_same<T, U>::value && !std::is_same<U, void>::value,
int>::type = 0>
307 template<
typename U = T,
308 typename std::enable_if<std::is_same<T, U>::value && std::is_same<U, void>::value,
int>::type = 0>
329 template<
typename T2,
typename FEATURES2 = MOVE_DATA,
typename CALLABLE>
467 executeDelete =
true;
494 this->continuation_origin.d->is_continuation_when_all) {
561 template<
typename ITERATOR_TYPE>
580 return notifyerQueue;
589 template<
typename ITERATOR_TYPE>
604 notifyerQueue.d->continuation_process_deferred = std::function<void(void)>([notifyerQueue,
participants]()
mutable {
612 if(!empty) notifyerQueue.push();
617 notifyerQueue.d->continuation_process_deferred_wait =
630 notifyerQueue.push();
635 notifyerQueue.d->is_continuation_when_all =
true;
636 notifyerQueue.d->continuation_origin =
anyNotify;
638 return notifyerQueue;
655 new(&
a) T(std::move(
b));
701 inline void shared_state_ptr::free() {
704 if(
get() ==
nullptr) {
735 inline shared_state_ptr::operator
bool()
const {
736 return get() !=
nullptr;
766 return d->readIndexMax -
d->readIndex;
774 info =
d->when_any_notification.load(std::memory_order_acquire);
775 if(
info.notifyerQueue)
break;
777 ++
info_n.notifyerQueue_previousData;
778 }
while(!
d->when_any_notification.compare_exchange_weak(
info,
info_n));
779 return info.notifyerQueue;
787 bool nret =
n.push(
d->when_any_index);
799 info =
d->when_any_notification.load(std::memory_order_acquire);
800 if(
info.notifyerQueue)
break;
803 --
info_n.notifyerQueue_previousData;
804 }
while(!
d->when_any_notification.compare_exchange_weak(
info,
info_n));
813 d->exceptions[
myIndex %
d->nBuffers] = exception;
820 d->semaphores[
myIndex %
d->nBuffers].unlock();
835 if(
d->semaphores[(
myIndex - 1) %
d->nBuffers].is_ready_and_reset()) {
841 d->semaphores[(
myIndex - 1) %
d->nBuffers].unlock();
853 d->exceptions[
myIndex %
d->nBuffers] = exception;
863 d->semaphores[
myIndex %
d->nBuffers].unlock();
874 if(
d->hasFrontOwnership)
return false;
875 if(
d->is_continuation_deferred ||
d->is_continuation_when_all)
d->continuation_process_deferred();
876 if(
d->semaphores[
d->readIndex %
d->nBuffers].is_ready_and_reset()) {
877 d->hasFrontOwnership =
true;
884 if(
d->hasFrontOwnership)
return;
885 if(
d->is_continuation_deferred ||
d->is_continuation_when_all)
d->continuation_process_deferred_wait();
886 d->semaphores[
d->readIndex %
d->nBuffers].wait_and_reset();
887 d->hasFrontOwnership =
true;
891 if(!
d->is_continuation_deferred) {
892 return d->nBuffers - 1;
895 return d->continuation_origin.size();
914 if(
index >=
d->readIndex +
d->nBuffers - 1)
return false;
929 if(!
d->semaphores[
index %
d->nBuffers].is_ready())
break;
938 if(!
d->is_continuation_deferred) {
944 info.notifyerQueue_previousData = 0;
949 info_o =
d->when_any_notification;
950 }
while(!
d->when_any_notification.compare_exchange_weak(
info_o,
info));
954 info.notifyerQueue->reference_count++;
956 return info_o.notifyerQueue_previousData;
969 template<
typename T,
typename FEATURES>
972 template<
typename T,
typename FEATURES>
979 template<
typename T,
typename FEATURES>
984 if(!obtain_write_slot(
myIndex))
return false;
988 d->exceptions[
myIndex % d->nBuffers] =
nullptr;
995 d->semaphores[
myIndex % d->nBuffers].unlock();
996 update_read_index_max();
1004 template<
typename T,
typename FEATURES>
1005 template<
typename U,
1006 typename std::enable_if<!std::is_same<U, void>::value && std::is_copy_constructible<T>::value,
int>::type>
1014 template<
typename T,
typename FEATURES>
1017 std::is_same<T, void>::value,
"future_queue<T,FEATURES>::push(void) may only be called for T = void.");
1020 if(!obtain_write_slot(
myIndex))
return false;
1023 d->exceptions[
myIndex % d->nBuffers] =
nullptr;
1030 d->semaphores[
myIndex % d->nBuffers].unlock();
1031 update_read_index_max();
1040 template<
typename T,
typename FEATURES>
1043 assert(d->nBuffers - 1 > 1);
1048 if(!obtain_write_slot(
myIndex)) {
1049 if(d->semaphores[(
myIndex - 1) % d->nBuffers].is_ready_and_reset()) {
1055 d->semaphores[(
myIndex - 1) % d->nBuffers].unlock();
1063 if(!obtain_write_slot(
myIndex))
return false;
1068 d->exceptions[
myIndex % d->nBuffers] =
nullptr;
1078 d->semaphores[
myIndex % d->nBuffers].unlock();
1079 update_read_index_max();
1090 template<
typename T,
typename FEATURES>
1091 template<
typename U,
1092 typename std::enable_if<!std::is_same<U, void>::value && std::is_copy_constructible<T>::value,
int>::type>
1096 return push_overwrite(T(
t));
1103 template<
typename T,
typename FEATURES>
1106 if((d->is_continuation_deferred || d->is_continuation_when_all) && !d->hasFrontOwnership) {
1107 d->continuation_process_deferred();
1109 if(d->hasFrontOwnership || d->semaphores[d->readIndex % d->nBuffers].is_ready_and_reset()) {
1110 std::exception_ptr
e;
1111 if(d->exceptions[d->readIndex % d->nBuffers]) {
1112 e = d->exceptions[d->readIndex % d->nBuffers];
1118 assert(d->readIndex < d->writeIndex);
1120 d->hasFrontOwnership =
false;
1121 decrement_previous_data_counter();
1122 if(
e) std::rethrow_exception(
e);
1132 template<
typename T,
typename FEATURES>
1134 if((d->is_continuation_deferred || d->is_continuation_when_all) && !d->hasFrontOwnership) {
1135 d->continuation_process_deferred();
1137 if(d->hasFrontOwnership || d->semaphores[d->readIndex % d->nBuffers].is_ready_and_reset()) {
1138 std::exception_ptr
e;
1139 if(d->exceptions[d->readIndex % d->nBuffers]) {
1140 e = d->exceptions[d->readIndex % d->nBuffers];
1142 assert(d->readIndex < d->writeIndex);
1144 d->hasFrontOwnership =
false;
1145 decrement_previous_data_counter();
1146 if(
e) std::rethrow_exception(
e);
1155 template<
typename T,
typename FEATURES>
1158 if(!d->hasFrontOwnership) {
1159 if(d->is_continuation_deferred || d->is_continuation_when_all) d->continuation_process_deferred_wait();
1160 d->semaphores[d->readIndex % d->nBuffers].wait_and_reset();
1163 d->hasFrontOwnership =
false;
1165 std::exception_ptr
e;
1166 if(d->exceptions[d->readIndex % d->nBuffers]) {
1167 e = d->exceptions[d->readIndex % d->nBuffers];
1173 assert(d->readIndex < d->writeIndex);
1175 decrement_previous_data_counter();
1176 if(
e) std::rethrow_exception(
e);
1181 template<
typename T,
typename FEATURES>
1183 if(!d->hasFrontOwnership) {
1184 if(d->is_continuation_deferred || d->is_continuation_when_all) d->continuation_process_deferred_wait();
1185 d->semaphores[d->readIndex % d->nBuffers].wait_and_reset();
1188 d->hasFrontOwnership =
false;
1190 std::exception_ptr
e;
1191 if(d->exceptions[d->readIndex % d->nBuffers]) {
1192 e = d->exceptions[d->readIndex % d->nBuffers];
1194 assert(d->readIndex < d->writeIndex);
1196 decrement_previous_data_counter();
1197 if(
e) std::rethrow_exception(
e);
1204 template<
typename T,
typename FEATURES>
1207 assert(d->hasFrontOwnership);
1208 if(d->exceptions[d->readIndex % d->nBuffers]) std::rethrow_exception(d->exceptions[d->readIndex % d->nBuffers]);
1213 template<
typename T,
typename FEATURES>
1216 assert(d->hasFrontOwnership);
1217 if(d->exceptions[d->readIndex % d->nBuffers]) std::rethrow_exception(d->exceptions[d->readIndex % d->nBuffers]);
1234 template<
typename T,
typename FEATURES,
typename TOUT,
typename CALLABLE>
1261 template<
typename FEATURES,
typename TOUT,
typename CALLABLE>
1276 template<
typename T,
typename FEATURES,
typename CALLABLE>
1304 template<
typename FEATURES,
typename CALLABLE>
1327 template<
typename T,
typename FEATURES,
typename TOUT,
typename CALLABLE>
1330 return {q_input, q_output, callable};
1338 template<
typename T,
typename FEATURES,
typename TOUT,
typename CALLABLE>
1366 template<
typename FEATURES,
typename TOUT,
typename CALLABLE>
1387 template<
typename T,
typename FEATURES,
typename CALLABLE>
1415 template<
typename FEATURES,
typename CALLABLE>
1436 template<
typename T,
typename FEATURES,
typename TOUT,
typename CALLABLE>
1439 return {q_input, q_output, callable};
1447 template<
typename T,
typename FEATURES,
typename TOUT,
typename CALLABLE>
1463 q_output.
d->continuation_process_async_terminated =
true;
1484 template<
typename FEATURES,
typename TOUT,
typename CALLABLE>
1497 q_output.
d->continuation_process_async_terminated =
true;
1512 template<
typename T,
typename FEATURES,
typename CALLABLE>
1529 q_output.
d->continuation_process_async_terminated =
true;
1550 template<
typename FEATURES,
typename CALLABLE>
1564 q_output.
d->continuation_process_async_terminated =
true;
1579 template<
typename T,
typename FEATURES,
typename TOUT,
typename CALLABLE>
1582 return {q_input, q_output, callable};
1590 template<
typename T,
typename FEATURES>
1591 template<
typename T2,
typename FEATURES2,
typename CALLABLE>
1594 if(
policy == std::launch::deferred) {
1596 q_output.d->continuation_process_deferred =
1598 q_output.d->continuation_process_deferred_wait =
1600 q_output.d->continuation_origin = *
this;
1601 q_output.d->is_continuation_deferred =
true;
1606 q_output.d->continuation_process_async =
1608 q_output.d->continuation_origin = *
this;
1609 q_output.d->is_continuation_async =
true;
Feature tag for future_queue: use std::move to store and retreive data to/from the queue.
Feature tag for future_queue: use std::swap to store and retreive data to/from the queue.
Exception to be pushed into the queue to signal a termination request for the internal thread of an a...
Type-independent base class for future_queue which does not depend on the template argument.
detail::shared_state_ptr d
pointer to data used to allow sharing the queue (create multiple copies which all refer to the same q...
bool operator==(const future_queue_base &other) const
Check whether two future_queue instances use the same shared state, i.e.
size_t setNotificationQueue(future_queue< size_t, MOVE_DATA > ¬ificationQueue, size_t indexToSend)
Set the notification queue in the shared state, as done in when_any.
void decrement_previous_data_counter()
Decrement the "previous data" counter used in when_any().
bool operator!=(const future_queue_base &other) const
void wait()
Wait until the queue is not empty.
bool push_exception(std::exception_ptr exception)
Push an exception pointer (inplace of a value) into the queue.
void update_read_index_max()
update readIndexMax after a write operation was completed
void send_notification(cppext::detail::shared_state_base *notification_queue)
Send notification to notification queue (if not nullptr).
cppext::detail::shared_state_base * get_notification_queue()
Atomically return the notification queue or increment the "previous data" counter (for wait_any).
friend future_queue< void, MOVE_DATA > when_all(ITERATOR_TYPE begin, ITERATOR_TYPE end)
This function expects two forward iterators pointing to a region of a container of future_queue objec...
size_t write_available() const
Number of push operations which can be performed before the queue is full.
bool empty()
Check if there is currently no data on the queue.
size_t read_available() const
Number of pop operations which can be performed before the queue is empty.
bool push_overwrite_exception(std::exception_ptr exception)
Like push_exception() but overwrite the last pushed value in case the queue is full.
bool obtain_write_slot(size_t &index)
reserve next available write slot.
friend future_queue< size_t, MOVE_DATA > when_any(ITERATOR_TYPE begin, ITERATOR_TYPE end)
Implementations of non-member functions.
size_t size() const
return length of the queue
A lockfree multi-producer single-consumer queue of a fixed length which the receiver can wait on in c...
void front() const
This front() is for void data types.
void pop_wait()
This pop_wait() is for all data types (for non-void data types the value will be discarded)
bool push_overwrite(U &&t)
Push object t to the queue.
future_queue(const future_queue &other)=default
Copy constructor: After copying the object both *this and the other object will refer to the same que...
bool push(U &&t)
Push object t to the queue.
bool pop()
This pop() is for all data types (for non-void data types the value will be discarded)
future_queue(size_t length)
The length specifies how many objects the queue can contain at a time.
future_queue< T2, FEATURES2 > then(CALLABLE callable, std::launch policy=std::launch::async)
Add continuation: Whenever there is a new element in the queue, process it with the callable and put ...
bool pop(U &t)
Pop object off the queue and store it in t.
future_queue & operator=(const future_queue &other)=default
Copy assignment operator: After the assignment both *this and the other object will refer to the same...
bool push_overwrite()
This version of push_overwrite() is valid only for T=void.
bool push(const U &t)
This push() is for non-void data types passed by Lvalue reference.
void pop_wait(U &t)
Pop object off the queue and store it in t.
U & front()
Obtain the front element of the queue without removing it.
future_queue()
The default constructor creates only a place holder which can later be assigned with a properly const...
bool push(void)
This version of push() is valid only for T=void.
bool push_overwrite(const U &t)
This push_overwrite() is for non-void data types passed by Lvalue reference.
continuation_process_async< T, FEATURES, TOUT, CALLABLE > make_continuation_process_async(future_queue< T, FEATURES > q_input, future_queue< TOUT > q_output, CALLABLE callable)
continuation_process_deferred< T, FEATURES, TOUT, CALLABLE > make_continuation_process_deferred(future_queue< T, FEATURES > q_input, future_queue< TOUT > q_output, CALLABLE callable)
void data_assign(T &a, T &&b, MOVE_DATA)
Helper function to realise the data assignment depending on the selected FEATURES tags.
continuation_process_deferred_wait< T, FEATURES, TOUT, CALLABLE > make_continuation_process_deferred_wait(future_queue< T, FEATURES > q_input, future_queue< TOUT > q_output, CALLABLE callable)
shared_state_ptr make_shared_state(size_t length)
future_queue< size_t > when_any(ITERATOR_TYPE begin, ITERATOR_TYPE end)
Implementations of non-member functions.
future_queue< void > when_all(ITERATOR_TYPE begin, ITERATOR_TYPE end)
This function expects two forward iterators pointing to a region of a container of future_queue objec...
continuation_process_async(future_queue< T, FEATURES > q_input_, future_queue< void > q_output_, CALLABLE callable_)
future_queue< void > q_output
future_queue< T, FEATURES > q_input
future_queue< void, FEATURES > q_input
future_queue< TOUT > q_output
continuation_process_async(future_queue< void, FEATURES > q_input_, future_queue< TOUT > q_output_, CALLABLE callable_)
future_queue< void > q_output
continuation_process_async(future_queue< void, FEATURES > q_input_, future_queue< void > q_output_, CALLABLE callable_)
future_queue< void, FEATURES > q_input
continuation_process_async(future_queue< T, FEATURES > q_input_, future_queue< TOUT > q_output_, CALLABLE callable_)
future_queue< T, FEATURES > q_input
future_queue< TOUT > q_output
continuation_process_deferred(future_queue< T, FEATURES > q_input_, future_queue< void > q_output_, CALLABLE callable_)
future_queue< void > q_output
future_queue< T, FEATURES > q_input
future_queue< TOUT > q_output
continuation_process_deferred(future_queue< void, FEATURES > q_input_, future_queue< TOUT > q_output_, CALLABLE callable_)
future_queue< void, FEATURES > q_input
continuation_process_deferred(future_queue< void, FEATURES > q_input_, future_queue< void > q_output_, CALLABLE callable_)
future_queue< void > q_output
future_queue< void, FEATURES > q_input
future_queue< void > q_output
future_queue< T, FEATURES > q_input
continuation_process_deferred_wait(future_queue< T, FEATURES > q_input_, future_queue< void > q_output_, CALLABLE callable_)
future_queue< TOUT > q_output
future_queue< void, FEATURES > q_input
continuation_process_deferred_wait(future_queue< void, FEATURES > q_input_, future_queue< TOUT > q_output_, CALLABLE callable_)
future_queue< void, FEATURES > q_input
continuation_process_deferred_wait(future_queue< void, FEATURES > q_input_, future_queue< void > q_output_, CALLABLE callable_)
future_queue< void > q_output
future_queue< T, FEATURES > q_input
future_queue< TOUT > q_output
continuation_process_deferred_wait(future_queue< T, FEATURES > q_input_, future_queue< TOUT > q_output_, CALLABLE callable_)
future_queue< TOUT > q_output
continuation_process_deferred(future_queue< T, FEATURES > q_input_, future_queue< TOUT > q_output_, CALLABLE callable_)
future_queue< T, FEATURES > q_input
shared_state(size_t length)
Internal base class for holding the data which is shared between multiple instances of the same queue...
std::atomic< size_t > writeIndex
index of the element which will be next written
bool is_continuation_when_all
Flag whether this future_queue is a when_all-type continuation (of many other)
std::vector< semaphore > semaphores
vector of semaphores corresponding to the buffers which allows the receiver to wait for new data
future_queue_base continuation_origin
If either is_continuation_deferred or is_continuation_async is true, this will point to the original ...
shared_state_base(size_t length)
size_t nBuffers
the number of buffers we have allocated
bool is_continuation_deferred
Flag whether this future_queue is a deferred-type continuation of another.
std::atomic< size_t > when_any_index
index used in wait_any to identify the queue
std::atomic< when_any_notification_info > when_any_notification
std::atomic< bool > continuation_process_async_terminated
Flag whether the internal thread continuation_process_async has been terminated.
std::vector< std::exception_ptr > exceptions
vector of exception pointers, can be set instead of values through push_exception()
std::atomic< size_t > reference_count
reference count.
std::function< void(void)> continuation_process_deferred
Function to be called for deferred evaulation of a single value if this queue is a continuation.
std::atomic< size_t > readIndex
index of the element which will be next read
bool is_continuation_async
Flag whether this future_queue is a async-type continuation of another.
bool hasFrontOwnership
Flag if the receiver has already ownership over the front element.
std::thread continuation_process_async
Thread handling async-type continuations.
void free()
Decreaces the reference count and calls "delete this" where appropriate.
std::atomic< size_t > readIndexMax
maximum index which the receiver is currently allowed to read (after checking it semaphore).
virtual ~shared_state_base()
Destructor must be virtual so the destructor of the derived class gets called.
std::function< void(void)> continuation_process_deferred_wait
Function to be called for deferred evaulation of a single value if this queue is a continuation.
shared_ptr-like smart pointer type for referencing the shared_state.
void make_new(size_t length)
Create new shared_state for type T.
shared_state_base * operator->()
Dereferencing operator.
shared_state< T > * cast()
Cast into shared state for type T.
shared_state_ptr & operator=(const shared_state_ptr &other)
Copy by assignment.
shared_state_base * get() const
Obtain the target pointer.
void set(shared_state_base *ptr_)
Set the target pointer without incrementing the reference counter.
shared_state_ptr()
Default constructor: create empty pointer.
~shared_state_ptr()
Destructor.
bool operator==(const shared_state_ptr &other) const
Check if two pointers are identical.
Internal class for holding the data which is shared between multiple instances of the same queue.
shared_state(size_t length)
std::vector< T > buffers
vector of buffers - allocation is done in the constructor
cppext::detail::shared_state_base * notifyerQueue
Notification queue used to realise a wait_any logic.
size_t notifyerQueue_previousData
counter for the number of elements in the queue before when_any has added the notifyerQueue