ChimeraTK-DeviceAccess-DoocsBackend 01.11.02
Loading...
Searching...
No Matches
ZMQSubscriptionManager.cc
Go to the documentation of this file.
2
4
5namespace ChimeraTK { namespace DoocsBackendNamespace {
6
7 /******************************************************************************************************************/
8
9 pthread_t ZMQSubscriptionManager::pthread_t_invalid;
10
11 /******************************************************************************************************************/
12
13 ZMQSubscriptionManager::ZMQSubscriptionManager() {
14 pthread_t_invalid = pthread_self();
15 }
16
17 /******************************************************************************************************************/
18
19 ZMQSubscriptionManager::~ZMQSubscriptionManager() {
20 std::unique_lock<std::mutex> lock(subscriptionMap_mutex);
21
22 for(auto subscription = subscriptionMap.begin(); subscription != subscriptionMap.end();) {
23 {
24 std::unique_lock<std::mutex> listeners_lock(subscription->second.listeners_mutex);
25 subscription->second.listeners.clear();
26 }
27 lock.unlock();
28 deactivateSubscription(subscription->first);
29 lock.lock();
30 subscription = subscriptionMap.erase(subscription);
31 }
32 }
33
34 /******************************************************************************************************************/
35
37 bool newSubscription = false;
38
39 std::unique_lock<std::mutex> lock(subscriptionMap_mutex);
40
41 // check if subscription is already in the map
42 newSubscription = subscriptionMap.find(path) == subscriptionMap.end();
43
44 // gain lock for listener, to exclude concurrent access with the zmq_callback()
45 std::unique_lock<std::mutex> listeners_lock(subscriptionMap[path].listeners_mutex);
46
47 // add accessor to list of listeners
48 subscriptionMap[path].listeners.push_back(accessor);
49
50 // subscriptionMap is no longer used below this point
51 lock.unlock();
52
53 // Set flag whether ZMQ is activated for this accessor
54 accessor->isActiveZMQ = accessor->_backend->_asyncReadActivated;
55
56 // create subscription if not yet existing. must be done after the previous steps to make sure the initial value
57 // is not lost
58 if(newSubscription) {
59 activateSubscription(path); // just establish the ZeroMQ subscription - listeners are still deactivated
60 }
61
62 // If required, poll the initial value and push it into the queue. This must be done after the subcription has been
63 // made.
64 if(accessor->isActiveZMQ && subscriptionMap[path].gotInitialValue) {
65 listeners_lock.unlock(); // lock no longer required and pollInitialValue might take a while...
66 pollInitialValue(path, {accessor});
67 }
68 } // namespace DoocsBackendNamespace
69
70 /******************************************************************************************************************/
71
72 void ZMQSubscriptionManager::pollInitialValue(
73 const std::string& path, const std::list<DoocsBackendRegisterAccessorBase*>& accessors) {
74 // Poll initial value vie RPC
75 doocs::EqData src, dst;
76 doocs::EqAdr adr;
77 EqCall eq;
78 adr.adr(path);
79 auto rc = eq.get(&adr, &src, &dst);
80 if(rc && DoocsBackend::isCommunicationError(dst.error())) {
81 // communication error: push to queues
82 for(auto accessor : accessors) {
83 pushError(accessor, "ZeroMQ connection for " + path + " interrupted: " + dst.get_string());
84 }
85 }
86 else {
87 // no communication error: push data
88 for(auto accessor : accessors) {
89 accessor->notifications.push_overwrite(dst);
90 }
91 }
92 }
93
94 /******************************************************************************************************************/
95
97 std::unique_lock<std::mutex> lock(subscriptionMap_mutex);
98
99 // ignore if no subscription exists
100 if(subscriptionMap.find(path) == subscriptionMap.end()) return;
101
102 // gain lock for listener, to exclude concurrent access with the zmq_callback()
103 std::unique_lock<std::mutex> listeners_lock(subscriptionMap[path].listeners_mutex);
104
105 // ignore if subscription exists but not for this accessor
106 if(std::find(subscriptionMap[path].listeners.begin(), subscriptionMap[path].listeners.end(), accessor) ==
107 subscriptionMap[path].listeners.end())
108 return;
109
110 // remove accessor from list of listeners
111 subscriptionMap[path].listeners.erase(
112 std::remove(subscriptionMap[path].listeners.begin(), subscriptionMap[path].listeners.end(), accessor));
113
114 // if no listener left, delete the subscription
115 if(subscriptionMap[path].listeners.empty()) {
116 // temporarily unlock the locks which might block the ZQM subscription thread
117 listeners_lock.unlock();
118 lock.unlock();
119
120 // remove ZMQ subscription. This will also join the ZMQ subscription thread
121 deactivateSubscription(path);
122
123 // obtain locks again
124 lock.lock();
125
126 // remove subscription from map
127 subscriptionMap.erase(subscriptionMap.find(path));
128 }
129 }
130
131 /******************************************************************************************************************/
132
133 void ZMQSubscriptionManager::activateSubscription(const std::string& path) {
134 // precondition: subscriptionMap_mutex an the subscription's listeners_mutex must be locked
135
136 // do nothing if already active
137 if(subscriptionMap[path].active) return;
138
139 // subscribe to property
140 doocs::EqData dst;
141 doocs::EqAdr ea;
142 ea.adr(path);
143 dmsg_t tag;
144 int err = dmsg_attach(&ea, &dst, (void*)&(subscriptionMap[path]), &zmq_callback, &tag);
145 if(err) {
147 throw ChimeraTK::runtime_error(
148 std::string("Cannot subscribe to DOOCS property '" + path + "' via ZeroMQ: ") + dst.get_string());
149 }
150
151 // run dmsg_start() once
152 std::unique_lock<std::mutex> lck(dmsgStartCalled_mutex);
153 if(!dmsgStartCalled) {
154 dmsg_start();
155 dmsgStartCalled = true;
156 }
157
158 // set active flag
159 subscriptionMap[path].active = true;
160 }
161
162 /******************************************************************************************************************/
163
164 void ZMQSubscriptionManager::deactivateSubscription(const std::string& path) {
165 // do nothing if already inactive
166 {
167 std::unique_lock<std::mutex> lock(subscriptionMap_mutex);
168 std::unique_lock<std::mutex> listeners_lock(subscriptionMap[path].listeners_mutex);
169 if(!subscriptionMap[path].active) return;
170
171 // Wait until we have seen any reaction from ZMQ. This is to work around a race condition in DOOCS's ZMQ
172 // subcription mechanism
173 while(not subscriptionMap[path].started) subscriptionMap[path].startedCv.wait(listeners_lock);
174
175 // clear active flag
176 subscriptionMap[path].active = false;
177 subscriptionMap[path].started = false;
178 }
179
180 // remove ZMQ subscription. This will also join the ZMQ subscription thread
181 doocs::EqAdr ea;
182 ea.adr(path);
183 dmsg_detach(&ea, nullptr); // nullptr = remove all subscriptions for that address
184 }
185
186 /******************************************************************************************************************/
187
189 std::unique_lock<std::mutex> lock(subscriptionMap_mutex);
190
191 for(auto& subscription : subscriptionMap) {
192 std::unique_lock<std::mutex> listeners_lock(subscription.second.listeners_mutex);
193 std::list<DoocsBackendRegisterAccessorBase*> listeners;
194 for(auto& listener : subscription.second.listeners) {
195 if(listener->_backend.get() == backend) {
196 listener->isActiveZMQ = true;
197 if(subscription.second.gotInitialValue) {
198 // If the DOOCS initial value was already seen by the callback, put listener to list for initial value poll
199 listeners.push_back(listener);
200 }
201 }
202 }
203 if(subscription.second.gotInitialValue) {
204 // Poll initial values if and only if the DOOCS initial value was already seen by the callback function. It is
205 // important to do this decision together with setting the isActiveZMQ under the same lock, to avoid a race
206 // condition which might lead to duplicate initial values (the callback function also uses the same lock
207 // when setting gotInitialValue and checking isActiveZMQ).
208 listeners_lock.unlock();
209 pollInitialValue(subscription.first, listeners);
210 listeners_lock.lock();
211 }
212 }
213 }
214
215 /******************************************************************************************************************/
216
218 std::unique_lock<std::mutex> lock(subscriptionMap_mutex);
219
220 for(auto& subscription : subscriptionMap) {
221 for(auto& listener : subscription.second.listeners) {
222 if(listener->_backend.get() == backend) {
223 listener->isActiveZMQ = false;
224 }
225 }
226 }
227 }
228
229 /******************************************************************************************************************/
230
231 void ZMQSubscriptionManager::pushError(DoocsBackendRegisterAccessorBase* listener, const std::string& message) {
232 // Don't push exceptions into deactivated listeners.
233 // The check in subscription.second.hasException is not sufficient because it is reset in open(),
234 // but activateAsyncRead() might not have been called when the next setException comes in.
235 if(!listener->isActiveZMQ) return;
236
237 // First deactivate the listener to avoid race conditions with pushing the exception. Nothing must be pushed
238 // after the exception until a succefful open() and activateAsyncRead(). (see. Spec B.9.3.1 and B.9.3.2)
239 listener->isActiveZMQ = false;
240
241 // Now finally put the exception to the queue
242 try {
243 throw ChimeraTK::runtime_error(message);
244 }
245 catch(...) {
246 listener->notifications.push_overwrite_exception(std::current_exception());
247 }
248 }
249
250 /******************************************************************************************************************/
251
253 std::unique_lock<std::mutex> lock(subscriptionMap_mutex);
254
255 for(auto& subscription : subscriptionMap) {
256 std::unique_lock<std::mutex> listeners_lock(subscription.second.listeners_mutex);
257 for(auto& listener : subscription.second.listeners) {
258 // only handle listeners belonging to the current backend
259 if(listener->_backend.get() != backend) continue;
260 pushError(listener, "Exception reported by another accessor.");
261 }
262 }
263 }
264
265 /******************************************************************************************************************/
266
267 void ZMQSubscriptionManager::zmq_callback(void* self_, doocs::EqData* data, dmsg_info_t* info) {
268 // obtain pointer to subscription object
269 auto* subscription = static_cast<ZMQSubscriptionManager::Subscription*>(self_);
270
271 // Make sure the stamp is used from the ZeroMQ header. TODO: Is this really wanted?
272 data->time(info->sec, info->usec);
273 data->mpnum(info->ident);
274
275 std::unique_lock<std::mutex> lock(subscription->listeners_mutex);
276
277 // As long as we get a callback from ZMQ, we consider it started
278 if(not subscription->started) {
279 subscription->started = true;
280 subscription->startedCv.notify_all();
281 }
282
283 // store thread id of the thread calling this function, if not yet done
284 if(pthread_equal(subscription->zqmThreadId, pthread_t_invalid)) {
285 subscription->zqmThreadId = pthread_self();
286 }
287
288 // check for error
289 if(!DoocsBackend::isCommunicationError(data->error())) {
290 // Set flag that we have received an initial value. (If the flag is not set, any received value is by
291 // definition the initial value.) This must be done independent from listener activation status, because this
292 // is to keep track of the DOOCS-provided initial value.
293 if(!subscription->gotInitialValue) subscription->gotInitialValue = true;
294
295 // data has been received: push the data
296 for(auto& listener : subscription->listeners) {
297 if(listener->isActiveZMQ) {
298 // push data to listener queue
299 listener->notifications.push_overwrite(*data);
300 }
301 }
302 }
303 else {
304 // Clear initial value flag: we will get a new initial value from DOOCS after the DOOCS-internal recovery
305 // (independent of the backend's error state and recovery!)
306 subscription->gotInitialValue = false;
307
308 // Push exception to the listeners
309 for(auto& listener : subscription->listeners) {
310 pushError(listener, "ZeroMQ connection interrupted: " + data->get_string());
311 lock.unlock();
312 listener->_backend->informRuntimeError(listener->_path);
313 lock.lock();
314 }
315 }
316 }
317
318 /******************************************************************************************************************/
319
320}} // namespace ChimeraTK::DoocsBackendNamespace
Backend to access DOOCS control system servers.
static bool isCommunicationError(int doocs_error)
void deactivateAllListenersAndPushException(DoocsBackend *backend)
Deactivate all listeners for the given backend and push exceptions into the queues.
void activateAllListeners(DoocsBackend *backend)
Activate all listeners for the given backend. Should be called from DoocsBackend::activateAsyncRead()...
void unsubscribe(const std::string &path, DoocsBackendRegisterAccessorBase *accessor)
Unregister accessor subscription.
void deactivateAllListeners(DoocsBackend *backend)
Deactivate all listeners the given backend. Should be called from DoocsBackend::close().
void subscribe(const std::string &path, DoocsBackendRegisterAccessorBase *accessor)
Register accessor subscription.
This is the untemplated base class which unifies all data members not depending on the UserType.
boost::shared_ptr< DoocsBackend > _backend
Pointer to the backend.
bool isActiveZMQ
flag whether it should receive updates from the ZeroMQ subscription.
cppext::future_queue< doocs::EqData > notifications
future_queue used to notify the TransferFuture about completed transfers