15 template<
typename UserType>
18 FeedingFanOut(std::string
const& name, std::string
const& unit, std::string
const& description,
19 size_t numberOfElements,
bool withReturn,
24 [[nodiscard]]
bool isReadOnly()
const override {
return false; }
26 [[nodiscard]]
bool isWriteable()
const override {
return true; }
30 void doPreRead(TransferType type)
override;
32 void doPostRead(TransferType type,
bool hasNewData)
override;
34 void doPreWrite(TransferType, VersionNumber)
override;
42 void doPostWrite(TransferType, VersionNumber)
override;
44 [[nodiscard]]
bool mayReplaceOther(
const boost::shared_ptr<const ChimeraTK::TransferElement>&)
const override;
56 void addSlave(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> slave, VariableNetworkNode&)
override;
68 std::vector<boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>>
_returnSlaves;
77 template<
typename UserType>
79 std::string
const& description,
size_t numberOfElements,
bool withReturn,
81 :
FanOut<UserType>(boost::shared_ptr<
ChimeraTK::NDRegisterAccessor<UserType>>()),
83 ChimeraTK::NDRegisterAccessor<UserType>(
"FeedingFanOut:" + name, AccessModeFlags{}, unit, description),
84 _withReturn(withReturn) {
85 ChimeraTK::NDRegisterAccessor<UserType>::buffer_2D.resize(1);
86 ChimeraTK::NDRegisterAccessor<UserType>::buffer_2D[0].resize(numberOfElements);
89 this->_accessModeFlags = {AccessMode::wait_for_new_data};
94 for(
auto el : consumerImplementationPairs) {
103 template<
typename UserType>
105 boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> slave,
VariableNetworkNode& node) {
109 if(slave->getNumberOfSamples() != 0 &&
110 (slave->getNumberOfChannels() != 1 || slave->getNumberOfSamples() != this->getNumberOfSamples())) {
111 std::string what =
"FeedingFanOut::addSlave(): Trying to add a slave '" + slave->getName();
112 what +=
"' with incompatible array shape! Name of fan out: '" + this->getName() +
"'";
113 throw ChimeraTK::logic_error(what);
117 if(!slave->isWriteable()) {
118 throw ChimeraTK::logic_error(
"FeedingFanOut::addSlave() has been called "
119 "with a receiving implementation!");
126 assert(slave->isReadable());
127 assert(slave->getAccessModeFlags().has(AccessMode::wait_for_new_data));
128 _returnSlaves.push_back(slave);
138 template<
typename UserType>
141 std::vector<cppext::future_queue<void>> queueList;
142 for(
auto& slave : _returnSlaves) {
143 queueList.push_back(slave->getReadQueue());
146 auto notificationQueue = cppext::when_any(queueList.begin(), queueList.end());
147 this->_readQueue = notificationQueue.then<
void>(
149 _idxLastUpdate = idx;
151 _returnSlaves[idx]->getReadQueue().pop_wait();
153 catch(detail::DiscardValueException&) {
156 _idxLastUpdate = std::numeric_limits<size_t>::max() - 1;
160 std::launch::deferred);
167 template<
typename UserType>
174 template<
typename UserType>
177 throw ChimeraTK::logic_error(
"Read operation called on write-only variable.");
179 if(this->_disabled) {
183 assert(_idxLastUpdate != std::numeric_limits<size_t>::max() - 1);
184 if(_idxLastUpdate == std::numeric_limits<size_t>::max()) {
185 for(
auto& slave : _returnSlaves) {
186 slave->preRead(TransferType::read);
190 _returnSlaves[_idxLastUpdate]->preRead(type);
196 template<
typename UserType>
199 if(this->_disabled) {
203 if(!hasNewData && type != TransferType::read) {
209 assert(_idxLastUpdate < std::numeric_limits<size_t>::max() - 1);
211 auto _ = cppext::finally([&] {
212 if(!hasNewData || TransferElement::_activeException) {
215 _returnSlaves[_idxLastUpdate]->accessChannel(0).swap(ChimeraTK::NDRegisterAccessor<UserType>::buffer_2D[0]);
219 if(slave == _returnSlaves[_idxLastUpdate]) {
222 if(slave->getNumberOfSamples() != 0) {
223 slave->accessChannel(0) = ChimeraTK::NDRegisterAccessor<UserType>::buffer_2D[0];
225 slave->writeDestructively(this->_versionNumber);
229 _returnSlaves[_idxLastUpdate]->postRead(type, hasNewData);
231 this->_versionNumber = _returnSlaves[_idxLastUpdate]->getVersionNumber();
232 this->_dataValidity = _returnSlaves[_idxLastUpdate]->dataValidity();
237 template<
typename UserType>
239 if(this->_disabled) {
243 if(slave->getNumberOfSamples() != 0) {
245 slave->accessChannel(0).swap(ChimeraTK::NDRegisterAccessor<UserType>::buffer_2D[0]);
251 slave->setDataValidity(this->dataValidity());
265 template<
typename UserType>
267 if(this->_disabled) {
270 bool dataLost =
false;
276 ret = slave->write(versionNumber);
279 ret = slave->writeDestructively(versionNumber);
291 template<
typename UserType>
295 if(this->_disabled) {
298 bool dataLost =
false;
300 bool ret = slave->writeDestructively(versionNumber);
310 template<
typename UserType>
312 if(this->_disabled) {
321 template<
typename UserType>
328 template<
typename UserType>
335 template<
typename UserType>
337 return {boost::enable_shared_from_this<ChimeraTK::TransferElement>::shared_from_this()};
343 template<
typename UserType>
351 template<
typename UserType>
355 for(
auto returnSlave : _returnSlaves) {
356 returnSlave->interrupt();
Base class for several implementations which distribute values from one feeder to multiple consumers.
NDRegisterAccessor implementation which distributes values written to this accessor out to any number...
void replaceTransferElement(boost::shared_ptr< ChimeraTK::TransferElement >) override
bool mayReplaceOther(const boost::shared_ptr< const ChimeraTK::TransferElement > &) const override
bool _finalised
Flag whether finalise() has been called.
std::vector< boost::shared_ptr< ChimeraTK::TransferElement > > getHardwareAccessingElements() override
std::list< boost::shared_ptr< ChimeraTK::TransferElement > > getInternalElements() override
FeedingFanOut(std::string const &name, std::string const &unit, std::string const &description, size_t numberOfElements, bool withReturn, ConsumerImplementationPairs< UserType > const &consumerImplementationPairs)
std::vector< boost::shared_ptr< ChimeraTK::NDRegisterAccessor< UserType > > > _returnSlaves
list of return slaves, if any
bool isWriteable() const override
bool doWriteTransferDestructively(ChimeraTK::VersionNumber versionNumber={}) override
bool isReadable() const override
size_t _idxLastUpdate
index to _returnSlaves for the last update
void doPostRead(TransferType type, bool hasNewData) override
void finalise()
Finalise the return channel.
void addSlave(boost::shared_ptr< ChimeraTK::NDRegisterAccessor< UserType > > slave, VariableNetworkNode &) override
Add a slave to the FanOut.
void doPreRead(TransferType type) override
bool isReadOnly() const override
void doPreWrite(TransferType, VersionNumber) override
bool doWriteTransfer(ChimeraTK::VersionNumber versionNumber) override
void doPostWrite(TransferType, VersionNumber) override
void doReadTransferSynchronously() override
void interrupt() override
bool _withReturn
Flag whether this FeedingFanOut has a return channel. Is specified in the constructor.
Class describing a node of a variable network.
VariableDirection getDirection() const
InvalidityTracer application module.
std::list< std::pair< boost::shared_ptr< ChimeraTK::NDRegisterAccessor< UserType > >, VariableNetworkNode > > ConsumerImplementationPairs
bool withReturn
Presence of return channel.