1 import lxml.etree
as ET
5 from typing
import List
11 self.exclude:bool|
None =
None
13 self.historizing:str|
None =
None
24 raise RuntimeError(
"Number of entries per response not set.")
26 raise RuntimeError(
"Sampling interval not set.")
28 raise RuntimeError(
"Buffer length not set.")
31 history.set(
"name", self.
name)
37 if 'name' in data.attrib:
39 if 'entries_per_response' in data.attrib:
41 if 'buffer_length' in data.attrib:
43 if 'interval' in data.attrib:
44 self.
interval = int(data.attrib[
'interval'])
47 element.set(
"name", self.
name)
55 self.encryptionEnabled:bool =
False
58 self.
key:str|
None =
None
65 Check if encryption settings are valid and include all required information.
67 @raise RuntimeError: In case the settings are not complete.
70 raise RuntimeError(
"Server key not set.")
72 raise RuntimeError(
"Server certificate not set.")
74 raise RuntimeError(
"No trust list or CA related configuration set.")
78 security.set(
"unsecure",
"True")
80 security.set(
"unsecure",
"False")
82 security.set(
"privatekey", self.
key)
93 if 'unsecure' in data.attrib
and data.attrib[
'unsecure'] ==
'True':
95 if 'privatekey' in data.attrib:
96 self.
key =
str(data.attrib[
'privatekey'])
97 if 'certificate' in data.attrib:
99 if 'trustlist' in data.attrib:
101 if 'issuerlist' in data.attrib:
103 if 'blocklist' in data.attrib:
115 self.
port: int|
None =
None
116 self.historySettings: List[HistorySetting] = []
121 Create server config XML section of the map file.
123 @raise RuntimeError: If configuration is not complete.
125 config = ET.SubElement(root,
"config")
129 server = ET.SubElement(config,
"server")
133 logging.info(
"Default application name will be used. It is the ChimeraTK application name.")
135 server.set(
"port",
str(self.
port))
137 logging.info(
"Default port of 16664 will be used.")
139 server.set(
"logLevel", self.
logLevel)
142 login = ET.SubElement(config,
"login")
144 login.set(
"username", self.
username)
146 raise RuntimeError(
"No username set!")
148 login.set(
"password", self.
password)
150 raise RuntimeError(
"No password set!")
157 if len(self.historySettings) > 0:
158 logging.info(
"Writing {} history settings.".
format(len(self.historySettings)))
159 historizing = ET.SubElement(config,
"historizing")
160 for setting
in self.historySettings:
161 settingElement = ET.SubElement(historizing,
"setup")
162 setting.writeSetting(settingElement)
164 logging.info(
"No history settings found.")
167 config = data.find(
'config',namespaces=data.nsmap)
168 if config
is not None:
169 if 'rootFolder' in config.attrib:
171 if 'rootFolder' in config.attrib:
174 server = config.find(
'server')
175 if server
is not None:
176 if 'applicationName' in server.attrib:
178 if 'port' in server.attrib:
179 self.
port = int(server.attrib[
'port'])
180 if 'logLevel' in server.attrib:
181 tmp =
str(server.attrib[
"logLevel"]).upper()
182 if tmp
in [
'TRACE',
'DEBUG',
'INFO',
'WARNING',
'ERROR',
'FATAL']:
185 logging.warning(
"Failed to read log level. {} is not a valid logLevel. Using INFO instead.".
format(tmp))
187 login = config.find(
'login')
188 if login
is not None:
190 if 'username' in login.attrib:
192 if 'password' in login.attrib:
198 security = config.find(
"security")
199 if security
is not None:
204 historizing = config.find(
'historizing')
205 if historizing
is not None:
206 for setup
in historizing.findall(
'setup', namespaces=data.nsmap):
209 foundSetting = next((x
for x
in self.historySettings
if x.name == h.name),
None)
210 if foundSetting ==
None:
211 self.historySettings.append(h)
213 logging.warning(
"Found history settings dublicate with name {}. Will not it again.".
format(h.name))
217 Class used to store all relevant information of process variables.
222 self.name:str =
str(var.attrib[
'name'])
225 self.valueType:str = var.find(
'value_type', namespaces=var.nsmap).text
226 self.numberOfElements:int = int(var.find(
'numberOfElements', namespaces=var.nsmap).text)
227 self.unit:str = var.find(
'unit', namespaces=var.nsmap).text
229 self.direction:str = var.find(
'direction', namespaces=var.nsmap).text
230 self.description:str = var.find(
'description', namespaces=var.nsmap).text
236 return "Variable ({}, {}): {}".
format(self.valueType, self.numberOfElements,self.name)
239 if isinstance(other, str):
241 elif isinstance(other, XMLVar):
242 return other.fullName == self.
fullName
244 return NotImplemented
246 def generateMapEntry(self, root:ET._Element, path:str, historizingActive:bool =
False, excludeActive:bool =
False):
247 if(self.exclude
and not excludeActive):
248 exclude = ET.SubElement(root,
"exclude")
249 exclude.set(
"sourceName", self.
fullName.removeprefix(
'/root/'))
251 logging.debug(
"Adding xml entry for process_variable for {}/{}".
format(path.removeprefix(
'/root/'),self.name))
252 pv = ET.SubElement(root,
"process_variable")
253 if path.removeprefix(
'/root/') ==
'':
254 pv.set(
"sourceName", self.name)
256 pv.set(
"sourceName", path.removeprefix(
'/root/') +
'/' + self.name)
257 if self.historizing
and not historizingActive:
258 pv.set(
"history", self.historizing)
260 dest = ET.SubElement(pv,
"description")
263 name = ET.SubElement(pv,
"name")
266 unit = ET.SubElement(pv,
"unit")
269 dest = ET.SubElement(pv,
"destination")
274 name = ET.SubElement(pv,
"name")
275 name.text = self.name
279 pv.set(
"copy",
"True")
281 pv.set(
"copy",
"False")
285 Resets all information related to updated directory and varaiable information, e.g. set newName, newDescription ect. to None.
294 Class used to store all relevant information of CS directories.
295 It includes sub directories and process variables.
297 def __init__(self, name: str, data:ET._Element, level: int, path:str =
''):
299 self.dirs: List[XMLDirectory] = []
300 self.vars: List[XMLVar] = []
312 if isinstance(other, str):
313 return other == self.
path
314 elif isinstance(dir, XMLDirectory):
315 return other.path == self.
path
317 return NotImplemented
320 if self == directory:
325 ret = d.findDir(directory)
342 Read directory information from XML directory entry.
343 Includes reading variables and sub directories.
345 for directory
in data.findall(
'directory',namespaces=data.nsmap):
347 for var
in data.findall(
'variable',namespaces=data.nsmap):
353 for directory
in self.dirs:
354 out = out +
str(directory)
355 for var
in self.vars:
356 out = out + space +
"\t" +
str(var) +
"\n"
360 def generateMapEntries(self, root:ET._Element, historiszingActive:bool =
False, excludeActive =
False):
362 d.generateMapEntries(root, self.historizing
is not None, self.exclude
is not None)
363 for var
in self.vars:
364 var.generateMapEntry(root, self.
path, self.historizing
is not None, self.exclude
is not None)
365 if self.exclude
and not excludeActive:
366 exclude = ET.SubElement(root,
"exclude")
367 exclude.set(
"sourceName", self.
path.removeprefix(
'/root/') +
'/*')
368 destText = self.
path.removeprefix(
'/root').removesuffix(self.
name)
380 descriptionAlreadySet =
False
382 logging.debug(
"Adding xml entry for folder {} that only changes the description of an existing folder".
format(self.
path.removeprefix(
'/root/')))
383 folder = ET.SubElement(root,
"folder")
384 description = ET.SubElement(folder,
"description")
386 name = ET.SubElement(folder,
"name")
387 name.text = self.
name
388 if not destText ==
'':
389 dest = ET.SubElement(folder,
"destination")
390 dest.text = destText.removeprefix(
'/').removesuffix(
'/')
391 descriptionAlreadySet =
True
394 logging.debug(
"Adding xml entry for folder {}".
format(self.
path.removeprefix(
'/root/')))
395 folder = ET.SubElement(root,
"folder")
396 folder.set(
"sourceName", self.
path.removeprefix(
'/root/'))
397 folder.set(
"copy",
"False")
398 if self.historizing
and not historiszingActive:
399 folder.set(
"history", self.historizing)
401 dest = ET.SubElement(folder,
"description")
404 name = ET.SubElement(folder,
"name")
407 name = ET.SubElement(folder,
"name")
408 name.text = self.
name
409 dest = ET.SubElement(folder,
"destination")
412 dest.text = self.
newDestination.removeprefix(
'/root').removeprefix(
'/')
414 dest.text = destText.removeprefix(
'/').removesuffix(
'/')
418 Resets all information related to updated directory and varaiable information, e.g. set newName, newDescription ect. to None.
423 for directory
in self.dirs:
425 for var
in self.vars:
431 self.inputFile: str|
None = inputFile
432 self.outputFile: str|
None =
None
434 self.
nsmap: Dict|
None =
None
435 self.
dir: XMLDirectory|
None =
None
441 Parse a ChiemraTK generated XML file using the XML generator of the application.
442 @raise RuntimeError: If the given input file is not a ChiemraTK XML file.
443 The ChimeraTK map file is identified by the root node called 'application'.
445 logging.debug(
"ChimeraTK XML file.")
447 if data.tag ==
'application' or data.tag.split(
'}')[1] ==
'application':
448 if 'name' in data.keys():
453 raise RuntimeError(
"Failed to find application tag. Not an XML file created by ChimeraTK's XML generator.")
455 if logging.root.level == logging.DEBUG:
460 Parse an existing map file.
461 @param inputFile: The map file name.
462 @return: list that includes the number of not found directories (index 0), pvs (index 1) and excludes (index 2).
463 In the map file the sourceName is given and it can happen that the source
464 given in the map file does not correspond to a PV or directory from the original
466 @raise RuntimeError: If the given input file is not a mapping XML file.
467 A mapping file is identified by the root node called 'uamapping'.
469 logging.debug(
"Parsing map file.")
473 if data.tag ==
'uamapping':
474 nsmap = {
'csa':
'https://github.com/ChimeraTK/ControlSystemAdapter-OPC-UA-Adapter'}
475 if nsmap != data.nsmap:
476 RuntimeError(
"Wrong name space ({}) used in mapping file.".
format(data.nsmap))
484 for folder
in data.findall(
'folder', namespaces=data.nsmap):
485 if "sourceName" in folder.attrib:
486 directory = self.
dir.findDir(
"/root/" +
str(folder.attrib[
"sourceName"]))
487 if directory !=
None:
492 if 'history' in folder.attrib:
493 directory.historizing =
str(folder.attrib[
"history"])
494 if folder.find(
'description', namespaces=folder.nsmap) !=
None and folder.find(
'description', namespaces=folder.nsmap).text !=
None:
495 directory.newDescription = folder.find(
'description', namespaces=folder.nsmap).text
496 if folder.find(
'destination', namespaces=folder.nsmap) !=
None and folder.find(
'destination', namespaces=folder.nsmap).text !=
None:
497 tmpDestination = directory.path.removeprefix(
"/root").removeprefix(
"/").removesuffix(directory.name).removesuffix(
"/")
498 tmpPath = tmpPath + folder.find(
'destination', namespaces=folder.nsmap).text
499 if tmpDestination != folder.find(
'destination', namespaces=folder.nsmap).text:
500 directory.newDestination =
"/root/"+folder.find(
'destination', namespaces=folder.nsmap).text
501 if folder.find(
'name', namespaces=folder.nsmap) !=
None and directory.name != folder.find(
'name', namespaces=folder.nsmap).text:
502 directory.newName = folder.find(
'name', namespaces=folder.nsmap).text
504 tmpPath = tmpPath.removesuffix(
"/") +
"/" + folder.find(
'name', namespaces=folder.nsmap).text
505 if directory.newDestination ==
None and tmpPath != directory.path:
506 directory.newDestination =
"/root"
508 logging.warning(
"Failed to find source folder path {} in the application variable tree!".
format(
"/root/" +
str(folder.attrib[
"sourceName"])))
509 nSkipped[0] = nSkipped[0] + 1
511 tmpDestination =
None
513 if folder.find(
'destination', namespaces=folder.nsmap) !=
None:
514 tmpDestination = folder.find(
'destination', namespaces=folder.nsmap).text
515 if folder.find(
'name', namespaces=folder.nsmap) !=
None:
516 tmpName = folder.find(
'name', namespaces=folder.nsmap).text
518 logging.warning(
"For folder entry in the map file with no sourceName no name is given. Will be skipped.")
519 tmpSourceName =
"/root/"
520 if tmpDestination ==
None:
521 tmpSourceName = tmpSourceName + tmpName
523 tmpSourceName = tmpSourceName + tmpDestination +
"/" + tmpName
524 directory = self.
dir.findDir(tmpSourceName)
525 if directory !=
None:
526 if folder.find(
'description', namespaces=folder.nsmap) !=
None:
527 directory.newDescription = folder.find(
'description', namespaces=folder.nsmap).text
529 logging.warning(
"Failed to find source folder path {} in the application variable tree!".
format(tmpSourceName))
530 nSkipped[0] = nSkipped[0] + 1
532 for pv
in data.findall(
'process_variable', namespaces=data.nsmap):
533 if "sourceName" in pv.attrib:
534 var = self.
dir.findVar(
"/root/" +
str(pv.attrib[
"sourceName"]))
536 if 'history' in pv.attrib:
537 var.historizing =
str(pv.attrib[
"history"])
538 if pv.find(
'description', namespaces=pv.nsmap) !=
None:
539 var.newDescription = pv.find(
'description', namespaces=pv.nsmap).text
540 if pv.find(
'unit', namespaces=pv.nsmap) !=
None:
541 var.newUnit = pv.find(
'unit', namespaces=pv.nsmap).text
542 if pv.find(
'name', namespaces=pv.nsmap) !=
None:
543 var.newName = pv.find(
'name', namespaces=pv.nsmap).text
544 if pv.find(
'destination', namespaces=pv.nsmap) !=
None:
545 var.newDestination = pv.find(
'destination', namespaces=pv.nsmap).text
546 if (var.newDestination ==
"" or var.newDestination ==
None)
and var.name == var.newName:
547 var.newDestination =
"/root"
550 logging.warning(
"Failed to find source pv path {} in the application variable tree!".
format(
"/root/" +
str(pv.attrib[
"sourceName"])))
551 nSkipped[1] = nSkipped[1] + 1
552 for exclude
in data.findall(
'exclude', namespaces=data.nsmap):
553 if "sourceName" in exclude.attrib:
554 if exclude.attrib[
"sourceName"].endswith(
"/*"):
555 directory = self.
dir.findDir(
"/root/" + exclude.attrib[
"sourceName"][0:-2])
556 if directory !=
None:
557 directory.exclude =
True
559 logging.warning(
"Failed to find source exclude path {} in the application variable tree!".
format(
"/root/" +
str(exclude.attrib[
"sourceName"][0:-2])))
560 nSkipped[2] = nSkipped[2] + 1
562 var = self.
dir.findVar(
"/root/" + exclude.attrib[
"sourceName"])
566 logging.warning(
"Failed to find source exclude path {} in the application variable tree!".
format(
"/root/" +
str(exclude.attrib[
"sourceName"])))
567 nSkipped[2] = nSkipped[2] + 1
570 raise RuntimeError(
"Failed to find uamapping tag. Not an ControlSystem-OPC-UA XML mapping file.")
573 '''Write the map file.
575 logging.info(
"Writing map file: {}".
format(self.outputFile))
576 root = ET.Element(
"uamapping", nsmap = {
'csa':
'https://github.com/ChimeraTK/ControlSystemAdapter-OPC-UA-Adapter'})
579 self.
dir.generateMapEntries(root)
580 tree = ET.ElementTree()
582 uglyXml = ET.tounicode(tree, pretty_print =
True)
583 text_re = re.compile(
'>\n\s+([^<>\s].*?)\n\s+</', re.DOTALL)
584 prettyXml = text_re.sub(
'>\g<1></', uglyXml)
585 with open(self.outputFile,
"w")
as text_file:
586 text_file.write(prettyXml)
588 def _openFile(self, inputFile: str) -> ET._Element:
592 logging.debug(
"analysing file {}".
format(inputFile))
593 data = ET.parse(inputFile).getroot()