ChimeraTK-DeviceAccess 03.25.00
Loading...
Searching...
No Matches
AsyncAccessorManager.h
Go to the documentation of this file.
1// SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
2// SPDX-License-Identifier: LGPL-3.0-or-later
3#pragma once
4
5#include "../VersionNumber.h"
7
8#include <utility>
9
10namespace ChimeraTK::async {
11
16 virtual ~AsyncVariable() = default;
17
23 virtual void send() = 0;
24
27 virtual void sendException(std::exception_ptr e) = 0;
28
32 virtual unsigned int getNumberOfChannels() = 0;
33 virtual unsigned int getNumberOfSamples() = 0;
34 virtual const std::string& getUnit() = 0;
35 virtual const std::string& getDescription() = 0;
36
40 virtual void fillSendBuffer() = 0;
41 };
42
55
70 class AsyncAccessorManager : public boost::enable_shared_from_this<AsyncAccessorManager> {
71 public:
72 explicit AsyncAccessorManager(boost::shared_ptr<DeviceBackend> backend, boost::shared_ptr<Domain> asyncDomain)
73 : _backend(std::move(backend)), _asyncDomain(std::move(asyncDomain)) {}
74 virtual ~AsyncAccessorManager() = default;
75
80 template<typename UserType>
81 boost::shared_ptr<AsyncNDRegisterAccessor<UserType>> subscribe(
82 RegisterPath name, size_t numberOfWords, size_t wordOffsetInRegister, AccessModeFlags flags);
83
88
91 void sendException(const std::exception_ptr& e);
92
93 protected:
94 /*** Each implementation must provide a function to create specific AsyncVariables.
95 * When the variable is returned, async accessor is not set yet. This would leave the whole creation
96 * of the async accessor to the backend, would mean a lot of code duplication. It also cannot be
97 * retrieved from the AsyncVariable as this only contains a weak pointer.
98 * If the isActive flag is set, the _sendBuffer must already contain the initial value. The variable itself
99 * is not activated yet as the async accessor is still not set.
100 */
102 createAsyncVariable, std::unique_ptr<AsyncVariable>(AccessorInstanceDescriptor const&));
103
104 std::map<TransferElementID, std::unique_ptr<AsyncVariable>> _asyncVariables;
105
106 boost::shared_ptr<DeviceBackend> _backend;
107 boost::shared_ptr<Domain> _asyncDomain;
108
111
129 std::list<TransferElementID> _delayedUnsubscriptions;
130
133 };
134
135 template<typename SourceType>
137 public:
139
148
156 virtual bool prepareIntermediateBuffers() { return true; };
157
158 protected:
159 SourceType _sourceBuffer;
161 };
162
174 template<typename UserType>
176 AsyncVariableImpl(size_t nChannels, size_t nElements);
177
178 void send() final;
179 void sendException(std::exception_ptr e) final;
180 unsigned int getNumberOfChannels() override;
181 unsigned int getNumberOfSamples() override;
182
184
185 private:
186 // This weak pointer is private so the user cannot bypass correct locking and nullptr-checking.
187 boost::weak_ptr<AsyncNDRegisterAccessor<UserType>> _asyncAccessor;
188 friend AsyncAccessorManager; // is allowed to set the _asyncAccessor
189
190 VersionNumber _lastSentVersion{nullptr};
191 };
192
193 /********************************************************************************************************************/
194 // Implementations
195 /********************************************************************************************************************/
196
197 /********************************************************************************************************************/
198 template<typename UserType>
200 auto subscriber = _asyncAccessor.lock();
201 if(subscriber.get() != nullptr) { // Solves possible race condition: The subscriber is being destructed.
202 subscriber->sendException(e);
203 }
204 }
205
206 /********************************************************************************************************************/
207 template<typename UserType>
209 return _sendBuffer.value.size();
210 }
211
212 /********************************************************************************************************************/
213 template<typename UserType>
215 if(_sendBuffer.value.size() > 0) {
216 return _sendBuffer.value[0].size();
217 }
218 return 0;
219 }
220
221 /********************************************************************************************************************/
222 template<typename UserType>
223 AsyncVariableImpl<UserType>::AsyncVariableImpl(size_t nChannels, size_t nElements)
224 : _sendBuffer(nChannels, nElements) {}
225
226 /********************************************************************************************************************/
227 template<typename UserType>
229 auto subscriber = _asyncAccessor.lock();
230
231 if(_lastSentVersion > _sendBuffer.versionNumber) {
232 throw ChimeraTK::logic_error(std::format("Trying to send decreased version {} < {} on AsyncVariable {}.",
233 _sendBuffer.versionNumber, _lastSentVersion, subscriber->getName()));
234 }
235 _lastSentVersion = _sendBuffer.versionNumber;
236
237 if(subscriber.get() != nullptr) { // Solves possible race condition: The subscriber is being destructed.
238 subscriber->sendDestructively(_sendBuffer);
239 }
240 }
241
242 /********************************************************************************************************************/
243 template<typename UserType>
244 boost::shared_ptr<AsyncNDRegisterAccessor<UserType>> AsyncAccessorManager::subscribe(
245 RegisterPath name, size_t numberOfWords, size_t wordOffsetInRegister, AccessModeFlags flags) {
246 AccessorInstanceDescriptor descriptor{name, typeid(UserType), numberOfWords, wordOffsetInRegister, flags};
247 auto untypedAsyncVariable = CALL_VIRTUAL_FUNCTION_TEMPLATE(createAsyncVariable, UserType, descriptor);
248
249 auto asyncVariable = dynamic_cast<AsyncVariableImpl<UserType>*>(untypedAsyncVariable.get());
250 // we take all the information we need for the async accessor from AsyncVariable because we cannot use the catalogue
251 // here
252 auto newSubscriber = boost::make_shared<AsyncNDRegisterAccessor<UserType>>(_backend, shared_from_this(),
253 _asyncDomain, name, asyncVariable->getNumberOfChannels(), asyncVariable->getNumberOfSamples(), flags,
254 asyncVariable->getUnit(), asyncVariable->getDescription());
255 // Set the exception backend here. It might be that the accessor is already activated during subscription, and the
256 // backend should be set at that point
257 newSubscriber->setExceptionBackend(_backend);
258
259 asyncVariable->_asyncAccessor = newSubscriber;
260 // Now that the AsyncVariable is complete we can finally activate it.
261 if(_asyncDomain->unsafeGetIsActive()) {
262 asyncVariable->fillSendBuffer();
263 asyncVariable->send();
264 }
265
266 _asyncVariables[newSubscriber->getId()] = std::move(untypedAsyncVariable);
267 asyncVariableMapChanged(newSubscriber->getId());
268
269 return newSubscriber;
270 }
271 /********************************************************************************************************************/
272 template<typename SourceType>
274 if(!_asyncDomain->unsafeGetIsActive()) {
275 return version;
276 }
277
278 _sourceBuffer = data;
279 _version = version;
280
281 if(prepareIntermediateBuffers()) {
282 assert(_delayedUnsubscriptions.empty());
283 for(auto& var : _asyncVariables) {
284 var.second->fillSendBuffer();
285 _isHoldingDomainLock = this;
286 var.second->send(); // function from the AsyncVariable base class
287 _isHoldingDomainLock = nullptr;
288 }
289 for(auto id : _delayedUnsubscriptions) {
290 unsubscribeImpl(id);
291 }
292 _delayedUnsubscriptions.clear();
293 }
294
295 return _version;
296 }
297
298} // namespace ChimeraTK::async
#define CALL_VIRTUAL_FUNCTION_TEMPLATE(functionName, templateArgument,...)
Execute the virtual function template call using the vtable defined with the DEFINE_VIRTUAL_FUNCTION_...
Set of AccessMode flags with additional functionality for an easier handling.
Definition AccessMode.h:48
Class to store a register path name.
Simple class holding a unique ID for a TransferElement.
Class for generating and holding version numbers without exposing a numeric representation.
The AsyncAccessorManager has three main functionalities:
boost::shared_ptr< AsyncNDRegisterAccessor< UserType > > subscribe(RegisterPath name, size_t numberOfWords, size_t wordOffsetInRegister, AccessModeFlags flags)
Request a new subscription.
DEFINE_VIRTUAL_FUNCTION_TEMPLATE_VTABLE(createAsyncVariable, std::unique_ptr< AsyncVariable >(AccessorInstanceDescriptor const &))
AsyncAccessorManager(boost::shared_ptr< DeviceBackend > backend, boost::shared_ptr< Domain > asyncDomain)
std::map< TransferElementID, std::unique_ptr< AsyncVariable > > _asyncVariables
void unsubscribeImpl(TransferElementID id)
Internal helper function to avoid code duplication.
void sendException(const std::exception_ptr &e)
Send an exception to all accessors.
boost::shared_ptr< DeviceBackend > _backend
void unsubscribe(TransferElementID id)
This function must only be called from the destructor of the AsyncNDRegisterAccessor which is created...
std::list< TransferElementID > _delayedUnsubscriptions
If an unsubscription request is coming in while iterating the _asyncVariables container,...
static thread_local AsyncAccessorManager * _isHoldingDomainLock
We have to remember that we are holding the domain lock before we lock a weak pointer to an AsyncNDRe...
virtual void asyncVariableMapChanged(TransferElementID)
This virtual function lets derived classes react on subscribe / unsubscribe.
VersionNumber distribute(SourceType, VersionNumber version)
Distribute the given data to the subsribers.
virtual bool prepareIntermediateBuffers()
Implement this function in case there is a step between setting the source buffer and filling the use...
Exception thrown when a logic error has occured.
Definition Exception.h:51
STL namespace.
Data type to create individual buffers.
Helper class to have a complete descriton to create an Accessor.
virtual unsigned int getNumberOfSamples()=0
virtual const std::string & getUnit()=0
virtual void sendException(std::exception_ptr e)=0
Send an exception to all subscribers.
virtual const std::string & getDescription()=0
virtual void send()=0
Send the value from the _sendBuffer of the AsyncVariableImpl.
virtual unsigned int getNumberOfChannels()=0
Helper functions for the creation of an AsyncNDRegisterAccessor.
virtual void fillSendBuffer()=0
Fill the send buffer with data and version number.
virtual ~AsyncVariable()=default
AsyncVariableImpl contains a weak pointer to an AsyncNDRegisterAccessor<UserType> and a send buffer N...
void sendException(std::exception_ptr e) final
Send an exception to all subscribers.
unsigned int getNumberOfChannels() override
Helper functions for the creation of an AsyncNDRegisterAccessor.
void send() final
Send the value from the _sendBuffer of the AsyncVariableImpl.
NDRegisterAccessor< UserType >::Buffer _sendBuffer
AsyncVariableImpl(size_t nChannels, size_t nElements)