5#include "../VersionNumber.h"
72 explicit AsyncAccessorManager(boost::shared_ptr<DeviceBackend> backend, boost::shared_ptr<Domain> asyncDomain)
80 template<
typename UserType>
81 boost::shared_ptr<AsyncNDRegisterAccessor<UserType>>
subscribe(
135 template<
typename SourceType>
174 template<
typename UserType>
187 boost::weak_ptr<AsyncNDRegisterAccessor<UserType>> _asyncAccessor;
198 template<
typename UserType>
200 auto subscriber = _asyncAccessor.lock();
201 if(subscriber.get() !=
nullptr) {
202 subscriber->sendException(e);
207 template<
typename UserType>
209 return _sendBuffer.value.size();
213 template<
typename UserType>
215 if(_sendBuffer.value.size() > 0) {
216 return _sendBuffer.value[0].size();
222 template<
typename UserType>
224 : _sendBuffer(nChannels, nElements) {}
227 template<
typename UserType>
229 auto subscriber = _asyncAccessor.lock();
231 if(_lastSentVersion > _sendBuffer.versionNumber) {
233 _sendBuffer.versionNumber, _lastSentVersion, subscriber->getName()));
235 _lastSentVersion = _sendBuffer.versionNumber;
237 if(subscriber.get() !=
nullptr) {
238 subscriber->sendDestructively(_sendBuffer);
243 template<
typename UserType>
252 auto newSubscriber = boost::make_shared<AsyncNDRegisterAccessor<UserType>>(
_backend, shared_from_this(),
253 _asyncDomain, name, asyncVariable->getNumberOfChannels(), asyncVariable->getNumberOfSamples(), flags,
254 asyncVariable->getUnit(), asyncVariable->getDescription());
257 newSubscriber->setExceptionBackend(
_backend);
259 asyncVariable->_asyncAccessor = newSubscriber;
263 asyncVariable->send();
266 _asyncVariables[newSubscriber->getId()] = std::move(untypedAsyncVariable);
269 return newSubscriber;
272 template<
typename SourceType>
274 if(!_asyncDomain->unsafeGetIsActive()) {
278 _sourceBuffer = data;
281 if(prepareIntermediateBuffers()) {
282 assert(_delayedUnsubscriptions.empty());
283 for(
auto& var : _asyncVariables) {
284 var.second->fillSendBuffer();
285 _isHoldingDomainLock =
this;
287 _isHoldingDomainLock =
nullptr;
289 for(
auto id : _delayedUnsubscriptions) {
292 _delayedUnsubscriptions.clear();
#define CALL_VIRTUAL_FUNCTION_TEMPLATE(functionName, templateArgument,...)
Execute the virtual function template call using the vtable defined with the DEFINE_VIRTUAL_FUNCTION_...
Set of AccessMode flags with additional functionality for an easier handling.
Class to store a register path name.
Simple class holding a unique ID for a TransferElement.
Class for generating and holding version numbers without exposing a numeric representation.
The AsyncAccessorManager has three main functionalities:
boost::shared_ptr< AsyncNDRegisterAccessor< UserType > > subscribe(RegisterPath name, size_t numberOfWords, size_t wordOffsetInRegister, AccessModeFlags flags)
Request a new subscription.
boost::shared_ptr< Domain > _asyncDomain
DEFINE_VIRTUAL_FUNCTION_TEMPLATE_VTABLE(createAsyncVariable, std::unique_ptr< AsyncVariable >(AccessorInstanceDescriptor const &))
virtual ~AsyncAccessorManager()=default
AsyncAccessorManager(boost::shared_ptr< DeviceBackend > backend, boost::shared_ptr< Domain > asyncDomain)
std::map< TransferElementID, std::unique_ptr< AsyncVariable > > _asyncVariables
void unsubscribeImpl(TransferElementID id)
Internal helper function to avoid code duplication.
void sendException(const std::exception_ptr &e)
Send an exception to all accessors.
boost::shared_ptr< DeviceBackend > _backend
void unsubscribe(TransferElementID id)
This function must only be called from the destructor of the AsyncNDRegisterAccessor which is created...
std::list< TransferElementID > _delayedUnsubscriptions
If an unsubscription request is coming in while iterating the _asyncVariables container,...
static thread_local AsyncAccessorManager * _isHoldingDomainLock
We have to remember that we are holding the domain lock before we lock a weak pointer to an AsyncNDRe...
virtual void asyncVariableMapChanged(TransferElementID)
This virtual function lets derived classes react on subscribe / unsubscribe.
VersionNumber distribute(SourceType, VersionNumber version)
Distribute the given data to the subsribers.
virtual bool prepareIntermediateBuffers()
Implement this function in case there is a step between setting the source buffer and filling the use...
Exception thrown when a logic error has occured.
Data type to create individual buffers.
Helper class to have a complete descriton to create an Accessor.
size_t wordOffsetInRegister
virtual unsigned int getNumberOfSamples()=0
virtual const std::string & getUnit()=0
virtual void sendException(std::exception_ptr e)=0
Send an exception to all subscribers.
virtual const std::string & getDescription()=0
virtual void send()=0
Send the value from the _sendBuffer of the AsyncVariableImpl.
virtual unsigned int getNumberOfChannels()=0
Helper functions for the creation of an AsyncNDRegisterAccessor.
virtual void fillSendBuffer()=0
Fill the send buffer with data and version number.
virtual ~AsyncVariable()=default
AsyncVariableImpl contains a weak pointer to an AsyncNDRegisterAccessor<UserType> and a send buffer N...
void sendException(std::exception_ptr e) final
Send an exception to all subscribers.
unsigned int getNumberOfChannels() override
Helper functions for the creation of an AsyncNDRegisterAccessor.
unsigned int getNumberOfSamples() override
void send() final
Send the value from the _sendBuffer of the AsyncVariableImpl.
NDRegisterAccessor< UserType >::Buffer _sendBuffer
AsyncVariableImpl(size_t nChannels, size_t nElements)