ChimeraTK-DeviceAccess-DoocsBackend  01.09.02
ZMQSubscriptionManager.cc
Go to the documentation of this file.
2 
4 
5 namespace ChimeraTK { namespace DoocsBackendNamespace {
6 
7  /******************************************************************************************************************/
8 
9  pthread_t ZMQSubscriptionManager::pthread_t_invalid;
10 
11  /******************************************************************************************************************/
12 
13  ZMQSubscriptionManager::ZMQSubscriptionManager() {
14  pthread_t_invalid = pthread_self();
15  }
16 
17  /******************************************************************************************************************/
18 
19  ZMQSubscriptionManager::~ZMQSubscriptionManager() {
20  std::unique_lock<std::mutex> lock(subscriptionMap_mutex);
21 
22  for(auto subscription = subscriptionMap.begin(); subscription != subscriptionMap.end();) {
23  {
24  std::unique_lock<std::mutex> listeners_lock(subscription->second.listeners_mutex);
25  subscription->second.listeners.clear();
26  }
27  lock.unlock();
28  deactivateSubscription(subscription->first);
29  lock.lock();
30  subscription = subscriptionMap.erase(subscription);
31  }
32  }
33 
34  /******************************************************************************************************************/
35 
36  void ZMQSubscriptionManager::subscribe(const std::string& path, DoocsBackendRegisterAccessorBase* accessor) {
37  bool newSubscription = false;
38 
39  std::unique_lock<std::mutex> lock(subscriptionMap_mutex);
40 
41  // check if subscription is already in the map
42  newSubscription = subscriptionMap.find(path) == subscriptionMap.end();
43 
44  // gain lock for listener, to exclude concurrent access with the zmq_callback()
45  std::unique_lock<std::mutex> listeners_lock(subscriptionMap[path].listeners_mutex);
46 
47  // add accessor to list of listeners
48  subscriptionMap[path].listeners.push_back(accessor);
49 
50  // subscriptionMap is no longer used below this point
51  lock.unlock();
52 
53  // Set flag whether ZMQ is activated for this accessor
54  accessor->isActiveZMQ = accessor->_backend->_asyncReadActivated;
55 
56  // create subscription if not yet existing. must be done after the previous steps to make sure the initial value
57  // is not lost
58  if(newSubscription) {
59  activateSubscription(path); // just establish the ZeroMQ subscription - listeners are still deactivated
60  }
61 
62  // If required, poll the initial value and push it into the queue. This must be done after the subcription has been
63  // made.
64  if(accessor->isActiveZMQ && subscriptionMap[path].gotInitialValue) {
65  listeners_lock.unlock(); // lock no longer required and pollInitialValue might take a while...
66  pollInitialValue(path, {accessor});
67  }
68  } // namespace DoocsBackendNamespace
69 
70  /******************************************************************************************************************/
71 
72  void ZMQSubscriptionManager::pollInitialValue(
73  const std::string& path, const std::list<DoocsBackendRegisterAccessorBase*>& accessors) {
74  // Poll initial value vie RPC
75  doocs::EqData src, dst;
76  doocs::EqAdr adr;
77  EqCall eq;
78  adr.adr(path);
79  auto rc = eq.get(&adr, &src, &dst);
80  if(rc && DoocsBackend::isCommunicationError(dst.error())) {
81  // communication error: push to queues
82  for(auto accessor : accessors) {
83  pushError(accessor, "ZeroMQ connection for " + path + " interrupted: " + dst.get_string());
84  }
85  }
86  else {
87  // no communication error: push data
88  for(auto accessor : accessors) {
89  accessor->notifications.push_overwrite(dst);
90  }
91  }
92  }
93 
94  /******************************************************************************************************************/
95 
96  void ZMQSubscriptionManager::unsubscribe(const std::string& path, DoocsBackendRegisterAccessorBase* accessor) {
97  std::unique_lock<std::mutex> lock(subscriptionMap_mutex);
98 
99  // ignore if no subscription exists
100  if(subscriptionMap.find(path) == subscriptionMap.end()) return;
101 
102  // gain lock for listener, to exclude concurrent access with the zmq_callback()
103  std::unique_lock<std::mutex> listeners_lock(subscriptionMap[path].listeners_mutex);
104 
105  // ignore if subscription exists but not for this accessor
106  if(std::find(subscriptionMap[path].listeners.begin(), subscriptionMap[path].listeners.end(), accessor) ==
107  subscriptionMap[path].listeners.end())
108  return;
109 
110  // remove accessor from list of listeners
111  subscriptionMap[path].listeners.erase(
112  std::remove(subscriptionMap[path].listeners.begin(), subscriptionMap[path].listeners.end(), accessor));
113 
114  // if no listener left, delete the subscription
115  if(subscriptionMap[path].listeners.empty()) {
116  // temporarily unlock the locks which might block the ZQM subscription thread
117  listeners_lock.unlock();
118  lock.unlock();
119 
120  // remove ZMQ subscription. This will also join the ZMQ subscription thread
121  deactivateSubscription(path);
122 
123  // obtain locks again
124  lock.lock();
125 
126  // remove subscription from map
127  subscriptionMap.erase(subscriptionMap.find(path));
128  }
129  }
130 
131  /******************************************************************************************************************/
132 
133  void ZMQSubscriptionManager::activateSubscription(const std::string& path) {
134  // precondition: subscriptionMap_mutex an the subscription's listeners_mutex must be locked
135 
136  // do nothing if already active
137  if(subscriptionMap[path].active) return;
138 
139  // subscribe to property
140  doocs::EqData dst;
141  doocs::EqAdr ea;
142  ea.adr(path);
143  dmsg_t tag;
144  int err = dmsg_attach(&ea, &dst, (void*)&(subscriptionMap[path]), &zmq_callback, &tag);
145  if(err) {
147  throw ChimeraTK::runtime_error(
148  std::string("Cannot subscribe to DOOCS property '" + path + "' via ZeroMQ: ") + dst.get_string());
149  }
150 
151  // run dmsg_start() once
152  std::unique_lock<std::mutex> lck(dmsgStartCalled_mutex);
153  if(!dmsgStartCalled) {
154  dmsg_start();
155  dmsgStartCalled = true;
156  }
157 
158  // set active flag
159  subscriptionMap[path].active = true;
160  }
161 
162  /******************************************************************************************************************/
163 
164  void ZMQSubscriptionManager::deactivateSubscription(const std::string& path) {
165  // do nothing if already inactive
166  {
167  std::unique_lock<std::mutex> lock(subscriptionMap_mutex);
168  std::unique_lock<std::mutex> listeners_lock(subscriptionMap[path].listeners_mutex);
169  if(!subscriptionMap[path].active) return;
170 
171  // Wait until we have seen any reaction from ZMQ. This is to work around a race condition in DOOCS's ZMQ
172  // subcription mechanism
173  while(not subscriptionMap[path].started) subscriptionMap[path].startedCv.wait(listeners_lock);
174 
175  // clear active flag
176  subscriptionMap[path].active = false;
177  subscriptionMap[path].started = false;
178  }
179 
180  // remove ZMQ subscription. This will also join the ZMQ subscription thread
181  doocs::EqAdr ea;
182  ea.adr(path);
183  dmsg_detach(&ea, nullptr); // nullptr = remove all subscriptions for that address
184  }
185 
186  /******************************************************************************************************************/
187 
189  std::unique_lock<std::mutex> lock(subscriptionMap_mutex);
190 
191  for(auto& subscription : subscriptionMap) {
192  std::unique_lock<std::mutex> listeners_lock(subscription.second.listeners_mutex);
193  std::list<DoocsBackendRegisterAccessorBase*> listeners;
194  for(auto& listener : subscription.second.listeners) {
195  if(listener->_backend.get() == backend) {
196  listener->isActiveZMQ = true;
197  if(subscription.second.gotInitialValue) {
198  // If the DOOCS initial value was already seen by the callback, put listener to list for initial value poll
199  listeners.push_back(listener);
200  }
201  }
202  }
203  if(subscription.second.gotInitialValue) {
204  // Poll initial values if and only if the DOOCS initial value was already seen by the callback function. It is
205  // important to do this decision together with setting the isActiveZMQ under the same lock, to avoid a race
206  // condition which might lead to duplicate initial values (the callback function also uses the same lock
207  // when setting gotInitialValue and checking isActiveZMQ).
208  listeners_lock.unlock();
209  pollInitialValue(subscription.first, listeners);
210  listeners_lock.lock();
211  }
212  }
213  }
214 
215  /******************************************************************************************************************/
216 
218  std::unique_lock<std::mutex> lock(subscriptionMap_mutex);
219 
220  for(auto& subscription : subscriptionMap) {
221  for(auto& listener : subscription.second.listeners) {
222  if(listener->_backend.get() == backend) {
223  listener->isActiveZMQ = false;
224  }
225  }
226  }
227  }
228 
229  /******************************************************************************************************************/
230 
231  void ZMQSubscriptionManager::pushError(DoocsBackendRegisterAccessorBase* listener, const std::string& message) {
232  // Don't push exceptions into deactivated listeners.
233  // The check in subscription.second.hasException is not sufficient because it is reset in open(),
234  // but activateAsyncRead() might not have been called when the next setException comes in.
235  if(!listener->isActiveZMQ) return;
236 
237  // First deactivate the listener to avoid race conditions with pushing the exception. Nothing must be pushed
238  // after the exception until a succefful open() and activateAsyncRead(). (see. Spec B.9.3.1 and B.9.3.2)
239  listener->isActiveZMQ = false;
240 
241  // Now finally put the exception to the queue
242  try {
243  throw ChimeraTK::runtime_error(message);
244  }
245  catch(...) {
246  listener->notifications.push_overwrite_exception(std::current_exception());
247  }
248  }
249 
250  /******************************************************************************************************************/
251 
253  std::unique_lock<std::mutex> lock(subscriptionMap_mutex);
254 
255  for(auto& subscription : subscriptionMap) {
256  std::unique_lock<std::mutex> listeners_lock(subscription.second.listeners_mutex);
257  for(auto& listener : subscription.second.listeners) {
258  // only handle listeners belonging to the current backend
259  if(listener->_backend.get() != backend) continue;
260  pushError(listener, "Exception reported by another accessor.");
261  }
262  }
263  }
264 
265  /******************************************************************************************************************/
266 
267  void ZMQSubscriptionManager::zmq_callback(void* self_, doocs::EqData* data, dmsg_info_t* info) {
268  // obtain pointer to subscription object
269  auto* subscription = static_cast<ZMQSubscriptionManager::Subscription*>(self_);
270 
271  // Make sure the stamp is used from the ZeroMQ header. TODO: Is this really wanted?
272  data->time(info->sec, info->usec);
273  data->mpnum(info->ident);
274 
275  std::unique_lock<std::mutex> lock(subscription->listeners_mutex);
276 
277  // As long as we get a callback from ZMQ, we consider it started
278  if(not subscription->started) {
279  subscription->started = true;
280  subscription->startedCv.notify_all();
281  }
282 
283  // store thread id of the thread calling this function, if not yet done
284  if(pthread_equal(subscription->zqmThreadId, pthread_t_invalid)) {
285  subscription->zqmThreadId = pthread_self();
286  }
287 
288  // check for error
289  if(!DoocsBackend::isCommunicationError(data->error())) {
290  // Set flag that we have received an initial value. (If the flag is not set, any received value is by
291  // definition the initial value.) This must be done independent from listener activation status, because this
292  // is to keep track of the DOOCS-provided initial value.
293  if(!subscription->gotInitialValue) subscription->gotInitialValue = true;
294 
295  // data has been received: push the data
296  for(auto& listener : subscription->listeners) {
297  if(listener->isActiveZMQ) {
298  // push data to listener queue
299  listener->notifications.push_overwrite(*data);
300  }
301  }
302  }
303  else {
304  // Clear initial value flag: we will get a new initial value from DOOCS after the DOOCS-internal recovery
305  // (independent of the backend's error state and recovery!)
306  subscription->gotInitialValue = false;
307 
308  // Push exception to the listeners
309  for(auto& listener : subscription->listeners) {
310  pushError(listener, "ZeroMQ connection interrupted: " + data->get_string());
311  lock.unlock();
312  listener->_backend->informRuntimeError(listener->_path);
313  lock.lock();
314  }
315  }
316  }
317 
318  /******************************************************************************************************************/
319 
320 }} // namespace ChimeraTK::DoocsBackendNamespace
ChimeraTK::DoocsBackendNamespace::ZMQSubscriptionManager::activateAllListeners
void activateAllListeners(DoocsBackend *backend)
Activate all listeners for the given backend. Should be called from DoocsBackend::activateAsyncRead()...
Definition: ZMQSubscriptionManager.cc:188
ZMQSubscriptionManager.h
ChimeraTK::DoocsBackendNamespace::ZMQSubscriptionManager::unsubscribe
void unsubscribe(const std::string &path, DoocsBackendRegisterAccessorBase *accessor)
Unregister accessor subscription.
Definition: ZMQSubscriptionManager.cc:96
ChimeraTK::DoocsBackendRegisterAccessorBase
This is the untemplated base class which unifies all data members not depending on the UserType.
Definition: DoocsBackendRegisterAccessor.h:25
ChimeraTK::DoocsBackendRegisterAccessorBase::_path
std::string _path
register path
Definition: DoocsBackendRegisterAccessor.h:28
ChimeraTK::DoocsBackendRegisterAccessorBase::isActiveZMQ
bool isActiveZMQ
flag whether it should receive updates from the ZeroMQ subscription.
Definition: DoocsBackendRegisterAccessor.h:56
ChimeraTK::DoocsBackendRegisterAccessorBase::notifications
cppext::future_queue< doocs::EqData > notifications
future_queue used to notify the TransferFuture about completed transfers
Definition: DoocsBackendRegisterAccessor.h:59
ChimeraTK::DoocsBackendNamespace::ZMQSubscriptionManager::deactivateAllListenersAndPushException
void deactivateAllListenersAndPushException(DoocsBackend *backend)
Deactivate all listeners for the given backend and push exceptions into the queues.
Definition: ZMQSubscriptionManager.cc:252
ChimeraTK::DoocsBackendNamespace::ZMQSubscriptionManager::deactivateAllListeners
void deactivateAllListeners(DoocsBackend *backend)
Deactivate all listeners the given backend. Should be called from DoocsBackend::close().
Definition: ZMQSubscriptionManager.cc:217
ChimeraTK::DoocsBackend::isCommunicationError
static bool isCommunicationError(int doocs_error)
Definition: DoocsBackend.cc:400
ChimeraTK
Definition: spec_DoocsBackend.dox:2
ChimeraTK::DoocsBackend
Backend to access DOOCS control system servers.
Definition: DoocsBackend.h:56
ChimeraTK::DoocsBackendNamespace::ZMQSubscriptionManager::subscribe
void subscribe(const std::string &path, DoocsBackendRegisterAccessorBase *accessor)
Register accessor subscription.
Definition: ZMQSubscriptionManager.cc:36
DoocsBackendRegisterAccessor.h
ChimeraTK::DoocsBackendRegisterAccessorBase::_backend
boost::shared_ptr< DoocsBackend > _backend
Pointer to the backend.
Definition: DoocsBackendRegisterAccessor.h:65