ChimeraTK-DeviceAccess-DoocsBackend  01.09.02
testZeroMQ.cpp
Go to the documentation of this file.
1 
2 #include "eq_dummy.h"
3 
4 #include <random>
5 #include <thread>
6 
7 #define BOOST_TEST_MODULE testZeroMQ
8 #include <doocs-server-test-helper/doocsServerTestHelper.h>
9 
10 #include <ChimeraTK/Device.h>
11 #include <ChimeraTK/TransferGroup.h>
12 
13 #include <boost/test/included/unit_test.hpp>
14 // For CHECK_TIMEOUT
15 #include <doocs-server-test-helper/ThreadedDoocsServer.h>
16 #include <doocs/EqCall.h>
17 
18 #include <ChimeraTK/UnifiedBackendTest.h>
19 
20 using namespace boost::unit_test_framework;
21 using namespace ChimeraTK;
22 
23 /**********************************************************************************************************************/
24 
25 class DoocsLauncher : public ThreadedDoocsServer {
26  public:
28  : ThreadedDoocsServer("testZeroMQ.conf", boost::unit_test::framework::master_test_suite().argc,
29  boost::unit_test::framework::master_test_suite().argv, eq_dummy::createServer()) {
30  // set CDDs for the two doocs addresses used in the test
31  DoocsServer1 = "(doocs:doocs://localhost:" + rpcNo() + "/F/D)";
32  DoocsServer2 = "(doocs:doocs://localhost:" + rpcNo() + "/F/D/MYDUMMY)";
33  // wait until server has started (both the update thread and the rpc thread)
34  doocs::EqCall eq;
35  doocs::EqAdr ea;
36  doocs::EqData src, dst;
37  ea.adr("doocs://localhost:" + rpcNo() + "/F/D/MYDUMMY/SOME_ZMQINT");
38  while(eq.get(&ea, &src, &dst)) usleep(100000);
39  }
40 
41  static std::string DoocsServer1;
42  static std::string DoocsServer2;
43 };
44 
45 std::string DoocsLauncher::DoocsServer1;
46 std::string DoocsLauncher::DoocsServer2;
47 
49 
50 /**********************************************************************************************************************/
51 
52 BOOST_AUTO_TEST_CASE(testZeroMQ) {
53  ChimeraTK::Device device;
54  device.open(DoocsLauncher::DoocsServer1);
55  device.activateAsyncRead();
56 
57  ScalarRegisterAccessor<int32_t> acc(
58  device.getScalarRegisterAccessor<int32_t>("MYDUMMY/SOME_ZMQINT", 0, {AccessMode::wait_for_new_data}));
59 
60  BOOST_CHECK(acc.readNonBlocking() == false);
61 
62  // Send updates until the ZMQ interface is initialised (this is done in the background unfortunately).
63  // For some reason, only one single successful update does not necessarily mean the connection is working
64  // reliably already, so we make sure that we got 10 successful updates (note that ic is incremented and
65  // checked only if readNonBlocking() was successful). Without this trick, the test is failing almost always
66  // when compiling with TSAN.
67  size_t ic = 0;
68  while(!acc.readNonBlocking() || ++ic < 10) {
69  DoocsServerTestHelper::runUpdate();
70  }
71  // empty the queue
72  usleep(100000);
73  acc.readLatest();
74  usleep(100000);
75 
76  BOOST_CHECK(acc.readNonBlocking() == false);
77 
78  // check a simple read
79  DoocsServerTestHelper::doocsSet("//MYDUMMY/SOME_ZMQINT", 1);
80  DoocsServerTestHelper::runUpdate();
81 
82  CHECK_TIMEOUT(acc.readNonBlocking() == true, 300000);
83  BOOST_CHECK_EQUAL(acc, 2);
84  BOOST_CHECK(acc.readNonBlocking() == false);
85 
86  // test having 3 updates in the queue
87  DoocsServerTestHelper::runUpdate();
88  DoocsServerTestHelper::runUpdate();
89  DoocsServerTestHelper::runUpdate();
90 
91  acc.read();
92  BOOST_CHECK_EQUAL(acc, 3);
93  CHECK_TIMEOUT(acc.readNonBlocking() == true, 30000);
94  BOOST_CHECK_EQUAL(acc, 4);
95  acc.read();
96  BOOST_CHECK_EQUAL(acc, 5);
97 
98  // test if read really blocks when having no update in the queue
99  {
100  std::atomic<bool> readFinished(false);
101  std::promise<void> prom;
102  std::future<void> fut = prom.get_future();
103  std::thread readAsync([&acc, &prom, &readFinished]() {
104  acc.read();
105  readFinished = true;
106  prom.set_value();
107  });
108  fut.wait_for(std::chrono::milliseconds(500));
109  BOOST_CHECK(readFinished == false);
110  DoocsServerTestHelper::runUpdate();
111  fut.wait_for(std::chrono::milliseconds(500));
112  BOOST_CHECK(readFinished == true);
113  readAsync.join();
114  }
115 
116  // test shutdown procedure
117  {
118  std::atomic<bool> threadInterrupted(false);
119  std::promise<void> prom;
120  std::future<void> fut = prom.get_future();
121  std::thread readAsync([&acc, &prom, &threadInterrupted]() {
122  try {
123  acc.read();
124  }
125  catch(boost::thread_interrupted&) {
126  // should end up here!
127  threadInterrupted = true;
128  prom.set_value();
129  }
130  });
131  fut.wait_for(std::chrono::milliseconds(500));
132  BOOST_CHECK(threadInterrupted == false);
133  acc.getHighLevelImplElement()->interrupt();
134  fut.wait_for(std::chrono::milliseconds(500));
135  BOOST_CHECK(threadInterrupted == true);
136  readAsync.join();
137  }
138 
139  device.close();
140 }
141 
142 /**********************************************************************************************************************/
DoocsLauncher::DoocsServer1
static std::string DoocsServer1
Definition: testDoocsBackend.cpp:69
BOOST_AUTO_TEST_CASE
BOOST_AUTO_TEST_CASE(testZeroMQ)
Definition: testZeroMQ.cpp:52
eq_dummy
Definition: eq_dummy.h:7
DoocsLauncher
Definition: testDoocsBackend.cpp:37
DoocsLauncher::DoocsLauncher
DoocsLauncher()
Definition: testZeroMQ.cpp:27
eq_dummy.h
BOOST_GLOBAL_FIXTURE
BOOST_GLOBAL_FIXTURE(DoocsLauncher)
DoocsLauncher::DoocsServer2
static std::string DoocsServer2
Definition: testDoocsBackend.cpp:73
ChimeraTK
Definition: spec_DoocsBackend.dox:2