8#include <ChimeraTK/cppext/future_queue.hpp>
11#include <initializer_list>
15 namespace DataConsistencyGroupDetail {
16 class HistorizedMatcher;
89 [[nodiscard]] std::size_t
getIndex()
const;
101 [[nodiscard]]
bool isReady()
const;
117 bool accepted{
false};
137 ReadAnyGroup(std::initializer_list<std::reference_wrapper<TransferElementAbstractor>> list);
142 ReadAnyGroup(std::initializer_list<boost::shared_ptr<TransferElement>> list);
147 template<
typename ITERATOR>
168 void add(boost::shared_ptr<TransferElement> element);
242 void readUntilAll(
const std::vector<TransferElementID>& ids);
247 void readUntilAll(
const std::vector<TransferElementAbstractor>& elements);
283 void interrupt() { push_elements.front().getHighLevelImplElement()->interrupt(); }
287 void handlePreRead();
290 bool isFinalised{
false};
293 std::vector<TransferElementAbstractor> push_elements;
296 std::vector<TransferElementAbstractor> poll_elements;
299 cppext::future_queue<size_t> notification_queue;
304 size_t _lastOperationIndex{std::numeric_limits<size_t>::max()};
316 : accepted(other.accepted), index(other.index), valid(other.valid), _owner(other._owner) {
325 if(this->valid && !this->accepted) {
342 if(this->valid && !this->accepted) {
352 this->accepted = other.accepted;
353 this->index = other.index;
354 this->valid = other.valid;
355 this->_owner = other._owner;
369 this->accepted =
true;
370 bool hasSeenException =
false;
372 _owner->push_elements[index].getHighLevelImplElement()->_readQueue.pop_wait();
375 _owner->push_elements[index].getHighLevelImplElement()->_activeException = std::current_exception();
376 hasSeenException =
true;
378 catch(boost::thread_interrupted&) {
379 _owner->push_elements[index].getHighLevelImplElement()->_activeException = std::current_exception();
380 hasSeenException =
true;
382 catch(detail::DiscardValueException&) {
384 _owner->_lastOperationIndex = std::numeric_limits<size_t>::max() - 1;
387 _owner->_lastOperationIndex = index;
388 _owner->push_elements[index].getHighLevelImplElement()->postRead(
TransferType::read, !hasSeenException);
398 return _owner->push_elements[index].getId();
416 return _owner->push_elements[index];
422 return this->valid && !this->accepted;
428 : index(index_), valid(true), _owner(owner) {}
439 for(
const auto& element : list)
add(element);
446 for(
const auto& element : list)
add(element);
452 template<
typename ITERATOR>
454 for(ITERATOR it = first; it != last; ++it)
add(*it);
462 this->isFinalised = other.isFinalised;
463 this->push_elements = std::move(other.push_elements);
464 this->poll_elements = std::move(other.poll_elements);
465 this->_lastOperationIndex = other._lastOperationIndex;
466 this->notification_queue = std::move(other.notification_queue);
467 for(
auto& e : push_elements) {
468 e.getHighLevelImplElement()->setInReadAnyGroup(
this);
478 "add() is no longer allowed.");
482 "Cannot add non-readable accessor for register " + element.
getName() +
" to ReadAnyGroup.");
491 push_elements.push_back(element);
494 poll_elements.push_back(element);
516 "finalise() is no longer allowed.");
518 std::vector<cppext::future_queue<void>> queueList;
519 bool groupEmpty =
true;
520 for(
auto& e : push_elements) {
521 queueList.push_back(e.getHighLevelImplElement()->_readQueue);
527 notification_queue = cppext::when_any(queueList.begin(), queueList.end());
536 notification = this->
waitAny();
537 }
while(!notification.
accept());
541 return notification.
getId();
554 }
while(!notification.
accept());
558 return notification.
getId();
562 inline void ReadAnyGroup::handlePreRead() {
571 if(_lastOperationIndex == std::numeric_limits<size_t>::max()) {
572 for(
auto& elem : push_elements) {
576 else if(_lastOperationIndex != std::numeric_limits<size_t>::max() - 1) {
579 push_elements[_lastOperationIndex].getHighLevelImplElement()->preRead(
TransferType::read);
590 notification_queue.pop_wait(index);
593 return {index,
this};
599 restart_after_discard_value:
601 if(notification_queue.empty()) {
614 auto id = notification_queue.front();
617 if(push_elements[
id].getHighLevelImplElement()->_readQueue.empty()) {
621 push_elements[id].getHighLevelImplElement()->_readQueue.front();
623 catch(detail::DiscardValueException&) {
625 notification_queue.pop();
627 push_elements[id].getHighLevelImplElement()->_readQueue.pop();
629 catch(detail::DiscardValueException&) {
630 goto restart_after_discard_value;
639 catch(boost::thread_interrupted&) {
644 std::cout <<
"Fatal ERROR in ReadAnyGroup: Found unexpected exception on the read queue. Terminating!"
657 for(
auto& e : poll_elements) {
667 if(
read ==
id)
return;
680 std::map<TransferElementID, bool> found;
681 for(
const auto&
id : ids) {
685 size_t leftToFind = ids.size();
688 if(found.count(
read) == 0)
continue;
689 if(found[
read])
continue;
692 if(leftToFind == 0)
return;
699 std::map<TransferElementID, bool> found;
700 for(
const auto& elem : elements) {
701 found[elem.getId()] =
false;
704 size_t leftToFind = elements.size();
707 if(found.count(
read) == 0)
continue;
708 if(found[
read])
continue;
711 if(leftToFind == 0)
return;
bool has(AccessMode flag) const
Check if a certain flag is in the set.
Data consistency matching via history of available data.
Notification object returned by waitAny().
bool accept()
Accept the notification.
TransferElementID getId()
Return the ID of the transfer element for which this notification has been generated.
TransferElementAbstractor getTransferElement()
Return the transfer element for which this notification has been generated.
Notification & operator=(Notification &&other) noexcept
Assign this notification from another notification.
bool isReady() const
Tell whether this notification is valid and has not been accepted yet.
~Notification()
Destructor.
Notification()
Create an empty notification.
friend class ReadAnyGroup
Notification(const Notification &)=delete
Notification & operator=(const Notification &)=delete
std::size_t getIndex() const
Return the index of the transfer element for which this notifiaction has been generated.
Group several registers (= TransferElement) to allow waiting for an update of any of the registers.
Notification waitAny()
Wait until one of the elements received an update notification, but do not actually process the updat...
ReadAnyGroup(ReadAnyGroup &&other) noexcept
TransferElementID readAny()
Wait until one of the elements in this group has received an update.
ReadAnyGroup(const ReadAnyGroup &)=delete
void add(TransferElementAbstractor &element)
Add register to group.
TransferElementID readAnyNonBlocking()
Read the next available update in the group, but do not block if no update is available.
void readUntilAll(const std::vector< TransferElementID > &ids)
Wait until all of the given TransferElements has received an update and store it to its user buffer.
ReadAnyGroup & operator=(ReadAnyGroup &&other) noexcept
void readUntil(const TransferElementID &id)
Wait until the given TransferElement has received an update and store it to its user buffer.
void processPolled()
Process polled transfer elements (update them if new values are available).
Notification waitAnyNonBlocking()
Check if an update is available in the group, but do not block if no update is available.
void finalise()
Finalise the group.
void interrupt()
Convenience function to interrupt any running readAny/waitAny by calling interrupt on one of the push...
ReadAnyGroup()
Construct empty group.
Base class for register accessors abstractors independent of the UserType.
const boost::shared_ptr< TransferElement > & getHighLevelImplElement()
Obtain the highest level implementation TransferElement.
bool isReadable() const
Check if transfer element is readable.
ReadAnyGroup * getReadAnyGroup() const
Obtain the ReadAnyGroup this TransferElement is part of, or nullptr if not in a ReadAnyGroup.
AccessModeFlags getAccessModeFlags() const
Return the AccessModeFlags for this TransferElement.
TransferElementID getId() const
Obtain unique ID for the actual implementation of this TransferElement.
const std::string & getName() const
Returns the name that identifies the process variable.
Simple class holding a unique ID for a TransferElement.
Exception thrown when a logic error has occured.
Exception thrown when a runtime error has occured.
@ wait_for_new_data
Make any read blocking until new data has arrived since the last read.