ChimeraTK-DeviceAccess 03.25.00
Loading...
Searching...
No Matches
AsyncNDRegisterAccessor.cc
Go to the documentation of this file.
1// SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
2// SPDX-License-Identifier: LGPL-3.0-or-later
3
5
7
8namespace ChimeraTK::async {
9
10 template<typename UserType>
12 _accessorManager->unsubscribe(this->getId());
13 }
14
15 template<typename UserType>
16 AsyncNDRegisterAccessor<UserType>::AsyncNDRegisterAccessor(boost::shared_ptr<DeviceBackend> backend,
17 boost::shared_ptr<AsyncAccessorManager> manager, boost::shared_ptr<Domain> asyncDomain, std::string const& name,
18 size_t nChannels, size_t nElements, AccessModeFlags accessModeFlags, std::string const& unit,
19 std::string const& description)
20
21 : NDRegisterAccessor<UserType>(name, accessModeFlags, unit, description), _backend(std::move(backend)),
22 _accessorManager(std::move(manager)), _asyncDomain(std::move(asyncDomain)), _receiveBuffer(nChannels, nElements) {
23 // Don't throw a ChimeraTK::logic_error here. They are for mistakes an application is doing when using DeviceAccess.
24 // If an AsyncNDRegisterAccessor is created without wait_for_new_data it is a mistake in the backend, which is not
25 // part of the application.
26 assert(accessModeFlags.has(AccessMode::wait_for_new_data));
27 buffer_2D.resize(nChannels);
28 for(auto& chan : buffer_2D) chan.resize(nElements);
29
30 // The sequence to initialise the queue:
31 // * write once and then read,
32 // * repeat n+1 times to make sure all buffers inside the queue have been replaced with
33 // a properly sized buffer, so it can be swapped out and used for data
34 for(size_t i = 0; i < _queueSize + 1; ++i) {
35 Buffer b1(nChannels, nElements);
36 _dataTransportQueue.push(std::move(b1));
37 Buffer b2(nChannels, nElements);
38 _dataTransportQueue.pop(b2); // here b2 is swapped into the queue and transported "backwards"
39 }
40
41 this->_readQueue = _dataTransportQueue.template then<void>(
42 [&](Buffer& buf) { std::swap(_receiveBuffer, buf); }, std::launch::deferred);
43 }
44
45 /********************************************************************************************************************/
46 template<typename UserType>
47 void AsyncNDRegisterAccessor<UserType>::doPostRead([[maybe_unused]] TransferType type, bool updateDataBuffer) {
48 if(updateDataBuffer) {
49 // do not update meta data if updateDataBuffer == false, since this is the equivalent to a backend
50 // implementation, not a decorator
51 this->_versionNumber = _receiveBuffer.versionNumber;
52 this->_dataValidity = _receiveBuffer.dataValidity;
53 // Do not overwrite the vectors in the first layer of the 2D array. Accessing code might have stored them.
54 // Instead, swap the received data into the channel vectors.
55 auto source = _receiveBuffer.value.begin(); // the received data is the source as it is moved into the user buffer
56 auto destination = this->buffer_2D.begin();
57 for(; source != _receiveBuffer.value.end(); ++source, ++destination) {
58 destination->swap(*source);
59 }
60 }
61 }
62
63 /********************************************************************************************************************/
64 template<typename UserType>
66 if(_asyncDomain->unsafeGetIsActive()) {
67 _dataTransportQueue.push_overwrite(std::move(data));
68 }
69 }
70
72} // namespace ChimeraTK::async
#define INSTANTIATE_TEMPLATE_FOR_CHIMERATK_USER_TYPES(TemplateClass)
Set of AccessMode flags with additional functionality for an easier handling.
Definition AccessMode.h:48
bool has(AccessMode flag) const
Check if a certain flag is in the set.
Definition AccessMode.cc:20
N-dimensional register accessor.
std::vector< std::vector< UserType > > buffer_2D
Buffer of converted data elements.
cppext::future_queue< void > _readQueue
The queue for asynchronous read transfers.
The AsyncNDRegisterAccessor implements a data transport queue with typed data as continuation of the ...
void doPostRead(TransferType type, bool updateDataBuffer) override
Backend specific implementation of postRead().
void sendDestructively(typename NDRegisterAccessor< UserType >::Buffer &data)
You can only send destructively.
cppext::future_queue< Buffer, cppext::SWAP_DATA > _dataTransportQueue
AsyncNDRegisterAccessor(boost::shared_ptr< DeviceBackend > backend, boost::shared_ptr< AsyncAccessorManager > manager, boost::shared_ptr< Domain > asyncDomain, std::string const &name, size_t nChannels, size_t nElements, AccessModeFlags accessModeFlags, std::string const &unit=std::string(TransferElement::unitNotSet), std::string const &description=std::string())
In addition to the arguments of the NDRegisterAccessor constructor, you need an AsyncAccessorManager ...
@ wait_for_new_data
Make any read blocking until new data has arrived since the last read.
TransferType
Used to indicate the applicable operation on a Transferelement.
STL namespace.
Data type to create individual buffers.