ChimeraTK-DeviceAccess 03.25.00
Loading...
Searching...
No Matches
ReadAnyGroup.h
Go to the documentation of this file.
1// SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
2// SPDX-License-Identifier: LGPL-3.0-or-later
3#pragma once
4
5#include "TransferElement.h"
7
8#include <ChimeraTK/cppext/future_queue.hpp>
9
10#include <functional>
11#include <initializer_list>
12
13namespace ChimeraTK {
14
15 namespace DataConsistencyGroupDetail {
16 class HistorizedMatcher;
17 } // namespace DataConsistencyGroupDetail
18
26 // friend because it needs to decorate our push_elements
28
29 public:
35 public:
36 friend class ReadAnyGroup;
37
43
48 Notification(Notification&& other) noexcept;
49
54
61 Notification& operator=(Notification&& other) noexcept;
62
74 bool accept();
75
81 [[nodiscard]] TransferElementID getId();
82
89 [[nodiscard]] std::size_t getIndex() const;
90
97
101 [[nodiscard]] bool isReady() const;
102
103 /*
104 * Notifications cannot be copied because each notification can only be accepted once.
105 */
106 Notification(const Notification&) = delete;
108
109 private:
114 Notification(std::size_t index, ReadAnyGroup* owner);
115
117 bool accepted{false};
118
120 std::size_t index{};
121
123 bool valid{false};
124
126 ReadAnyGroup* _owner{nullptr};
127 };
128
133
137 ReadAnyGroup(std::initializer_list<std::reference_wrapper<TransferElementAbstractor>> list);
138
142 ReadAnyGroup(std::initializer_list<boost::shared_ptr<TransferElement>> list);
143
147 template<typename ITERATOR>
148 ReadAnyGroup(ITERATOR first, ITERATOR last);
149
159 void add(TransferElementAbstractor& element);
160
161 ReadAnyGroup(const ReadAnyGroup&) = delete;
162 ReadAnyGroup(ReadAnyGroup&& other) noexcept { operator=(std::move(other)); }
163 ReadAnyGroup& operator=(ReadAnyGroup&& other) noexcept;
164
168 void add(boost::shared_ptr<TransferElement> element);
169
185 void finalise();
186
203
212
224 void readUntil(const TransferElementID& id);
225
229 void readUntil(const TransferElementAbstractor& element);
230
242 void readUntilAll(const std::vector<TransferElementID>& ids);
243
247 void readUntilAll(const std::vector<TransferElementAbstractor>& elements);
248
260 Notification waitAny();
261
270 Notification waitAnyNonBlocking();
271
277 void processPolled();
278
283 void interrupt() { push_elements.front().getHighLevelImplElement()->interrupt(); }
284
285 private:
287 void handlePreRead();
288
290 bool isFinalised{false};
291
293 std::vector<TransferElementAbstractor> push_elements;
294
296 std::vector<TransferElementAbstractor> poll_elements;
297
299 cppext::future_queue<size_t> notification_queue;
300
304 size_t _lastOperationIndex{std::numeric_limits<size_t>::max()};
305 };
306
307 /********************************************************************************************************************/
308 /* Implementations of ReadAnyGroup::Notification */
309 /********************************************************************************************************************/
310
312
313 /********************************************************************************************************************/
314
316 : accepted(other.accepted), index(other.index), valid(other.valid), _owner(other._owner) {
317 other.valid = false;
318 }
319
320 /********************************************************************************************************************/
321
323 // It is important that each received notification is consumed. This means that we have to accept a notification
324 // before we can destroy it.
325 if(this->valid && !this->accepted) {
326 try {
327 this->accept();
328 }
329 catch(...) {
330 // Do not let exceptions escape the destructor, rather terminate the application instead. This will still print
331 // the exception message.
332 std::terminate();
333 }
334 }
335 }
336
337 /********************************************************************************************************************/
338
340 // It is important that each received notification is consumed. This means that we have to accept this notification
341 // before we can overwrite it with another one.
342 if(this->valid && !this->accepted) {
343 try {
344 this->accept();
345 }
346 catch(...) {
347 // Do not let exceptions escape the move operation, rather terminate the application instead. This will still
348 // print the exception message.
349 std::terminate();
350 }
351 }
352 this->accepted = other.accepted;
353 this->index = other.index;
354 this->valid = other.valid;
355 this->_owner = other._owner;
356 other.valid = false;
357 return *this;
358 }
359
360 /********************************************************************************************************************/
361
363 if(!this->valid) {
364 throw ChimeraTK::logic_error("This notification object is invalid.");
365 }
366 if(this->accepted) {
367 throw ChimeraTK::logic_error("This notification has already been accepted.");
368 }
369 this->accepted = true;
370 bool hasSeenException = false;
371 try {
372 _owner->push_elements[index].getHighLevelImplElement()->_readQueue.pop_wait();
373 }
375 _owner->push_elements[index].getHighLevelImplElement()->_activeException = std::current_exception();
376 hasSeenException = true;
377 }
378 catch(boost::thread_interrupted&) {
379 _owner->push_elements[index].getHighLevelImplElement()->_activeException = std::current_exception();
380 hasSeenException = true;
381 }
382 catch(detail::DiscardValueException&) {
383 // we must not call postRead() in this case, hence we do not call preRead()
384 _owner->_lastOperationIndex = std::numeric_limits<size_t>::max() - 1;
385 return false;
386 }
387 _owner->_lastOperationIndex = index;
388 _owner->push_elements[index].getHighLevelImplElement()->postRead(TransferType::read, !hasSeenException);
389 return true;
390 }
391
392 /********************************************************************************************************************/
393
395 if(!this->valid) {
396 throw ChimeraTK::logic_error("This notification object is invalid.");
397 }
398 return _owner->push_elements[index].getId();
399 }
400
401 /********************************************************************************************************************/
402
403 inline std::size_t ReadAnyGroup::Notification::getIndex() const {
404 if(!this->valid) {
405 throw ChimeraTK::logic_error("This notification object is invalid.");
406 }
407 return this->index;
408 }
409
410 /********************************************************************************************************************/
411
413 if(!this->valid) {
414 throw ChimeraTK::logic_error("This notification object is invalid.");
415 }
416 return _owner->push_elements[index];
417 }
418
419 /********************************************************************************************************************/
420
422 return this->valid && !this->accepted;
423 }
424
425 /********************************************************************************************************************/
426
427 inline ReadAnyGroup::Notification::Notification(std::size_t index_, ReadAnyGroup* owner)
428 : index(index_), valid(true), _owner(owner) {}
429
430 /********************************************************************************************************************/
431 /* Implementations of ReadAnyGroup */
432 /********************************************************************************************************************/
433
434 inline ReadAnyGroup::ReadAnyGroup() = default;
435
436 /********************************************************************************************************************/
437
438 inline ReadAnyGroup::ReadAnyGroup(std::initializer_list<std::reference_wrapper<TransferElementAbstractor>> list) {
439 for(const auto& element : list) add(element);
440 finalise();
441 }
442
443 /********************************************************************************************************************/
444
445 inline ReadAnyGroup::ReadAnyGroup(std::initializer_list<boost::shared_ptr<TransferElement>> list) {
446 for(const auto& element : list) add(element);
447 finalise();
448 }
449
450 /********************************************************************************************************************/
451
452 template<typename ITERATOR>
453 ReadAnyGroup::ReadAnyGroup(ITERATOR first, ITERATOR last) {
454 for(ITERATOR it = first; it != last; ++it) add(*it);
455 finalise();
456 }
457
458 /********************************************************************************************************************/
459
461 // we need non-default implementation because we have to move pointers to ReadAnyGroup
462 this->isFinalised = other.isFinalised;
463 this->push_elements = std::move(other.push_elements);
464 this->poll_elements = std::move(other.poll_elements);
465 this->_lastOperationIndex = other._lastOperationIndex;
466 this->notification_queue = std::move(other.notification_queue);
467 for(auto& e : push_elements) {
468 e.getHighLevelImplElement()->setInReadAnyGroup(this);
469 }
470 return *this;
471 }
472
473 /********************************************************************************************************************/
474
476 if(isFinalised) {
477 throw ChimeraTK::logic_error("ReadAnyGroup has already been finalised, calling "
478 "add() is no longer allowed.");
479 }
480 if(!element.isReadable()) {
482 "Cannot add non-readable accessor for register " + element.getName() + " to ReadAnyGroup.");
483 }
484 if(element.getReadAnyGroup() == this) {
485 return;
486 }
487 if(element.getReadAnyGroup() != nullptr) {
488 throw ChimeraTK::logic_error(element.getName() + " is already in a different ReadAnyGroup");
489 }
491 push_elements.push_back(element);
492 }
493 else {
494 poll_elements.push_back(element);
495 }
496 // set flag on the accessor that it is now in a ReadAnyGroup:
497 // We do this for push-types only, since poll-types technically still allow calling read() without the ReadAnyGroup,
498 // although its documentation states that would not be allowed.
500 element.getHighLevelImplElement()->setInReadAnyGroup(this);
501 }
502 }
503
504 /********************************************************************************************************************/
505
506 inline void ReadAnyGroup::add(boost::shared_ptr<TransferElement> element) {
507 TransferElementAbstractor a(std::move(element));
508 add(a);
509 }
510
511 /********************************************************************************************************************/
512
514 if(isFinalised) {
515 throw ChimeraTK::logic_error("ReadAnyGroup has already been finalised, calling "
516 "finalise() is no longer allowed.");
517 }
518 std::vector<cppext::future_queue<void>> queueList;
519 bool groupEmpty = true;
520 for(auto& e : push_elements) {
521 queueList.push_back(e.getHighLevelImplElement()->_readQueue);
522 groupEmpty = false;
523 }
524 if(groupEmpty) {
525 throw ChimeraTK::logic_error("ReadAnyGroup has no element with AccessMode::wait_for_new_data.");
526 }
527 notification_queue = cppext::when_any(queueList.begin(), queueList.end());
528 isFinalised = true;
529 }
530
531 /********************************************************************************************************************/
532
534 Notification notification;
535 do {
536 notification = this->waitAny();
537 } while(!notification.accept());
538
539 this->processPolled();
540
541 return notification.getId();
542 }
543
544 /********************************************************************************************************************/
545
547 Notification notification;
548 do {
549 notification = this->waitAnyNonBlocking();
550 if(!notification.isReady()) {
551 this->processPolled();
552 return {};
553 }
554 } while(!notification.accept());
555
556 this->processPolled();
557
558 return notification.getId();
559 }
560 /********************************************************************************************************************/
561
562 inline void ReadAnyGroup::handlePreRead() {
563 // preRead() and postRead() must be called in pairs. Hence we call all preReads here before waiting for transfers to
564 // finish. postRead() will be called when accepting the notification. We can call preRead() repeatedly on the same
565 // element, even if no transfer and call to postRead() have happened. It is just ignored (see Transfer element spec
566 // B.5.2). Since this has a performance impact which might be significant on big applications, we try to avoid
567 // unnecessary calls anyway.
568
569 // Notice : This has the side effect that decorators can block here, for instance for the setup phase. This is used
570 // by ApplicationCore in testable mode.
571 if(_lastOperationIndex == std::numeric_limits<size_t>::max()) {
572 for(auto& elem : push_elements) {
573 elem.getHighLevelImplElement()->preRead(TransferType::read);
574 }
575 }
576 else if(_lastOperationIndex != std::numeric_limits<size_t>::max() - 1) {
577 // Note: _lastOperationIndex is set to std::numeric_limits<size_t>::max() - 1 in case a DiscardValueException
578 // has been seen, in which case no postRead() is called.
579 push_elements[_lastOperationIndex].getHighLevelImplElement()->preRead(TransferType::read);
580 }
581 }
582
583 /********************************************************************************************************************/
584
586 handlePreRead();
587
588 // Wait for notification
589 std::size_t index;
590 notification_queue.pop_wait(index);
591 // clazy has a false positive warning about index being uninitialised.
592 // NOLINTNEXTLINE(clang-analyzer-core.CallAndMessage)
593 return {index, this};
594 }
595
596 /********************************************************************************************************************/
597
599 restart_after_discard_value:
600 // check if update is available
601 if(notification_queue.empty()) {
602 // If no notification is present, do not even execute preRead. This is necessary for two reasons:
603 // - We always used TransferType::read to avoid mixing TransferType::read and TransferType::readNonBlocking
604 // in the same transfer of the same variable. We can do this even in this non-blocking case, if we already know
605 // that there will be an update read, since there will not be any difference beyond this point.
606 // - In ApplicationCore testable mode, the testable mode lock must be released in a preRead before any blocking
607 // read operation. If preRead is called here when no update is available, no preRead will be called in a
608 // possible subsequend blocking readAny(), hence there would be no way to release the testable mode lock in the
609 // right place.
610 return {};
611 }
612
613 // if update is available, peek into the queue to check whether a DiscardValueException will be read
614 auto id = notification_queue.front();
615 try {
616 // call to empty() necessary before call to front(), to gain ownership of front element
617 if(push_elements[id].getHighLevelImplElement()->_readQueue.empty()) {
618 // cannot place the call to empty() into the assert, since it must be executed also in release builds
619 assert(false);
620 }
621 push_elements[id].getHighLevelImplElement()->_readQueue.front();
622 }
623 catch(detail::DiscardValueException&) {
624 // Remove discarded transfer from the queues and go back to square one
625 notification_queue.pop();
626 try {
627 push_elements[id].getHighLevelImplElement()->_readQueue.pop();
628 }
629 catch(detail::DiscardValueException&) {
630 goto restart_after_discard_value;
631 }
632 assert(false); // we must never end up at this point
633 }
635 // While peeking we found another runtime which is stored in the queue.
636 // Don't let it through, but leave it on the queue and continue with the waitAny().
637 // It will be handled later.
638 }
639 catch(boost::thread_interrupted&) {
640 // Also suppress the thread_interrupted exception here.
641 // It will stay on the queue, hence it's not lost and will be handled later.
642 }
643 catch(...) {
644 std::cout << "Fatal ERROR in ReadAnyGroup: Found unexpected exception on the read queue. Terminating!"
645 << std::endl;
646 std::terminate();
647 }
648
649 // now that we know that an update is available, we can defer to waitAny()
650 return waitAny();
651 }
652
653 /********************************************************************************************************************/
654
656 // update all poll-type elements in the group
657 for(auto& e : poll_elements) {
658 if(!e.getAccessModeFlags().has(AccessMode::wait_for_new_data)) e.readLatest();
659 }
660 }
661
662 /********************************************************************************************************************/
663
665 while(true) {
666 auto read = readAny();
667 if(read == id) return;
668 }
669 }
670
671 /********************************************************************************************************************/
672
674 readUntil(element.getId());
675 }
676
677 /********************************************************************************************************************/
678
679 inline void ReadAnyGroup::readUntilAll(const std::vector<TransferElementID>& ids) {
680 std::map<TransferElementID, bool> found;
681 for(const auto& id : ids) {
682 found[id] = false; // initialise map so we can tell from the map if a
683 // variable is in the vector
684 }
685 size_t leftToFind = ids.size();
686 while(true) {
687 auto read = readAny();
688 if(found.count(read) == 0) continue; // variable is not in the vector ids
689 if(found[read]) continue; // variable has been read already
690 found[read] = true;
691 --leftToFind;
692 if(leftToFind == 0) return;
693 }
694 }
695
696 /********************************************************************************************************************/
697
698 inline void ReadAnyGroup::readUntilAll(const std::vector<TransferElementAbstractor>& elements) {
699 std::map<TransferElementID, bool> found;
700 for(const auto& elem : elements) {
701 found[elem.getId()] = false; // initialise map so we can tell from the map
702 // if a variable is in the vector
703 }
704 size_t leftToFind = elements.size();
705 while(true) {
706 auto read = readAny();
707 if(found.count(read) == 0) continue; // variable is not in the vector elements
708 if(found[read]) continue; // variable has been read already
709 found[read] = true;
710 --leftToFind;
711 if(leftToFind == 0) return;
712 }
713 }
714
715 /********************************************************************************************************************/
716
717} /* namespace ChimeraTK */
bool has(AccessMode flag) const
Check if a certain flag is in the set.
Definition AccessMode.cc:20
Data consistency matching via history of available data.
Notification object returned by waitAny().
bool accept()
Accept the notification.
TransferElementID getId()
Return the ID of the transfer element for which this notification has been generated.
TransferElementAbstractor getTransferElement()
Return the transfer element for which this notification has been generated.
Notification & operator=(Notification &&other) noexcept
Assign this notification from another notification.
bool isReady() const
Tell whether this notification is valid and has not been accepted yet.
Notification()
Create an empty notification.
Notification(const Notification &)=delete
Notification & operator=(const Notification &)=delete
std::size_t getIndex() const
Return the index of the transfer element for which this notifiaction has been generated.
Group several registers (= TransferElement) to allow waiting for an update of any of the registers.
Notification waitAny()
Wait until one of the elements received an update notification, but do not actually process the updat...
ReadAnyGroup(ReadAnyGroup &&other) noexcept
TransferElementID readAny()
Wait until one of the elements in this group has received an update.
ReadAnyGroup(const ReadAnyGroup &)=delete
void add(TransferElementAbstractor &element)
Add register to group.
TransferElementID readAnyNonBlocking()
Read the next available update in the group, but do not block if no update is available.
void readUntilAll(const std::vector< TransferElementID > &ids)
Wait until all of the given TransferElements has received an update and store it to its user buffer.
ReadAnyGroup & operator=(ReadAnyGroup &&other) noexcept
void readUntil(const TransferElementID &id)
Wait until the given TransferElement has received an update and store it to its user buffer.
void processPolled()
Process polled transfer elements (update them if new values are available).
Notification waitAnyNonBlocking()
Check if an update is available in the group, but do not block if no update is available.
void finalise()
Finalise the group.
void interrupt()
Convenience function to interrupt any running readAny/waitAny by calling interrupt on one of the push...
ReadAnyGroup()
Construct empty group.
Base class for register accessors abstractors independent of the UserType.
const boost::shared_ptr< TransferElement > & getHighLevelImplElement()
Obtain the highest level implementation TransferElement.
bool isReadable() const
Check if transfer element is readable.
ReadAnyGroup * getReadAnyGroup() const
Obtain the ReadAnyGroup this TransferElement is part of, or nullptr if not in a ReadAnyGroup.
AccessModeFlags getAccessModeFlags() const
Return the AccessModeFlags for this TransferElement.
TransferElementID getId() const
Obtain unique ID for the actual implementation of this TransferElement.
const std::string & getName() const
Returns the name that identifies the process variable.
Simple class holding a unique ID for a TransferElement.
Exception thrown when a logic error has occured.
Definition Exception.h:51
Exception thrown when a runtime error has occured.
Definition Exception.h:18
@ wait_for_new_data
Make any read blocking until new data has arrived since the last read.