ChimeraTK-ApplicationCore 04.06.00
Loading...
Searching...
No Matches
testPythonWithArray.py
Go to the documentation of this file.
1#!/usr/bin/python3
2
3import sys
4import os
5import os.path
6import numpy as np
7from typing import Callable
8import time
9import traceback
10
11# fmt: off
12# Hack to insert the python path for the locally compiled module in the
13# test script
14sys.path.insert(0, os.path.abspath(os.path.join(os.curdir, "..")))
15import PyApplicationCore as ac # NOQA
16# fmt: on
17
18class MyMod(ac.ApplicationModule) :
19
20 def __init__(self, owner, name, description) :
21 super().__init__(owner, name, description)
22
23 # can we do this somehow in the class definition instead of the constructor body?
24 self.myOutput1 = ac.ArrayOutput(ac.DataType.int32, self, "ArrayOut1" ,"SomeUnit", 10, "my fancy description")
25 self.myOutput2 = ac.ArrayOutput(ac.DataType.int32, self, "ArrayOut2" ,"SomeUnit", 10, "my fancy description")
26 self.myOutputRB = ac.ArrayOutputPushRB(ac.DataType.int32, self, "ArrayOutRB" ,"SomeUnit", 2, "my fancy description")
27
28 self.myInput1 = ac.ArrayPushInput(ac.DataType.int32, self, "ArrayIn1","unit", 2, "description")
29 self.myInput2 = ac.ArrayPushInput(ac.DataType.int32, self, "ArrayIn2","unit", 5, "description")
30 self.myInputPoll = ac.ArrayPollInput(ac.DataType.int32, self, "ArrayInPOLL","unit", 2, "description")
31 self.myInputWB = ac.ArrayPushInputWB(ac.DataType.int32, self, "ArrayInWB","unit", 1, "description")
32
33 # these outputs are used to transport subject-to-test data to the C++ side for checking in the BOOST test case
34 self.testError = ac.ScalarOutput(ac.DataType.string, self, "TestError", "", "")
35
36 def prepare(self) :
37 self.myOutputRB.setAndWrite([42,43])
38
39 def mainLoop(self) :
40 try :
41 assert self.myOutput1.getName() == "/SomeName/ArrayOut1"
42 assert self.myOutput1.getUnit() == "SomeUnit"
43 assert "Root for Python Modules - Module's description - my fancy description" in self.myOutput1.getDescription()
44 assert self.myOutput1.getValueType() == ac.DataType.int32
45 assert self.myOutput1.getVersionNumber() == ac.VersionNumber(None)
46 assert self.myOutput1.isReadOnly() == False
47 assert self.myOutput1.isReadable() == False
48 assert self.myOutput1.isWriteable() == True
49 assert str(self.myOutput1.getId()).startswith('0x')
50 assert self.myOutput1.getId() != self.myOutput2.getId()
51 assert self.myOutput1.dataValidity() == ac.DataValidity.ok
52
53 while True:
54 val = self.myInput1.get()
55 valSum = sum(val)
56 self.myOutput1.setAndWrite(range(valSum,valSum+10))
57
58 valcopy = np.array(self.myInput1, copy=True)
59 assert (self.myInput1 == valcopy).all()
60
61 valcopy[0] = 3
62
63 assert ((self.myInput1 == valcopy) == [False, True]).all()
64 assert ((self.myInput1 != valcopy) == [True, False]).all()
65 assert ((self.myInput1 <= valcopy) == [False, True]).all()
66 assert ((self.myInput1 < valcopy) == [False, False]).all()
67 assert ((self.myInput1 >= valcopy) == [True, True]).all()
68 assert ((self.myInput1 > valcopy) == [True, False]).all()
69
70 assert ((valcopy == self.myInput1) == [False, True]).all()
71 assert ((valcopy != self.myInput1) == [True, False]).all()
72 assert ((valcopy >= self.myInput1) == [False, True]).all()
73 assert ((valcopy > self.myInput1) == [False, False]).all()
74 assert ((valcopy <= self.myInput1) == [True, True]).all()
75 assert ((valcopy < self.myInput1) == [True, False]).all()
76
77 assert ((self.myInput1 == self.myInput1) == [True, True]).all()
78 assert ((self.myInput1 == self.myInputPoll) == [False, False]).all()
79
80 self.myInput1.set([123,234])
81 assert self.myInput1[0] == 123
82 assert self.myInput1[1] == 234
83 assert ((self.myInput1 == [123,234]) == [True, True]).all()
84
85 # calling members of np.array is directly possible
86 assert self.myInput1.shape == (2,)
87
88 # val is not a deep copy
89 assert val[0] == self.myInput1[0]
90 val[0] = 4
91 assert val[0] == self.myInput1[0]
92 self.myInput1[0] = 3
93 assert self.myInput1[0] == 3
94 assert val[0] == self.myInput1[0]
95 assert val[0] == 3
96
97
98 val = self.myInput2.readAndGet()
99 valSum = sum(val)
100 self.myOutput2.set(range(valSum,valSum+10))
101 self.myOutput2.write()
102
103 for i in range(0,5) :
104 self.myInputPoll.read() # non-blocking (poll)
105 assert (self.myInputPoll == [43,2]).all()
106
107 self.myInputWB.read()
108 assert self.myInputWB == 15
109 self.myInputWB.setAndWrite([28])
110 assert self.myInputWB == 28
111
112 self.myOutputRB.setAndWrite([120,121])
113 self.myOutputRB.read()
114 assert (self.myOutputRB == [130,131]).all()
115
116 self.myInput1.read()
117
118 except AssertionError as e:
119 print("\n".join(traceback.format_exception(e)))
120 sys.stdout.flush()
121 self.testError.setAndWrite("\n".join(traceback.format_exception(e)))
122
123# We cannot check the return channel of a ArrayOutputPushRB through the control system, so we use a second
124# module for this purpose
125class MySecondMod(ac.ApplicationModule) :
126 def __init__(self, owner, name, description) :
127 super().__init__(owner, name, description)
128
129 self.input = ac.ArrayPushInputWB(ac.DataType.int32, self, "/SomeName/ArrayOutRB" ,"", 2, "")
130
131 # these outputs are used to transport subject-to-test data to the C++ side for checking in the BOOST test case
132 self.testError = ac.ScalarOutput(ac.DataType.string, self, "TestError", "", "")
133
134
135 def mainLoop(self) :
136 try :
137 assert (self.input == [42,43]).all() # initial value as set in prepare()
138 self.input.read()
139 assert (self.input == [120,121]).all()
140 self.input.setAndWrite([130,131])
141 except AssertionError as e:
142 self.testError.setAndWrite("\n".join(traceback.format_exception(e)))
143
144
145ac.app.mod = MyMod(ac.app, "SomeName", "Module's description")
146ac.app.mod2 = MySecondMod(ac.app, "Foo", "Bar")
__init__(self, owner, name, description)
__init__(self, owner, name, description)
void mainLoop() override