10#include <ChimeraTK/NDRegisterAccessor.h>
11#include <ChimeraTK/ReadAnyGroup.h>
12#include <ChimeraTK/SupportedUserTypes.h>
13#include <ChimeraTK/SystemTags.h>
15#include <boost/smart_ptr/shared_ptr.hpp>
26 template<
typename UserType>
29 ThreadedFanOut(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl,
42 VersionNumber
readInitialValues(boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> accessor);
55 template<
typename UserType>
62 boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> slave,
VariableNetworkNode& consumer)
override;
71 std::vector<boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>>>
_inputChannels;
81 template<
typename UserType>
84 :
FanOut<UserType>(feedingImpl) {
85 assert(feedingImpl->getAccessModeFlags().has(AccessMode::wait_for_new_data));
86 for(
auto el : consumerImplementationPairs) {
93 template<
typename UserType>
98 catch(ChimeraTK::logic_error& e) {
105 template<
typename UserType>
107 if(this->_disabled) {
110 assert(!_thread.joinable());
111 _thread = boost::thread([
this] { this->
run(); });
116 template<
typename UserType>
119 if(_thread.joinable()) {
124 assert(!_thread.joinable());
126 catch(boost::thread_resource_error& e) {
133 template<
typename UserType>
137 _testableModeReached =
true;
139 ChimeraTK::VersionNumber
version{
nullptr};
143 boost::this_thread::interruption_point();
147 if(slave->getNumberOfSamples() != 0) {
150 slave->setDataValidity(validity);
151 bool dataLoss = slave->writeDestructively(
version);
157 boost::this_thread::interruption_point();
165 template<
typename UserType>
167 boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> accessor) {
173 return accessor->getVersionNumber();
179 template<
typename UserType>
181 boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> feedingImpl,
183 :
ThreadedFanOut<UserType>(feedingImpl, consumerImplementationPairs) {
187 for(
auto el : consumerImplementationPairs) {
194 template<
typename UserType>
196 boost::shared_ptr<ChimeraTK::NDRegisterAccessor<UserType>> slave,
VariableNetworkNode& consumer) {
201 if(consumer.
getTags().contains(ChimeraTK::SystemTags::reverseRecovery)) {
202 _initialValueProvider = slave;
207 _inputChannels.push_back(slave);
213 template<
typename UserType>
217 _testableModeReached =
true;
219 std::map<TransferElementID, boost::shared_ptr<NDRegisterAccessor<UserType>>> accessors;
221 accessors[acc->getId()] = acc;
225 TransferElementID changedVariable = _initialValueProvider->getId();
227 auto version = readInitialValues(_initialValueProvider);
229 ReadAnyGroup group(_inputChannels.begin(), _inputChannels.end());
233 for(
auto& [
id, accessor] : accessors) {
235 if(
id == changedVariable) {
240 if(accessor->getNumberOfSamples() != 0) {
241 accessor->accessChannel(0) = accessors[changedVariable]->accessChannel(0);
244 bool dataLoss = accessor->writeDestructively(
version);
252 boost::this_thread::interruption_point();
253 changedVariable = group.readAny();
254 boost::this_thread::interruption_point();
256 version = accessors[changedVariable]->getVersionNumber();
detail::TestableMode & getTestableMode()
Get the TestableMode control object of this application.
static void registerThread(const std::string &name)
Register the thread in the application system and give it a name.
static Application & getInstance()
Obtain instance of the application.
static void incrementDataLossCounter(const std::string &name)
Increment counter for how many write() operations have overwritten unread data.
std::atomic< bool > _testableModeReached
Flag used by the testable mode to identify whether a thread within the EntityOwner has reached the po...
Base class for several implementations which distribute values from one feeder to multiple consumers.
virtual void addSlave(boost::shared_ptr< ChimeraTK::NDRegisterAccessor< UserType > > slave, VariableNetworkNode &)
Add a slave to the FanOut.
Base class for internal modules which are created by the variable connection code (e....
FanOut implementation with an internal thread which waits for new data which is read from the given f...
VersionNumber readInitialValues(boost::shared_ptr< ChimeraTK::NDRegisterAccessor< UserType > > accessor)
void activate() override
Activate synchronisation thread if needed.
void deactivate() override
Deactivate synchronisation thread if running.
boost::thread _thread
Thread handling the synchronisation, if needed.
~ThreadedFanOut() override
ThreadedFanOut(boost::shared_ptr< ChimeraTK::NDRegisterAccessor< UserType > > feedingImpl, ConsumerImplementationPairs< UserType > const &consumerImplementationPairs)
virtual void run()
Synchronise feeder and the consumers.
Same as ThreadedFanOut but with return channel.
boost::shared_ptr< ChimeraTK::NDRegisterAccessor< UserType > > _initialValueProvider
ThreadedFanOutWithReturn(boost::shared_ptr< ChimeraTK::NDRegisterAccessor< UserType > > feedingImpl, ConsumerImplementationPairs< UserType > const &consumerImplementationPairs)
std::vector< boost::shared_ptr< ChimeraTK::NDRegisterAccessor< UserType > > > _inputChannels
void addSlave(boost::shared_ptr< ChimeraTK::NDRegisterAccessor< UserType > > slave, VariableNetworkNode &consumer) override
Add a slave to the FanOut.
void run() override
Synchronise feeder and the consumers.
boost::thread _thread
Thread handling the synchronisation, if needed.
Class describing a node of a variable network.
const std::unordered_set< std::string > & getTags() const
VariableDirection getDirection() const
InvalidityTracer application module.
@ run
Actual run phase with full multi threading.
std::list< std::pair< boost::shared_ptr< ChimeraTK::NDRegisterAccessor< UserType > >, VariableNetworkNode > > ConsumerImplementationPairs
bool withReturn
Presence of return channel.