ChimeraTK-ControlSystemAdapter-OPCUAAdapter 04.00.05
Loading...
Searching...
No Matches
generatorClass.py
Go to the documentation of this file.
1import lxml.etree as ET
2#import xml.etree.ElementTree as ET
3import logging
4import re
5from typing import List
6
7
8class MapOption():
9 def __init__(self):
10 # True/False in case it should be mapped
11 self.exclude:bool|None = None
12 # Name of the history setting in case history is enabled
13 self.historizing:str|None = None
14
16 def __init__(self, name:str):
17 self.name:str = name
18 self.entriesPerResponse:int = 1000
19 self.bufferLength:int = 100
20 self.interval:int = 1000
21
22 def checkSettings(self):
23 if self.entriesPerResponse == None:
24 raise RuntimeError("Number of entries per response not set.")
25 if self.interval == None:
26 raise RuntimeError("Sampling interval not set.")
27 if self.bufferLength == None:
28 raise RuntimeError("Buffer length not set.")
29
30 def createHistory(self, history: ET._Element):
31 history.set("name", self.name)
32 history.set("entries_per_response", str(self.entriesPerResponse))
33 history.set("buffer_length", str(self.bufferLength))
34 history.set("interval", str(self.interval))
35
36 def readHistory(self, data: ET._Element):
37 if 'name' in data.attrib:
38 self.name = str(data.attrib['name'])
39 if 'entries_per_response' in data.attrib:
40 self.entriesPerResponse = int(data.attrib['entries_per_response'])
41 if 'buffer_length' in data.attrib:
42 self.bufferLength = int(data.attrib['buffer_length'])
43 if 'interval' in data.attrib:
44 self.interval = int(data.attrib['interval'])
45
46 def writeSetting(self, element:ET._Element):
47 element.set("name", self.name)
48 element.set("entries_per_response",str(self.entriesPerResponse))
49 element.set("buffer_length",str(self.bufferLength))
50 element.set("interval",str(self.interval))
51
52
54 def __init__(self):
55 self.encryptionEnabled:bool = False
56 self.addUnsecureEndpoint:bool = False
57 self.certificate:str|None = None
58 self.key:str|None = None
59 self.blockList:str|None = None
60 self.trustList:str|None = None
61 self.issuerList:str|None = None
62
64 '''
65 Check if encryption settings are valid and include all required information.
66
67 @raise RuntimeError: In case the settings are not complete.
68 '''
69 if self.key == None:
70 raise RuntimeError("Server key not set.")
71 if self.certificate == None:
72 raise RuntimeError("Server certificate not set.")
73 if self.trustList == None or (self.issuerList == None or self.blockList == None):
74 raise RuntimeError("No trust list or CA related configuration set.")
75
76 def createEncryption(self, security: ET._Element):
77 if self.addUnsecureEndpoint:
78 security.set("unsecure", "True")
79 else:
80 security.set("unsecure", "False")
81 if self.key:
82 security.set("privatekey", self.key)
83 if self.certificate:
84 security.set("certificate", self.certificate)
85 if self.trustList:
86 security.set("trustlist", self.trustList)
87 if self.issuerList:
88 security.set("issuerlist", self.issuerList)
89 if self.blockList:
90 security.set("blocklist", self.blockList)
91
92 def readEncryption(self, data: ET._Element):
93 if 'unsecure' in data.attrib and data.attrib['unsecure'] == 'True':
95 if 'privatekey' in data.attrib:
96 self.key = str(data.attrib['privatekey'])
97 if 'certificate' in data.attrib:
98 self.certificate = str(data.attrib['certificate'])
99 if 'trustlist' in data.attrib:
100 self.trustList = str(data.attrib['trustlist'])
101 if 'issuerlist' in data.attrib:
102 self.issuerList = str(data.attrib['issuerlist'])
103 if 'blocklist' in data.attrib:
104 self.blockList = str(data.attrib['blocklist'])
105
107 def __init__(self):
108 super().__init__()
109 self.username: str|None = None
110 self.password: str|None = None
111 self.applicationName: str|None = None
112 self.rootFolder: str|None = None
114 self.enableLogin = False
115 self.port: int|None = None
116 self.historySettings: List[HistorySetting] = []
117 self.logLevel: str = "INFO"
118
119 def createConfig(self, root:ET._Element):
120 '''
121 Create server config XML section of the map file.
122
123 @raise RuntimeError: If configuration is not complete.
124 '''
125 config = ET.SubElement(root,"config")
126 if self.rootFolder:
127 config.set("rootFolder", self.rootFolder)
128 config.set("description", self.applicationDescription)
129 server = ET.SubElement(config, "server")
130 if self.applicationName:
131 server.set("applicationName", self.applicationName)
132 else:
133 logging.info("Default application name will be used. It is the ChimeraTK application name.")
134 if self.port:
135 server.set("port", str(self.port))
136 else:
137 logging.info("Default port of 16664 will be used.")
138
139 server.set("logLevel", self.logLevel)
140
141 if self.enableLogin == True:
142 login = ET.SubElement(config, "login")
143 if self.username:
144 login.set("username", self.username)
145 else:
146 raise RuntimeError("No username set!")
147 if self.password:
148 login.set("password", self.password)
149 else:
150 raise RuntimeError("No password set!")
151
152 if self.encryptionEnabled:
153 # raises in case of problems
155 self.createEncryption(ET.SubElement(config, "security"))
156
157 if len(self.historySettings) > 0:
158 logging.info("Writing {} history settings.".format(len(self.historySettings)))
159 historizing = ET.SubElement(config, "historizing")
160 for setting in self.historySettings:
161 settingElement = ET.SubElement(historizing,"setup")
162 setting.writeSetting(settingElement)
163 else:
164 logging.info("No history settings found.")
165
166 def readConfig(self, data: ET._Element):
167 config = data.find('config',namespaces=data.nsmap)
168 if config is not None:
169 if 'rootFolder' in config.attrib:
170 self.rootFolder = str(config.attrib['rootFolder'])
171 if 'rootFolder' in config.attrib:
172 self.applicationDescription = config.attrib["description"]
173 # server settings
174 server = config.find('server')
175 if server is not None:
176 if 'applicationName' in server.attrib:
177 self.applicationName = str(server.attrib["applicationName"])
178 if 'port' in server.attrib:
179 self.port = int(server.attrib['port'])
180 if 'logLevel' in server.attrib:
181 tmp = str(server.attrib["logLevel"]).upper()
182 if tmp in ['TRACE', 'DEBUG', 'INFO', 'WARNING','ERROR','FATAL']:
183 self.logLevel = str(server.attrib["logLevel"]).upper()
184 else:
185 logging.warning("Failed to read log level. {} is not a valid logLevel. Using INFO instead.".format(tmp))
186 # security
187 login = config.find('login')
188 if login is not None:
189 self.enableLogin = True
190 if 'username' in login.attrib:
191 self.username = str(login.attrib['username'])
192 if 'password' in login.attrib:
193 self.password = str(login.attrib['password'])
194 else:
195 self.enableLogin = False
196 self.username = None
197 self.password = None
198 security = config.find("security")
199 if security is not None:
201 self.readEncryption(security)
202 else:
203 super().__init__()
204 historizing = config.find('historizing')
205 if historizing is not None:
206 for setup in historizing.findall('setup', namespaces=data.nsmap):
207 h = HistorySetting('')
208 h.readHistory(setup)
209 foundSetting = next((x for x in self.historySettings if x.name == h.name), None)
210 if foundSetting == None:
211 self.historySettings.append(h)
212 else:
213 logging.warning("Found history settings dublicate with name {}. Will not it again.".format(h.name))
214
216 '''
217 Class used to store all relevant information of process variables.
218 '''
219 def __init__(self, var:ET._Element, path:str):
220 super().__init__()
221 self.element = var
222 self.name:str = str(var.attrib['name'])
223 # This name is set if the directory should be renamed
224 self.newName: str|None = None
225 self.valueType:str = var.find('value_type', namespaces=var.nsmap).text
226 self.numberOfElements:int = int(var.find('numberOfElements', namespaces=var.nsmap).text)
227 self.unit:str = var.find('unit', namespaces=var.nsmap).text
228 self.newUnit: str|None = None
229 self.direction:str = var.find('direction', namespaces=var.nsmap).text
230 self.description:str = var.find('description', namespaces=var.nsmap).text
231 self.newDescription: str|None = None
232 self.newDestination: str|None = None
233 self.fullName = path + "/" + self.name
234
235 def __str__(self) -> str:
236 return "Variable ({}, {}): {}".format(self.valueType, self.numberOfElements,self.name)
237
238 def __eq__(self, other: object) -> bool:
239 if isinstance(other, str):
240 return other == self.fullName
241 elif isinstance(other, XMLVar):
242 return other.fullName == self.fullName
243 else:
244 return NotImplemented
245
246 def generateMapEntry(self, root:ET._Element, path:str, historizingActive:bool = False, excludeActive:bool = False):
247 if(self.exclude and not excludeActive):
248 exclude = ET.SubElement(root, "exclude")
249 exclude.set("sourceName", self.fullName.removeprefix('/root/'))
250 if self.newName or self.newDescription or self.newUnit or self.newDestination or (self.historizing and not historizingActive):
251 logging.debug("Adding xml entry for process_variable for {}/{}".format(path.removeprefix('/root/'),self.name))
252 pv = ET.SubElement(root, "process_variable")
253 if path.removeprefix('/root/') == '':
254 pv.set("sourceName", self.name)
255 else:
256 pv.set("sourceName", path.removeprefix('/root/') + '/' + self.name)
257 if self.historizing and not historizingActive:
258 pv.set("history", self.historizing)
259 if self.newDescription:
260 dest = ET.SubElement(pv, "description")
261 dest.text = self.newDescription
262 if self.newName:
263 name = ET.SubElement(pv, "name")
264 name.text = self.newName
265 if self.newUnit:
266 unit = ET.SubElement(pv, "unit")
267 unit.text = self.newUnit
268 if self.newDestination:
269 dest = ET.SubElement(pv, "destination")
270 if self.newDestination == "/root":
271 # PV is moved to root -> No destination needs to be set in that case
272 # If no new name is assigned the original name needs to be set
273 if not self.newName:
274 name = ET.SubElement(pv, "name")
275 name.text = self.name
276 else:
277 dest.text = self.newDestination.removeprefix('/root/')
278 if self.newName:
279 pv.set("copy", "True")
280 else:
281 pv.set("copy", "False")
282
283 def reset(self):
284 '''
285 Resets all information related to updated directory and varaiable information, e.g. set newName, newDescription ect. to None.
286 '''
287 self.newName = None
288 self.newUnit = None
289 self.newDescription = None
290 self.newDestination = None
291
293 '''
294 Class used to store all relevant information of CS directories.
295 It includes sub directories and process variables.
296 '''
297 def __init__(self, name: str, data:ET._Element, level: int, path:str = ''):
298 super().__init__()
299 self.dirs: List[XMLDirectory] = []
300 self.vars: List[XMLVar] = []
301 self.element = data
302 self.name = name
303 self.path = path + "/" + name
304 # This name is set if the directory should be renamed
305 self.newName: str|None = None
306 self.newDescription: str|None = None
307 self.newDestination: str|None = None
308 self.hierarchyLevel = level
309 self.parseDir(data)
310
311 def __eq__(self, other: object) -> bool:
312 if isinstance(other, str):
313 return other == self.path
314 elif isinstance(dir, XMLDirectory):
315 return other.path == self.path
316 else:
317 return NotImplemented
318
319 def findDir(self, directory: str):
320 if self == directory:
321 return self
322 for d in self.dirs:
323 if d == directory:
324 return d
325 ret = d.findDir(directory)
326 if ret != None:
327 return ret
328 return None
329
330 def findVar(self, var: str):
331 for v in self.vars:
332 if v == var:
333 return v
334 for d in self.dirs:
335 ret = d.findVar(var)
336 if ret != None:
337 return ret
338 return None
339
340 def parseDir(self, data:ET._Element):
341 '''
342 Read directory information from XML directory entry.
343 Includes reading variables and sub directories.
344 '''
345 for directory in data.findall('directory',namespaces=data.nsmap):
346 self.dirs.append(XMLDirectory(name = str(directory.attrib['name']), data=directory, level=self.hierarchyLevel+1, path = self.path))
347 for var in data.findall('variable',namespaces=data.nsmap):
348 self.vars.append(XMLVar(var, self.path))
349
350 def __str__(self):
351 space = self.hierarchyLevel*"\t"
352 out = space + "Directory (level: {}): {} \n".format(self.hierarchyLevel,self.path)
353 for directory in self.dirs:
354 out = out + str(directory)
355 for var in self.vars:
356 out = out + space + "\t" + str(var) + "\n"
357
358 return out
359
360 def generateMapEntries(self, root:ET._Element, historiszingActive:bool = False, excludeActive = False):
361 for d in self.dirs:
362 d.generateMapEntries(root, self.historizing is not None, self.exclude is not None)
363 for var in self.vars:
364 var.generateMapEntry(root, self.path, self.historizing is not None, self.exclude is not None)
365 if self.exclude and not excludeActive:
366 exclude = ET.SubElement(root, "exclude")
367 exclude.set("sourceName", self.path.removeprefix('/root/') + '/*')
368 destText = self.path.removeprefix('/root').removesuffix(self.name)
369 # special case: If only the description is to be changed no sourceName has to be given put name and destination
370 # If a history is assigned and a description is given two entries have to be created!
371 #<folder sourceName="watchdog/logging" copy="False" history="short_history">
372 # <name>logging</name>
373 # <destination>watchdog</destination>
374 #</folder>
375 #<folder>
376 # <description>Test</description>
377 # <name>logging</name>
378 # <destination>watchdog</destination>
379 #</folder>
380 descriptionAlreadySet = False
381 if self.newDescription and not self.newName and not self.newDestination:
382 logging.debug("Adding xml entry for folder {} that only changes the description of an existing folder".format(self.path.removeprefix('/root/')))
383 folder = ET.SubElement(root, "folder")
384 description = ET.SubElement(folder, "description")
385 description.text = self.newDescription
386 name = ET.SubElement(folder, "name")
387 name.text = self.name
388 if not destText == '':
389 dest = ET.SubElement(folder, "destination")
390 dest.text = destText.removeprefix('/').removesuffix('/')
391 descriptionAlreadySet = True
392
393 if self.newName or self.newDestination or (self.historizing and not historiszingActive):
394 logging.debug("Adding xml entry for folder {}".format(self.path.removeprefix('/root/')))
395 folder = ET.SubElement(root, "folder")
396 folder.set("sourceName", self.path.removeprefix('/root/'))
397 folder.set("copy", "False")
398 if self.historizing and not historiszingActive:
399 folder.set("history", self.historizing)
400 if self.newDescription and not descriptionAlreadySet:
401 dest = ET.SubElement(folder, "description")
402 dest.text = self.newDescription
403 if self.newName:
404 name = ET.SubElement(folder, "name")
405 name.text = self.newName
406 else:
407 name = ET.SubElement(folder, "name")
408 name.text = self.name
409 dest = ET.SubElement(folder, "destination")
410 if self.newDestination:
411 # If folder is moved to root the new destination is '/root' -> that is why the '/' is removed in an extra call
412 dest.text = self.newDestination.removeprefix('/root').removeprefix('/')
413 else:
414 dest.text = destText.removeprefix('/').removesuffix('/')
415
416 def reset(self):
417 '''
418 Resets all information related to updated directory and varaiable information, e.g. set newName, newDescription ect. to None.
419 '''
420 self.newName = None
421 self.newDescription = None
422 self.newDestination = None
423 for directory in self.dirs:
424 directory.reset()
425 for var in self.vars:
426 var.reset()
427
429 def __init__(self, inputFile : str|None):
430 super().__init__()
431 self.inputFile: str|None = inputFile
432 self.outputFile: str|None = None
433 self.applicationNameapplicationName: str|None = None
434 self.nsmap: Dict|None = None
435 self.dir: XMLDirectory|None = None
436 if self.inputFile:
437 self.parseChimeraTK()
438
439 def parseChimeraTK(self):
440 '''
441 Parse a ChiemraTK generated XML file using the XML generator of the application.
442 @raise RuntimeError: If the given input file is not a ChiemraTK XML file.
443 The ChimeraTK map file is identified by the root node called 'application'.
444 '''
445 logging.debug("ChimeraTK XML file.")
446 data = self._openFile(self.inputFile)
447 if data.tag == 'application' or data.tag.split('}')[1] == 'application':
448 if 'name' in data.keys():
449 self.applicationNameapplicationName = data.attrib['name']
450 logging.info("Application name is: {}".format(self.applicationNameapplicationName))
451 self.nsmap = data.nsmap
452 else:
453 raise RuntimeError("Failed to find application tag. Not an XML file created by ChimeraTK's XML generator.")
454 self.dir = XMLDirectory(name='root', data=data, level=0)
455 if logging.root.level == logging.DEBUG:
456 print(self.dir)
457
458 def parseMapFile(self, inputFile:str) -> List[int]:
459 '''
460 Parse an existing map file.
461 @param inputFile: The map file name.
462 @return: list that includes the number of not found directories (index 0), pvs (index 1) and excludes (index 2).
463 In the map file the sourceName is given and it can happen that the source
464 given in the map file does not correspond to a PV or directory from the original
465 variable tree.
466 @raise RuntimeError: If the given input file is not a mapping XML file.
467 A mapping file is identified by the root node called 'uamapping'.
468 '''
469 logging.debug("Parsing map file.")
470 if self.dir:
471 self.dir.reset()
472 data = self._openFile(inputFile)
473 if data.tag == 'uamapping':
474 nsmap = {'csa': 'https://github.com/ChimeraTK/ControlSystemAdapter-OPC-UA-Adapter'}
475 if nsmap != data.nsmap:
476 RuntimeError("Wrong name space ({}) used in mapping file.".format(data.nsmap))
477 self.readConfig(data)
478
479 nSkipped = [0,0,0]
480 if self.dir == None:
481 # No xml input was given so we do not parse any further
482 return nSkipped
483 # read folder information
484 for folder in data.findall('folder', namespaces=data.nsmap):
485 if "sourceName" in folder.attrib:
486 directory = self.dir.findDir("/root/" + str(folder.attrib["sourceName"]))
487 if directory != None:
488 # construct test path from name and destination to compare to the directory path
489 # if both match only the history is added
490 # else the folder is moved
491 tmpPath = "/root/"
492 if 'history' in folder.attrib:
493 directory.historizing = str(folder.attrib["history"])
494 if folder.find('description', namespaces=folder.nsmap) != None and folder.find('description', namespaces=folder.nsmap).text != None:
495 directory.newDescription = folder.find('description', namespaces=folder.nsmap).text
496 if folder.find('destination', namespaces=folder.nsmap) != None and folder.find('destination', namespaces=folder.nsmap).text != None:
497 tmpDestination = directory.path.removeprefix("/root").removeprefix("/").removesuffix(directory.name).removesuffix("/")
498 tmpPath = tmpPath + folder.find('destination', namespaces=folder.nsmap).text
499 if tmpDestination != folder.find('destination', namespaces=folder.nsmap).text:
500 directory.newDestination = "/root/"+folder.find('destination', namespaces=folder.nsmap).text
501 if folder.find('name', namespaces=folder.nsmap) != None and directory.name != folder.find('name', namespaces=folder.nsmap).text:
502 directory.newName = folder.find('name', namespaces=folder.nsmap).text
503 # first remove "/" avoids resulting "//" when no destination was added
504 tmpPath = tmpPath.removesuffix("/") + "/" + folder.find('name', namespaces=folder.nsmap).text
505 if directory.newDestination == None and tmpPath != directory.path:
506 directory.newDestination = "/root"
507 else:
508 logging.warning("Failed to find source folder path {} in the application variable tree!".format("/root/" + str(folder.attrib["sourceName"])))
509 nSkipped[0] = nSkipped[0] + 1
510 else:
511 tmpDestination = None
512 tmpName = None
513 if folder.find('destination', namespaces=folder.nsmap) != None:
514 tmpDestination = folder.find('destination', namespaces=folder.nsmap).text
515 if folder.find('name', namespaces=folder.nsmap) != None:
516 tmpName = folder.find('name', namespaces=folder.nsmap).text
517 if tmpName == None:
518 logging.warning("For folder entry in the map file with no sourceName no name is given. Will be skipped.")
519 tmpSourceName = "/root/"
520 if tmpDestination == None:
521 tmpSourceName = tmpSourceName + tmpName
522 else:
523 tmpSourceName = tmpSourceName + tmpDestination + "/" + tmpName
524 directory = self.dir.findDir(tmpSourceName)
525 if directory != None:
526 if folder.find('description', namespaces=folder.nsmap) != None:
527 directory.newDescription = folder.find('description', namespaces=folder.nsmap).text
528 else:
529 logging.warning("Failed to find source folder path {} in the application variable tree!".format(tmpSourceName))
530 nSkipped[0] = nSkipped[0] + 1
531 # read process_variable information
532 for pv in data.findall('process_variable', namespaces=data.nsmap):
533 if "sourceName" in pv.attrib:
534 var = self.dir.findVar("/root/" + str(pv.attrib["sourceName"]))
535 if var != None:
536 if 'history' in pv.attrib:
537 var.historizing = str(pv.attrib["history"])
538 if pv.find('description', namespaces=pv.nsmap) != None:
539 var.newDescription = pv.find('description', namespaces=pv.nsmap).text
540 if pv.find('unit', namespaces=pv.nsmap) != None:
541 var.newUnit = pv.find('unit', namespaces=pv.nsmap).text
542 if pv.find('name', namespaces=pv.nsmap) != None:
543 var.newName = pv.find('name', namespaces=pv.nsmap).text
544 if pv.find('destination', namespaces=pv.nsmap) != None:
545 var.newDestination = pv.find('destination', namespaces=pv.nsmap).text
546 if (var.newDestination == "" or var.newDestination == None) and var.name == var.newName:
547 var.newDestination = "/root"
548 var.newName = None
549 else:
550 logging.warning("Failed to find source pv path {} in the application variable tree!".format("/root/" + str(pv.attrib["sourceName"])))
551 nSkipped[1] = nSkipped[1] + 1
552 for exclude in data.findall('exclude', namespaces=data.nsmap):
553 if "sourceName" in exclude.attrib:
554 if exclude.attrib["sourceName"].endswith("/*"):
555 directory = self.dir.findDir("/root/" + exclude.attrib["sourceName"][0:-2])
556 if directory != None:
557 directory.exclude = True
558 else:
559 logging.warning("Failed to find source exclude path {} in the application variable tree!".format("/root/" + str(exclude.attrib["sourceName"][0:-2])))
560 nSkipped[2] = nSkipped[2] + 1
561 else:
562 var = self.dir.findVar("/root/" + exclude.attrib["sourceName"])
563 if var != None:
564 var.exclude = True
565 else:
566 logging.warning("Failed to find source exclude path {} in the application variable tree!".format("/root/" + str(exclude.attrib["sourceName"])))
567 nSkipped[2] = nSkipped[2] + 1
568 return nSkipped
569 else:
570 raise RuntimeError("Failed to find uamapping tag. Not an ControlSystem-OPC-UA XML mapping file.")
571
572 def save(self):
573 '''Write the map file.
574 '''
575 logging.info("Writing map file: {}".format(self.outputFile))
576 root = ET.Element("uamapping", nsmap = {'csa': 'https://github.com/ChimeraTK/ControlSystemAdapter-OPC-UA-Adapter'})
577 self.createConfig(root)
578 if self.dir:
579 self.dir.generateMapEntries(root)
580 tree = ET.ElementTree()
581 tree._setroot(root)
582 uglyXml = ET.tounicode(tree, pretty_print = True)
583 text_re = re.compile('>\n\s+([^<>\s].*?)\n\s+</', re.DOTALL)
584 prettyXml = text_re.sub('>\g<1></', uglyXml)
585 with open(self.outputFile, "w") as text_file:
586 text_file.write(prettyXml)
587
588 def _openFile(self, inputFile: str) -> ET._Element:
589 '''
590 Open an xml file.
591 '''
592 logging.debug("analysing file {}".format(inputFile))
593 data = ET.parse(inputFile).getroot()
594 return data
__init__(self, str name, ET._Element data, int level, str path='')
generateMapEntries(self, ET._Element root, bool historiszingActive=False, excludeActive=False)
generateMapEntry(self, ET._Element root, str path, bool historizingActive=False, bool excludeActive=False)
__init__(self, ET._Element var, str path)
#define str(a)