ChimeraTK-DeviceAccess 03.25.00
Loading...
Searching...
No Matches
DomainsContainer.h
Go to the documentation of this file.
1// SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
2// SPDX-License-Identifier: LGPL-3.0-or-later
3#pragma once
4
5#include "DomainImpl.h"
6
7#include <ChimeraTK/cppext/future_queue.hpp>
8
9#include <iostream>
10#include <map>
11#include <sstream>
12#include <thread>
13
14namespace ChimeraTK::async {
15
35 public:
37
42 void sendExceptions(const std::string& exceptionMessage);
43
46
83 template<typename BackendType, typename BackendSpecificDataType, typename UserDataType>
84 boost::shared_ptr<AsyncNDRegisterAccessor<UserDataType>> subscribe(boost::shared_ptr<BackendType> backend,
85 size_t domainId, bool activate, RegisterPath name, size_t numberOfWords, size_t wordOffsetInRegister,
86 AccessModeFlags flags);
87
92 boost::shared_ptr<Domain> getDomain(size_t key);
93
100 void forEach(const std::function<void(size_t, boost::shared_ptr<Domain>&)>& executeMe);
101
102 protected:
103 std::atomic_bool _isSendingExceptions{false};
104
107
108 cppext::future_queue<std::string> _startExceptionDistribution{2};
111 std::atomic_bool _threadIsRunning{false}; // Cache whether the thread is running so we don't have to lock the mutex
112 // each time to be able to ask the thread itself.
113 class StopThread : public std::exception {};
114
115 std::mutex _domainsMutex;
116 std::map<size_t, boost::weak_ptr<Domain>> _domains;
117 };
118
119 /********************************************************************************************************************/
120
121 template<typename BackendType, typename BackendSpecificDataType, typename UserDataType>
122 boost::shared_ptr<AsyncNDRegisterAccessor<UserDataType>> DomainsContainer::subscribe(
123 boost::shared_ptr<BackendType> backend, size_t domainId, bool activate, RegisterPath name, size_t numberOfWords,
124 size_t wordOffsetInRegister, AccessModeFlags flags) {
125 std::lock_guard<std::mutex> domainsLock(_domainsMutex);
126
127 auto domain = _domains[domainId].lock();
128 boost::shared_ptr<DomainImpl<BackendSpecificDataType>> domainImpl;
129
130 bool domainCreated{false};
131 if(domain) {
132 domainImpl = boost::dynamic_pointer_cast<DomainImpl<BackendSpecificDataType>>(domain);
133 assert(domainImpl);
134 }
135 else {
136 // The domain does not exist, create it
137 domainCreated = true;
138 domainImpl = boost::make_shared<DomainImpl<BackendSpecificDataType>>(backend, domainId);
139 // start the thread if not already running
140 { // thread lock scope
141 auto threadCreationLock = std::lock_guard(_threadCreationMutex);
142
143 if(!_distributorThread.joinable()) {
144 _distributorThread = std::thread([&] { distributeExceptions(); });
145 _threadIsRunning = true;
146 }
147 } // end of thread lock scope
148 _domains[domainId] = domainImpl;
149 }
150
151 auto newSubscriber = domainImpl->template subscribe<UserDataType>(name, numberOfWords, wordOffsetInRegister, flags);
152
153 // Only activate a newly created domain after the subscription is done. This allows to distribute the intial value
154 // from here, and not from inside the subscription.
155 if(domainCreated && activate) {
156 auto subscriptionDone = backend->activateSubscription(domainId, domainImpl);
157 // wait until the backend reports that the subscription is ready before proceeding with the creation of the
158 // accessor, which will poll the initial value.
159 subscriptionDone.wait();
160 auto [value, version] = backend->template getAsyncDomainInitialValue<BackendSpecificDataType>(domainId);
161 domainImpl->activate(value, version);
162 }
163
164 return newSubscriber;
165 }
166
167} // namespace ChimeraTK::async
Set of AccessMode flags with additional functionality for an easier handling.
Definition AccessMode.h:48
Class to store a register path name.
The DomainsContainer has a container with Domains and is performing actions on all of them.
std::map< size_t, boost::weak_ptr< Domain > > _domains
boost::shared_ptr< AsyncNDRegisterAccessor< UserDataType > > subscribe(boost::shared_ptr< BackendType > backend, size_t domainId, bool activate, RegisterPath name, size_t numberOfWords, size_t wordOffsetInRegister, AccessModeFlags flags)
Get an accessor from a particular domain.
cppext::future_queue< std::string > _startExceptionDistribution
void sendExceptions(const std::string &exceptionMessage)
Request the sending of exceptions.
void forEach(const std::function< void(size_t, boost::shared_ptr< Domain > &)> &executeMe)
Iterate all Domains under the container lock.
boost::shared_ptr< Domain > getDomain(size_t key)
Return the shared pointer to the Domain for a key.
void distributeExceptions()
Endless loop executed in the thread.
bool isSendingExceptions()
Check whether an exception distribution is started and not completed yet.