ChimeraTK-DeviceAccess 03.25.00
Loading...
Searching...
No Matches
testHistorizedDataMatching.cpp
Go to the documentation of this file.
1// SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
2// SPDX-License-Identifier: LGPL-3.0-or-later
3
4#include <thread>
5#define BOOST_TEST_DYN_LINK
6#define BOOST_TEST_MODULE DataConsistencyGroupTest
7#include <boost/test/unit_test.hpp>
8using namespace boost::unit_test_framework;
9
11#include "Device.h"
12#include "ReadAnyGroup.h"
13
14using namespace ChimeraTK;
15
16BOOST_AUTO_TEST_SUITE(HistorizedDataMatchingTestSuite)
17// Note, test code testDataConsistencyGroup is unsuitable for testing extended data matching,
18// since it is based on explicitly provided changes in user-buffers.
19
21 // empty read queue before start of next test.
22 // this also gets rid of initial values
23 unsigned numDiscarded = 0;
25 while((id = rag.readAnyNonBlocking()).isValid()) {
26 if(dg) {
27 dg->update(id);
28 }
29 numDiscarded++;
30 }
31 return numDiscarded;
32};
33
34struct Fixture {
36 sem_init(&sem, 0, 0);
37
38 dev.open("(logicalNameMap?map=historizedDataMatching.xlmap)");
40 readAccA.replace(dev.getScalarRegisterAccessor<int32_t>("/A", 0, {AccessMode::wait_for_new_data}));
41 readAccB.replace(dev.getScalarRegisterAccessor<int32_t>("/B", 0, {AccessMode::wait_for_new_data}));
42 }
43
45 // we need a timeout since with HDataConsistencyGroup, we only get a chance to acknowledge consistent data updates
46 timespec timeout{};
47 timeval now{};
48 gettimeofday(&now, nullptr);
49 timeout.tv_sec = now.tv_sec + 1;
50 timeout.tv_nsec = now.tv_usec * 300;
51 sem_timedwait(&sem, &timeout);
52 };
53
54 // loop for updater thread: option to delay updates on A and B
55 // e.g. B has delay=2: A=v1, A=v2, A=v3, B=v1, A=v4, B=v2, ...
56 void updaterLoop(unsigned nLoops, unsigned delay, unsigned duplicateVns = 0, bool catchUp = false) {
57 std::cout << "updaterLoop: delay=" << delay << ", duplicateVns=" << duplicateVns << std::endl;
58 auto accA = dev.getScalarRegisterAccessor<int32_t>("/A");
59 auto accB = dev.getScalarRegisterAccessor<int32_t>("/B");
60 // note, the default-constructed VersionNumbers in here will not be used
61 std::vector<ChimeraTK::VersionNumber> vs(nLoops);
62 for(unsigned loopCount = 0; loopCount < nLoops; loopCount++) {
63 std::cout << "updaterLoop: writing value " << loopCount << std::endl;
64 accA = (int32_t)loopCount;
65 retryWrite:
66 // device might be in error state so write can throw
67 try {
68 if(loopCount > 0 && duplicateVns > 0) {
69 vs[loopCount] = vs[loopCount - 1]; // use last VersionNumber another time
70 duplicateVns--;
71 }
72 else {
73 vs[loopCount] = {}; // new VersionNumber
74 }
75 accA.write(vs[loopCount]);
76 if(loopCount >= delay) {
77 accB = (int32_t)(loopCount - delay);
78 accB.write(vs[loopCount - delay]);
79 }
80 if(loopCount == nLoops - 1 && delay > 0 && catchUp) {
81 // let variable B catch up with A
82 for(unsigned i = loopCount - delay + 1; i < nLoops; i++) {
83 accB = (int32_t)(i);
84 accB.write(vs[i]);
85 }
86 }
87 }
88 catch(ChimeraTK::runtime_error& e) {
89 std::cout << "updaterLoop: exception, wait and retry write.." << std::endl;
90 std::this_thread::sleep_for(std::chrono::milliseconds{500});
91 goto retryWrite;
92 }
93 // wait on data receive before writing next
95 }
96 // interrupt blocking reads in test thread so it can terminate. It should be sufficient to
97 // interrupt one of the read accessors.
99 };
100
104
105 // we use a semaphore to acknowlegde data received; like that we avoid future_queue overruns
106 sem_t sem;
107};
108
110 // minimal test code: we create two read accessors on variables defined in xlmap, put them into a ReadAnyGroup and a
111 // historized DataConsistencyGroup. We provide data from another thread, as in real use case. Then using different
112 // delay settings for updates of A and B, check expected number of consistent data updates.
113
114 ChimeraTK::ReadAnyGroup rag{readAccA, readAccB};
115
116 std::cout << "test1: no history" << std::endl;
117 // without historized DataConsistencyGroup, check that we get consistent updates only if delay is 0
118 const unsigned nLoops = 4;
119 for(unsigned delay = 0; delay <= 2; delay++) {
120 emptyQueues(rag);
121 unsigned nUpdates = 0;
122 unsigned nConsistentUpdates = 0;
123 std::thread updaterThread1([&]() { updaterLoop(nLoops, delay); });
124 // test loop consuming data
125 try {
126 while(true) {
127 auto id = rag.readAny();
128 auto& acc = id == readAccA.getId() ? readAccA : readAccB;
129 std::cout << "readAny: seeing update for target " << acc.getName() << " vs " << acc.getVersionNumber()
130 << std::endl;
131
132 nUpdates++;
133 if(readAccA.getVersionNumber() == readAccB.getVersionNumber()) {
134 nConsistentUpdates++;
135 }
136 if(id == readAccA.getId()) {
137 sem_post(&sem);
138 }
139 }
140 }
141 catch(boost::thread_interrupted& e) {
142 std::cout << "thread interrupted" << std::endl;
143 }
144 updaterThread1.join();
145
146 unsigned nExpectedConsistentUpdates = (delay == 0) ? nUpdates / 2 : 0;
147 BOOST_TEST(nConsistentUpdates == nExpectedConsistentUpdates);
148 BOOST_TEST(nUpdates == nLoops + nLoops - delay);
149 }
150
151 std::cout << "test1: with history" << std::endl;
152 DataConsistencyGroup dg({readAccA, readAccB}, DataConsistencyGroup::MatchingMode::historized);
153
154 for(unsigned delay = 0; delay <= 2; delay++) {
155 // With HDataConsistencyGroup, check that we get N-delay consistent updates.
156 // Also check that we have consistent data (e.g. data=versionnumber counter)
157 emptyQueues(rag, &dg);
158 unsigned nUpdates = 0;
159 unsigned nConsistentUpdates = 0;
160 std::thread updaterThread1([&]() { updaterLoop(nLoops, delay); });
161 // test loop consuming data
162 try {
163 while(true) {
164 auto id = rag.readAny();
165 bool isConsistent = dg.update(id);
166 nUpdates++;
167 if(readAccA.getVersionNumber() == readAccB.getVersionNumber()) {
168 nConsistentUpdates++;
169 }
170 // check data consistency via VersionNumber and content
171 BOOST_TEST(isConsistent);
172 BOOST_TEST(readAccA.getVersionNumber() == readAccB.getVersionNumber());
173 BOOST_TEST(readAccA == readAccB);
174 // acknowledge data received: here updated id is irrelevant
175 sem_post(&sem);
176 }
177 }
178 catch(boost::thread_interrupted& e) {
179 std::cout << "thread interrupted" << std::endl;
180 }
181 updaterThread1.join();
182 BOOST_TEST(nConsistentUpdates == nLoops - delay);
183 BOOST_TEST(nConsistentUpdates == nUpdates);
184 }
185}
186
188 std::cout << "testDuplicateVns" << std::endl;
189
190 ChimeraTK::ReadAnyGroup rag{readAccA, readAccB};
191
192 DataConsistencyGroup dg({readAccA, readAccB}, DataConsistencyGroup::MatchingMode::historized);
193
194 const unsigned nLoops = 4;
195 const unsigned nDuplicateVns = 1;
196
197 for(unsigned delay = 0; delay <= 2; delay++) {
198 // With MatchingMode::historized, check that we get N-delay consistent updates.
199 // Also check that we have consistent data (e.g. data=versionnumber counter)
200 emptyQueues(rag, &dg);
201 unsigned nUpdates = 0;
202 unsigned nConsistentUpdates = 0;
203 std::thread updaterThread1([&]() { updaterLoop(nLoops, delay, nDuplicateVns); });
204 // test loop consuming data
205 try {
206 while(true) {
207 auto id = rag.readAny();
208 nUpdates++;
209 bool isConsistent = dg.update(id);
210 if(readAccA.getVersionNumber() == readAccB.getVersionNumber()) {
211 nConsistentUpdates++;
212 }
213 // check data consistency via VersionNumber
214 BOOST_TEST(isConsistent);
215 BOOST_TEST(readAccA.getVersionNumber() == readAccB.getVersionNumber());
216 // acknowledge data received: here updated id is irrelevant
217 sem_post(&sem);
218 }
219 }
220 catch(boost::thread_interrupted& e) {
221 std::cout << "thread interrupted" << std::endl;
222 }
223 updaterThread1.join();
224 unsigned nExpectedUpdates = nLoops - delay;
225 if(delay == 0) {
226 // for each VersionNumber that is repeated for A, we should get one extra consistent update
227 nExpectedUpdates++;
228 }
229 BOOST_TEST(nConsistentUpdates == nExpectedUpdates);
230 BOOST_TEST(nConsistentUpdates == nUpdates);
231 }
232}
233
235 std::cout << "testExceptions" << std::endl;
236
237 ChimeraTK::ReadAnyGroup rag{readAccA, readAccB};
238 DataConsistencyGroup dg({readAccA, readAccB}, DataConsistencyGroup::MatchingMode::historized);
239
240 const unsigned nLoops = 6;
241 const unsigned delay = 0;
242 {
243 // With HDataConsistencyGroup, check that we get N-delay consistent updates.
244 // Also check that we have consistent data (e.g. data=versionnumber counter)
245 emptyQueues(rag, &dg);
246 unsigned nUpdates = 0;
247 unsigned nConsistentUpdates = 0;
248 std::thread updaterThread1([&]() { updaterLoop(nLoops, delay); });
249 retry:
250 // test loop consuming data
251 try {
252 while(true) {
253 auto id = rag.readAny();
254 auto& acc = id == readAccA.getId() ? readAccA : readAccB;
255 std::cout << "readAny: seeing update for target " << acc.getName() << " vs " << acc.getVersionNumber()
256 << " values " << readAccA << "," << readAccB << std::endl;
257
258 bool isConsistent = dg.update(id);
259 nUpdates++;
260 if(readAccA.getVersionNumber() == readAccB.getVersionNumber()) {
261 nConsistentUpdates++;
262 }
263 // check data consistency via VersionNumber and content
264 BOOST_TEST(isConsistent);
265 BOOST_TEST(readAccA.getVersionNumber() == readAccB.getVersionNumber());
266 BOOST_TEST(readAccA == readAccB);
267
268 if(nUpdates == 2) {
269 // shortly put device into exception state and recover. When the exception is seen by the accessors,
270 // it should pop out from readAny but after that, we should be able to continue normal operation (at retry:)
271 dev.setException("exception");
272 dev.open();
273 dev.activateAsyncRead();
274 }
275
276 // acknowledge data received: here updated id is irrelevant
277 sem_post(&sem);
278 }
279 }
280 catch(boost::thread_interrupted& e) {
281 std::cout << "thread interrupted" << std::endl;
282 }
283 catch(ChimeraTK::runtime_error& e) {
284 std::cout << "runtime error: " << e.what() << std::endl;
285 goto retry;
286 }
287
288 updaterThread1.join();
289 // one more update since after exception, open, activateAsyncRead, we get another initial value
290 BOOST_TEST(nConsistentUpdates == nLoops - delay + 1);
291 BOOST_TEST(nConsistentUpdates == nUpdates);
292 }
293}
294
296 std::cout << "testCatchUp" << std::endl;
297
298 ChimeraTK::ReadAnyGroup rag{readAccA, readAccB};
299 DataConsistencyGroup dg({readAccA, readAccB}, DataConsistencyGroup::MatchingMode::historized);
300
301 const unsigned nLoops = 6;
302 const unsigned delay = 2;
303
304 // With HDataConsistencyGroup, check that we get N consistent updates, even when we
305 // have update delay on second variable that appears and vanishes again.
306 emptyQueues(rag, &dg);
307 unsigned nUpdates = 0;
308 unsigned nConsistentUpdates = 0;
309 std::thread updaterThread1([&]() { updaterLoop(nLoops, delay, false, true); });
310
311 // test loop consuming data
312 try {
313 while(true) {
314 auto id = rag.readAny();
315 auto& acc = id == readAccA.getId() ? readAccA : readAccB;
316 std::cout << "readAny: seeing update for target " << acc.getName() << " vs " << acc.getVersionNumber()
317 << " values " << readAccA << "," << readAccB << std::endl;
318
319 bool isConsistent = dg.update(id);
320 nUpdates++;
321 if(readAccA.getVersionNumber() == readAccB.getVersionNumber()) {
322 nConsistentUpdates++;
323 }
324 // check data consistency via VersionNumber and content
325 BOOST_TEST(isConsistent);
326 BOOST_TEST(readAccA.getVersionNumber() == readAccB.getVersionNumber());
327 BOOST_TEST(readAccA == readAccB);
328
329 // acknowledge data received: here updated id is irrelevant
330 sem_post(&sem);
331 }
332 }
333 catch(boost::thread_interrupted& e) {
334 std::cout << "thread interrupted" << std::endl;
335 }
336
337 updaterThread1.join();
338 BOOST_TEST(nConsistentUpdates == nLoops);
339 BOOST_TEST(nConsistentUpdates == nUpdates);
340}
341
342BOOST_FIXTURE_TEST_CASE(testInitialValues, Fixture) {
343 std::cout << "testInitialValues" << std::endl;
344 // at start VersionNum(A)=0, since no read has yet occurred
345
346 ChimeraTK::ReadAnyGroup rag{readAccA, readAccB};
347 DataConsistencyGroup dg({readAccA, readAccB}, DataConsistencyGroup::MatchingMode::historized);
348
349 unsigned nDiscarded = emptyQueues(rag);
350 // after read, VersionNumbers must be non-zero.
351 // Note, initial values here count as one consistent set.
352 BOOST_TEST(nDiscarded == 1);
353
354 BOOST_TEST(readAccA.getVersionNumber() != VersionNumber(nullptr));
355 BOOST_TEST(readAccB.getVersionNumber() != VersionNumber(nullptr));
356}
357
359 std::cout << "testIllegalUse" << std::endl;
360
361 ChimeraTK::ReadAnyGroup rag{readAccA, readAccB};
362 DataConsistencyGroup dg({readAccA, readAccB}, DataConsistencyGroup::MatchingMode::historized);
363
364 BOOST_CHECK_THROW(
365 DataConsistencyGroup({readAccA}, DataConsistencyGroup::MatchingMode::historized), ChimeraTK::logic_error);
366}
367
368BOOST_AUTO_TEST_SUITE_END()
Group several registers (= TransferElement) which ensures data consistency across multiple variables ...
Class allows to read/write registers from device.
Definition Device.h:39
ScalarRegisterAccessor< UserType > getScalarRegisterAccessor(const RegisterPath &registerPathName, size_t wordOffsetInRegister=0, const AccessModeFlags &flags=AccessModeFlags({})) const
Get a ScalarRegisterObject object for the given register.
Definition Device.h:266
void activateAsyncRead() noexcept
Activate asyncronous read for all transfer elements where AccessMode::wait_for_new_data is set.
Definition Device.cc:91
void open(std::string const &aliasName)
Open a device by the given alias name from the DMAP file.
Definition Device.cc:58
void replace(const NDRegisterAccessorAbstractor< UserType > &newAccessor)
Assign a new accessor to this NDRegisterAccessorAbstractor.
Group several registers (= TransferElement) to allow waiting for an update of any of the registers.
Accessor class to read and write scalar registers transparently by using the accessor object like a v...
void interrupt()
Return from a blocking read immediately and throw boost::thread_interrupted.
Simple class holding a unique ID for a TransferElement.
Class for generating and holding version numbers without exposing a numeric representation.
Exception thrown when a logic error has occured.
Definition Exception.h:51
Exception thrown when a runtime error has occured.
Definition Exception.h:18
ScalarRegisterAccessor< int32_t > readAccB
ScalarRegisterAccessor< int32_t > readAccA
void updaterLoop(unsigned nLoops, unsigned delay, unsigned duplicateVns=0, bool catchUp=false)
BOOST_FIXTURE_TEST_CASE(test1, Fixture)
auto emptyQueues(ReadAnyGroup &rag, DataConsistencyGroup *dg=nullptr)