ChimeraTK-ApplicationCore 04.08.00
Loading...
Searching...
No Matches
testPythonWithArray.py
Go to the documentation of this file.
1#!/usr/bin/python3
2
3import sys
4import os
5import os.path
6import numpy as np
7from typing import Callable
8import time
9import traceback
10
11# fmt: off
12# Hack to insert the python path for the locally compiled module in the
13# test script
14sys.path.insert(0, os.path.abspath(os.path.join(os.curdir, "..")))
15import PyApplicationCore as ac # NOQA
16# fmt: on
17
18
19class MyMod(ac.ApplicationModule):
20
21 def __init__(self, owner, name, description):
22 super().__init__(owner, name, description)
23
24 # can we do this somehow in the class definition instead of the constructor body?
25 self.myOutput1 = ac.ArrayOutput(ac.DataType.int32, self, "ArrayOut1", "SomeUnit", 10, "my fancy description")
26 self.myOutput2 = ac.ArrayOutput(ac.DataType.int32, self, "ArrayOut2", "SomeUnit", 10, "my fancy description")
27 self.myOutputRB = ac.ArrayOutputPushRB(
28 ac.DataType.int32,
29 self,
30 "ArrayOutRB",
31 "SomeUnit",
32 2,
33 "my fancy description")
34
35 self.myInput1 = ac.ArrayPushInput(ac.DataType.int32, self, "ArrayIn1", "unit", 2, "description")
36 self.myInput2 = ac.ArrayPushInput(ac.DataType.int32, self, "ArrayIn2", "unit", 5, "description")
37 self.myInputPoll = ac.ArrayPollInput(ac.DataType.int32, self, "ArrayInPOLL", "unit", 2, "description")
38 self.myInputWB = ac.ArrayPushInputWB(ac.DataType.int32, self, "ArrayInWB", "unit", 1, "description")
39
40 # these outputs are used to transport subject-to-test data to the C++ side for checking in the BOOST test case
41 self.testError = ac.ScalarOutput(ac.DataType.string, self, "TestError", "", "")
42
43 def prepare(self):
44 self.myOutputRB.setAndWrite([42, 43])
45
46 def mainLoop(self):
47 try:
48 assert self.myOutput1.getName() == "/SomeName/ArrayOut1"
49 assert self.myOutput1.getUnit() == "SomeUnit"
50 assert "Root for Python Modules - Module's description - my fancy description" in self.myOutput1.getDescription()
51 assert self.myOutput1.getValueType() == ac.DataType.int32
52 assert self.myOutput1.getVersionNumber() == ac.VersionNumber(None)
53 assert self.myOutput1.isReadOnly() == False
54 assert self.myOutput1.isReadable() == False
55 assert self.myOutput1.isWriteable()
56 assert str(self.myOutput1.getId()).startswith('0x')
57 assert self.myOutput1.getId() != self.myOutput2.getId()
58 assert self.myOutput1.dataValidity() == ac.DataValidity.ok
59
60 while True:
61 val = self.myInput1.get()
62 valSum = sum(val)
63 self.myOutput1.setAndWrite(range(valSum, valSum + 10))
64
65 valcopy = np.array(self.myInput1, copy=True)
66 assert (self.myInput1 == valcopy).all()
67
68 valcopy[0] = 3
69
70 assert ((self.myInput1 == valcopy) == [False, True]).all()
71 assert ((self.myInput1 != valcopy) == [True, False]).all()
72 assert ((self.myInput1 <= valcopy) == [False, True]).all()
73 assert ((self.myInput1 < valcopy) == [False, False]).all()
74 assert ((self.myInput1 >= valcopy) == [True, True]).all()
75 assert ((self.myInput1 > valcopy) == [True, False]).all()
76
77 assert ((valcopy == self.myInput1) == [False, True]).all()
78 assert ((valcopy != self.myInput1) == [True, False]).all()
79 assert ((valcopy >= self.myInput1) == [False, True]).all()
80 assert ((valcopy > self.myInput1) == [False, False]).all()
81 assert ((valcopy <= self.myInput1) == [True, True]).all()
82 assert ((valcopy < self.myInput1) == [True, False]).all()
83
84 assert ((self.myInput1 == self.myInput1) == [True, True]).all()
85 assert ((self.myInput1 == self.myInputPoll) == [False, False]).all()
86
87 self.myInput1.set([123, 234])
88 assert self.myInput1[0] == 123
89 assert self.myInput1[1] == 234
90 assert ((self.myInput1 == [123, 234]) == [True, True]).all()
91
92 # calling members of np.array is directly possible
93 assert self.myInput1.shape == (2,)
94
95 # val is not a deep copy
96 assert val[0] == self.myInput1[0]
97 val[0] = 4
98 assert val[0] == self.myInput1[0]
99 self.myInput1[0] = 3
100 assert self.myInput1[0] == 3
101 assert val[0] == self.myInput1[0]
102 assert val[0] == 3
103
104 val = self.myInput2.readAndGet()
105 valSum = sum(val)
106 self.myOutput2.set(range(valSum, valSum + 10))
107 self.myOutput2.write()
108
109 for i in range(0, 5):
110 self.myInputPoll.read() # non-blocking (poll)
111 assert (self.myInputPoll == [43, 2]).all()
112
113 self.myInputWB.read()
114 assert self.myInputWB == 15
115 self.myInputWB.setAndWrite([28])
116 assert self.myInputWB == 28
117
118 self.myOutputRB.setAndWrite([120, 121])
119 self.myOutputRB.read()
120 assert (self.myOutputRB == [130, 131]).all()
121
122 self.myInput1.read()
123
124 except AssertionError as e:
125 print("\n".join(traceback.format_exception(e)))
126 sys.stdout.flush()
127 self.testError.setAndWrite("\n".join(traceback.format_exception(e)))
128
129# We cannot check the return channel of a ArrayOutputPushRB through the control system, so we use a second
130# module for this purpose
131
132
133class MySecondMod(ac.ApplicationModule):
134 def __init__(self, owner, name, description):
135 super().__init__(owner, name, description)
136
137 self.input = ac.ArrayPushInputWB(ac.DataType.int32, self, "/SomeName/ArrayOutRB", "", 2, "")
138
139 # these outputs are used to transport subject-to-test data to the C++ side for checking in the BOOST test case
140 self.testError = ac.ScalarOutput(ac.DataType.string, self, "TestError", "", "")
141
142 def mainLoop(self):
143 try:
144 assert (self.input == [42, 43]).all() # initial value as set in prepare()
145 self.input.read()
146 assert (self.input == [120, 121]).all()
147 self.input.setAndWrite([130, 131])
148 except AssertionError as e:
149 self.testError.setAndWrite("\n".join(traceback.format_exception(e)))
150
151
152ac.app.mod = MyMod(ac.app, "SomeName", "Module's description")
153ac.app.mod2 = MySecondMod(ac.app, "Foo", "Bar")
__init__(self, owner, name, description)
__init__(self, owner, name, description)
void mainLoop() override