ChimeraTK-DeviceAccess 03.26.00
Loading...
Searching...
No Matches
testHistorizedDataMatching.cpp
Go to the documentation of this file.
1// SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
2// SPDX-License-Identifier: LGPL-3.0-or-later
3
4#include <thread>
5#define BOOST_TEST_DYN_LINK
6#define BOOST_TEST_MODULE DataConsistencyGroupTest
7#include <boost/test/unit_test.hpp>
8using namespace boost::unit_test_framework;
9
11#include "Device.h"
13#include "ReadAnyGroup.h"
14
15using namespace ChimeraTK;
16
17BOOST_AUTO_TEST_SUITE(HistorizedDataMatchingTestSuite)
18// Note, test code testDataConsistencyGroup is unsuitable for testing extended data matching,
19// since it is based on explicitly provided changes in user-buffers.
20
22 // empty read queue before start of next test.
23 // this also gets rid of initial values
24 unsigned numDiscarded = 0;
26 while((id = rag.readAnyNonBlocking()).isValid()) {
27 if(dg) {
28 dg->update(id);
29 }
30 numDiscarded++;
31 }
32 return numDiscarded;
33};
34
35struct Fixture {
37 sem_init(&sem, 0, 0);
38
39 dev.open("(logicalNameMap?map=historizedDataMatching.xlmap)");
41 readAccA.replace(dev.getScalarRegisterAccessor<int32_t>("/A", 0, {AccessMode::wait_for_new_data}));
42 readAccB.replace(dev.getScalarRegisterAccessor<int32_t>("/B", 0, {AccessMode::wait_for_new_data}));
43 }
44
46 // we need a timeout since with HDataConsistencyGroup, we only get a chance to acknowledge consistent data updates
47 timespec timeout{};
48 timeval now{};
49 gettimeofday(&now, nullptr);
50 timeout.tv_sec = now.tv_sec + 1;
51 timeout.tv_nsec = now.tv_usec * 300;
52 sem_timedwait(&sem, &timeout);
53 };
54
55 // loop for updater thread: option to delay updates on A and B
56 // e.g. B has delay=2: A=v1, A=v2, A=v3, B=v1, A=v4, B=v2, ...
57 void updaterLoop(unsigned nLoops, unsigned delay, unsigned duplicateVns = 0, bool catchUp = false) {
58 std::cout << "updaterLoop: delay=" << delay << ", duplicateVns=" << duplicateVns << std::endl;
59 auto accA = dev.getScalarRegisterAccessor<int32_t>("/A");
60 auto accB = dev.getScalarRegisterAccessor<int32_t>("/B");
61 // note, the default-constructed VersionNumbers in here will not be used
62 std::vector<ChimeraTK::VersionNumber> vs(nLoops);
63 for(unsigned loopCount = 0; loopCount < nLoops; loopCount++) {
64 std::cout << "updaterLoop: writing value " << loopCount << std::endl;
65 accA = (int32_t)loopCount;
66 retryWrite:
67 // device might be in error state so write can throw
68 try {
69 if(loopCount > 0 && duplicateVns > 0) {
70 vs[loopCount] = vs[loopCount - 1]; // use last VersionNumber another time
71 duplicateVns--;
72 }
73 else {
74 vs[loopCount] = {}; // new VersionNumber
75 }
76 accA.write(vs[loopCount]);
77 if(loopCount >= delay) {
78 accB = (int32_t)(loopCount - delay);
79 accB.write(vs[loopCount - delay]);
80 }
81 if(loopCount == nLoops - 1 && delay > 0 && catchUp) {
82 // let variable B catch up with A
83 for(unsigned i = loopCount - delay + 1; i < nLoops; i++) {
84 accB = (int32_t)(i);
85 accB.write(vs[i]);
86 }
87 }
88 }
89 catch(ChimeraTK::runtime_error& e) {
90 std::cout << "updaterLoop: exception, wait and retry write.." << std::endl;
91 std::this_thread::sleep_for(std::chrono::milliseconds{500});
92 goto retryWrite;
93 }
94 // wait on data receive before writing next
96 }
97 // interrupt blocking reads in test thread so it can terminate. It should be sufficient to
98 // interrupt one of the read accessors.
100 };
101
105
106 // we use a semaphore to acknowlegde data received; like that we avoid future_queue overruns
107 sem_t sem;
108};
109
111 // minimal test code: we create two read accessors on variables defined in xlmap, put them into a ReadAnyGroup and a
112 // historized DataConsistencyGroup. We provide data from another thread, as in real use case. Then using different
113 // delay settings for updates of A and B, check expected number of consistent data updates.
114
115 ChimeraTK::ReadAnyGroup rag{readAccA, readAccB};
116
117 std::cout << "test1: no history" << std::endl;
118 // without historized DataConsistencyGroup, check that we get consistent updates only if delay is 0
119 const unsigned nLoops = 4;
120 for(unsigned delay = 0; delay <= 2; delay++) {
121 emptyQueues(rag);
122 unsigned nUpdates = 0;
123 unsigned nConsistentUpdates = 0;
124 std::thread updaterThread1([&]() { updaterLoop(nLoops, delay); });
125 // test loop consuming data
126 try {
127 while(true) {
128 auto id = rag.readAny();
129 auto& acc = id == readAccA.getId() ? readAccA : readAccB;
130 std::cout << "readAny: seeing update for target " << acc.getName() << " vs " << acc.getVersionNumber()
131 << std::endl;
132
133 nUpdates++;
134 if(readAccA.getVersionNumber() == readAccB.getVersionNumber()) {
135 nConsistentUpdates++;
136 }
137 if(id == readAccA.getId()) {
138 sem_post(&sem);
139 }
140 }
141 }
142 catch(boost::thread_interrupted& e) {
143 std::cout << "thread interrupted" << std::endl;
144 }
145 updaterThread1.join();
146
147 unsigned nExpectedConsistentUpdates = (delay == 0) ? nUpdates / 2 : 0;
148 BOOST_TEST(nConsistentUpdates == nExpectedConsistentUpdates);
149 BOOST_TEST(nUpdates == nLoops + nLoops - delay);
150 }
151
152 std::cout << "test1: with history" << std::endl;
153 DataConsistencyGroup dg({readAccA, readAccB}, DataConsistencyGroup::MatchingMode::historized);
154
155 for(unsigned delay = 0; delay <= 2; delay++) {
156 // With HDataConsistencyGroup, check that we get N-delay consistent updates.
157 // Also check that we have consistent data (e.g. data=versionnumber counter)
158 emptyQueues(rag, &dg);
159 unsigned nUpdates = 0;
160 unsigned nConsistentUpdates = 0;
161 std::thread updaterThread1([&]() { updaterLoop(nLoops, delay); });
162 // test loop consuming data
163 try {
164 while(true) {
165 auto id = rag.readAny();
166 bool isConsistent = dg.update(id);
167 nUpdates++;
168 if(readAccA.getVersionNumber() == readAccB.getVersionNumber()) {
169 nConsistentUpdates++;
170 }
171 // check data consistency via VersionNumber and content
172 BOOST_TEST(isConsistent);
173 BOOST_TEST(readAccA.getVersionNumber() == readAccB.getVersionNumber());
174 BOOST_TEST(readAccA == readAccB);
175 // acknowledge data received: here updated id is irrelevant
176 sem_post(&sem);
177 }
178 }
179 catch(boost::thread_interrupted& e) {
180 std::cout << "thread interrupted" << std::endl;
181 }
182 updaterThread1.join();
183 BOOST_TEST(nConsistentUpdates == nLoops - delay);
184 BOOST_TEST(nConsistentUpdates == nUpdates);
185 }
186}
187
189 std::cout << "testDuplicateVns" << std::endl;
190
191 ChimeraTK::ReadAnyGroup rag{readAccA, readAccB};
192
193 DataConsistencyGroup dg({readAccA, readAccB}, DataConsistencyGroup::MatchingMode::historized);
194
195 const unsigned nLoops = 4;
196 const unsigned nDuplicateVns = 1;
197
198 for(unsigned delay = 0; delay <= 2; delay++) {
199 // With MatchingMode::historized, check that we get N-delay consistent updates.
200 // Also check that we have consistent data (e.g. data=versionnumber counter)
201 emptyQueues(rag, &dg);
202 unsigned nUpdates = 0;
203 unsigned nConsistentUpdates = 0;
204 std::thread updaterThread1([&]() { updaterLoop(nLoops, delay, nDuplicateVns); });
205 // test loop consuming data
206 try {
207 while(true) {
208 auto id = rag.readAny();
209 nUpdates++;
210 bool isConsistent = dg.update(id);
211 if(readAccA.getVersionNumber() == readAccB.getVersionNumber()) {
212 nConsistentUpdates++;
213 }
214 // check data consistency via VersionNumber
215 BOOST_TEST(isConsistent);
216 BOOST_TEST(readAccA.getVersionNumber() == readAccB.getVersionNumber());
217 // acknowledge data received: here updated id is irrelevant
218 sem_post(&sem);
219 }
220 }
221 catch(boost::thread_interrupted& e) {
222 std::cout << "thread interrupted" << std::endl;
223 }
224 updaterThread1.join();
225 unsigned nExpectedUpdates = nLoops - delay;
226 if(delay == 0) {
227 // for each VersionNumber that is repeated for A, we should get one extra consistent update
228 nExpectedUpdates++;
229 }
230 BOOST_TEST(nConsistentUpdates == nExpectedUpdates);
231 BOOST_TEST(nConsistentUpdates == nUpdates);
232 }
233}
234
236 std::cout << "testExceptions" << std::endl;
237
238 ChimeraTK::ReadAnyGroup rag{readAccA, readAccB};
239 DataConsistencyGroup dg({readAccA, readAccB}, DataConsistencyGroup::MatchingMode::historized);
240
241 const unsigned nLoops = 6;
242 const unsigned delay = 0;
243 {
244 // With HDataConsistencyGroup, check that we get N-delay consistent updates.
245 // Also check that we have consistent data (e.g. data=versionnumber counter)
246 emptyQueues(rag, &dg);
247 unsigned nUpdates = 0;
248 unsigned nConsistentUpdates = 0;
249 std::thread updaterThread1([&]() { updaterLoop(nLoops, delay); });
250 retry:
251 // test loop consuming data
252 try {
253 while(true) {
254 auto id = rag.readAny();
255 auto& acc = id == readAccA.getId() ? readAccA : readAccB;
256 std::cout << "readAny: seeing update for target " << acc.getName() << " vs " << acc.getVersionNumber()
257 << " values " << readAccA << "," << readAccB << std::endl;
258
259 bool isConsistent = dg.update(id);
260 nUpdates++;
261 if(readAccA.getVersionNumber() == readAccB.getVersionNumber()) {
262 nConsistentUpdates++;
263 }
264 // check data consistency via VersionNumber and content
265 BOOST_TEST(isConsistent);
266 BOOST_TEST(readAccA.getVersionNumber() == readAccB.getVersionNumber());
267 BOOST_TEST(readAccA == readAccB);
268
269 if(nUpdates == 2) {
270 // shortly put device into exception state and recover. When the exception is seen by the accessors,
271 // it should pop out from readAny but after that, we should be able to continue normal operation (at retry:)
272 dev.setException("exception");
273 dev.open();
274 dev.activateAsyncRead();
275 }
276
277 // acknowledge data received: here updated id is irrelevant
278 sem_post(&sem);
279 }
280 }
281 catch(boost::thread_interrupted& e) {
282 std::cout << "thread interrupted" << std::endl;
283 }
284 catch(ChimeraTK::runtime_error& e) {
285 std::cout << "runtime error: " << e.what() << std::endl;
286 goto retry;
287 }
288
289 updaterThread1.join();
290 // one more update since after exception, open, activateAsyncRead, we get another initial value
291 BOOST_TEST(nConsistentUpdates == nLoops - delay + 1);
292 BOOST_TEST(nConsistentUpdates == nUpdates);
293 }
294}
295
297 std::cout << "testCatchUp" << std::endl;
298
299 ChimeraTK::ReadAnyGroup rag{readAccA, readAccB};
300 DataConsistencyGroup dg({readAccA, readAccB}, DataConsistencyGroup::MatchingMode::historized);
301
302 const unsigned nLoops = 6;
303 const unsigned delay = 2;
304
305 // With HDataConsistencyGroup, check that we get N consistent updates, even when we
306 // have update delay on second variable that appears and vanishes again.
307 emptyQueues(rag, &dg);
308 unsigned nUpdates = 0;
309 unsigned nConsistentUpdates = 0;
310 std::thread updaterThread1([&]() { updaterLoop(nLoops, delay, false, true); });
311
312 // test loop consuming data
313 try {
314 while(true) {
315 auto id = rag.readAny();
316 auto& acc = id == readAccA.getId() ? readAccA : readAccB;
317 std::cout << "readAny: seeing update for target " << acc.getName() << " vs " << acc.getVersionNumber()
318 << " values " << readAccA << "," << readAccB << std::endl;
319
320 bool isConsistent = dg.update(id);
321 nUpdates++;
322 if(readAccA.getVersionNumber() == readAccB.getVersionNumber()) {
323 nConsistentUpdates++;
324 }
325 // check data consistency via VersionNumber and content
326 BOOST_TEST(isConsistent);
327 BOOST_TEST(readAccA.getVersionNumber() == readAccB.getVersionNumber());
328 BOOST_TEST(readAccA == readAccB);
329
330 // acknowledge data received: here updated id is irrelevant
331 sem_post(&sem);
332 }
333 }
334 catch(boost::thread_interrupted& e) {
335 std::cout << "thread interrupted" << std::endl;
336 }
337
338 updaterThread1.join();
339 BOOST_TEST(nConsistentUpdates == nLoops);
340 BOOST_TEST(nConsistentUpdates == nUpdates);
341}
342
343BOOST_FIXTURE_TEST_CASE(testInitialValues, Fixture) {
344 std::cout << "testInitialValues" << std::endl;
345 // at start VersionNum(A)=0, since no read has yet occurred
346
347 ChimeraTK::ReadAnyGroup rag{readAccA, readAccB};
348 DataConsistencyGroup dg({readAccA, readAccB}, DataConsistencyGroup::MatchingMode::historized);
349
350 unsigned nDiscarded = emptyQueues(rag);
351 // after read, VersionNumbers must be non-zero.
352 // Note, initial values here count as one consistent set.
353 BOOST_TEST(nDiscarded == 1);
354
355 BOOST_TEST(readAccA.getVersionNumber() != VersionNumber(nullptr));
356 BOOST_TEST(readAccB.getVersionNumber() != VersionNumber(nullptr));
357}
358
359BOOST_AUTO_TEST_CASE(testInitialValuesConsistency) {
360 // in all the previous tests, we simply discarded the initial values.
361 // However in real use, e.g. with ApplicationCore, it often makes sense to keep the initial values and complete
362 // them with some data update that turns them into a consistent set.
363 // Test that MatchingMode::historized supports this use case.
364
365 Device dev;
366 dev.open("(logicalNameMap?map=historizedDataMatching.xlmap)");
367 dev.activateAsyncRead();
368
369 for(bool extraDecorators : {false, true}) {
370 // prepare initial values
371 VersionNumber vs0;
372 VersionNumber vs1;
373 auto accA = dev.getScalarRegisterAccessor<int32_t>("/A");
374 auto accB = dev.getScalarRegisterAccessor<int32_t>("/B");
375 accA.setAndWrite(100, vs1);
376 accB.setAndWrite(99, vs0);
377
378 // use 'fresh' read accessors not yet tainted by ReadAnyGroup or DataConsistencyGroup
379 auto readAccA = dev.getScalarRegisterAccessor<int32_t>("/A", 0, {AccessMode::wait_for_new_data});
380 auto readAccB = dev.getScalarRegisterAccessor<int32_t>("/B", 0, {AccessMode::wait_for_new_data});
381
382 if(extraDecorators) {
383 // In order to mimic ApplicationCore behaviour, where a MetaDataPropagatingRegisterDecorator is placed around
384 // every accessor, we add a decoration layer via otherwise useless NDRegisterAccessorDecorator.
385 // We want to test that DataConsistencyDecorator swaps the right buffers even then.
386 auto da = boost::make_shared<NDRegisterAccessorDecorator<int32_t>>(readAccA.getImpl());
387 readAccA.replace(da);
388 auto db = boost::make_shared<NDRegisterAccessorDecorator<int32_t>>(readAccB.getImpl());
389 readAccB.replace(db);
390 }
391
392 // read and check them
393 readAccA.readLatest();
394 readAccB.readLatest();
395 BOOST_TEST(readAccA == 100);
396 BOOST_TEST(readAccA.getVersionNumber() == vs1);
397 BOOST_TEST(readAccB == 99);
398 BOOST_TEST(readAccB.getVersionNumber() == vs0);
399
400 ReadAnyGroup rag{readAccA, readAccB};
401 DataConsistencyGroup dg({readAccA, readAccB}, DataConsistencyGroup::MatchingMode::historized);
402 // check user buffer again - because of DataConsistencyDecorator
403 BOOST_TEST(readAccA.getVersionNumber() == vs1);
404 BOOST_TEST(readAccB.getVersionNumber() == vs0);
405 BOOST_TEST(dg.isConsistent() == false);
406 // note, following 2 tests fail unless DataConsistencyDecorator takes over initial data on construction
407 BOOST_TEST(readAccA == 100);
408 BOOST_TEST(readAccB == 99);
409
410 // provide data update for B that completes consistent set
411 accB.setAndWrite(100, vs1);
412 auto id = rag.readAny();
413 BOOST_TEST(id == readAccB.getId());
414 BOOST_TEST(readAccA == 100);
415 BOOST_TEST(readAccB == 100);
416 BOOST_TEST(readAccA.getVersionNumber() == vs1);
417 BOOST_TEST(readAccB.getVersionNumber() == vs1);
418 BOOST_TEST(dg.isConsistent() == true);
419 }
420}
421
423 std::cout << "testIllegalUse" << std::endl;
424
425 ChimeraTK::ReadAnyGroup rag{readAccA, readAccB};
426 DataConsistencyGroup dg({readAccA, readAccB}, DataConsistencyGroup::MatchingMode::historized);
427
428 BOOST_CHECK_THROW(
429 DataConsistencyGroup({readAccA}, DataConsistencyGroup::MatchingMode::historized), ChimeraTK::logic_error);
430}
431
432BOOST_AUTO_TEST_SUITE_END()
Group several registers (= TransferElement) which ensures data consistency across multiple variables ...
Class allows to read/write registers from device.
Definition Device.h:39
ScalarRegisterAccessor< UserType > getScalarRegisterAccessor(const RegisterPath &registerPathName, size_t wordOffsetInRegister=0, const AccessModeFlags &flags=AccessModeFlags({})) const
Get a ScalarRegisterObject object for the given register.
Definition Device.h:266
void activateAsyncRead() noexcept
Activate asyncronous read for all transfer elements where AccessMode::wait_for_new_data is set.
Definition Device.cc:91
void open(std::string const &aliasName)
Open a device by the given alias name from the DMAP file.
Definition Device.cc:58
void replace(const NDRegisterAccessorAbstractor< UserType > &newAccessor)
Assign a new accessor to this NDRegisterAccessorAbstractor.
Group several registers (= TransferElement) to allow waiting for an update of any of the registers.
Accessor class to read and write scalar registers transparently by using the accessor object like a v...
void interrupt()
Return from a blocking read immediately and throw boost::thread_interrupted.
Simple class holding a unique ID for a TransferElement.
Class for generating and holding version numbers without exposing a numeric representation.
Exception thrown when a logic error has occured.
Definition Exception.h:51
Exception thrown when a runtime error has occured.
Definition Exception.h:18
ScalarRegisterAccessor< int32_t > readAccB
ScalarRegisterAccessor< int32_t > readAccA
void updaterLoop(unsigned nLoops, unsigned delay, unsigned duplicateVns=0, bool catchUp=false)
BOOST_FIXTURE_TEST_CASE(test1, Fixture)
BOOST_AUTO_TEST_CASE(testInitialValuesConsistency)
auto emptyQueues(ReadAnyGroup &rag, DataConsistencyGroup *dg=nullptr)