ChimeraTK-ApplicationCore 04.06.00
Loading...
Searching...
No Matches
FeedingFanOut.h
Go to the documentation of this file.
1// SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
2// SPDX-License-Identifier: LGPL-3.0-or-later
3#pragma once
4
5#include "FanOut.h"
6
7namespace ChimeraTK {
8
9 /********************************************************************************************************************/
10
15 template<typename UserType>
16 class FeedingFanOut : public FanOut<UserType>, public ChimeraTK::NDRegisterAccessor<UserType> {
17 public:
18 FeedingFanOut(std::string const& name, std::string const& unit, std::string const& description,
19 size_t numberOfElements, bool withReturn,
20 ConsumerImplementationPairs<UserType> const& consumerImplementationPairs);
21
22 [[nodiscard]] bool isReadable() const override { return _withReturn; }
23
24 [[nodiscard]] bool isReadOnly() const override { return false; }
25
26 [[nodiscard]] bool isWriteable() const override { return true; }
27
28 void doReadTransferSynchronously() override;
29
30 void doPreRead(TransferType type) override;
31
32 void doPostRead(TransferType type, bool hasNewData) override;
33
34 void doPreWrite(TransferType, VersionNumber) override;
35
36 bool doWriteTransfer(ChimeraTK::VersionNumber versionNumber) override;
37
38 // FIXME: https://redmine.msktools.desy.de/issues/12242
39 // NOLINTNEXTLINE(google-default-arguments)
40 bool doWriteTransferDestructively(ChimeraTK::VersionNumber versionNumber = {}) override;
41
42 void doPostWrite(TransferType, VersionNumber) override;
43
44 [[nodiscard]] bool mayReplaceOther(const boost::shared_ptr<const ChimeraTK::TransferElement>&) const override;
45
46 std::list<boost::shared_ptr<ChimeraTK::TransferElement>> getInternalElements() override;
47
48 std::vector<boost::shared_ptr<ChimeraTK::TransferElement>> getHardwareAccessingElements() override;
49
50 void replaceTransferElement(boost::shared_ptr<ChimeraTK::TransferElement>) override;
51
52 void interrupt() override;
53
54 protected:
56 void addSlave(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> slave, VariableNetworkNode&) override;
57
59 void finalise();
60
63
65 bool _finalised{false};
66
68 std::vector<boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>> _returnSlaves;
69
71 size_t _idxLastUpdate{std::numeric_limits<size_t>::max()};
72 };
73
74 /********************************************************************************************************************/
75 /********************************************************************************************************************/
76
77 template<typename UserType>
78 FeedingFanOut<UserType>::FeedingFanOut(std::string const& name, std::string const& unit,
79 std::string const& description, size_t numberOfElements, bool withReturn,
80 ConsumerImplementationPairs<UserType> const& consumerImplementationPairs)
81 : FanOut<UserType>(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>()),
82 // We pass default-constructed, empty AccessModeFlags, they may later be determined from _returnSlave
83 ChimeraTK::NDRegisterAccessor<UserType>("FeedingFanOut:" + name, AccessModeFlags{}, unit, description),
84 _withReturn(withReturn) {
85 ChimeraTK::NDRegisterAccessor<UserType>::buffer_2D.resize(1);
86 ChimeraTK::NDRegisterAccessor<UserType>::buffer_2D[0].resize(numberOfElements);
87
88 if(_withReturn) {
89 this->_accessModeFlags = {AccessMode::wait_for_new_data};
90 }
91
92 // Add the consuming accessors
93 // TODO FanOut constructors and addSlave should get refactoring
94 for(auto el : consumerImplementationPairs) {
95 FeedingFanOut<UserType>::addSlave(el.first, el.second);
96 }
97
98 finalise();
99 }
100
101 /********************************************************************************************************************/
102
103 template<typename UserType>
105 boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> slave, VariableNetworkNode& node) {
106 assert(!_finalised);
107 // check if array shape is compatible, unless the receiver is a trigger
108 // node, so no data is expected
109 if(slave->getNumberOfSamples() != 0 &&
110 (slave->getNumberOfChannels() != 1 || slave->getNumberOfSamples() != this->getNumberOfSamples())) {
111 std::string what = "FeedingFanOut::addSlave(): Trying to add a slave '" + slave->getName();
112 what += "' with incompatible array shape! Name of fan out: '" + this->getName() + "'";
113 throw ChimeraTK::logic_error(what);
114 }
115
116 // make sure slave is writeable
117 if(!slave->isWriteable()) {
118 throw ChimeraTK::logic_error("FeedingFanOut::addSlave() has been called "
119 "with a receiving implementation!");
120 }
121
122 // handle return channels
123 if(_withReturn) {
124 if(node.getDirection().withReturn) {
125 // These assumptions should be guaranteed by the connection making code which created the PV
126 assert(slave->isReadable());
127 assert(slave->getAccessModeFlags().has(AccessMode::wait_for_new_data));
128 _returnSlaves.push_back(slave);
129 }
130 }
131
132 // add the slave
133 FanOut<UserType>::_slaves.push_back(slave);
134 }
135
136 /********************************************************************************************************************/
137
138 template<typename UserType>
140 // create read queue as when-any continuation from all return slave read queues
141 std::vector<cppext::future_queue<void>> queueList;
142 for(auto& slave : _returnSlaves) {
143 queueList.push_back(slave->getReadQueue());
144 }
145
146 auto notificationQueue = cppext::when_any(queueList.begin(), queueList.end());
147 this->_readQueue = notificationQueue.then<void>(
148 [this](size_t idx) {
149 _idxLastUpdate = idx;
150 try {
151 _returnSlaves[idx]->getReadQueue().pop_wait();
152 }
153 catch(detail::DiscardValueException&) {
154 // This value should never be actually exposed anywhere since the read transfer will be retried,
155 // but we set it anyway to make sure the logic is correct (would trigger asserts if not).
156 _idxLastUpdate = std::numeric_limits<size_t>::max() - 1;
157 throw;
158 }
159 },
160 std::launch::deferred);
161
162 _finalised = true;
163 }
164
165 /********************************************************************************************************************/
166
167 template<typename UserType>
171
172 /********************************************************************************************************************/
173
174 template<typename UserType>
175 void FeedingFanOut<UserType>::doPreRead(TransferType type) {
176 if(!_withReturn) {
177 throw ChimeraTK::logic_error("Read operation called on write-only variable.");
178 }
179 if(this->_disabled) {
180 return;
181 }
182
183 assert(_idxLastUpdate != std::numeric_limits<size_t>::max() - 1);
184 if(_idxLastUpdate == std::numeric_limits<size_t>::max()) {
185 for(auto& slave : _returnSlaves) {
186 slave->preRead(TransferType::read);
187 }
188 }
189 else {
190 _returnSlaves[_idxLastUpdate]->preRead(type);
191 }
192 }
193
194 /********************************************************************************************************************/
195
196 template<typename UserType>
197 void FeedingFanOut<UserType>::doPostRead(TransferType type, bool hasNewData) {
198 assert(_withReturn);
199 if(this->_disabled) {
200 return;
201 }
202
203 if(!hasNewData && type != TransferType::read) {
204 // No post read handling for readNonBlocking and readLatest if there was no new data, since there was actually no
205 // corresponding read operation on any of the underlying accessors (just checking the notification queue).
206 return;
207 }
208
209 assert(_idxLastUpdate < std::numeric_limits<size_t>::max() - 1);
210
211 auto _ = cppext::finally([&] {
212 if(!hasNewData || TransferElement::_activeException) {
213 return;
214 }
215 _returnSlaves[_idxLastUpdate]->accessChannel(0).swap(ChimeraTK::NDRegisterAccessor<UserType>::buffer_2D[0]);
216 // distribute return-channel update to the other slaves
217
218 for(auto& slave : FanOut<UserType>::_slaves) { // send out copies to slaves
219 if(slave == _returnSlaves[_idxLastUpdate]) {
220 continue;
221 }
222 if(slave->getNumberOfSamples() != 0) { // do not send copy if no data is expected (e.g. trigger)
223 slave->accessChannel(0) = ChimeraTK::NDRegisterAccessor<UserType>::buffer_2D[0];
224 }
225 slave->writeDestructively(this->_versionNumber);
226 }
227 });
228
229 _returnSlaves[_idxLastUpdate]->postRead(type, hasNewData);
230
231 this->_versionNumber = _returnSlaves[_idxLastUpdate]->getVersionNumber();
232 this->_dataValidity = _returnSlaves[_idxLastUpdate]->dataValidity();
233 }
234
235 /********************************************************************************************************************/
236
237 template<typename UserType>
238 void FeedingFanOut<UserType>::doPreWrite(TransferType, VersionNumber) {
239 if(this->_disabled) {
240 return;
241 }
242 for(auto& slave : FanOut<UserType>::_slaves) { // send out copies to slaves
243 if(slave->getNumberOfSamples() != 0) { // do not send copy if no data is expected (e.g. trigger)
244 if(slave == FanOut<UserType>::_slaves.front()) { // in case of first slave, swap instead of copy
245 slave->accessChannel(0).swap(ChimeraTK::NDRegisterAccessor<UserType>::buffer_2D[0]);
246 }
247 else { // not the first slave: copy the data from the first slave
248 slave->accessChannel(0) = FanOut<UserType>::_slaves.front()->accessChannel(0);
249 }
250 }
251 slave->setDataValidity(this->dataValidity());
252 }
253
254 // Don't call pre-write on the slaves. Each slave has to do it's own exception handling, so we call the whole
255 // operation in doWriteTansfer(). To fulfill the TransferElement specification we would have to check the
256 // pre-conditions here so no logic error is thrown in the transfer phase (logic_errors are predictable and can
257 // always pre prevented. They should be thrown here already).
258 // FIXME: At the moment we can be lazy about it. logic_errors are not treated in ApplicationCore and the only
259 // effect is that the logic_error would be delayed after postRead() and terminate the application there, and not
260 // after the transfer. Advantage about being lazy: It safes a few virtual function calls.
261 }
262
263 /********************************************************************************************************************/
264
265 template<typename UserType>
266 bool FeedingFanOut<UserType>::doWriteTransfer(ChimeraTK::VersionNumber versionNumber) {
267 if(this->_disabled) {
268 return false;
269 }
270 bool dataLost = false;
271 bool isFirst = true;
272 for(auto& slave : FanOut<UserType>::_slaves) {
273 bool ret;
274 if(isFirst) {
275 isFirst = false;
276 ret = slave->write(versionNumber);
277 }
278 else {
279 ret = slave->writeDestructively(versionNumber);
280 }
281 if(ret) {
282 dataLost = true;
283 }
284 }
285
286 return dataLost;
287 }
288
289 /********************************************************************************************************************/
290
291 template<typename UserType>
292 // FIXME: https://redmine.msktools.desy.de/issues/12242
293 // NOLINTNEXTLINE(google-default-arguments)
294 bool FeedingFanOut<UserType>::doWriteTransferDestructively(ChimeraTK::VersionNumber versionNumber) {
295 if(this->_disabled) {
296 return false;
297 }
298 bool dataLost = false;
299 for(auto& slave : FanOut<UserType>::_slaves) {
300 bool ret = slave->writeDestructively(versionNumber);
301 if(ret) {
302 dataLost = true;
303 }
304 }
305 return dataLost;
306 }
307
308 /********************************************************************************************************************/
309
310 template<typename UserType>
311 void FeedingFanOut<UserType>::doPostWrite(TransferType, VersionNumber) {
312 if(this->_disabled) {
313 return;
314 }
315 // the postWrite() on the slaves has already been called
316 FanOut<UserType>::_slaves.front()->accessChannel(0).swap(ChimeraTK::NDRegisterAccessor<UserType>::buffer_2D[0]);
317 }
318
319 /********************************************************************************************************************/
320
321 template<typename UserType>
322 bool FeedingFanOut<UserType>::mayReplaceOther(const boost::shared_ptr<const ChimeraTK::TransferElement>&) const {
323 return false;
324 }
325
326 /********************************************************************************************************************/
327
328 template<typename UserType>
329 std::list<boost::shared_ptr<ChimeraTK::TransferElement>> FeedingFanOut<UserType>::getInternalElements() {
330 return {};
331 }
332
333 /********************************************************************************************************************/
334
335 template<typename UserType>
336 std::vector<boost::shared_ptr<ChimeraTK::TransferElement>> FeedingFanOut<UserType>::getHardwareAccessingElements() {
337 return {boost::enable_shared_from_this<ChimeraTK::TransferElement>::shared_from_this()};
339 }
340
341 /********************************************************************************************************************/
342
343 template<typename UserType>
344 void FeedingFanOut<UserType>::replaceTransferElement(boost::shared_ptr<ChimeraTK::TransferElement>) {
345 // You can't replace anything here. Just do nothing.
347 }
348
349 /********************************************************************************************************************/
350
351 template<typename UserType>
353 // call the interrut sequences of the fan out (interrupts for fan input and all outputs), and the ndRegisterAccessor
355 for(auto returnSlave : _returnSlaves) {
356 returnSlave->interrupt();
357 }
358 }
359
360 /********************************************************************************************************************/
361
362} /* namespace ChimeraTK */
Base class for several implementations which distribute values from one feeder to multiple consumers.
Definition FanOut.h:42
virtual void interrupt()
Definition FanOut.h:114
NDRegisterAccessor implementation which distributes values written to this accessor out to any number...
void replaceTransferElement(boost::shared_ptr< ChimeraTK::TransferElement >) override
bool mayReplaceOther(const boost::shared_ptr< const ChimeraTK::TransferElement > &) const override
bool _finalised
Flag whether finalise() has been called.
std::vector< boost::shared_ptr< ChimeraTK::TransferElement > > getHardwareAccessingElements() override
std::list< boost::shared_ptr< ChimeraTK::TransferElement > > getInternalElements() override
FeedingFanOut(std::string const &name, std::string const &unit, std::string const &description, size_t numberOfElements, bool withReturn, ConsumerImplementationPairs< UserType > const &consumerImplementationPairs)
std::vector< boost::shared_ptr< ChimeraTK::NDRegisterAccessor< UserType > > > _returnSlaves
list of return slaves, if any
bool isWriteable() const override
bool doWriteTransferDestructively(ChimeraTK::VersionNumber versionNumber={}) override
bool isReadable() const override
size_t _idxLastUpdate
index to _returnSlaves for the last update
void doPostRead(TransferType type, bool hasNewData) override
void finalise()
Finalise the return channel.
void addSlave(boost::shared_ptr< ChimeraTK::NDRegisterAccessor< UserType > > slave, VariableNetworkNode &) override
Add a slave to the FanOut.
void doPreRead(TransferType type) override
bool isReadOnly() const override
void doPreWrite(TransferType, VersionNumber) override
bool doWriteTransfer(ChimeraTK::VersionNumber versionNumber) override
void doPostWrite(TransferType, VersionNumber) override
void doReadTransferSynchronously() override
void interrupt() override
bool _withReturn
Flag whether this FeedingFanOut has a return channel. Is specified in the constructor.
Class describing a node of a variable network.
VariableDirection getDirection() const
InvalidityTracer application module.
std::list< std::pair< boost::shared_ptr< ChimeraTK::NDRegisterAccessor< UserType > >, VariableNetworkNode > > ConsumerImplementationPairs
Definition FanOut.h:18
bool withReturn
Presence of return channel.
Definition Flags.h:21