5 namespace ChimeraTK {
namespace DoocsBackendNamespace {
9 pthread_t ZMQSubscriptionManager::pthread_t_invalid;
13 ZMQSubscriptionManager::ZMQSubscriptionManager() {
14 pthread_t_invalid = pthread_self();
19 ZMQSubscriptionManager::~ZMQSubscriptionManager() {
20 std::unique_lock<std::mutex> lock(subscriptionMap_mutex);
22 for(
auto subscription = subscriptionMap.begin(); subscription != subscriptionMap.end();) {
24 std::unique_lock<std::mutex> listeners_lock(subscription->second.listeners_mutex);
25 subscription->second.listeners.clear();
28 deactivateSubscription(subscription->first);
30 subscription = subscriptionMap.erase(subscription);
37 bool newSubscription =
false;
39 std::unique_lock<std::mutex> lock(subscriptionMap_mutex);
42 newSubscription = subscriptionMap.find(path) == subscriptionMap.end();
45 std::unique_lock<std::mutex> listeners_lock(subscriptionMap[path].listeners_mutex);
48 subscriptionMap[path].listeners.push_back(accessor);
59 activateSubscription(path);
64 if(accessor->
isActiveZMQ && subscriptionMap[path].gotInitialValue) {
65 listeners_lock.unlock();
66 pollInitialValue(path, {accessor});
72 void ZMQSubscriptionManager::pollInitialValue(
73 const std::string& path,
const std::list<DoocsBackendRegisterAccessorBase*>& accessors) {
75 doocs::EqData src, dst;
79 auto rc = eq.get(&adr, &src, &dst);
82 for(
auto accessor : accessors) {
83 pushError(accessor,
"ZeroMQ connection for " + path +
" interrupted: " + dst.get_string());
88 for(
auto accessor : accessors) {
89 accessor->notifications.push_overwrite(dst);
97 std::unique_lock<std::mutex> lock(subscriptionMap_mutex);
100 if(subscriptionMap.find(path) == subscriptionMap.end())
return;
103 std::unique_lock<std::mutex> listeners_lock(subscriptionMap[path].listeners_mutex);
106 if(std::find(subscriptionMap[path].listeners.begin(), subscriptionMap[path].listeners.end(), accessor) ==
107 subscriptionMap[path].listeners.end())
111 subscriptionMap[path].listeners.erase(
112 std::remove(subscriptionMap[path].listeners.begin(), subscriptionMap[path].listeners.end(), accessor));
115 if(subscriptionMap[path].listeners.empty()) {
117 listeners_lock.unlock();
121 deactivateSubscription(path);
127 subscriptionMap.erase(subscriptionMap.find(path));
133 void ZMQSubscriptionManager::activateSubscription(
const std::string& path) {
137 if(subscriptionMap[path].active)
return;
144 int err = dmsg_attach(&ea, &dst, (
void*)&(subscriptionMap[path]), &zmq_callback, &tag);
147 throw ChimeraTK::runtime_error(
148 std::string(
"Cannot subscribe to DOOCS property '" + path +
"' via ZeroMQ: ") + dst.get_string());
152 std::unique_lock<std::mutex> lck(dmsgStartCalled_mutex);
153 if(!dmsgStartCalled) {
155 dmsgStartCalled =
true;
159 subscriptionMap[path].active =
true;
164 void ZMQSubscriptionManager::deactivateSubscription(
const std::string& path) {
167 std::unique_lock<std::mutex> lock(subscriptionMap_mutex);
168 std::unique_lock<std::mutex> listeners_lock(subscriptionMap[path].listeners_mutex);
169 if(!subscriptionMap[path].active)
return;
173 while(not subscriptionMap[path].started) subscriptionMap[path].startedCv.wait(listeners_lock);
176 subscriptionMap[path].active =
false;
177 subscriptionMap[path].started =
false;
183 dmsg_detach(&ea,
nullptr);
189 std::unique_lock<std::mutex> lock(subscriptionMap_mutex);
191 for(
auto& subscription : subscriptionMap) {
192 std::unique_lock<std::mutex> listeners_lock(subscription.second.listeners_mutex);
193 std::list<DoocsBackendRegisterAccessorBase*> listeners;
194 for(
auto& listener : subscription.second.listeners) {
195 if(listener->_backend.get() == backend) {
196 listener->isActiveZMQ =
true;
197 if(subscription.second.gotInitialValue) {
199 listeners.push_back(listener);
203 if(subscription.second.gotInitialValue) {
208 listeners_lock.unlock();
209 pollInitialValue(subscription.first, listeners);
210 listeners_lock.lock();
218 std::unique_lock<std::mutex> lock(subscriptionMap_mutex);
220 for(
auto& subscription : subscriptionMap) {
221 for(
auto& listener : subscription.second.listeners) {
222 if(listener->_backend.get() == backend) {
223 listener->isActiveZMQ =
false;
243 throw ChimeraTK::runtime_error(message);
246 listener->
notifications.push_overwrite_exception(std::current_exception());
253 std::unique_lock<std::mutex> lock(subscriptionMap_mutex);
255 for(
auto& subscription : subscriptionMap) {
256 std::unique_lock<std::mutex> listeners_lock(subscription.second.listeners_mutex);
257 for(
auto& listener : subscription.second.listeners) {
259 if(listener->
_backend.get() != backend)
continue;
260 pushError(listener,
"Exception reported by another accessor.");
267 void ZMQSubscriptionManager::zmq_callback(
void* self_, doocs::EqData* data, dmsg_info_t* info) {
269 auto* subscription =
static_cast<ZMQSubscriptionManager::Subscription*
>(self_);
272 data->time(info->sec, info->usec);
273 data->mpnum(info->ident);
275 std::unique_lock<std::mutex> lock(subscription->listeners_mutex);
278 if(not subscription->started) {
279 subscription->started =
true;
280 subscription->startedCv.notify_all();
284 if(pthread_equal(subscription->zqmThreadId, pthread_t_invalid)) {
285 subscription->zqmThreadId = pthread_self();
293 if(!subscription->gotInitialValue) subscription->gotInitialValue =
true;
296 for(
auto& listener : subscription->listeners) {
306 subscription->gotInitialValue =
false;
309 for(
auto& listener : subscription->listeners) {
310 pushError(listener,
"ZeroMQ connection interrupted: " + data->get_string());