ChimeraTK-DeviceAccess-DoocsBackend 01.12.00
Loading...
Searching...
No Matches
testZeroMQ.cpp
Go to the documentation of this file.
1
2#include "eq_dummy.h"
3
4#include <random>
5#include <thread>
6
7#define BOOST_TEST_MODULE testZeroMQ
8#include <ChimeraTK/Device.h>
9#include <ChimeraTK/TransferGroup.h>
10
11#include <doocs-server-test-helper/doocsServerTestHelper.h>
12
13#include <boost/test/included/unit_test.hpp>
14// For CHECK_TIMEOUT
15#include <ChimeraTK/UnifiedBackendTest.h>
16
17#include <doocs-server-test-helper/ThreadedDoocsServer.h>
18#include <doocs/EqCall.h>
19
20using namespace boost::unit_test_framework;
21using namespace ChimeraTK;
22
23/**********************************************************************************************************************/
24
25class DoocsLauncher : public ThreadedDoocsServer {
26 public:
28 : ThreadedDoocsServer("testZeroMQ.conf", boost::unit_test::framework::master_test_suite().argc,
29 boost::unit_test::framework::master_test_suite().argv, eq_dummy::createServer()) {
30 // set CDDs for the two doocs addresses used in the test
31 DoocsServer1 = "(doocs:doocs://localhost:" + rpcNo() + "/F/D)";
32 DoocsServer2 = "(doocs:doocs://localhost:" + rpcNo() + "/F/D/MYDUMMY)";
33 // wait until server has started (both the update thread and the rpc thread)
34 doocs::EqCall eq;
35 doocs::EqAdr ea;
36 doocs::EqData src, dst;
37 ea.adr("doocs://localhost:" + rpcNo() + "/F/D/MYDUMMY/SOME_ZMQINT");
38 while(eq.get(&ea, &src, &dst)) usleep(100000);
39 }
40
41 static std::string DoocsServer1;
42 static std::string DoocsServer2;
43};
44
47
49
50/**********************************************************************************************************************/
51
53 ChimeraTK::Device device;
54 device.open(DoocsLauncher::DoocsServer1);
55 device.activateAsyncRead();
56
57 ScalarRegisterAccessor<int32_t> acc(
58 device.getScalarRegisterAccessor<int32_t>("MYDUMMY/SOME_ZMQINT", 0, {AccessMode::wait_for_new_data}));
59
60 BOOST_CHECK(acc.readNonBlocking() == false);
61
62 // Send updates until the ZMQ interface is initialised (this is done in the background unfortunately).
63 // For some reason, only one single successful update does not necessarily mean the connection is working
64 // reliably already, so we make sure that we got 10 successful updates (note that ic is incremented and
65 // checked only if readNonBlocking() was successful). Without this trick, the test is failing almost always
66 // when compiling with TSAN.
67 size_t ic = 0;
68 while(!acc.readNonBlocking() || ++ic < 10) {
69 DoocsServerTestHelper::runUpdate();
70 }
71 // empty the queue
72 usleep(100000);
73 acc.readLatest();
74 usleep(100000);
75
76 BOOST_CHECK(acc.readNonBlocking() == false);
77
78 // check a simple read
79 DoocsServerTestHelper::doocsSet("//MYDUMMY/SOME_ZMQINT", 1);
80 DoocsServerTestHelper::runUpdate();
81
82 CHECK_TIMEOUT(acc.readNonBlocking() == true, 300000);
83 BOOST_CHECK_EQUAL(acc, 2);
84 BOOST_CHECK(acc.readNonBlocking() == false);
85
86 // test having 3 updates in the queue
87 DoocsServerTestHelper::runUpdate();
88 DoocsServerTestHelper::runUpdate();
89 DoocsServerTestHelper::runUpdate();
90
91 acc.read();
92 BOOST_CHECK_EQUAL(acc, 3);
93 CHECK_TIMEOUT(acc.readNonBlocking() == true, 30000);
94 BOOST_CHECK_EQUAL(acc, 4);
95 acc.read();
96 BOOST_CHECK_EQUAL(acc, 5);
97
98 // test if read really blocks when having no update in the queue
99 {
100 std::atomic<bool> readFinished(false);
101 std::promise<void> prom;
102 std::future<void> fut = prom.get_future();
103 std::thread readAsync([&acc, &prom, &readFinished]() {
104 acc.read();
105 readFinished = true;
106 prom.set_value();
107 });
108 fut.wait_for(std::chrono::milliseconds(500));
109 BOOST_CHECK(readFinished == false);
110 DoocsServerTestHelper::runUpdate();
111 fut.wait_for(std::chrono::milliseconds(500));
112 BOOST_CHECK(readFinished == true);
113 readAsync.join();
114 }
115
116 // test shutdown procedure
117 {
118 std::atomic<bool> threadInterrupted(false);
119 std::promise<void> prom;
120 std::future<void> fut = prom.get_future();
121 std::thread readAsync([&acc, &prom, &threadInterrupted]() {
122 try {
123 acc.read();
124 }
125 catch(boost::thread_interrupted&) {
126 // should end up here!
127 threadInterrupted = true;
128 prom.set_value();
129 }
130 });
131 fut.wait_for(std::chrono::milliseconds(500));
132 BOOST_CHECK(threadInterrupted == false);
133 acc.getHighLevelImplElement()->interrupt();
134 fut.wait_for(std::chrono::milliseconds(500));
135 BOOST_CHECK(threadInterrupted == true);
136 readAsync.join();
137 }
138
139 device.close();
140}
141
142/**********************************************************************************************************************/
static std::string DoocsServer1
static std::string DoocsServer2
BOOST_GLOBAL_FIXTURE(DoocsLauncher)
BOOST_AUTO_TEST_CASE(testZeroMQ)