9#include <ChimeraTK/cppext/finally.hpp>
11#include <boost/thread/exceptions.hpp>
22 :
ApplicationModule(application,
"/Devices/" + Utilities::escapeName(deviceAliasOrCDD, false),
""),
23 _device(deviceAliasOrCDD), _deviceAliasOrCDD(deviceAliasOrCDD), _owner{application} {
24 auto involvedBackends =
_device.getInvolvedBackendIDs();
29 int64_t recoveryGroupSize{1};
31 for(
auto backendID : involvedBackends) {
32 if(existingDeviceManager->_recoveryGroup->recoveryBackendIDs.contains(backendID)) {
35 involvedBackends.merge(existingDeviceManager->_recoveryGroup->recoveryBackendIDs);
43 if(recoveryGroupSize > 1) {
49 new(&
_recoveryGroup->recoveryBarrier) std::barrier(recoveryGroupSize);
56 std::vector<VariableNetworkNode> rv;
59 auto catalog =
_device.getRegisterCatalogue();
62 for(
const auto& reg : catalog) {
64 if(reg.getNumberOfDimensions() > 1) {
71 if(reg.isWriteable()) {
77 if(reg.getSupportedAccessModes().has(AccessMode::wait_for_new_data)) {
86 const auto* valTyp = &(reg.getDataDescriptor().minimumDataType().getAsTypeInfo());
88 if(reg.getTags().contains(SystemTags::reverseRecovery) && reg.isReadable()) {
93 rv.emplace_back(reg.getRegisterName(),
_deviceAliasOrCDD, reg.getRegisterName(), updateMode, direction,
94 reg.isReadable(), *valTyp, reg.getNumberOfElements());
95 for(
const auto& tag : reg.getTags()) {
96 rv.back().addTag(tag);
115 boost::unique_lock<boost::shared_mutex> errorLock(
_errorMutex);
134 if(deviceManager.get() ==
this) {
140 (errMsg.find(
"[in device ") == std::string::npos) ?
"[in device " +
_deviceAliasOrCDD +
"]" :
"";
141 deviceManager->reportException(errMsg + deviceInfo);
150 boost::shared_lock<boost::shared_mutex> errorLock(
_errorMutex);
176 bool firstSuccess =
true;
179 std::unique_ptr<RegisterCatalogue> catalogue;
193 boost::this_thread::interruption_point();
200 catalogue = std::make_unique<RegisterCatalogue>(
_device.getRegisterCatalogue());
202 catch(ChimeraTK::runtime_error& e) {
213 }
while(!
_device.isFunctional());
215 boost::unique_lock<boost::shared_mutex> errorLock(
_errorMutex);
227 if(!catalogue->getRegister(writeMe).isWriteable()) {
229 throw ChimeraTK::logic_error(std::string(writeMe) +
" is not writeable!");
234 if(!catalogue->getRegister(readMe).isReadable()) {
236 throw ChimeraTK::logic_error(std::string(readMe) +
" is not readable!");
255 std::lock_guard<std::mutex> openCloseLock(
_recoveryGroup->_initHandlerOpenCloseMutex);
264 catch(ChimeraTK::runtime_error& e) {
298 boost::unique_lock<boost::shared_mutex> recoveryLock(
_recoveryMutex);
301 _recoveryHelpers.sort([](boost::shared_ptr<RecoveryHelper>& a, boost::shared_ptr<RecoveryHelper>& b) {
302 return a->writeOrder < b->writeOrder;
306 if(recoveryHelper->versionNumber != VersionNumber{
nullptr}) {
307 recoveryHelper->accessor->write();
308 recoveryHelper->wasWritten =
true;
312 if(recoveryHelper->accessor->isReadable()) {
313 recoveryHelper->notificationQueue.push();
318 catch(ChimeraTK::runtime_error& e) {
353 recoveryLock.unlock();
369 firstSuccess =
false;
384 boost::this_thread::interruption_point();
386 boost::this_thread::interruption_point();
425 void DeviceManager::prepare() {
427 setCurrentVersionNumber({});
428 _deviceError.write(StatusOutput::Status::FAULT,
"Attempting to open device...");
433 if(Application::getInstance().getTestableMode()._enabled) {
434 ++_owner->getTestableMode()._deviceInitialisationCounter;
440 void DeviceManager::addInitialisationHandler(std::function<
void(
ChimeraTK::Device&)> initialisationHandler) {
441 _initialisationHandlers.push_back(std::move(initialisationHandler));
446 void DeviceManager::addRecoveryAccessor(boost::shared_ptr<RecoveryHelper> recoveryAccessor) {
447 _recoveryHelpers.push_back(std::move(recoveryAccessor));
452 uint64_t DeviceManager::writeOrder() {
453 return ++_writeOrderCounter;
458 boost::shared_lock<boost::shared_mutex> DeviceManager::getRecoverySharedLock() {
459 return boost::shared_lock<boost::shared_mutex>(_recoveryMutex);
464 void DeviceManager::waitForInitialValues() {
465 _initialValueLatch.wait();
470 std::list<EntityOwner*> DeviceManager::getInputModulesRecursively(std::list<EntityOwner*> startList) {
476 if(startList.empty()) {
477 startList.push_back(
this);
484 size_t DeviceManager::getCircularNetworkHash()
const {
490 void DeviceManager::incrementDataFaultCounter() {
491 throw ChimeraTK::logic_error(
"incrementDataFaultCounter() called on a DeviceManager. This is probably "
492 "caused by incorrect ownership of variables/accessors or VariableGroups.");
497 void DeviceManager::decrementDataFaultCounter() {
498 throw ChimeraTK::logic_error(
"decrementDataFaultCounter() called on a DeviceManager. This is probably "
499 "caused by incorrect ownership of variables/accessors or VariableGroups.");
504 void DeviceManager::terminate() {
505 if(_moduleThread.joinable()) {
506 _moduleThread.interrupt();
508 while(!_moduleThread.try_join_for(boost::chrono::milliseconds(10))) {
510 _errorQueue.push_exception(std::make_exception_ptr(boost::thread_interrupted()));
516 assert(!_moduleThread.joinable());
521 bool DeviceManager::RecoveryGroup::waitForRecoveryStage(
RecoveryStage stage) {
523 throw boost::thread_interrupted();
525 boost::this_thread::interruption_point();
527 recoveryBarrier.arrive_and_wait();
530 throw boost::thread_interrupted();
532 boost::this_thread::interruption_point();
546 return !(errorAtStage == stage);
552 assert((errorAtStage == RecoveryStage::NO_ERROR) || (errorAtStage == stage));
553 errorAtStage = stage;
558 void DeviceManager::RecoveryGroup::resetErrorAtStage() {
559 errorAtStage = RecoveryStage::NO_ERROR;
561 recoveryBarrier.arrive_and_wait();
562 boost::this_thread::interruption_point();
detail::TestableMode & getTestableMode()
Get the TestableMode control object of this application.
static void registerThread(const std::string &name)
Register the thread in the application system and give it a name.
std::map< std::string, boost::shared_ptr< DeviceManager > > const & getDeviceManagerMap()
Access the device manager map.
static Application & getInstance()
Obtain instance of the application.
void setCurrentVersionNumber(VersionNumber versionNumber) override
Set the current version number.
std::atomic< int64_t > _synchronousTransferCounter
bool _deviceHasError
The error flag whether the device is functional.
boost::latch _initialValueLatch
std::list< RegisterPath > _readRegisterPaths
std::vector< VariableNetworkNode > getNodesList() const
Create and return list of VariableNetworkNodes for all device registers.
bool _isHoldingInitialValueLatch
Latch to halt accessors until initial values can be received.
VersionNumber getExceptionVersionNumber()
Use this function to read the exception version number.
VersionNumber _exceptionVersionNumber
Version number of the last exception.
std::list< RegisterPath > _writeRegisterPaths
boost::shared_mutex _errorMutex
Mutex to protect deviceHasError.
VoidOutput deviceBecameFunctional
A trigger that indicated that the device just became available again an error (in contrast to the err...
void reportException(const std::string &errMsg)
Use this function to report an exception.
std::shared_ptr< RecoveryGroup > _recoveryGroup
cppext::future_queue< std::string > _errorQueue
Queue used for communication between reportException() and the moduleThread.
void mainLoopImpl()
This functions tries to open the device and set the deviceError.
std::list< std::function< void(ChimeraTK::Device &)> > _initialisationHandlers
DeviceManager(Application *application, const std::string &deviceAliasOrCDD)
Create DeviceManager which handles device exceptions and performs the recovery.
boost::shared_mutex _recoveryMutex
Mutex for writing the DeviceModule::writeRecoveryOpen.
void mainLoop() override
Wrapper around the actual main loop implementation to add unsubscribing from the barrier to allow a c...
std::string _deviceAliasOrCDD
std::list< boost::shared_ptr< RecoveryHelper > > _recoveryHelpers
List of TransferElements to be written after the device has been recovered.
StatusWithMessage _deviceError
A VariableGroup for exception status and message.
const std::string & getName() const
Get the name of the module instance.
bool write(ChimeraTK::VersionNumber versionNumber)=delete
InvalidityTracer application module.
@ shutdown
The application is in the process of shutting down.
UpdateMode
Enum to define the update mode of variables.
Logger::StreamProxy logger(Logger::Severity severity, std::string context)
Convenience function to obtain the logger stream.
static std::mutex globalDeviceOpenMutex
Protect the device open actions for all DeviceManagers and groups.
void writeOk()
Set status to OK, clear the message and write the outputs.
ScalarOutput< std::string > _message
void write(StatusOutput::Status status, std::string message)
Set the status and the message and write the outputs.
Struct to define the direction of variables.
bool withReturn
Presence of return channel.
constexpr std::string_view cdd