ChimeraTK-DeviceAccess  03.18.00
ReadAnyGroup.h
Go to the documentation of this file.
1 // SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
2 // SPDX-License-Identifier: LGPL-3.0-or-later
3 #pragma once
4 
5 #include "TransferElement.h"
7 #include <initializer_list>
8 
9 #include <ChimeraTK/cppext/future_queue.hpp>
10 
11 #include <functional>
12 
13 namespace ChimeraTK {
14 
21  class ReadAnyGroup {
22  public:
27  class Notification {
28  public:
29  friend class ReadAnyGroup;
30 
35  Notification();
36 
41  Notification(Notification&& other) noexcept;
42 
46  ~Notification();
47 
54  Notification& operator=(Notification&& other) noexcept;
55 
67  bool accept();
68 
74  [[nodiscard]] TransferElementID getId();
75 
82  [[nodiscard]] std::size_t getIndex() const;
83 
90 
94  [[nodiscard]] bool isReady() const;
95 
96  /*
97  * Notifications cannot be copied because each notification can only be accepted once.
98  */
99  Notification(const Notification&) = delete;
100  Notification& operator=(const Notification&) = delete;
101 
102  private:
107  Notification(std::size_t index, ReadAnyGroup* owner);
108 
110  bool accepted{false};
111 
113  std::size_t index{};
114 
116  bool valid{false};
117 
119  ReadAnyGroup* _owner{nullptr};
120  };
121 
125  ReadAnyGroup();
126 
130  ReadAnyGroup(std::initializer_list<std::reference_wrapper<TransferElementAbstractor>> list);
131 
135  ReadAnyGroup(std::initializer_list<boost::shared_ptr<TransferElement>> list);
136 
140  template<typename ITERATOR>
141  ReadAnyGroup(ITERATOR first, ITERATOR last);
142 
152  void add(TransferElementAbstractor& element);
153 
157  void add(boost::shared_ptr<TransferElement> element);
158 
174  void finalise();
175 
191  TransferElementID readAny();
192 
200  TransferElementID readAnyNonBlocking();
201 
213  void readUntil(const TransferElementID& id);
214 
218  void readUntil(const TransferElementAbstractor& element);
219 
231  void readUntilAll(const std::vector<TransferElementID>& ids);
232 
236  void readUntilAll(const std::vector<TransferElementAbstractor>& elements);
237 
249  Notification waitAny();
250 
259  Notification waitAnyNonBlocking();
260 
266  void processPolled();
267 
272  void interrupt() { push_elements.front().getHighLevelImplElement()->interrupt(); }
273 
274  private:
276  void handlePreRead();
277 
279  bool isFinalised{false};
280 
282  std::vector<TransferElementAbstractor> push_elements;
283 
285  std::vector<TransferElementAbstractor> poll_elements;
286 
288  cppext::future_queue<size_t> notification_queue;
289 
293  size_t _lastOperationIndex{std::numeric_limits<size_t>::max()};
294  };
295 
296  /********************************************************************************************************************/
297  /* Implementations of ReadAnyGroup::Notification */
298  /********************************************************************************************************************/
299 
300  inline ReadAnyGroup::Notification::Notification() = default;
301 
302  /********************************************************************************************************************/
303 
305  : accepted(other.accepted), index(other.index), valid(other.valid), _owner(other._owner) {
306  other.valid = false;
307  }
308 
309  /********************************************************************************************************************/
310 
312  // It is important that each received notification is consumed. This means that we have to accept a notification
313  // before we can destroy it.
314  if(this->valid && !this->accepted) {
315  try {
316  this->accept();
317  }
318  catch(...) {
319  // Do not let exceptions escape the destructor, rather terminate the application instead. This will still print
320  // the exception message.
321  std::terminate();
322  }
323  }
324  }
325 
326  /********************************************************************************************************************/
327 
329  // It is important that each received notification is consumed. This means that we have to accept this notification
330  // before we can overwrite it with another one.
331  if(this->valid && !this->accepted) {
332  try {
333  this->accept();
334  }
335  catch(...) {
336  // Do not let exceptions escape the move operation, rather terminate the application instead. This will still
337  // print the exception message.
338  std::terminate();
339  }
340  }
341  this->accepted = other.accepted;
342  this->index = other.index;
343  this->valid = other.valid;
344  this->_owner = other._owner;
345  other.valid = false;
346  return *this;
347  }
348 
349  /********************************************************************************************************************/
350 
352  if(!this->valid) {
353  throw std::logic_error("This notification object is invalid.");
354  }
355  if(this->accepted) {
356  throw std::logic_error("This notification has already been accepted.");
357  }
358  this->accepted = true;
359  try {
360  _owner->push_elements[index].getHighLevelImplElement()->_readQueue.pop_wait();
361  }
362  catch(ChimeraTK::runtime_error&) {
363  _owner->push_elements[index].getHighLevelImplElement()->_activeException = std::current_exception();
364  }
365  catch(boost::thread_interrupted&) {
366  _owner->push_elements[index].getHighLevelImplElement()->_activeException = std::current_exception();
367  }
368  catch(detail::DiscardValueException&) {
369  // we must not call postRead() in this case, hence we do not call preRead()
370  _owner->_lastOperationIndex = std::numeric_limits<size_t>::max() - 1;
371  return false;
372  }
373  _owner->_lastOperationIndex = index;
374  _owner->push_elements[index].getHighLevelImplElement()->postRead(TransferType::read, true);
375  return true;
376  }
377 
378  /********************************************************************************************************************/
379 
381  if(!this->valid) {
382  throw std::logic_error("This notification object is invalid.");
383  }
384  return _owner->push_elements[index].getId();
385  }
386 
387  /********************************************************************************************************************/
388 
389  inline std::size_t ReadAnyGroup::Notification::getIndex() const {
390  if(!this->valid) {
391  throw std::logic_error("This notification object is invalid.");
392  }
393  return this->index;
394  }
395 
396  /********************************************************************************************************************/
397 
399  if(!this->valid) {
400  throw std::logic_error("This notification object is invalid.");
401  }
402  return _owner->push_elements[index];
403  }
404 
405  /********************************************************************************************************************/
406 
408  return this->valid && !this->accepted;
409  }
410 
411  /********************************************************************************************************************/
412 
413  inline ReadAnyGroup::Notification::Notification(std::size_t index_, ReadAnyGroup* owner)
414  : index(index_), valid(true), _owner(owner) {}
415 
416  /********************************************************************************************************************/
417  /* Implementations of ReadAnyGroup */
418  /********************************************************************************************************************/
419 
420  inline ReadAnyGroup::ReadAnyGroup() = default;
421 
422  /********************************************************************************************************************/
423 
424  inline ReadAnyGroup::ReadAnyGroup(std::initializer_list<std::reference_wrapper<TransferElementAbstractor>> list) {
425  for(const auto& element : list) add(element);
426  finalise();
427  }
428 
429  /********************************************************************************************************************/
430 
431  inline ReadAnyGroup::ReadAnyGroup(std::initializer_list<boost::shared_ptr<TransferElement>> list) {
432  for(const auto& element : list) add(element);
433  finalise();
434  }
435 
436  /********************************************************************************************************************/
437 
438  template<typename ITERATOR>
439  ReadAnyGroup::ReadAnyGroup(ITERATOR first, ITERATOR last) {
440  for(ITERATOR it = first; it != last; ++it) add(*it);
441  finalise();
442  }
443 
444  /********************************************************************************************************************/
445 
447  if(isFinalised) {
448  throw std::logic_error("ReadAnyGroup has already been finalised, calling "
449  "add() is no longer allowed.");
450  }
451  if(!element.isReadable()) {
452  throw std::logic_error(
453  "Cannot add non-readable accessor for register " + element.getName() + " to ReadAnyGroup.");
454  }
456  push_elements.push_back(element);
457  }
458  else {
459  poll_elements.push_back(element);
460  }
461  // set flag on the accessor that it is now in a ReadAnyGroup:
462  // We do this for push-types only, since poll-types technically still allow calling read() without the ReadAnyGroup,
463  // although its documentation states that would not be allowed.
465  element.getHighLevelImplElement()->_isInReadAnyGroup = true;
466  }
467  }
468 
469  /********************************************************************************************************************/
470 
471  inline void ReadAnyGroup::add(boost::shared_ptr<TransferElement> element) {
472  TransferElementAbstractor a(std::move(element));
473  add(a);
474  }
475 
476  /********************************************************************************************************************/
477 
478  inline void ReadAnyGroup::finalise() {
479  if(isFinalised) {
480  throw std::logic_error("ReadAnyGroup has already been finalised, calling "
481  "finalise() is no longer allowed.");
482  }
483  std::vector<cppext::future_queue<void>> queueList;
484  bool groupEmpty = true;
485  for(auto& e : push_elements) {
486  queueList.push_back(e.getHighLevelImplElement()->_readQueue);
487  groupEmpty = false;
488  }
489  if(groupEmpty) {
490  throw std::logic_error("ReadAnyGroup has no element with AccessMode::wait_for_new_data.");
491  }
492  notification_queue = cppext::when_any(queueList.begin(), queueList.end());
493  isFinalised = true;
494  }
495 
496  /********************************************************************************************************************/
497 
499  Notification notification;
500  do {
501  notification = this->waitAny();
502  } while(!notification.accept());
503 
504  this->processPolled();
505 
506  return notification.getId();
507  }
508 
509  /********************************************************************************************************************/
510 
512  Notification notification;
513  do {
514  notification = this->waitAnyNonBlocking();
515  if(!notification.isReady()) {
516  this->processPolled();
517  return {};
518  }
519  } while(!notification.accept());
520 
521  this->processPolled();
522 
523  return notification.getId();
524  }
525  /********************************************************************************************************************/
526 
527  inline void ReadAnyGroup::handlePreRead() {
528  // preRead() and postRead() must be called in pairs. Hence we call all preReads here before waiting for transfers to
529  // finish. postRead() will be called when accepting the notification. We can call preRead() repeatedly on the same
530  // element, even if no transfer and call to postRead() have happened. It is just ignored (see Transfer element spec
531  // B.5.2). Since this has a performance impact which might be significant on big applications, we try to avoid
532  // unnecessary calls anyway.
533 
534  // Notice : This has the side effect that decorators can block here, for instance for the setup phase. This is used
535  // by ApplicationCore in testable mode.
536  if(_lastOperationIndex == std::numeric_limits<size_t>::max()) {
537  for(auto& elem : push_elements) {
538  elem.getHighLevelImplElement()->preRead(TransferType::read);
539  }
540  }
541  else if(_lastOperationIndex != std::numeric_limits<size_t>::max() - 1) {
542  // Note: _lastOperationIndex is set to std::numeric_limits<size_t>::max() - 1 in case a DiscardValueException
543  // has been seen, in which case no postRead() is called.
544  push_elements[_lastOperationIndex].getHighLevelImplElement()->preRead(TransferType::read);
545  }
546  }
547 
548  /********************************************************************************************************************/
549 
551  handlePreRead();
552 
553  // Wait for notification
554  std::size_t index;
555  notification_queue.pop_wait(index);
556  // clazy has a false positive warning about index being uninitialised.
557  // NOLINTNEXTLINE(clang-analyzer-core.CallAndMessage)
558  return {index, this};
559  }
560 
561  /********************************************************************************************************************/
562 
564  restart_after_discard_value:
565  // check if update is available
566  if(notification_queue.empty()) {
567  // If no notification is present, do not even execute preRead. This is necessary for two reasons:
568  // - We always used TransferType::read to avoid mixing TransferType::read and TransferType::readNonBlocking
569  // in the same transfer of the same variable. We can do this even in this non-blocking case, if we already know
570  // that there will be an update read, since there will not be any difference beyond this point.
571  // - In ApplicationCore testable mode, the testable mode lock must be released in a preRead before any blocking
572  // read operation. If preRead is called here when no update is available, no preRead will be called in a
573  // possible subsequend blocking readAny(), hence there would be no way to release the testable mode lock in the
574  // right place.
575  return {};
576  }
577 
578  // if update is available, peek into the queue to check whether a DiscardValueException will be read
579  auto id = notification_queue.front();
580  try {
581  // call to empty() necessary before call to front(), to gain ownership of front element
582  if(push_elements[id].getHighLevelImplElement()->_readQueue.empty()) {
583  // cannot place the call to empty() into the assert, since it must be executed also in release builds
584  assert(false);
585  }
586  push_elements[id].getHighLevelImplElement()->_readQueue.front();
587  }
588  catch(detail::DiscardValueException&) {
589  // Remove discarded transfer from the queues and go back to square one
590  notification_queue.pop();
591  try {
592  push_elements[id].getHighLevelImplElement()->_readQueue.pop();
593  }
594  catch(detail::DiscardValueException&) {
595  goto restart_after_discard_value;
596  }
597  assert(false); // we must never end up at this point
598  }
599  catch(ChimeraTK::runtime_error&) {
600  // While peeking we found another runtime which is stored in the queue.
601  // Don't let it through, but leave it on the queue and continue with the waitAny().
602  // It will be handled later.
603  }
604  catch(boost::thread_interrupted&) {
605  // Also suppress the thread_interrupted exception here.
606  // It will stay on the queue, hence it's not lost and will be handled later.
607  }
608  catch(...) {
609  std::cout << "Fatal ERROR in ReadAnyGroup: Found unexpected exception on the read queue. Terminating!"
610  << std::endl;
611  std::terminate();
612  }
613 
614  // now that we know that an update is available, we can defer to waitAny()
615  return waitAny();
616  }
617 
618  /********************************************************************************************************************/
619 
621  // update all poll-type elements in the group
622  for(auto& e : poll_elements) {
623  if(!e.getAccessModeFlags().has(AccessMode::wait_for_new_data)) e.readLatest();
624  }
625  }
626 
627  /********************************************************************************************************************/
628 
630  while(true) {
631  auto read = readAny();
632  if(read == id) return;
633  }
634  }
635 
636  /********************************************************************************************************************/
637 
639  readUntil(element.getId());
640  }
641 
642  /********************************************************************************************************************/
643 
644  inline void ReadAnyGroup::readUntilAll(const std::vector<TransferElementID>& ids) {
645  std::map<TransferElementID, bool> found;
646  for(const auto& id : ids) {
647  found[id] = false; // initialise map so we can tell from the map if a
648  // variable is in the vector
649  }
650  size_t leftToFind = ids.size();
651  while(true) {
652  auto read = readAny();
653  if(found.count(read) == 0) continue; // variable is not in the vector ids
654  if(found[read]) continue; // variable has been read already
655  found[read] = true;
656  --leftToFind;
657  if(leftToFind == 0) return;
658  }
659  }
660 
661  /********************************************************************************************************************/
662 
663  inline void ReadAnyGroup::readUntilAll(const std::vector<TransferElementAbstractor>& elements) {
664  std::map<TransferElementID, bool> found;
665  for(const auto& elem : elements) {
666  found[elem.getId()] = false; // initialise map so we can tell from the map
667  // if a variable is in the vector
668  }
669  size_t leftToFind = elements.size();
670  while(true) {
671  auto read = readAny();
672  if(found.count(read) == 0) continue; // variable is not in the vector elements
673  if(found[read]) continue; // variable has been read already
674  found[read] = true;
675  --leftToFind;
676  if(leftToFind == 0) return;
677  }
678  }
679 
680  /********************************************************************************************************************/
681 
682 } /* namespace ChimeraTK */
ChimeraTK::TransferElementAbstractor::getId
TransferElementID getId() const
Obtain unique ID for the actual implementation of this TransferElement.
Definition: TransferElementAbstractor.h:192
ChimeraTK::TransferElementAbstractor::getAccessModeFlags
AccessModeFlags getAccessModeFlags() const
Return the AccessModeFlags for this TransferElement.
Definition: TransferElementAbstractor.h:51
ChimeraTK::ReadAnyGroup::add
void add(TransferElementAbstractor &element)
Add register to group.
Definition: ReadAnyGroup.h:446
ChimeraTK::ReadAnyGroup::Notification::isReady
bool isReady() const
Tell whether this notification is valid and has not been accepted yet.
Definition: ReadAnyGroup.h:407
ChimeraTK::ReadAnyGroup::Notification::~Notification
~Notification()
Destructor.
Definition: ReadAnyGroup.h:311
ChimeraTK::ReadAnyGroup::readUntilAll
void readUntilAll(const std::vector< TransferElementID > &ids)
Wait until all of the given TransferElements has received an update and store it to its user buffer.
Definition: ReadAnyGroup.h:644
ChimeraTK::ReadAnyGroup::waitAnyNonBlocking
Notification waitAnyNonBlocking()
Check if an update is available in the group, but do not block if no update is available.
Definition: ReadAnyGroup.h:563
ChimeraTK::ReadAnyGroup::Notification::ReadAnyGroup
friend class ReadAnyGroup
Definition: ReadAnyGroup.h:29
ChimeraTK::ReadAnyGroup::Notification
Notification object returned by waitAny().
Definition: ReadAnyGroup.h:27
TransferElement.h
ChimeraTK::ReadAnyGroup::Notification::getId
TransferElementID getId()
Return the ID of the transfer element for which this notification has been generated.
Definition: ReadAnyGroup.h:380
ChimeraTK::ReadAnyGroup::Notification::getTransferElement
TransferElementAbstractor getTransferElement()
Return the transfer element for which this notification has been generated.
Definition: ReadAnyGroup.h:398
ChimeraTK::ReadAnyGroup::Notification::getIndex
std::size_t getIndex() const
Return the index of the transfer element for which this notifiaction has been generated.
Definition: ReadAnyGroup.h:389
ChimeraTK::ReadAnyGroup::interrupt
void interrupt()
Convenience function to interrupt any running readAny/waitAny by calling interrupt on one of the push...
Definition: ReadAnyGroup.h:272
ChimeraTK::runtime_error
Exception thrown when a runtime error has occured.
Definition: Exception.h:18
ChimeraTK::ReadAnyGroup::Notification::operator=
Notification & operator=(Notification &&other) noexcept
Assign this notification from another notification.
Definition: ReadAnyGroup.h:328
ChimeraTK::AccessModeFlags::has
bool has(AccessMode flag) const
Check if a certain flag is in the set.
Definition: AccessMode.cc:20
ChimeraTK::TransferElementAbstractor
Base class for register accessors abstractors independent of the UserType.
Definition: TransferElementAbstractor.h:28
ChimeraTK::TransferElementAbstractor::isReadable
bool isReadable() const
Check if transfer element is readable.
Definition: TransferElementAbstractor.h:107
ChimeraTK::ReadAnyGroup::readAnyNonBlocking
TransferElementID readAnyNonBlocking()
Read the next available update in the group, but do not block if no update is available.
Definition: ReadAnyGroup.h:511
ChimeraTK::ReadAnyGroup::Notification::accept
bool accept()
Accept the notification.
Definition: ReadAnyGroup.h:351
ChimeraTK::ReadAnyGroup::processPolled
void processPolled()
Process polled transfer elements (update them if new values are available).
Definition: ReadAnyGroup.h:620
TransferElementAbstractor.h
ChimeraTK::AccessMode::wait_for_new_data
@ wait_for_new_data
Make any read blocking until new data has arrived since the last read.
ChimeraTK::ReadAnyGroup::readUntil
void readUntil(const TransferElementID &id)
Wait until the given TransferElement has received an update and store it to its user buffer.
Definition: ReadAnyGroup.h:629
ChimeraTK::ReadAnyGroup::finalise
void finalise()
Finalise the group.
Definition: ReadAnyGroup.h:478
ChimeraTK::TransferElementAbstractor::getHighLevelImplElement
const boost::shared_ptr< TransferElement > & getHighLevelImplElement()
Obtain the highest level implementation TransferElement.
Definition: TransferElementAbstractor.h:145
ChimeraTK::ReadAnyGroup::readAny
TransferElementID readAny()
Wait until one of the elements in this group has received an update.
Definition: ReadAnyGroup.h:498
ChimeraTK::TransferElementAbstractor::getName
const std::string & getName() const
Returns the name that identifies the process variable.
Definition: TransferElementAbstractor.h:37
ChimeraTK::ReadAnyGroup
Group several registers (= TransferElement) to allow waiting for an update of any of the registers.
Definition: ReadAnyGroup.h:21
ChimeraTK::ReadAnyGroup::ReadAnyGroup
ReadAnyGroup()
Construct empty group.
ChimeraTK::TransferElementID
Simple class holding a unique ID for a TransferElement.
Definition: TransferElementID.h:17
ChimeraTK
Definition: DummyBackend.h:16
ChimeraTK::ReadAnyGroup::Notification::Notification
Notification()
Create an empty notification.
ChimeraTK::TransferType::read
@ read
ChimeraTK::ReadAnyGroup::waitAny
Notification waitAny()
Wait until one of the elements received an update notification, but do not actually process the updat...
Definition: ReadAnyGroup.h:550