ChimeraTK-DeviceAccess  03.18.00
DomainsContainer.h
Go to the documentation of this file.
1 // SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
2 // SPDX-License-Identifier: LGPL-3.0-or-later
3 #pragma once
4 
5 #include "DomainImpl.h"
6 
7 #include <ChimeraTK/cppext/future_queue.hpp>
8 
9 #include <iostream>
10 #include <map>
11 #include <sstream>
12 #include <thread>
13 
14 namespace ChimeraTK::async {
15 
35  public:
37 
42  void sendExceptions(const std::string& exceptionMessage);
43 
46 
83  template<typename BackendType, typename BackendSpecificDataType, typename UserDataType>
84  boost::shared_ptr<AsyncNDRegisterAccessor<UserDataType>> subscribe(boost::shared_ptr<BackendType> backend,
85  size_t domainId, bool activate, RegisterPath name, size_t numberOfWords, size_t wordOffsetInRegister,
86  AccessModeFlags flags);
87 
92  boost::shared_ptr<Domain> getDomain(size_t key);
93 
100  void forEach(const std::function<void(size_t, boost::shared_ptr<Domain>&)>& executeMe);
101 
102  protected:
103  std::atomic_bool _isSendingExceptions{false};
104 
106  void distributeExceptions();
107 
108  cppext::future_queue<std::string> _startExceptionDistribution{2};
109  std::thread _distributorThread;
111  std::atomic_bool _threadIsRunning{false}; // Cache whether the thread is running so we don't have to lock the mutex
112  // each time to be able to ask the thread itself.
113  class StopThread : public std::exception {};
114 
115  std::mutex _domainsMutex;
116  std::map<size_t, boost::weak_ptr<Domain>> _domains;
117  };
118 
119  /********************************************************************************************************************/
120 
121  template<typename BackendType, typename BackendSpecificDataType, typename UserDataType>
122  boost::shared_ptr<AsyncNDRegisterAccessor<UserDataType>> DomainsContainer::subscribe(
123  boost::shared_ptr<BackendType> backend, size_t domainId, bool activate, RegisterPath name, size_t numberOfWords,
124  size_t wordOffsetInRegister, AccessModeFlags flags) {
125  std::lock_guard<std::mutex> domainsLock(_domainsMutex);
126 
127  auto domain = _domains[domainId].lock();
128  boost::shared_ptr<DomainImpl<BackendSpecificDataType>> domainImpl;
129 
130  bool domainCreated{false};
131  if(domain) {
132  domainImpl = boost::dynamic_pointer_cast<DomainImpl<BackendSpecificDataType>>(domain);
133  assert(domainImpl);
134  }
135  else {
136  // The domain does not exist, create it
137  domainCreated = true;
138  domainImpl = boost::make_shared<DomainImpl<BackendSpecificDataType>>(backend, domainId);
139  // start the thread if not already running
140  { // thread lock scope
141  auto threadCreationLock = std::lock_guard(_threadCreationMutex);
142 
143  if(!_distributorThread.joinable()) {
144  _distributorThread = std::thread([&] { distributeExceptions(); });
145  _threadIsRunning = true;
146  }
147  } // end of thread lock scope
148  _domains[domainId] = domainImpl;
149  }
150 
151  auto newSubscriber = domainImpl->template subscribe<UserDataType>(name, numberOfWords, wordOffsetInRegister, flags);
152 
153  // Only activate a newly created domain after the subscription is done. This allows to distribute the intial value
154  // from here, and not from inside the subscription.
155  if(domainCreated && activate) {
156  auto subscriptionDone = backend->activateSubscription(domainId, domainImpl);
157  // wait until the backend reports that the subscription is ready before proceeding with the creation of the
158  // accessor, which will poll the initial value.
159  subscriptionDone.wait();
160  auto [value, version] = backend->template getAsyncDomainInitialValue<BackendSpecificDataType>(domainId);
161  domainImpl->activate(value, version);
162  }
163 
164  return newSubscriber;
165  }
166 
167 } // namespace ChimeraTK::async
ChimeraTK::async::DomainsContainer::_threadCreationMutex
std::mutex _threadCreationMutex
Definition: DomainsContainer.h:110
ChimeraTK::async::DomainsContainer::StopThread
Definition: DomainsContainer.h:113
ChimeraTK::async::DomainsContainer::sendExceptions
void sendExceptions(const std::string &exceptionMessage)
Request the sending of exceptions.
Definition: DomainsContainer.cc:65
ChimeraTK::async::DomainsContainer::_isSendingExceptions
std::atomic_bool _isSendingExceptions
Definition: DomainsContainer.h:103
ChimeraTK::async::DomainsContainer::forEach
void forEach(const std::function< void(size_t, boost::shared_ptr< Domain > &)> &executeMe)
Iterate all Domains under the container lock.
Definition: DomainsContainer.cc:91
ChimeraTK::async::DomainsContainer::_threadIsRunning
std::atomic_bool _threadIsRunning
Definition: DomainsContainer.h:111
ChimeraTK::async::DomainsContainer::isSendingExceptions
bool isSendingExceptions()
Check whether an exception distribution is started and not completed yet.
Definition: DomainsContainer.h:45
ChimeraTK::async
Definition: design_AsyncNDRegisterAcessor_and_NumericAddressedBackend.dox:1
DomainImpl.h
ChimeraTK::async::DomainsContainer::getDomain
boost::shared_ptr< Domain > getDomain(size_t key)
Return the shared pointer to the Domain for a key.
Definition: DomainsContainer.cc:82
ChimeraTK::async::DomainsContainer::_domains
std::map< size_t, boost::weak_ptr< Domain > > _domains
Definition: DomainsContainer.h:116
ChimeraTK::async::DomainsContainer::distributeExceptions
void distributeExceptions()
Endless loop executed in the thread.
Definition: DomainsContainer.cc:12
ChimeraTK::async::DomainsContainer::_startExceptionDistribution
cppext::future_queue< std::string > _startExceptionDistribution
Definition: DomainsContainer.h:108
ChimeraTK::async::DomainsContainer
The DomainsContainer has a container with Domains and is performing actions on all of them.
Definition: DomainsContainer.h:34
ChimeraTK::async::DomainsContainer::~DomainsContainer
~DomainsContainer()
Definition: DomainsContainer.cc:41
ChimeraTK::async::DomainsContainer::_domainsMutex
std::mutex _domainsMutex
Definition: DomainsContainer.h:115
ChimeraTK::async::DomainsContainer::subscribe
boost::shared_ptr< AsyncNDRegisterAccessor< UserDataType > > subscribe(boost::shared_ptr< BackendType > backend, size_t domainId, bool activate, RegisterPath name, size_t numberOfWords, size_t wordOffsetInRegister, AccessModeFlags flags)
Get an accessor from a particular domain.
Definition: DomainsContainer.h:122
ChimeraTK::RegisterPath
Class to store a register path name.
Definition: RegisterPath.h:16
ChimeraTK::AccessModeFlags
Set of AccessMode flags with additional functionality for an easier handling.
Definition: AccessMode.h:48
ChimeraTK::async::DomainsContainer::_distributorThread
std::thread _distributorThread
Definition: DomainsContainer.h:109