ChimeraTK-DeviceAccess 03.26.00
Loading...
Searching...
No Matches
DataConsistencyGroupHistorizedMatcher.cc
Go to the documentation of this file.
1// SPDX-FileCopyrightText: Deutsches Elektronen-Synchrotron DESY, MSK, ChimeraTK Project <chimeratk-support@desy.de>
2// SPDX-License-Identifier: LGPL-3.0-or-later
3
5
7#include "ReadAnyGroup.h"
9
11
12 /********************************************************************************************************************/
13
14 boost::shared_ptr<TransferElement> HistorizedMatcher::decorateAccessor(TransferElementAbstractor& acc) {
15 boost::shared_ptr<TransferElement> dataConsistencyDecorator;
16 callForType(acc.getValueType(), [&](auto t) {
17 using UserType = decltype(t);
18
19 // check if accessor already is in another DataConsistencyGroup
20 for(auto& e : acc.getInternalElements()) {
21 auto dec = boost::dynamic_pointer_cast<DataConsistencyDecorator<UserType>>(e);
22 if(dec) {
23 throw ChimeraTK::logic_error(
24 "accessor is already in historized DataConsistencyGroup, cannot add it to another one: " + acc.getName());
25 }
26 }
27 auto accImpl = boost::dynamic_pointer_cast<NDRegisterAccessor<UserType>>(acc.getHighLevelImplElement());
28 assert(accImpl);
29 // factory function which creates our DataConsistencyDecorator
30 auto factoryF = [&](const boost::shared_ptr<NDRegisterAccessor<UserType>>& toBeDecorated) {
31 return boost::make_shared<DataConsistencyDecorator<UserType>>(toBeDecorated, this);
32 };
33 // in case accImpl is ApplicationCore's MetaDataPropagatingRegisterDecorator we need to "go inside" and
34 // replace its target by our DataConsistencyDecorator
35 dataConsistencyDecorator = accImpl->decorateDeepInside(factoryF);
36 if(!dataConsistencyDecorator) {
37 // accImpl is not itself a decorator, so decorateDeepInside did not do anything.
38 dataConsistencyDecorator = factoryF(accImpl);
39 acc.replace(dataConsistencyDecorator);
40 }
41 });
42 return dataConsistencyDecorator;
43 }
44
45 /********************************************************************************************************************/
46
47 bool HistorizedMatcher::checkUpdate(const TransferElementID& transferElementID) {
48 auto it = _targetElements.find(transferElementID);
49 assert(it != _targetElements.end());
50
51 auto& pElem = it->second;
52 auto vn = pElem.acc.getVersionNumber();
53 if(pElem.histLen > 0 && vn == pElem.versionNumbers[0]) {
54 // take special care for duplicate VersionNumbers. We want VersionNumbers only once in history.
55 // So in case of a duplicate VersionNumber, we mark the previous historized value as invalid
56 pElem.versionNumbers[0] = VersionNumber(nullptr);
57 }
58
59 bool consistent = findMatch(transferElementID);
60 return consistent;
61 }
62
63 /********************************************************************************************************************/
64
65 void HistorizedMatcher::handleMissingPreReads(TransferElementID callerId) {
66 // prevent recursion by setting flag
67 if(_handleMissingPreReadsCalled) {
68 return;
69 }
70 _handleMissingPreReadsCalled = true;
71 auto resetFlag = cppext::finally([this] { _handleMissingPreReadsCalled = false; });
72
73 // just a check for right usage: check that DataConsistencyGroup::update was called on updates from ReadAnyGroup.
74 if(_decoratorsNeedPreRead) {
75 // we know there was already a consistent data update handed out from ReadAnyGroup
76 if(lastUpdateCall() != callerId) {
77 throw ChimeraTK::logic_error("updates from ReadAnyGroup must be processed by DataConsistencyGroup::update");
78 }
79 }
80
81 for(auto& e : _pushElements) {
82 if(e.first != callerId) {
83 auto& acc = e.second;
84 acc.getHighLevelImplElement()->preRead(TransferType::read);
85 }
86 }
87 _decoratorsNeedPreRead = false;
88 }
89
90 /********************************************************************************************************************/
91
92 void HistorizedMatcher::handleMissingPostReads(TransferElementID callerId, bool updateBuffer) {
93 // prevent recursion by setting flag
94 if(_handleMissingPostReadsCalled) {
95 return;
96 }
97 _handleMissingPostReadsCalled = true;
98 auto resetFlag = cppext::finally([this] { _handleMissingPostReadsCalled = false; });
99
100 // To update other user buffers, call postRead on the other involved decorators.
101 // This concerns all pushElements, except when a push element was already on right version num and received
102 // another update, which can only happen if the new datum has the same version number (handled as special case).
103 // The other exception is when the initial value of some pushElements already had the matching version number.
104 // Note, in case of an exception thrown by some postRead, it might happen that postRead is
105 // called more than once in a row, for the other elements. This is allowed.
106 for(auto& e : _pushElements) {
107 if(e.first == callerId) {
108 continue;
109 }
110 auto& acc = e.second;
111 // in case the inital value of the accessor already had the matching version number, we must skip the postRead
112 // since otherwise it would swap away the value
113 if(lastMatchingVersionNumber() > acc.getVersionNumber()) {
114 acc.getHighLevelImplElement()->postRead(TransferType::read, updateBuffer);
115 }
116 }
117 _decoratorsNeedPreRead = true;
118 }
119
120 /********************************************************************************************************************/
121
122 void HistorizedMatcher::add(TransferElementAbstractor& acc, unsigned histLen) {
123 auto* rag = acc.getReadAnyGroup();
124 auto id = acc.getId();
125
126 auto dataConsistencyDecorator = decorateAccessor(acc);
127 auto target = dataConsistencyDecorator->getInternalElements().front();
128 setupHistory(TransferElementAbstractor{target}, histLen);
129 // add decorated access to our elements map (key = Id remains unchanged by decoration)
130 _pushElements[id] = acc;
131 if(rag) {
132 // also find the copy of accessor abstractor in ReadAnyGroup and decorate it in there
133 for(auto& pe : rag->push_elements) {
134 if(pe.getId() == id) {
135 pe.replace(acc);
136 }
137 }
138 }
139 }
140
141 /********************************************************************************************************************/
142
143 void HistorizedMatcher::setupHistory(const TransferElementAbstractor& acc, unsigned histLen) {
144 TransferElementID id = acc.getId();
145 if(_targetElements.find(id) != _targetElements.end()) {
146 // was alread set up
147 return;
148 }
149
150 callForType(acc.getValueType(), [&](auto argForType) {
151 // set up ring buffer for element's user type
152 using UserType = decltype(argForType);
153 using UserBufferType = std::vector<std::vector<UserType>>;
154 // prepare and insert PushElement not yet having memory (because getUserBuffer requires registered accessor)
155 TargetElement element0 = {acc, histLen, nullptr, typeid(UserType), {}, {}};
156 _targetElements.insert({id, element0});
157 if(histLen > 0) {
158 auto* mem = new std::vector<UserBufferType>(histLen);
159 // get user buffer just to find out it's shape
160 auto& buf = getUserBuffer<UserType>(id);
161 unsigned nChannels = buf.size();
162 assert(nChannels > 0);
163 unsigned nSamples = buf[0].size();
164 for(UserBufferType& historyElement : *mem) {
165 historyElement.resize(nChannels);
166 for(auto& historyElementChannel : historyElement) {
167 historyElementChannel.resize(nSamples);
168 }
169 }
170 // continue setup: make buffer known
171 TargetElement& element = _targetElements.at(id);
172 element.histBuffer = mem;
173 element.versionNumbers.resize(histLen);
174 std::fill(element.versionNumbers.begin(), element.versionNumbers.end(), VersionNumber{nullptr});
175 element.dataValidities.resize(histLen);
176 }
177 });
178 }
179
180 /********************************************************************************************************************/
181
182 HistorizedMatcher::~HistorizedMatcher() {
183 for(auto& x : _targetElements) {
184 TransferElementID id = x.first;
185 TargetElement& element = x.second;
186 if(element.histLen > 0) {
187 try {
188 callForType(element.histBufferType, [&](auto arg) {
189 using UserType = decltype(arg);
190 using UserBufferType = std::vector<std::vector<UserType>>;
191 delete getBufferVector<UserBufferType>(id);
192 });
193 }
194 catch(std::bad_cast& e) {
195 // catch + assert in order to satisfy linter
196 assert(false);
197 }
198 }
199 }
200 }
201
202 /********************************************************************************************************************/
203
204 bool HistorizedMatcher::isConsistent() {
205 // we cannot always assume that user buffers are consistent:
206 // - before receiving data updates through ReadAnyGroup, state of initial values is unknown
207 // - also, user might have added extra elements to DataConsistencyGroup
208 if(_pushElements.size() == 0) {
209 return true;
210 }
211 auto firstVn = _pushElements.begin()->second.getVersionNumber();
212 for(auto x : _pushElements) {
213 if(x.second.getVersionNumber() != firstVn) {
214 return false;
215 }
216 }
217 return true;
218 }
219
220 /********************************************************************************************************************/
221
222 bool HistorizedMatcher::findMatch(TransferElementID transferElementID) {
223 auto it = _targetElements.find(transferElementID);
224 if(it == _targetElements.end()) {
225 // ignore unknown transfer elements
226 return false;
227 }
228 TargetElement& theElement = it->second;
229 auto vn = theElement.acc.getVersionNumber();
230
231 for(auto& pair : _targetElements) {
232 if(pair.first == transferElementID) {
233 continue;
234 }
235 TargetElement& element = pair.second;
236 // first consider accessor's user buffer and version number
237 if(element.acc.getVersionNumber() == vn) {
238 element.lastMatchingIndex = 0;
239 }
240 else if(element.histLen > 0) {
241 auto versionNumVector = element.versionNumbers;
242
243 auto pos = std::find(versionNumVector.begin(), versionNumVector.end(), vn);
244 if(pos == versionNumVector.end()) {
245 return false;
246 }
247 element.lastMatchingIndex = pos - versionNumVector.begin() + 1;
248 }
249 else {
250 // no direct match and no history
251 return false;
252 }
253 }
254 theElement.lastMatchingIndex = 0;
255 _lastMatchingVersionNumber = vn;
256 return true;
257 }
258
259 /********************************************************************************************************************/
260
261 void HistorizedMatcher::updateHistory(TransferElementID transferElementID) {
262 TargetElement& element = _targetElements.at(transferElementID);
263 if(element.histLen == 0) {
264 // exit early if history not availabe for this element
265 return;
266 }
267
268 VersionNumber vn = element.acc.getVersionNumber();
269 DataValidity dv = element.acc.dataValidity();
270
271 callForType(element.histBufferType, [&](auto arg) {
272 using UserType = decltype(arg);
273 using UserBufferType = std::vector<std::vector<UserType>>;
274 auto& buf = getUserBuffer<UserType>(transferElementID);
275
276 auto& bufferVector = *getBufferVector<UserBufferType>(transferElementID);
277 unsigned histLen = bufferVector.size();
278 auto& versionNumVector = element.versionNumbers;
279 auto& datavalidityVector = element.dataValidities;
280
281 // swap data into history
282 // after all swaps, to be erased data is in user buffer. Usually this is the oldest data element,
283 // except in special case where first history element is invalid.
284 if(versionNumVector[0] > VersionNumber(nullptr)) {
285 for(unsigned i = histLen - 1; i > 0; i--) {
286 bufferVector[i].swap(bufferVector[i - 1]);
287 versionNumVector[i] = versionNumVector[i - 1];
288 datavalidityVector[i] = datavalidityVector[i - 1];
289 }
290 }
291 bufferVector[0].swap(buf);
292 versionNumVector[0] = vn;
293 datavalidityVector[0] = dv;
294 });
295 }
296
297 /********************************************************************************************************************/
298
299 void HistorizedMatcher::getMatchingInfo(TransferElementID id, VersionNumber& vs, DataValidity& dv) {
300 TargetElement& pe = _targetElements.at(id);
301
302 if(pe.lastMatchingIndex > 0) {
303 vs = pe.versionNumbers[pe.lastMatchingIndex - 1];
304 dv = pe.dataValidities[pe.lastMatchingIndex - 1];
305 return;
306 }
307 vs = pe.acc.getVersionNumber();
308 dv = pe.acc.dataValidity();
309 }
310
311 /********************************************************************************************************************/
312
313} // namespace ChimeraTK::DataConsistencyGroupDetail
boost::shared_ptr< TransferElement > decorateAccessor(TransferElementAbstractor &acc)
decorate accessor by replacing its target => DataConsistencyDecorator(target), possibly at an inner l...
N-dimensional register accessor.
Base class for register accessors abstractors independent of the UserType.
ChimeraTK::VersionNumber getVersionNumber() const
Returns the version number that is associated with the last transfer (i.e.
const boost::shared_ptr< TransferElement > & getHighLevelImplElement()
Obtain the highest level implementation TransferElement.
const std::type_info & getValueType() const
Returns the std::type_info for the value type of this transfer element.
ReadAnyGroup * getReadAnyGroup() const
Obtain the ReadAnyGroup this TransferElement is part of, or nullptr if not in a ReadAnyGroup.
DataValidity dataValidity() const
Return current validity of the data.
void replace(const TransferElementAbstractor &newAccessor)
Assign a new accessor to this TransferElementAbstractor.
TransferElementID getId() const
Obtain unique ID for the actual implementation of this TransferElement.
Simple class holding a unique ID for a TransferElement.
Class for generating and holding version numbers without exposing a numeric representation.
Exception thrown when a logic error has occured.
Definition Exception.h:51
void callForType(const std::type_info &type, LAMBDATYPE lambda)
Helper function for running code which uses some compile-time type that is specified at runtime as a ...
DataValidity
The current state of the data.
unsigned lastMatchingIndex
match indices set by findMatch() in case it returns true.