ChimeraTK-cppext 01.07.01
Loading...
Searching...
No Matches
future_queue.hpp
Go to the documentation of this file.
1// SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
2// SPDX-License-Identifier: LGPL-3.0-or-later
3#pragma once
4
5#include "semaphore.hpp"
6
7#include <atomic>
8#include <cassert>
9#include <future> // just for std::launch
10#include <vector>
11
12namespace cppext {
13
14 /*********************************************************************************************************************/
15
18 class MOVE_DATA {};
19
22 class SWAP_DATA {};
23
24 namespace detail {
29 } // namespace detail
30
31 /*********************************************************************************************************************/
32
33 namespace detail {
34 struct shared_state_base;
35
36 template<typename T>
37 struct shared_state;
38
39 template<typename T, typename FEATURES, typename TOUT, typename CALLABLE>
40 struct continuation_process_async;
41
50
53
56
59
61 template<typename T>
62 void make_new(size_t length);
63
66 const shared_state_base* operator->() const;
67
69 template<typename T>
71
73 operator bool() const;
74
76 bool operator==(const shared_state_ptr& other) const;
77
79 shared_state_base* get() const;
80
83
84 private:
86 void free();
87
90 };
91
92 template<typename T>
95 p.make_new<T>(length);
96 return p;
97 }
98
99 } // namespace detail
100
101 template<typename T, typename FEATURES>
102 class future_queue;
103
104 /*********************************************************************************************************************/
105
109 public:
112 size_t write_available() const;
113
118 size_t read_available() const;
119
123 bool push_exception(std::exception_ptr exception);
124
127 bool push_overwrite_exception(std::exception_ptr exception);
128
133 bool empty();
134
138 void wait();
139
141 size_t size() const;
142
145 bool operator==(const future_queue_base& other) const;
146 bool operator!=(const future_queue_base& other) const;
147
148 protected:
150
152
155 bool obtain_write_slot(size_t& index);
156
159
163
171
178
181
185
186 template<typename T, typename FEATURES>
187 friend class ::cppext::future_queue;
188
189 template<typename ITERATOR_TYPE>
191
192 template<typename ITERATOR_TYPE>
194
197
198 template<typename T, typename FEATURES, typename TOUT, typename CALLABLE>
200 };
201
202 /*********************************************************************************************************************/
203
228 template<typename T, typename FEATURES = MOVE_DATA>
230 public:
236
240
243 future_queue(const future_queue& other) = default;
244
248
251 template<typename U = T,
252 typename std::enable_if<std::is_same<T, U>::value && !std::is_same<U, void>::value, int>::type = 0>
253 bool push(U&& t);
254 template<typename U = T,
255 typename std::enable_if<!std::is_same<U, void>::value && std::is_copy_constructible<T>::value, int>::type = 0>
256 bool push(const U& t);
257
259 bool push(void);
260
270 template<typename U = T,
271 typename std::enable_if<std::is_same<T, U>::value && !std::is_same<U, void>::value, int>::type = 0>
273
274 template<typename U = T,
275 typename std::enable_if<!std::is_same<U, void>::value && std::is_copy_constructible<T>::value, int>::type = 0>
276 bool push_overwrite(const U& t);
277
280
283 template<typename U = T,
284 typename std::enable_if<std::is_same<T, U>::value && !std::is_same<U, void>::value, int>::type = 0>
285 bool pop(U& t);
286
287 bool pop();
288
291 template<typename U = T,
292 typename std::enable_if<std::is_same<T, U>::value && !std::is_same<U, void>::value, int>::type = 0>
293 void pop_wait(U& t);
294
295 void pop_wait();
296
303 template<typename U = T,
304 typename std::enable_if<std::is_same<T, U>::value && !std::is_same<U, void>::value, int>::type = 0>
306
307 template<typename U = T,
308 typename std::enable_if<std::is_same<T, U>::value && std::is_same<U, void>::value, int>::type = 0>
309 void front() const;
310
329 template<typename T2, typename FEATURES2 = MOVE_DATA, typename CALLABLE>
330 future_queue<T2, FEATURES2> then(CALLABLE callable, std::launch policy = std::launch::async);
331
332 typedef T value_type;
333 };
334
335 /*********************************************************************************************************************/
336
337 namespace detail {
338
347
360
364
367 inline void free();
368
370 std::atomic<size_t> reference_count{0};
371
373 std::atomic<size_t> when_any_index;
374
376 size_t nBuffers;
377
380 std::vector<semaphore> semaphores;
381
384 std::vector<std::exception_ptr> exceptions;
385
388 std::atomic<size_t> writeIndex;
389
393 std::atomic<size_t> readIndexMax;
394
396 std::atomic<size_t> readIndex;
397
401
405
409
414
417
420
424
428
432
433 std::atomic<when_any_notification_info> when_any_notification{when_any_notification_info()};
434 };
435
439 template<typename T>
442
444 std::vector<T> buffers;
445 };
446
448 template<>
452
454 // Reduce reference count but atomically keep the old reference counter. Note
455 // that the std::memory_order_relaxed refers to the access to the pointer not
456 // to the reference counter.
457 size_t oldCount = this->reference_count--;
458
459 // Determine whether we need to destroy the shared state depending on possible
460 // internal references.
461 bool executeDelete = false;
462
463 // Standard case: no continuation. If the last user is just destroying its
464 // reference we delete the shared state.
465 if(oldCount == 1 && !this->is_continuation_async && !this->is_continuation_deferred &&
467 executeDelete = true;
468 }
469 // Deferred continuations (incl. when_all) have two internal use counts due to
470 // the two std::functions, so we need to remove those functions first.
471 else if(oldCount == 3 && (this->is_continuation_deferred || this->is_continuation_when_all)) {
474 executeDelete = true;
475 }
476 // Async continuations have one internal use count inside their thread, so we
477 // need to terminate the thread first.
478 else if(oldCount == 2 && this->is_continuation_async) {
479 if(this->continuation_process_async.joinable()) {
480 // Signal termination to internal thread and wait until thread has been
481 // terminated
482 while(this->continuation_process_async_terminated == false) {
483 // Push a detail::TerminateInternalThread exception into the queue which
484 // the internal thread is potentially waiting on.
485 try {
487 }
488 catch(...) {
489 // Special case: the origin queue is a continuation itself (deferred
490 // or when_all) - we need to push the exception to the origin of the
491 // origin to actually reach the internal thread, since
492 // deferred/when_all continuations do not really use their own queue
493 if(this->continuation_origin.d->is_continuation_deferred ||
494 this->continuation_origin.d->is_continuation_when_all) {
495 this->continuation_origin.d->continuation_origin.push_exception(std::current_exception());
496 }
497 // Standard case: just push the exception to the origin queue of the
498 // continuation
499 else {
500 this->continuation_origin.push_exception(std::current_exception());
501 }
502 } // end catch
503 } // end while
504
505 this->continuation_process_async.join();
506 }
507 executeDelete = true;
508 }
509
510 if(executeDelete) {
511 // Now that all potential internal references have been cleared the
512 // reference count must be 0
514
515 // the when_any_notification notifyerQueue may have it's reference count
516 // manually incremented by setNotificationQueue, and must be freed if it exists.
517 if(when_any_notification.load().notifyerQueue) {
518 when_any_notification.load().notifyerQueue->free();
519 }
520 delete this;
521 } // end if executeDelete
522
523 } // end shared_state_base::free()
524
525 } // namespace detail
526
527 /*********************************************************************************************************************/
528 /*********************************************************************************************************************/
530 /*********************************************************************************************************************/
531 /*********************************************************************************************************************/
532
561 template<typename ITERATOR_TYPE>
563 // Add lengthes of all queues - this will be the length of the notification
564 // queue
565 size_t summedLength = 0;
566 for(ITERATOR_TYPE it = begin; it != end; ++it) summedLength += it->size();
567
568 // Create a notification queue, so we can hand it on to the queues
569 future_queue<size_t> notifyerQueue(summedLength);
570
571 // Distribute the pointer to the notification queue to all participating
572 // queues
573 size_t index = 0;
574 for(ITERATOR_TYPE it = begin; it != end; ++it) {
575 size_t nPreviousValues = it->setNotificationQueue(notifyerQueue, index);
576 for(size_t i = 0; i < nPreviousValues; ++i) notifyerQueue.push(index);
577 ++index;
578 }
579
580 return notifyerQueue;
581 }
582
583 /*********************************************************************************************************************/
584
589 template<typename ITERATOR_TYPE>
591 // Create a notification queue in a shared pointer, so we can hand it on to
592 // the queues
593 future_queue<void> notifyerQueue(1);
594
595 // copy the list of participating queues
596 std::vector<future_queue_base> participants;
597 for(auto it = begin; it != end; ++it) participants.push_back(*it);
598
599 // obtain notification queue for any update to any queue
600 auto anyNotify = when_any(begin, end);
601
602 // define function to be executed (inside the notifyer queue) on non-blocking
603 // functions like pop() or empty()
604 notifyerQueue.d->continuation_process_deferred = std::function<void(void)>([notifyerQueue, participants]() mutable {
605 bool empty = false;
606 for(auto& q : participants) {
607 if(q.empty()) {
608 empty = true;
609 break;
610 }
611 }
612 if(!empty) notifyerQueue.push();
613 });
614
615 // define function to be executed (inside the notifyer queue) on blocking
616 // functions like pop_wait() or wait()
617 notifyerQueue.d->continuation_process_deferred_wait =
618 std::function<void(void)>([notifyerQueue, participants, anyNotify]() mutable {
619 while(true) {
620 anyNotify.pop_wait();
621 bool empty = false;
622 for(auto& q : participants) {
623 if(q.empty()) {
624 empty = true;
625 break;
626 }
627 }
628 if(!empty) break;
629 }
630 notifyerQueue.push();
631 });
632
633 // set flag marking the notifyerQueue a when_all continuation and save the
634 // notification queue of the when_any as the origin.
635 notifyerQueue.d->is_continuation_when_all = true;
636 notifyerQueue.d->continuation_origin = anyNotify;
637
638 return notifyerQueue;
639 }
640
641 /*********************************************************************************************************************/
642
643 namespace detail {
644
649 template<typename T>
650 void data_assign(T& a, T&& b, MOVE_DATA) {
651 // in order not to depend on the move assignment operator, which might not
652 // always be available, we perform an in-place destruction followed by an
653 // in-place move construction.
654 a.~T();
655 new(&a) T(std::move(b));
656 }
657
658 template<typename T>
659 void data_assign(T& a, T&& b, SWAP_DATA) {
660 std::swap(a, b);
661 }
662
663 } // namespace detail
664
665 /*********************************************************************************************************************/
666 /*********************************************************************************************************************/
668 /*********************************************************************************************************************/
669 /*********************************************************************************************************************/
670
671 namespace detail {
672
674
676 // Copy the pointer and increase the reference count
677 set(other.get());
678 if(get() != nullptr) get()->reference_count++;
679 }
680
682 // Free previous target, copy the new pointer and increase its reference count
683 free();
684 set(other.get());
685 if(get() != nullptr) get()->reference_count++;
686 return *this;
687 }
688
690 free();
691 }
692
694 return ptr;
695 }
696
698 ptr = ptr_;
699 }
700
701 inline void shared_state_ptr::free() {
702 // Don't do anything if called on a nullptr (i.e. default constructed or
703 // already destroyed)
704 if(get() == nullptr) {
705 return;
706 }
707
708 get()->free();
709 set(nullptr);
710 }
711
712 template<typename T>
714 free();
715 ptr = new shared_state<T>(length);
716 get()->reference_count = 1;
717 }
718
720 assert(get() != nullptr);
721 return get();
722 }
723
725 assert(get() != nullptr);
726 return get();
727 }
728
729 template<typename T>
731 assert(get() != nullptr);
732 return static_cast<shared_state<T>*>(get());
733 }
734
735 inline shared_state_ptr::operator bool() const {
736 return get() != nullptr;
737 }
738
740 return get() == other.get();
741 }
742
743 } // namespace detail
744
745 /*********************************************************************************************************************/
746 /*********************************************************************************************************************/
748 /*********************************************************************************************************************/
749 /*********************************************************************************************************************/
750
752 // Obtain indices in this particular order to ensure consistency. Result might
753 // be too small (but not too big) if writing happens concurrently.
754 size_t l_writeIndex = d->writeIndex;
755 size_t l_readIndex = d->readIndex;
756 if(l_writeIndex - l_readIndex < d->nBuffers - 1) {
757 return d->nBuffers - (l_writeIndex - l_readIndex) - 1;
758 }
759 else {
760 return 0;
761 }
762 }
763
764 inline size_t future_queue_base::read_available() const {
765 // Single consumer, so atomicity doesn't matter
766 return d->readIndexMax - d->readIndex;
767 }
768
770 // if there is no notification queue, atomically increment counter while making sure now notification queue is
771 // placed concurrently
773 do {
774 info = d->when_any_notification.load(std::memory_order_acquire);
775 if(info.notifyerQueue) break;
776 info_n = info;
777 ++info_n.notifyerQueue_previousData;
778 } while(!d->when_any_notification.compare_exchange_weak(info, info_n));
779 return info.notifyerQueue;
780 }
781
783 // if there is a notification queue, push to it
786 n.d.set(notification_queue);
787 bool nret = n.push(d->when_any_index);
788 n.d.set(nullptr); // prevent reference count from being decremented
789 (void)nret;
790 // This assert doesn't really hold. It might spuriously fail during destruction of certain combinations of
791 // continuations and when_any/when_all.
792 // assert(nret == true);
793 }
794 }
795
798 do {
799 info = d->when_any_notification.load(std::memory_order_acquire);
800 if(info.notifyerQueue) break; // no need to deal with this counter if notification queue present
801 info_n = info;
802 assert(info_n.notifyerQueue_previousData > 0);
803 --info_n.notifyerQueue_previousData;
804 } while(!d->when_any_notification.compare_exchange_weak(info, info_n));
805 }
806
807 inline bool future_queue_base::push_exception(std::exception_ptr exception) {
808 // obtain index to write to
809 size_t myIndex;
810 if(!obtain_write_slot(myIndex)) return false;
811
812 // assign the payload data (data buffer is ignored if exception is set)
813 d->exceptions[myIndex % d->nBuffers] = exception;
814
815 // obtain notification queue or increment previous data counter (for when_any)
817
818 // signal receiving end
819 assert(!d->semaphores[myIndex % d->nBuffers].is_ready());
820 d->semaphores[myIndex % d->nBuffers].unlock();
822
823 // deal with when_any notifications
825 return true;
826 }
827
828 inline bool future_queue_base::push_overwrite_exception(std::exception_ptr exception) {
829 assert(d->nBuffers - 1 > 1);
830 bool ret = true;
831
832 // obtain index to write to, if necessary remove old data first
833 size_t myIndex;
835 if(d->semaphores[(myIndex - 1) % d->nBuffers].is_ready_and_reset()) {
836 size_t expectedIndex = myIndex;
837 bool success = d->writeIndex.compare_exchange_strong(expectedIndex, myIndex - 1);
838 if(!success) {
839 // in case of a concurrent push_overwrite(), our data effectively just got overwritten by the other thread
840 // even before writing it...
841 d->semaphores[(myIndex - 1) % d->nBuffers].unlock();
842 return false;
843 }
844 ret = false;
845 }
846 else {
847 return false;
848 }
849 if(!obtain_write_slot(myIndex)) return false;
850 }
851
852 // assign the payload data (data buffer is ignored if exception is set)
853 d->exceptions[myIndex % d->nBuffers] = exception;
854
855 // obtain notification queue or increment previous data counter (for when_any) (unless data was overwritten)
857 if(ret) {
859 }
860
861 // obtain notification queue or increment previous data counter (for when_any)
862 assert(!d->semaphores[myIndex % d->nBuffers].is_ready());
863 d->semaphores[myIndex % d->nBuffers].unlock();
865
866 // deal with when_any notifications (unless data was overwritten)
867 if(ret) {
869 }
870 return ret;
871 }
872
874 if(d->hasFrontOwnership) return false;
875 if(d->is_continuation_deferred || d->is_continuation_when_all) d->continuation_process_deferred();
876 if(d->semaphores[d->readIndex % d->nBuffers].is_ready_and_reset()) {
877 d->hasFrontOwnership = true;
878 return false;
879 }
880 return true;
881 }
882
884 if(d->hasFrontOwnership) return;
885 if(d->is_continuation_deferred || d->is_continuation_when_all) d->continuation_process_deferred_wait();
886 d->semaphores[d->readIndex % d->nBuffers].wait_and_reset();
887 d->hasFrontOwnership = true;
888 }
889
890 inline size_t future_queue_base::size() const {
891 if(!d->is_continuation_deferred) {
892 return d->nBuffers - 1;
893 }
894 else {
895 return d->continuation_origin.size();
896 }
897 }
898
900 return d == other.d;
901 }
902
904 return !(d == other.d);
905 }
906
908
910
912 index = d->writeIndex;
913 while(true) {
914 if(index >= d->readIndex + d->nBuffers - 1) return false; // queue is full
915 bool success = d->writeIndex.compare_exchange_weak(index, index + 1);
916 if(success) break;
917 }
918 return true;
919 }
920
922 size_t l_readIndex = d->readIndex;
923 size_t l_writeIndex = d->writeIndex;
924 size_t l_readIndexMax = d->readIndexMax;
925 if(l_writeIndex >= l_readIndex + d->nBuffers) l_writeIndex = l_readIndex + d->nBuffers - 1;
927 do {
928 for(size_t index = l_readIndexMax; index <= l_writeIndex - 1; ++index) {
929 if(!d->semaphores[index % d->nBuffers].is_ready()) break;
931 }
932 d->readIndexMax.compare_exchange_weak(l_readIndexMax, newReadIndexMax);
933 } while(d->readIndexMax < newReadIndexMax);
934 }
935
938 if(!d->is_continuation_deferred) {
939 d->when_any_index = indexToSend;
940
941 // create new info struct with notification queue
943 info.notifyerQueue = notificationQueue.d.get();
944 info.notifyerQueue_previousData = 0;
945
946 // atomically exchange info struct while making sure it has not been altered at the target in the mean time
948 do {
949 info_o = d->when_any_notification;
950 } while(!d->when_any_notification.compare_exchange_weak(info_o, info));
951
952 // artificially increment the reference count of the notification queue, since we have to store a plain pointer
953 // in the info struct rather than a shared_state_ptr (which is not trivially copyable).
954 info.notifyerQueue->reference_count++;
955
956 return info_o.notifyerQueue_previousData;
957 }
958 else {
959 return d->continuation_origin.setNotificationQueue(notificationQueue, indexToSend);
960 }
961 }
962
963 /*********************************************************************************************************************/
964 /*********************************************************************************************************************/
966 /*********************************************************************************************************************/
967 /*********************************************************************************************************************/
968
969 template<typename T, typename FEATURES>
970 future_queue<T, FEATURES>::future_queue(size_t length) : future_queue_base(detail::make_shared_state<T>(length)) {}
971
972 template<typename T, typename FEATURES>
974
975 /*********************************************************************************************************************/
979 template<typename T, typename FEATURES>
980 template<typename U, typename std::enable_if<std::is_same<T, U>::value && !std::is_same<U, void>::value, int>::type>
982 // obtain index to write to
983 size_t myIndex;
984 if(!obtain_write_slot(myIndex)) return false;
985
986 // assign the payload data
987 detail::data_assign(future_queue_base::d.cast<T>()->buffers[myIndex % d->nBuffers], std::move(t), FEATURES());
988 d->exceptions[myIndex % d->nBuffers] = nullptr;
989
990 // obtain notification queue or increment previous data counter (for when_any)
991 auto notification_queue = get_notification_queue();
992
993 // signal receiving end
994 assert(!d->semaphores[myIndex % d->nBuffers].is_ready());
995 d->semaphores[myIndex % d->nBuffers].unlock();
996 update_read_index_max(); // basically only for read_available()
997
998 // deal with when_any notifications
999 send_notification(notification_queue);
1000 return true;
1001 }
1002
1004 template<typename T, typename FEATURES>
1005 template<typename U,
1006 typename std::enable_if<!std::is_same<U, void>::value && std::is_copy_constructible<T>::value, int>::type>
1008 // Create copy and pass this copy as an Rvalue reference to the other
1009 // implementation
1010 return push(T(t));
1011 }
1012
1014 template<typename T, typename FEATURES>
1016 static_assert(
1017 std::is_same<T, void>::value, "future_queue<T,FEATURES>::push(void) may only be called for T = void.");
1018 // obtain index to write to
1019 size_t myIndex;
1020 if(!obtain_write_slot(myIndex)) return false;
1021
1022 // assign the payload data
1023 d->exceptions[myIndex % d->nBuffers] = nullptr;
1024
1025 // obtain notification queue or increment previous data counter (for when_any)
1026 auto notification_queue = get_notification_queue();
1027
1028 // signal receiving end
1029 assert(!d->semaphores[myIndex % d->nBuffers].is_ready());
1030 d->semaphores[myIndex % d->nBuffers].unlock();
1031 update_read_index_max(); // basically only for read_available()
1032
1033 // deal with when_any notifications
1034 send_notification(notification_queue);
1035 return true;
1036 }
1037
1040 template<typename T, typename FEATURES>
1041 template<typename U, typename std::enable_if<std::is_same<T, U>::value && !std::is_same<U, void>::value, int>::type>
1043 assert(d->nBuffers - 1 > 1);
1044 bool ret = true;
1045
1046 // obtain index to write to, if necessary remove old data first
1047 size_t myIndex;
1048 if(!obtain_write_slot(myIndex)) {
1049 if(d->semaphores[(myIndex - 1) % d->nBuffers].is_ready_and_reset()) {
1050 size_t expectedIndex = myIndex;
1051 bool success = d->writeIndex.compare_exchange_strong(expectedIndex, myIndex - 1);
1052 if(!success) {
1053 // in case of a concurrent push_overwrite(), our data effectively just got overwritten by the other thread
1054 // even before writing it...
1055 d->semaphores[(myIndex - 1) % d->nBuffers].unlock();
1056 return false;
1057 }
1058 ret = false;
1059 }
1060 else {
1061 return false;
1062 }
1063 if(!obtain_write_slot(myIndex)) return false;
1064 }
1065
1066 // assign the payload data
1067 detail::data_assign(future_queue_base::d.cast<T>()->buffers[myIndex % d->nBuffers], std::move(t), FEATURES());
1068 d->exceptions[myIndex % d->nBuffers] = nullptr;
1069
1070 // obtain notification queue or increment previous data counter (for when_any) (unless data was overwritten)
1072 if(ret) {
1073 notification_queue = get_notification_queue();
1074 }
1075
1076 // signal receiving end
1077 assert(!d->semaphores[myIndex % d->nBuffers].is_ready());
1078 d->semaphores[myIndex % d->nBuffers].unlock();
1079 update_read_index_max();
1080
1081 // deal with when_any notifications (unless data was overwritten)
1082 if(ret) {
1083 send_notification(notification_queue);
1084 }
1085 return ret;
1086 }
1087
1090 template<typename T, typename FEATURES>
1091 template<typename U,
1092 typename std::enable_if<!std::is_same<U, void>::value && std::is_copy_constructible<T>::value, int>::type>
1094 // Create copy and pass this copy as an Rvalue reference to the other
1095 // implementation
1096 return push_overwrite(T(t));
1097 }
1098
1099 /*********************************************************************************************************************/
1103 template<typename T, typename FEATURES>
1104 template<typename U, typename std::enable_if<std::is_same<T, U>::value && !std::is_same<U, void>::value, int>::type>
1106 if((d->is_continuation_deferred || d->is_continuation_when_all) && !d->hasFrontOwnership) {
1107 d->continuation_process_deferred();
1108 }
1109 if(d->hasFrontOwnership || d->semaphores[d->readIndex % d->nBuffers].is_ready_and_reset()) {
1110 std::exception_ptr e;
1111 if(d->exceptions[d->readIndex % d->nBuffers]) {
1112 e = d->exceptions[d->readIndex % d->nBuffers];
1113 }
1114 else {
1116 t, std::move(future_queue_base::d.cast<T>()->buffers[d->readIndex % d->nBuffers]), FEATURES());
1117 }
1118 assert(d->readIndex < d->writeIndex);
1119 d->readIndex++;
1120 d->hasFrontOwnership = false;
1121 decrement_previous_data_counter();
1122 if(e) std::rethrow_exception(e);
1123 return true;
1124 }
1125 else {
1126 return false;
1127 }
1128 }
1129
1132 template<typename T, typename FEATURES>
1134 if((d->is_continuation_deferred || d->is_continuation_when_all) && !d->hasFrontOwnership) {
1135 d->continuation_process_deferred();
1136 }
1137 if(d->hasFrontOwnership || d->semaphores[d->readIndex % d->nBuffers].is_ready_and_reset()) {
1138 std::exception_ptr e;
1139 if(d->exceptions[d->readIndex % d->nBuffers]) {
1140 e = d->exceptions[d->readIndex % d->nBuffers];
1141 }
1142 assert(d->readIndex < d->writeIndex);
1143 d->readIndex++;
1144 d->hasFrontOwnership = false;
1145 decrement_previous_data_counter();
1146 if(e) std::rethrow_exception(e);
1147 return true;
1148 }
1149 else {
1150 return false;
1151 }
1152 }
1153
1155 template<typename T, typename FEATURES>
1156 template<typename U, typename std::enable_if<std::is_same<T, U>::value && !std::is_same<U, void>::value, int>::type>
1158 if(!d->hasFrontOwnership) {
1159 if(d->is_continuation_deferred || d->is_continuation_when_all) d->continuation_process_deferred_wait();
1160 d->semaphores[d->readIndex % d->nBuffers].wait_and_reset();
1161 }
1162 else {
1163 d->hasFrontOwnership = false;
1164 }
1165 std::exception_ptr e;
1166 if(d->exceptions[d->readIndex % d->nBuffers]) {
1167 e = d->exceptions[d->readIndex % d->nBuffers];
1168 }
1169 else {
1171 t, std::move(future_queue_base::d.cast<U>()->buffers[d->readIndex % d->nBuffers]), FEATURES());
1172 }
1173 assert(d->readIndex < d->writeIndex);
1174 d->readIndex++;
1175 decrement_previous_data_counter();
1176 if(e) std::rethrow_exception(e);
1177 }
1178
1181 template<typename T, typename FEATURES>
1183 if(!d->hasFrontOwnership) {
1184 if(d->is_continuation_deferred || d->is_continuation_when_all) d->continuation_process_deferred_wait();
1185 d->semaphores[d->readIndex % d->nBuffers].wait_and_reset();
1186 }
1187 else {
1188 d->hasFrontOwnership = false;
1189 }
1190 std::exception_ptr e;
1191 if(d->exceptions[d->readIndex % d->nBuffers]) {
1192 e = d->exceptions[d->readIndex % d->nBuffers];
1193 }
1194 assert(d->readIndex < d->writeIndex);
1195 d->readIndex++;
1196 decrement_previous_data_counter();
1197 if(e) std::rethrow_exception(e);
1198 }
1199
1200 /*********************************************************************************************************************/
1204 template<typename T, typename FEATURES>
1205 template<typename U, typename std::enable_if<std::is_same<T, U>::value && !std::is_same<U, void>::value, int>::type>
1207 assert(d->hasFrontOwnership);
1208 if(d->exceptions[d->readIndex % d->nBuffers]) std::rethrow_exception(d->exceptions[d->readIndex % d->nBuffers]);
1209 return future_queue_base::d.cast<T>()->buffers[d->readIndex % d->nBuffers];
1210 }
1211
1213 template<typename T, typename FEATURES>
1214 template<typename U, typename std::enable_if<std::is_same<T, U>::value && std::is_same<U, void>::value, int>::type>
1216 assert(d->hasFrontOwnership);
1217 if(d->exceptions[d->readIndex % d->nBuffers]) std::rethrow_exception(d->exceptions[d->readIndex % d->nBuffers]);
1218 }
1219
1220 /*********************************************************************************************************************/
1224 namespace detail {
1225 // ----------------------------------------------------------------------------------------------------------------
1226 // ----------------------------------------------------------------------------------------------------------------
1227 // helper functions used inside future_queue::then()
1228
1229 // ----------------------------------------------------------------------------------------------------------------
1230 // continuation_process_deferred: function to be executed in a deferred
1231 // continuation in non-blocking functions
1232
1233 // continuation_process_deferred for non-void data types
1234 template<typename T, typename FEATURES, typename TOUT, typename CALLABLE>
1239 void operator()() {
1240 // written this way so the callable is able to swap with the internal buffer
1241 if(q_input.empty()) return;
1242 try {
1244 }
1245 catch(...) {
1246 q_output.push_exception(std::current_exception());
1247 }
1248 try {
1249 q_input.pop();
1250 }
1251 catch(...) {
1252 // exception already pushed to the output queue, so ignore here
1253 }
1254 }
1258 };
1259
1260 // continuation_process_deferred for void input and non-void output data types
1261 template<typename FEATURES, typename TOUT, typename CALLABLE>
1274
1275 // continuation_process_deferred for non-void input and void output data types
1276 template<typename T, typename FEATURES, typename CALLABLE>
1281 void operator()() {
1282 // written this way so the callable is able to swap with the internal buffer
1283 if(q_input.empty()) return;
1284 try {
1286 q_output.push();
1287 }
1288 catch(...) {
1289 q_output.push_exception(std::current_exception());
1290 }
1291 try {
1292 q_input.pop();
1293 }
1294 catch(...) {
1295 // exception already pushed to the output queue, so ignore here
1296 }
1297 }
1301 };
1302
1303 // continuation_process_deferred for void input and void output data types
1304 template<typename FEATURES, typename CALLABLE>
1325
1326 // factory for continuation_process_deferred
1327 template<typename T, typename FEATURES, typename TOUT, typename CALLABLE>
1332
1333 // ----------------------------------------------------------------------------------------------------------------
1334 // continuation_process_deferred_wait: function to be executed in a deferred
1335 // continuation in blocking functions
1336
1337 // continuation_process_deferred_wait for non-void data types
1338 template<typename T, typename FEATURES, typename TOUT, typename CALLABLE>
1343 void operator()() {
1344 // written this way so the callable is able to swap with the internal buffer
1345 try {
1346 q_input.wait();
1348 }
1349 catch(...) {
1350 q_output.push_exception(std::current_exception());
1351 }
1352 try {
1353 q_input.pop();
1354 }
1355 catch(...) {
1356 // exception already pushed to the output queue, so ignore here
1357 }
1358 }
1362 };
1363
1364 // continuation_process_deferred_wait for void input and non-void output data
1365 // types
1366 template<typename FEATURES, typename TOUT, typename CALLABLE>
1384
1385 // continuation_process_deferred_wait for non-void input and void output data
1386 // types
1387 template<typename T, typename FEATURES, typename CALLABLE>
1392 void operator()() {
1393 // written this way so the callable is able to swap with the internal buffer
1394 try {
1395 q_input.wait();
1397 q_output.push();
1398 }
1399 catch(...) {
1400 q_output.push_exception(std::current_exception());
1401 }
1402 try {
1403 q_input.pop();
1404 }
1405 catch(...) {
1406 // exception already pushed to the output queue, so ignore here
1407 }
1408 }
1412 };
1413
1414 // continuation_process_deferred_wait for void input and void output data types
1415 template<typename FEATURES, typename CALLABLE>
1434
1435 // factory for continuation_process_deferred_wait
1436 template<typename T, typename FEATURES, typename TOUT, typename CALLABLE>
1441
1442 // ----------------------------------------------------------------------------------------------------------------
1443 // continuation_process_async: function to be executed in the internal thread of
1444 // a async continuation
1445
1446 // continuation_process_async for non-void data types
1447 template<typename T, typename FEATURES, typename TOUT, typename CALLABLE>
1451 void operator()() {
1452 while(true) {
1453 // written this way so the callable is able to swap with the internal
1454 // buffer
1455 q_input.wait();
1456 T* v;
1457 try {
1458 v = &(q_input.front());
1459 // TODO how to handle full output queues?
1461 }
1463 q_output.d->continuation_process_async_terminated = true;
1464 return;
1465 }
1466 catch(...) {
1467 // TODO how to handle full output queues?
1468 q_output.push_exception(std::current_exception());
1469 }
1470 try {
1471 q_input.pop();
1472 }
1473 catch(...) {
1474 // exception already pushed to the output queue, so ignore here
1475 }
1476 }
1477 }
1481 };
1482
1483 // continuation_process_async for void input and non-void output data types
1484 template<typename FEATURES, typename TOUT, typename CALLABLE>
1489 void operator()() {
1490 while(true) {
1491 try {
1492 q_input.pop_wait();
1493 // TODO how to handle full output queues?
1495 }
1497 q_output.d->continuation_process_async_terminated = true;
1498 return;
1499 }
1500 catch(...) {
1501 // TODO how to handle full output queues?
1502 q_output.push_exception(std::current_exception());
1503 }
1504 }
1505 }
1509 };
1510
1511 // continuation_process_async for non-void input and void output data types
1512 template<typename T, typename FEATURES, typename CALLABLE>
1516 void operator()() {
1517 while(true) {
1518 // written this way so the callable is able to swap with the internal
1519 // buffer
1520 q_input.wait();
1521 T* v;
1522 try {
1523 v = &(q_input.front());
1524 callable(*v);
1525 // TODO how to handle full output queues?
1526 q_output.push();
1527 }
1529 q_output.d->continuation_process_async_terminated = true;
1530 return;
1531 }
1532 catch(...) {
1533 // TODO how to handle full output queues?
1534 q_output.push_exception(std::current_exception());
1535 }
1536 try {
1537 q_input.pop();
1538 }
1539 catch(...) {
1540 // exception already pushed to the output queue, so ignore here
1541 }
1542 }
1543 }
1547 };
1548
1549 // continuation_process_async for void input and void output data types
1550 template<typename FEATURES, typename CALLABLE>
1555 void operator()() {
1556 while(true) {
1557 try {
1558 q_input.pop_wait();
1559 callable();
1560 // TODO how to handle full output queues?
1561 q_output.push();
1562 }
1564 q_output.d->continuation_process_async_terminated = true;
1565 return;
1566 }
1567 catch(...) {
1568 // TODO how to handle full output queues?
1569 q_output.push_exception(std::current_exception());
1570 }
1571 }
1572 }
1576 };
1577
1578 // factory for continuation_process_async
1579 template<typename T, typename FEATURES, typename TOUT, typename CALLABLE>
1584 } // namespace detail
1585
1586 // ----------------------------------------------------------------------------------------------------------------
1587 // ----------------------------------------------------------------------------------------------------------------
1588 // actual implementation of future_queue::then()
1589
1590 template<typename T, typename FEATURES>
1591 template<typename T2, typename FEATURES2, typename CALLABLE>
1593 future_queue<T, FEATURES> q_input(*this);
1594 if(policy == std::launch::deferred) {
1595 future_queue<T2, FEATURES2> q_output(1);
1596 q_output.d->continuation_process_deferred =
1597 detail::make_continuation_process_deferred(q_input, q_output, callable);
1598 q_output.d->continuation_process_deferred_wait =
1599 detail::make_continuation_process_deferred_wait(q_input, q_output, callable);
1600 q_output.d->continuation_origin = *this;
1601 q_output.d->is_continuation_deferred = true;
1602 return q_output;
1603 }
1604 else {
1605 future_queue<T2, FEATURES2> q_output(size());
1606 q_output.d->continuation_process_async =
1607 std::thread(detail::make_continuation_process_async(q_input, q_output, callable));
1608 q_output.d->continuation_origin = *this;
1609 q_output.d->is_continuation_async = true;
1610 return q_output;
1611 }
1612 }
1613
1614} // namespace cppext
Feature tag for future_queue: use std::move to store and retreive data to/from the queue.
Feature tag for future_queue: use std::swap to store and retreive data to/from the queue.
Exception to be pushed into the queue to signal a termination request for the internal thread of an a...
Type-independent base class for future_queue which does not depend on the template argument.
detail::shared_state_ptr d
pointer to data used to allow sharing the queue (create multiple copies which all refer to the same q...
bool operator==(const future_queue_base &other) const
Check whether two future_queue instances use the same shared state, i.e.
size_t setNotificationQueue(future_queue< size_t, MOVE_DATA > &notificationQueue, size_t indexToSend)
Set the notification queue in the shared state, as done in when_any.
void decrement_previous_data_counter()
Decrement the "previous data" counter used in when_any().
bool operator!=(const future_queue_base &other) const
void wait()
Wait until the queue is not empty.
bool push_exception(std::exception_ptr exception)
Push an exception pointer (inplace of a value) into the queue.
void update_read_index_max()
update readIndexMax after a write operation was completed
void send_notification(cppext::detail::shared_state_base *notification_queue)
Send notification to notification queue (if not nullptr).
cppext::detail::shared_state_base * get_notification_queue()
Atomically return the notification queue or increment the "previous data" counter (for wait_any).
friend future_queue< void, MOVE_DATA > when_all(ITERATOR_TYPE begin, ITERATOR_TYPE end)
This function expects two forward iterators pointing to a region of a container of future_queue objec...
size_t write_available() const
Number of push operations which can be performed before the queue is full.
bool empty()
Check if there is currently no data on the queue.
size_t read_available() const
Number of pop operations which can be performed before the queue is empty.
bool push_overwrite_exception(std::exception_ptr exception)
Like push_exception() but overwrite the last pushed value in case the queue is full.
bool obtain_write_slot(size_t &index)
reserve next available write slot.
friend future_queue< size_t, MOVE_DATA > when_any(ITERATOR_TYPE begin, ITERATOR_TYPE end)
Implementations of non-member functions.
size_t size() const
return length of the queue
A lockfree multi-producer single-consumer queue of a fixed length which the receiver can wait on in c...
void front() const
This front() is for void data types.
void pop_wait()
This pop_wait() is for all data types (for non-void data types the value will be discarded)
bool push_overwrite(U &&t)
Push object t to the queue.
future_queue(const future_queue &other)=default
Copy constructor: After copying the object both *this and the other object will refer to the same que...
bool push(U &&t)
Push object t to the queue.
bool pop()
This pop() is for all data types (for non-void data types the value will be discarded)
future_queue(size_t length)
The length specifies how many objects the queue can contain at a time.
future_queue< T2, FEATURES2 > then(CALLABLE callable, std::launch policy=std::launch::async)
Add continuation: Whenever there is a new element in the queue, process it with the callable and put ...
bool pop(U &t)
Pop object off the queue and store it in t.
future_queue & operator=(const future_queue &other)=default
Copy assignment operator: After the assignment both *this and the other object will refer to the same...
bool push_overwrite()
This version of push_overwrite() is valid only for T=void.
bool push(const U &t)
This push() is for non-void data types passed by Lvalue reference.
void pop_wait(U &t)
Pop object off the queue and store it in t.
U & front()
Obtain the front element of the queue without removing it.
future_queue()
The default constructor creates only a place holder which can later be assigned with a properly const...
bool push(void)
This version of push() is valid only for T=void.
bool push_overwrite(const U &t)
This push_overwrite() is for non-void data types passed by Lvalue reference.
continuation_process_async< T, FEATURES, TOUT, CALLABLE > make_continuation_process_async(future_queue< T, FEATURES > q_input, future_queue< TOUT > q_output, CALLABLE callable)
continuation_process_deferred< T, FEATURES, TOUT, CALLABLE > make_continuation_process_deferred(future_queue< T, FEATURES > q_input, future_queue< TOUT > q_output, CALLABLE callable)
void data_assign(T &a, T &&b, MOVE_DATA)
Helper function to realise the data assignment depending on the selected FEATURES tags.
continuation_process_deferred_wait< T, FEATURES, TOUT, CALLABLE > make_continuation_process_deferred_wait(future_queue< T, FEATURES > q_input, future_queue< TOUT > q_output, CALLABLE callable)
shared_state_ptr make_shared_state(size_t length)
future_queue< size_t > when_any(ITERATOR_TYPE begin, ITERATOR_TYPE end)
Implementations of non-member functions.
future_queue< void > when_all(ITERATOR_TYPE begin, ITERATOR_TYPE end)
This function expects two forward iterators pointing to a region of a container of future_queue objec...
continuation_process_async(future_queue< T, FEATURES > q_input_, future_queue< void > q_output_, CALLABLE callable_)
continuation_process_async(future_queue< void, FEATURES > q_input_, future_queue< TOUT > q_output_, CALLABLE callable_)
continuation_process_async(future_queue< void, FEATURES > q_input_, future_queue< void > q_output_, CALLABLE callable_)
continuation_process_async(future_queue< T, FEATURES > q_input_, future_queue< TOUT > q_output_, CALLABLE callable_)
continuation_process_deferred(future_queue< T, FEATURES > q_input_, future_queue< void > q_output_, CALLABLE callable_)
continuation_process_deferred(future_queue< void, FEATURES > q_input_, future_queue< TOUT > q_output_, CALLABLE callable_)
continuation_process_deferred(future_queue< void, FEATURES > q_input_, future_queue< void > q_output_, CALLABLE callable_)
continuation_process_deferred_wait(future_queue< T, FEATURES > q_input_, future_queue< void > q_output_, CALLABLE callable_)
continuation_process_deferred_wait(future_queue< void, FEATURES > q_input_, future_queue< TOUT > q_output_, CALLABLE callable_)
continuation_process_deferred_wait(future_queue< void, FEATURES > q_input_, future_queue< void > q_output_, CALLABLE callable_)
continuation_process_deferred_wait(future_queue< T, FEATURES > q_input_, future_queue< TOUT > q_output_, CALLABLE callable_)
continuation_process_deferred(future_queue< T, FEATURES > q_input_, future_queue< TOUT > q_output_, CALLABLE callable_)
Internal base class for holding the data which is shared between multiple instances of the same queue...
std::atomic< size_t > writeIndex
index of the element which will be next written
bool is_continuation_when_all
Flag whether this future_queue is a when_all-type continuation (of many other)
std::vector< semaphore > semaphores
vector of semaphores corresponding to the buffers which allows the receiver to wait for new data
future_queue_base continuation_origin
If either is_continuation_deferred or is_continuation_async is true, this will point to the original ...
size_t nBuffers
the number of buffers we have allocated
bool is_continuation_deferred
Flag whether this future_queue is a deferred-type continuation of another.
std::atomic< size_t > when_any_index
index used in wait_any to identify the queue
std::atomic< when_any_notification_info > when_any_notification
std::atomic< bool > continuation_process_async_terminated
Flag whether the internal thread continuation_process_async has been terminated.
std::vector< std::exception_ptr > exceptions
vector of exception pointers, can be set instead of values through push_exception()
std::atomic< size_t > reference_count
reference count.
std::function< void(void)> continuation_process_deferred
Function to be called for deferred evaulation of a single value if this queue is a continuation.
std::atomic< size_t > readIndex
index of the element which will be next read
bool is_continuation_async
Flag whether this future_queue is a async-type continuation of another.
bool hasFrontOwnership
Flag if the receiver has already ownership over the front element.
std::thread continuation_process_async
Thread handling async-type continuations.
void free()
Decreaces the reference count and calls "delete this" where appropriate.
std::atomic< size_t > readIndexMax
maximum index which the receiver is currently allowed to read (after checking it semaphore).
virtual ~shared_state_base()
Destructor must be virtual so the destructor of the derived class gets called.
std::function< void(void)> continuation_process_deferred_wait
Function to be called for deferred evaulation of a single value if this queue is a continuation.
shared_ptr-like smart pointer type for referencing the shared_state.
void make_new(size_t length)
Create new shared_state for type T.
shared_state_base * operator->()
Dereferencing operator.
shared_state< T > * cast()
Cast into shared state for type T.
shared_state_ptr & operator=(const shared_state_ptr &other)
Copy by assignment.
shared_state_base * get() const
Obtain the target pointer.
void set(shared_state_base *ptr_)
Set the target pointer without incrementing the reference counter.
shared_state_ptr()
Default constructor: create empty pointer.
bool operator==(const shared_state_ptr &other) const
Check if two pointers are identical.
Internal class for holding the data which is shared between multiple instances of the same queue.
std::vector< T > buffers
vector of buffers - allocation is done in the constructor
cppext::detail::shared_state_base * notifyerQueue
Notification queue used to realise a wait_any logic.
size_t notifyerQueue_previousData
counter for the number of elements in the queue before when_any has added the notifyerQueue