ChimeraTK-DeviceAccess-DoocsBackend  01.09.02
ZMQSubscriptionManager.h
Go to the documentation of this file.
1 #ifndef CHIMERATK_DOOCS_BACKEND_ZMQSUBSCRIPTIONMANAGER_H
2 #define CHIMERATK_DOOCS_BACKEND_ZMQSUBSCRIPTIONMANAGER_H
3 
4 #include <eq_fct.h>
5 
6 #include <ChimeraTK/Exception.h>
7 
8 #include <boost/shared_ptr.hpp>
9 
10 #include <map>
11 #include <mutex>
12 #include <pthread.h>
13 #include <string>
14 #include <vector>
15 
16 namespace ChimeraTK {
17 
18  class DoocsBackendRegisterAccessorBase;
19  class DoocsBackend;
20 
21  namespace DoocsBackendNamespace {
22 
24  public:
26  static ZMQSubscriptionManager manager;
27  return manager;
28  }
29 
31  void subscribe(const std::string& path, DoocsBackendRegisterAccessorBase* accessor);
32 
34  void unsubscribe(const std::string& path, DoocsBackendRegisterAccessorBase* accessor);
35 
37  void activateAllListeners(DoocsBackend* backend);
38 
40  void deactivateAllListeners(DoocsBackend* backend);
41 
45 
46  private:
49 
50  // Activate ZeroMQ subscription.
51  // Caller need to own subscriptionMap_mutex already.
52  void activateSubscription(const std::string& path);
53 
54  // Deactivate ZeroMQ subscription. Caller must not own subscriptionMap_mutex or any listeners_mutex.
55  void deactivateSubscription(const std::string& path);
56 
57  // Poll initial value via RPC call and push it into the queues
58  void pollInitialValue(const std::string& path, const std::list<DoocsBackendRegisterAccessorBase*>& accessors);
59 
60  // Push error to listener
61  static void pushError(DoocsBackendRegisterAccessorBase* listener, const std::string& message);
62 
64  bool dmsgStartCalled{false};
65  std::mutex dmsgStartCalled_mutex;
66 
68  struct Subscription {
69  Subscription() : zqmThreadId(ZMQSubscriptionManager::pthread_t_invalid) {}
70 
74  std::vector<DoocsBackendRegisterAccessorBase*> listeners;
75 
77  std::mutex listeners_mutex;
78 
82  pthread_t zqmThreadId;
83 
87  bool active{false};
88 
91  bool started{false};
92  std::condition_variable startedCv{};
93 
100  bool gotInitialValue{false};
101  };
102 
106  static pthread_t pthread_t_invalid;
107 
109  std::map<std::string, Subscription> subscriptionMap;
110 
112  std::mutex subscriptionMap_mutex;
113 
118  static void zmq_callback(void* self, doocs::EqData* data, dmsg_info_t* info);
119  };
120 
121  } // namespace DoocsBackendNamespace
122 
123 } // namespace ChimeraTK
124 
125 #endif // CHIMERATK_DOOCS_BACKEND_ZMQSUBSCRIPTIONMANAGER_H
ChimeraTK::DoocsBackendNamespace::ZMQSubscriptionManager::activateAllListeners
void activateAllListeners(DoocsBackend *backend)
Activate all listeners for the given backend. Should be called from DoocsBackend::activateAsyncRead()...
Definition: ZMQSubscriptionManager.cc:188
ChimeraTK::DoocsBackendNamespace::ZMQSubscriptionManager::unsubscribe
void unsubscribe(const std::string &path, DoocsBackendRegisterAccessorBase *accessor)
Unregister accessor subscription.
Definition: ZMQSubscriptionManager.cc:96
ChimeraTK::DoocsBackendNamespace::ZMQSubscriptionManager
Definition: ZMQSubscriptionManager.h:23
ChimeraTK::DoocsBackendRegisterAccessorBase
This is the untemplated base class which unifies all data members not depending on the UserType.
Definition: DoocsBackendRegisterAccessor.h:25
ChimeraTK::DoocsBackendNamespace::ZMQSubscriptionManager::getInstance
static ZMQSubscriptionManager & getInstance()
Definition: ZMQSubscriptionManager.h:25
ChimeraTK::DoocsBackendNamespace::ZMQSubscriptionManager::deactivateAllListenersAndPushException
void deactivateAllListenersAndPushException(DoocsBackend *backend)
Deactivate all listeners for the given backend and push exceptions into the queues.
Definition: ZMQSubscriptionManager.cc:252
ChimeraTK::DoocsBackendNamespace::ZMQSubscriptionManager::deactivateAllListeners
void deactivateAllListeners(DoocsBackend *backend)
Deactivate all listeners the given backend. Should be called from DoocsBackend::close().
Definition: ZMQSubscriptionManager.cc:217
ChimeraTK
Definition: spec_DoocsBackend.dox:2
ChimeraTK::DoocsBackend
Backend to access DOOCS control system servers.
Definition: DoocsBackend.h:56
ChimeraTK::DoocsBackendNamespace::ZMQSubscriptionManager::subscribe
void subscribe(const std::string &path, DoocsBackendRegisterAccessorBase *accessor)
Register accessor subscription.
Definition: ZMQSubscriptionManager.cc:36