ChimeraTK-DeviceAccess  03.18.00
AsyncAccessorManager.h
Go to the documentation of this file.
1 // SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
2 // SPDX-License-Identifier: LGPL-3.0-or-later
3 #pragma once
4 
6 
7 #include <utility>
8 
9 namespace ChimeraTK::async {
10 
14  struct AsyncVariable {
15  virtual ~AsyncVariable() = default;
16 
22  virtual void send() = 0;
23 
26  virtual void sendException(std::exception_ptr e) = 0;
27 
31  virtual unsigned int getNumberOfChannels() = 0;
32  virtual unsigned int getNumberOfSamples() = 0;
33  virtual const std::string& getUnit() = 0;
34  virtual const std::string& getDescription() = 0;
35 
39  virtual void fillSendBuffer() = 0;
40  };
41 
49  std::type_index type;
50  size_t numberOfWords;
53  };
54 
69  class AsyncAccessorManager : public boost::enable_shared_from_this<AsyncAccessorManager> {
70  public:
71  explicit AsyncAccessorManager(boost::shared_ptr<DeviceBackend> backend, boost::shared_ptr<Domain> asyncDomain)
72  : _backend(std::move(backend)), _asyncDomain(std::move(asyncDomain)) {}
73  virtual ~AsyncAccessorManager() = default;
74 
79  template<typename UserType>
80  boost::shared_ptr<AsyncNDRegisterAccessor<UserType>> subscribe(
81  RegisterPath name, size_t numberOfWords, size_t wordOffsetInRegister, AccessModeFlags flags);
82 
87 
90  void sendException(const std::exception_ptr& e);
91 
92  protected:
93  /*** Each implementation must provide a function to create specific AsyncVariables.
94  * When the variable is returned, async accessor is not set yet. This would leave the whole creation
95  * of the async accessor to the backend, would mean a lot of code duplication. It also cannot be
96  * retrieved from the AsyncVariable as this only contains a weak pointer.
97  * If the isActive flag is set, the _sendBuffer must already contain the initial value. The variable itself
98  * is not activated yet as the async accessor is still not set.
99  */
101  createAsyncVariable, std::unique_ptr<AsyncVariable>(AccessorInstanceDescriptor const&));
102 
103  std::map<TransferElementID, std::unique_ptr<AsyncVariable>> _asyncVariables;
104 
105  boost::shared_ptr<DeviceBackend> _backend;
106  boost::shared_ptr<Domain> _asyncDomain;
107 
110 
128  std::list<TransferElementID> _delayedUnsubscriptions;
129 
132  };
133 
134  template<typename SourceType>
136  public:
138  void distribute(SourceType, VersionNumber version);
139 
147  virtual bool prepareIntermediateBuffers() { return true; };
148 
149  protected:
150  SourceType _sourceBuffer;
152  };
153 
165  template<typename UserType>
167  AsyncVariableImpl(size_t nChannels, size_t nElements);
168 
169  void send() final;
170  void sendException(std::exception_ptr e) final;
171  unsigned int getNumberOfChannels() override;
172  unsigned int getNumberOfSamples() override;
173 
175 
176  private:
177  // This weak pointer is private so the user cannot bypass correct locking and nullptr-checking.
178  boost::weak_ptr<AsyncNDRegisterAccessor<UserType>> _asyncAccessor;
179  friend AsyncAccessorManager; // is allowed to set the _asyncAccessor
180  };
181 
182  /********************************************************************************************************************/
183  // Implementations
184  /********************************************************************************************************************/
185 
186  /********************************************************************************************************************/
187  template<typename UserType>
188  void AsyncVariableImpl<UserType>::sendException(std::exception_ptr e) {
189  auto subscriber = _asyncAccessor.lock();
190  if(subscriber.get() != nullptr) { // Solves possible race condition: The subscriber is being destructed.
191  subscriber->sendException(e);
192  }
193  }
194 
195  /********************************************************************************************************************/
196  template<typename UserType>
198  return _sendBuffer.value.size();
199  }
200 
201  /********************************************************************************************************************/
202  template<typename UserType>
204  if(_sendBuffer.value.size() > 0) {
205  return _sendBuffer.value[0].size();
206  }
207  return 0;
208  }
209 
210  /********************************************************************************************************************/
211  template<typename UserType>
212  AsyncVariableImpl<UserType>::AsyncVariableImpl(size_t nChannels, size_t nElements)
213  : _sendBuffer(nChannels, nElements) {}
214 
215  /********************************************************************************************************************/
216  template<typename UserType>
218  auto subscriber = _asyncAccessor.lock();
219  if(subscriber.get() != nullptr) { // Solves possible race condition: The subscriber is being destructed.
220  subscriber->sendDestructively(_sendBuffer);
221  }
222  }
223 
224  /********************************************************************************************************************/
225  template<typename UserType>
226  boost::shared_ptr<AsyncNDRegisterAccessor<UserType>> AsyncAccessorManager::subscribe(
227  RegisterPath name, size_t numberOfWords, size_t wordOffsetInRegister, AccessModeFlags flags) {
228  AccessorInstanceDescriptor descriptor{name, typeid(UserType), numberOfWords, wordOffsetInRegister, flags};
229  auto untypedAsyncVariable = CALL_VIRTUAL_FUNCTION_TEMPLATE(createAsyncVariable, UserType, descriptor);
230 
231  auto asyncVariable = dynamic_cast<AsyncVariableImpl<UserType>*>(untypedAsyncVariable.get());
232  // we take all the information we need for the async accessor from AsyncVariable because we cannot use the catalogue
233  // here
234  auto newSubscriber = boost::make_shared<AsyncNDRegisterAccessor<UserType>>(_backend, shared_from_this(),
235  _asyncDomain, name, asyncVariable->getNumberOfChannels(), asyncVariable->getNumberOfSamples(), flags,
236  asyncVariable->getUnit(), asyncVariable->getDescription());
237  // Set the exception backend here. It might be that the accessor is already activated during subscription, and the
238  // backend should be set at that point
239  newSubscriber->setExceptionBackend(_backend);
240 
241  asyncVariable->_asyncAccessor = newSubscriber;
242  // Now that the AsyncVariable is complete we can finally activate it.
243  if(_asyncDomain->unsafeGetIsActive()) {
244  asyncVariable->fillSendBuffer();
245  asyncVariable->send();
246  }
247 
248  _asyncVariables[newSubscriber->getId()] = std::move(untypedAsyncVariable);
249  asyncVariableMapChanged(newSubscriber->getId());
250 
251  return newSubscriber;
252  }
253  /********************************************************************************************************************/
254  template<typename SourceType>
256  if(!_asyncDomain->unsafeGetIsActive()) {
257  return;
258  }
259 
260  _sourceBuffer = data;
261  _version = version;
262 
263  if(prepareIntermediateBuffers()) {
264  assert(_delayedUnsubscriptions.empty());
265  for(auto& var : _asyncVariables) {
266  var.second->fillSendBuffer();
267  _isHoldingDomainLock = this;
268  var.second->send(); // function from the AsyncVariable base class
269  _isHoldingDomainLock = nullptr;
270  }
271  for(auto id : _delayedUnsubscriptions) {
272  unsubscribeImpl(id);
273  }
274  _delayedUnsubscriptions.clear();
275  }
276  }
277 
278 } // namespace ChimeraTK::async
ChimeraTK::async::SourceTypedAsyncAccessorManager::distribute
void distribute(SourceType, VersionNumber version)
Definition: AsyncAccessorManager.h:255
ChimeraTK::async::AsyncVariable::getNumberOfChannels
virtual unsigned int getNumberOfChannels()=0
Helper functions for the creation of an AsyncNDRegisterAccessor.
ChimeraTK::async::AsyncAccessorManager
The AsyncAccessorManager has three main functionalities:
Definition: AsyncAccessorManager.h:69
ChimeraTK::async::AsyncAccessorManager::unsubscribe
void unsubscribe(TransferElementID id)
This function must only be called from the destructor of the AsyncNDRegisterAccessor which is created...
Definition: AsyncAccessorManager.cc:17
ChimeraTK::async::AccessorInstanceDescriptor::type
std::type_index type
Definition: AsyncAccessorManager.h:49
ChimeraTK::async
Definition: design_AsyncNDRegisterAcessor_and_NumericAddressedBackend.dox:1
ChimeraTK::async::AccessorInstanceDescriptor::numberOfWords
size_t numberOfWords
Definition: AsyncAccessorManager.h:50
ChimeraTK::async::AccessorInstanceDescriptor::wordOffsetInRegister
size_t wordOffsetInRegister
Definition: AsyncAccessorManager.h:51
ChimeraTK::async::AsyncAccessorManager::_asyncVariables
std::map< TransferElementID, std::unique_ptr< AsyncVariable > > _asyncVariables
Definition: AsyncAccessorManager.h:103
ChimeraTK::async::AsyncVariableImpl
AsyncVariableImpl contains a weak pointer to an AsyncNDRegisterAccessor<UserType> and a send buffer N...
Definition: AsyncAccessorManager.h:166
ChimeraTK::async::SourceTypedAsyncAccessorManager
Definition: AsyncAccessorManager.h:135
ChimeraTK::async::SourceTypedAsyncAccessorManager::_sourceBuffer
SourceType _sourceBuffer
Definition: AsyncAccessorManager.h:147
ChimeraTK::async::AsyncVariable::send
virtual void send()=0
Send the value from the _sendBuffer of the AsyncVariableImpl.
ChimeraTK::async::AsyncVariable::sendException
virtual void sendException(std::exception_ptr e)=0
Send an exception to all subscribers.
ChimeraTK::async::AccessorInstanceDescriptor::flags
AccessModeFlags flags
Definition: AsyncAccessorManager.h:52
ChimeraTK::async::AccessorInstanceDescriptor::name
RegisterPath name
Definition: AsyncAccessorManager.h:48
ChimeraTK::async::AsyncAccessorManager::sendException
void sendException(const std::exception_ptr &e)
Send an exception to all accessors.
Definition: AsyncAccessorManager.cc:28
ChimeraTK::async::SourceTypedAsyncAccessorManager::_version
VersionNumber _version
Definition: AsyncAccessorManager.h:151
ChimeraTK::async::AsyncAccessorManager::DEFINE_VIRTUAL_FUNCTION_TEMPLATE_VTABLE
DEFINE_VIRTUAL_FUNCTION_TEMPLATE_VTABLE(createAsyncVariable, std::unique_ptr< AsyncVariable >(AccessorInstanceDescriptor const &))
ChimeraTK::async::AsyncVariableImpl::AsyncVariableImpl
AsyncVariableImpl(size_t nChannels, size_t nElements)
Definition: AsyncAccessorManager.h:212
ChimeraTK::async::AccessorInstanceDescriptor
Helper class to have a complete descriton to create an Accessor.
Definition: AsyncAccessorManager.h:47
ChimeraTK::async::AsyncVariableImpl::getNumberOfSamples
unsigned int getNumberOfSamples() override
Definition: AsyncAccessorManager.h:203
ChimeraTK::async::AsyncAccessorManager::_isHoldingDomainLock
static thread_local AsyncAccessorManager * _isHoldingDomainLock
We have to remember that we are holding the domain lock before we lock a weak pointer to an AsyncNDRe...
Definition: AsyncAccessorManager.h:123
ChimeraTK::async::AsyncAccessorManager::~AsyncAccessorManager
virtual ~AsyncAccessorManager()=default
ChimeraTK::async::AsyncAccessorManager::_backend
boost::shared_ptr< DeviceBackend > _backend
Definition: AsyncAccessorManager.h:105
ChimeraTK::async::AsyncAccessorManager::_asyncDomain
boost::shared_ptr< Domain > _asyncDomain
Definition: AsyncAccessorManager.h:106
ChimeraTK::async::AsyncVariable
Typeless base class.
Definition: AsyncAccessorManager.h:14
ChimeraTK::async::AsyncVariableImpl::send
void send() final
Send the value from the _sendBuffer of the AsyncVariableImpl.
Definition: AsyncAccessorManager.h:217
ChimeraTK::async::AsyncVariableImpl::_sendBuffer
NDRegisterAccessor< UserType >::Buffer _sendBuffer
Definition: AsyncAccessorManager.h:174
ChimeraTK::async::AsyncVariable::getDescription
virtual const std::string & getDescription()=0
ChimeraTK::async::AsyncVariableImpl::getNumberOfChannels
unsigned int getNumberOfChannels() override
Helper functions for the creation of an AsyncNDRegisterAccessor.
Definition: AsyncAccessorManager.h:197
ChimeraTK::async::SourceTypedAsyncAccessorManager::prepareIntermediateBuffers
virtual bool prepareIntermediateBuffers()
Implement this function in case there is a step between setting the source buffer and filling the use...
Definition: AsyncAccessorManager.h:147
ChimeraTK::async::AsyncVariable::~AsyncVariable
virtual ~AsyncVariable()=default
ChimeraTK::RegisterPath
Class to store a register path name.
Definition: RegisterPath.h:16
AsyncNDRegisterAccessor.h
ChimeraTK::VersionNumber
Class for generating and holding version numbers without exposing a numeric representation.
Definition: VersionNumber.h:23
ChimeraTK::async::AsyncVariable::fillSendBuffer
virtual void fillSendBuffer()=0
Fill the send buffer with data and version number.
ChimeraTK::async::AsyncVariable::getNumberOfSamples
virtual unsigned int getNumberOfSamples()=0
CALL_VIRTUAL_FUNCTION_TEMPLATE
#define CALL_VIRTUAL_FUNCTION_TEMPLATE(functionName, templateArgument,...)
Execute the virtual function template call using the vtable defined with the DEFINE_VIRTUAL_FUNCTION_...
Definition: VirtualFunctionTemplate.h:70
ChimeraTK::AccessModeFlags
Set of AccessMode flags with additional functionality for an easier handling.
Definition: AccessMode.h:48
ChimeraTK::NDRegisterAccessor::Buffer
Data type to create individual buffers.
Definition: NDRegisterAccessor.h:86
ChimeraTK::TransferElementID
Simple class holding a unique ID for a TransferElement.
Definition: TransferElementID.h:17
ChimeraTK::async::AsyncAccessorManager::unsubscribeImpl
void unsubscribeImpl(TransferElementID id)
Internal helper function to avoid code duplication.
Definition: AsyncAccessorManager.cc:11
ChimeraTK::async::AsyncVariableImpl::sendException
void sendException(std::exception_ptr e) final
Send an exception to all subscribers.
Definition: AsyncAccessorManager.h:188
ChimeraTK::async::AsyncAccessorManager::AsyncAccessorManager
AsyncAccessorManager(boost::shared_ptr< DeviceBackend > backend, boost::shared_ptr< Domain > asyncDomain)
Definition: AsyncAccessorManager.h:71
ChimeraTK::async::AsyncAccessorManager::asyncVariableMapChanged
virtual void asyncVariableMapChanged(TransferElementID)
This virtual function lets derived classes react on subscribe / unsubscribe.
Definition: AsyncAccessorManager.h:109
ChimeraTK::async::AsyncAccessorManager::_delayedUnsubscriptions
std::list< TransferElementID > _delayedUnsubscriptions
If an unsubscription request is coming in while iterating the _asyncVariables container,...
Definition: AsyncAccessorManager.h:128
ChimeraTK::async::AsyncVariable::getUnit
virtual const std::string & getUnit()=0
ChimeraTK::async::AsyncAccessorManager::subscribe
boost::shared_ptr< AsyncNDRegisterAccessor< UserType > > subscribe(RegisterPath name, size_t numberOfWords, size_t wordOffsetInRegister, AccessModeFlags flags)
Request a new subscription.
Definition: AsyncAccessorManager.h:226