7 #include <initializer_list>
9 #include <ChimeraTK/cppext/future_queue.hpp>
82 [[nodiscard]] std::size_t
getIndex()
const;
94 [[nodiscard]]
bool isReady()
const;
110 bool accepted{
false};
130 ReadAnyGroup(std::initializer_list<std::reference_wrapper<TransferElementAbstractor>> list);
135 ReadAnyGroup(std::initializer_list<boost::shared_ptr<TransferElement>> list);
140 template<
typename ITERATOR>
152 void add(TransferElementAbstractor& element);
157 void add(boost::shared_ptr<TransferElement> element);
213 void readUntil(
const TransferElementID&
id);
218 void readUntil(
const TransferElementAbstractor& element);
231 void readUntilAll(
const std::vector<TransferElementID>& ids);
236 void readUntilAll(
const std::vector<TransferElementAbstractor>& elements);
272 void interrupt() { push_elements.front().getHighLevelImplElement()->interrupt(); }
276 void handlePreRead();
279 bool isFinalised{
false};
282 std::vector<TransferElementAbstractor> push_elements;
285 std::vector<TransferElementAbstractor> poll_elements;
288 cppext::future_queue<size_t> notification_queue;
293 size_t _lastOperationIndex{std::numeric_limits<size_t>::max()};
305 : accepted(other.accepted), index(other.index), valid(other.valid), _owner(other._owner) {
314 if(this->valid && !this->accepted) {
331 if(this->valid && !this->accepted) {
341 this->accepted = other.accepted;
342 this->index = other.index;
343 this->valid = other.valid;
344 this->_owner = other._owner;
353 throw std::logic_error(
"This notification object is invalid.");
356 throw std::logic_error(
"This notification has already been accepted.");
358 this->accepted =
true;
360 _owner->push_elements[index].getHighLevelImplElement()->_readQueue.pop_wait();
363 _owner->push_elements[index].getHighLevelImplElement()->_activeException = std::current_exception();
365 catch(boost::thread_interrupted&) {
366 _owner->push_elements[index].getHighLevelImplElement()->_activeException = std::current_exception();
368 catch(detail::DiscardValueException&) {
370 _owner->_lastOperationIndex = std::numeric_limits<size_t>::max() - 1;
373 _owner->_lastOperationIndex = index;
374 _owner->push_elements[index].getHighLevelImplElement()->postRead(
TransferType::read,
true);
382 throw std::logic_error(
"This notification object is invalid.");
384 return _owner->push_elements[index].getId();
391 throw std::logic_error(
"This notification object is invalid.");
400 throw std::logic_error(
"This notification object is invalid.");
402 return _owner->push_elements[index];
408 return this->valid && !this->accepted;
414 : index(index_), valid(true), _owner(owner) {}
425 for(
const auto& element : list)
add(element);
432 for(
const auto& element : list)
add(element);
438 template<
typename ITERATOR>
440 for(ITERATOR it = first; it != last; ++it)
add(*it);
448 throw std::logic_error(
"ReadAnyGroup has already been finalised, calling "
449 "add() is no longer allowed.");
452 throw std::logic_error(
453 "Cannot add non-readable accessor for register " + element.
getName() +
" to ReadAnyGroup.");
456 push_elements.push_back(element);
459 poll_elements.push_back(element);
480 throw std::logic_error(
"ReadAnyGroup has already been finalised, calling "
481 "finalise() is no longer allowed.");
483 std::vector<cppext::future_queue<void>> queueList;
484 bool groupEmpty =
true;
485 for(
auto& e : push_elements) {
486 queueList.push_back(e.getHighLevelImplElement()->_readQueue);
490 throw std::logic_error(
"ReadAnyGroup has no element with AccessMode::wait_for_new_data.");
492 notification_queue = cppext::when_any(queueList.begin(), queueList.end());
501 notification = this->
waitAny();
502 }
while(!notification.
accept());
506 return notification.
getId();
519 }
while(!notification.
accept());
523 return notification.
getId();
527 inline void ReadAnyGroup::handlePreRead() {
536 if(_lastOperationIndex == std::numeric_limits<size_t>::max()) {
537 for(
auto& elem : push_elements) {
541 else if(_lastOperationIndex != std::numeric_limits<size_t>::max() - 1) {
544 push_elements[_lastOperationIndex].getHighLevelImplElement()->preRead(
TransferType::read);
555 notification_queue.pop_wait(index);
558 return {index,
this};
564 restart_after_discard_value:
566 if(notification_queue.empty()) {
579 auto id = notification_queue.front();
582 if(push_elements[
id].getHighLevelImplElement()->_readQueue.empty()) {
586 push_elements[id].getHighLevelImplElement()->_readQueue.front();
588 catch(detail::DiscardValueException&) {
590 notification_queue.pop();
592 push_elements[id].getHighLevelImplElement()->_readQueue.pop();
594 catch(detail::DiscardValueException&) {
595 goto restart_after_discard_value;
604 catch(boost::thread_interrupted&) {
609 std::cout <<
"Fatal ERROR in ReadAnyGroup: Found unexpected exception on the read queue. Terminating!"
622 for(
auto& e : poll_elements) {
632 if(
read ==
id)
return;
645 std::map<TransferElementID, bool> found;
646 for(
const auto&
id : ids) {
650 size_t leftToFind = ids.size();
653 if(found.count(
read) == 0)
continue;
654 if(found[
read])
continue;
657 if(leftToFind == 0)
return;
664 std::map<TransferElementID, bool> found;
665 for(
const auto& elem : elements) {
666 found[elem.getId()] =
false;
669 size_t leftToFind = elements.size();
672 if(found.count(
read) == 0)
continue;
673 if(found[
read])
continue;
676 if(leftToFind == 0)
return;