ChimeraTK-ApplicationCore 04.06.00
Loading...
Searching...
No Matches
ThreadedFanOut.h
Go to the documentation of this file.
1// SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
2// SPDX-License-Identifier: LGPL-3.0-or-later
3#pragma once
4
5#include "Application.h"
6#include "FanOut.h"
7#include "InternalModule.h"
9
10#include <ChimeraTK/NDRegisterAccessor.h>
11#include <ChimeraTK/ReadAnyGroup.h>
12#include <ChimeraTK/SupportedUserTypes.h>
13#include <ChimeraTK/SystemTags.h>
14
15#include <boost/smart_ptr/shared_ptr.hpp>
16
17#include <string>
18
19namespace ChimeraTK {
20
21 /********************************************************************************************************************/
22
26 template<typename UserType>
27 class ThreadedFanOut : public FanOut<UserType>, public InternalModule {
28 public:
29 ThreadedFanOut(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl,
30 ConsumerImplementationPairs<UserType> const& consumerImplementationPairs);
31
32 ~ThreadedFanOut() override;
33
34 void activate() override;
35
36 void deactivate() override;
37
40 virtual void run();
41
42 VersionNumber readInitialValues(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> accessor);
43
44 protected:
46 boost::thread _thread;
47
49 // VariableNetwork& _network;
50 };
51
52 /********************************************************************************************************************/
53
55 template<typename UserType>
56 class ThreadedFanOutWithReturn : public ThreadedFanOut<UserType> {
57 public:
58 ThreadedFanOutWithReturn(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl,
59 ConsumerImplementationPairs<UserType> const& consumerImplementationPairs);
60
61 void addSlave(
62 boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> slave, VariableNetworkNode& consumer) override;
63
64 void run() override;
65
66 protected:
68 boost::thread _thread;
69
70 boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> _initialValueProvider;
71 std::vector<boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>> _inputChannels;
72
73 // using ThreadedFanOut<UserType>::_network;
76 };
77
78 /********************************************************************************************************************/
79 /********************************************************************************************************************/
80
81 template<typename UserType>
82 ThreadedFanOut<UserType>::ThreadedFanOut(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl,
83 ConsumerImplementationPairs<UserType> const& consumerImplementationPairs)
84 : FanOut<UserType>(feedingImpl) /*, _network(network)*/ {
85 assert(feedingImpl->getAccessModeFlags().has(AccessMode::wait_for_new_data));
86 for(auto el : consumerImplementationPairs) {
87 FanOut<UserType>::addSlave(el.first, el.second);
88 }
89 }
90
91 /********************************************************************************************************************/
92
93 template<typename UserType>
95 try {
96 deactivate();
97 }
98 catch(ChimeraTK::logic_error& e) {
99 std::terminate();
100 }
101 }
102
103 /********************************************************************************************************************/
104
105 template<typename UserType>
107 if(this->_disabled) {
108 return;
109 }
110 assert(!_thread.joinable());
111 _thread = boost::thread([this] { this->run(); });
112 }
113
114 /********************************************************************************************************************/
115
116 template<typename UserType>
118 try {
119 if(_thread.joinable()) {
120 _thread.interrupt();
122 _thread.join();
123 }
124 assert(!_thread.joinable());
125 }
126 catch(boost::thread_resource_error& e) {
127 assert(false);
128 }
129 }
130
131 /********************************************************************************************************************/
132
133 template<typename UserType>
136 Application::getInstance().getTestableMode().lock("start", true);
137 _testableModeReached = true;
138
139 ChimeraTK::VersionNumber version{nullptr};
140 version = readInitialValues(FanOut<UserType>::_impl);
141 while(true) {
142 // send out copies to slaves
143 boost::this_thread::interruption_point();
144 auto validity = FanOut<UserType>::_impl->dataValidity();
145 for(auto& slave : FanOut<UserType>::_slaves) {
146 // do not send copy if no data is expected (e.g. trigger)
147 if(slave->getNumberOfSamples() != 0) {
148 slave->accessChannel(0) = FanOut<UserType>::_impl->accessChannel(0);
149 }
150 slave->setDataValidity(validity);
151 bool dataLoss = slave->writeDestructively(version);
152 if(dataLoss) {
154 }
155 }
156 // receive data
157 boost::this_thread::interruption_point();
159 version = FanOut<UserType>::_impl->getVersionNumber();
160 }
161 }
162
163 /********************************************************************************************************************/
164
165 template<typename UserType>
167 boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> accessor) {
168 Application::getInstance().getTestableMode().unlock("readInitialValues");
169 accessor->read();
170 if(!Application::getInstance().getTestableMode().testLock()) {
171 Application::getInstance().getTestableMode().lock("readInitialValues", true);
172 }
173 return accessor->getVersionNumber();
174 }
175
176 /********************************************************************************************************************/
177 /********************************************************************************************************************/
178
179 template<typename UserType>
181 boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl,
182 ConsumerImplementationPairs<UserType> const& consumerImplementationPairs)
183 : ThreadedFanOut<UserType>(feedingImpl, consumerImplementationPairs) {
184 _inputChannels.push_back(feedingImpl);
185 // By default, we take the initial value from the feeder
186 _initialValueProvider = feedingImpl;
187 for(auto el : consumerImplementationPairs) {
189 }
190 }
191
192 /********************************************************************************************************************/
193
194 template<typename UserType>
196 boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> slave, VariableNetworkNode& consumer) {
197 // TODO Adding slaves is currently by done by the ThreadedFanOut base class constructor.
198 // Refactor constructors and addSlaves for all FanOuts?
199 // FanOut<UserType>::addSlave(slave, consumer);
200
201 if(consumer.getTags().contains(ChimeraTK::SystemTags::reverseRecovery)) {
202 _initialValueProvider = slave;
203 // FIXME: Do we need to check here that there is only one reverse recovery accessor
204 }
205
206 if(consumer.getDirection().withReturn) {
207 _inputChannels.push_back(slave);
208 }
209 }
210
211 /********************************************************************************************************************/
212
213 template<typename UserType>
216 Application::getInstance().getTestableMode().lock("start", true);
217 _testableModeReached = true;
218
219 std::map<TransferElementID, boost::shared_ptr<NDRegisterAccessor<UserType>>> accessors;
220 for(auto& acc : FanOut<UserType>::_slaves) {
221 accessors[acc->getId()] = acc;
222 }
224
225 TransferElementID changedVariable = _initialValueProvider->getId();
226
227 auto version = readInitialValues(_initialValueProvider);
228
229 ReadAnyGroup group(_inputChannels.begin(), _inputChannels.end());
230
231 while(true) {
232 // send out copies to all receivers (slaves and return channel of feeding node)
233 for(auto& [id, accessor] : accessors) {
234 // do not feed back value to the accessor we got it from
235 if(id == changedVariable) {
236 continue;
237 }
238
239 // do not send copy if no data is expected (e.g. trigger)
240 if(accessor->getNumberOfSamples() != 0) {
241 accessor->accessChannel(0) = accessors[changedVariable]->accessChannel(0);
242 }
243
244 bool dataLoss = accessor->writeDestructively(version);
245
246 if(dataLoss) {
247 Application::incrementDataLossCounter(accessor->getName());
248 }
249 }
250
251 // receive data
252 boost::this_thread::interruption_point();
253 changedVariable = group.readAny();
254 boost::this_thread::interruption_point();
255
256 version = accessors[changedVariable]->getVersionNumber();
257 }
258 }
259
260 /********************************************************************************************************************/
261
262} /* namespace ChimeraTK */
VersionNumber version
detail::TestableMode & getTestableMode()
Get the TestableMode control object of this application.
static void registerThread(const std::string &name)
Register the thread in the application system and give it a name.
static Application & getInstance()
Obtain instance of the application.
static void incrementDataLossCounter(const std::string &name)
Increment counter for how many write() operations have overwritten unread data.
std::atomic< bool > _testableModeReached
Flag used by the testable mode to identify whether a thread within the EntityOwner has reached the po...
Base class for several implementations which distribute values from one feeder to multiple consumers.
Definition FanOut.h:42
virtual void interrupt()
Definition FanOut.h:114
virtual void addSlave(boost::shared_ptr< ChimeraTK::NDRegisterAccessor< UserType > > slave, VariableNetworkNode &)
Add a slave to the FanOut.
Definition FanOut.h:68
Base class for internal modules which are created by the variable connection code (e....
FanOut implementation with an internal thread which waits for new data which is read from the given f...
VersionNumber readInitialValues(boost::shared_ptr< ChimeraTK::NDRegisterAccessor< UserType > > accessor)
void activate() override
Activate synchronisation thread if needed.
void deactivate() override
Deactivate synchronisation thread if running.
boost::thread _thread
Thread handling the synchronisation, if needed.
ThreadedFanOut(boost::shared_ptr< ChimeraTK::NDRegisterAccessor< UserType > > feedingImpl, ConsumerImplementationPairs< UserType > const &consumerImplementationPairs)
virtual void run()
Synchronise feeder and the consumers.
Same as ThreadedFanOut but with return channel.
boost::shared_ptr< ChimeraTK::NDRegisterAccessor< UserType > > _initialValueProvider
ThreadedFanOutWithReturn(boost::shared_ptr< ChimeraTK::NDRegisterAccessor< UserType > > feedingImpl, ConsumerImplementationPairs< UserType > const &consumerImplementationPairs)
std::vector< boost::shared_ptr< ChimeraTK::NDRegisterAccessor< UserType > > > _inputChannels
void addSlave(boost::shared_ptr< ChimeraTK::NDRegisterAccessor< UserType > > slave, VariableNetworkNode &consumer) override
Add a slave to the FanOut.
void run() override
Synchronise feeder and the consumers.
boost::thread _thread
Thread handling the synchronisation, if needed.
Class describing a node of a variable network.
const std::unordered_set< std::string > & getTags() const
VariableDirection getDirection() const
InvalidityTracer application module.
@ run
Actual run phase with full multi threading.
std::list< std::pair< boost::shared_ptr< ChimeraTK::NDRegisterAccessor< UserType > >, VariableNetworkNode > > ConsumerImplementationPairs
Definition FanOut.h:18
bool withReturn
Presence of return channel.
Definition Flags.h:21