-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcanary_tests.py
executable file
·147 lines (119 loc) · 5.61 KB
/
canary_tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#!/usr/bin/env python3
import asyncio
from parameterized import parameterized
import subprocess
import time
import unittest
from viam.components.board import Board
from viam.robot.client import RobotClient
from viam.rpc.dial import DialOptions
import canary_config as conf
import stack_printing # Set up the ability to print stack traces on SIGUSR1
async def connect(address, opts):
try:
robot = await RobotClient.at_address(address, opts)
except ConnectionError:
# There's some race condition in the Python SDK that causes
# reconnection to fail sometimes. See if we can figure out what
# the RDK server is doing that causes this trouble. Send a SIGUSR1
# (signal 10), which should log stack traces from all goroutines.
# The tricky part here is that there might be 2 RDK servers
# running: the "normal" one on port 8080 and the board canary one
# on port 9090. We want to only send the signal to the latter.
subprocess.run(["pkill", "-10", "-f", "viam-canary.json"],
shell=True)
print("Connection error during canary tests. Retrying...")
time.sleep(5)
robot = await RobotClient.at_address(address, opts)
return robot
class PinTests(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self.robot = await connect(conf.address, conf.opts)
self.board = Board.from_robot(self.robot, "board")
self.input_pin = await self.board.gpio_pin_by_name(conf.INPUT_PIN)
self.output_pin = await self.board.gpio_pin_by_name(conf.OUTPUT_PIN)
# Most boards have combination GPIO/PWM/interrupt pins. However, rarely
# they are separated to different pins (e.g., the Beaglebone AI-64 does
# not have GPIO functionality on the PWM pins or vice versa). We need
# two pairs of pins: GPIO (output) pairing with GPIO (input), and PWM
# (output) pairing with digital interrupt (input). For boards whose
# pins can have multiple functions, you can just define the first pair
# and we'll reuse it for both. but if you want to define the two pairs
# separately, you can.
try:
HW_INTERRUPT_PIN = conf.HW_INTERRUPT_PIN
except AttributeError:
HW_INTERRUPT_PIN = conf.INPUT_PIN
try:
HW_PWM_PIN = conf.HW_PWM_PIN
except AttributeError:
HW_PWM_PIN = conf.OUTPUT_PIN
self.hw_pwm_pin = await self.board.gpio_pin_by_name(HW_PWM_PIN)
self.hw_interrupt = await self.board.digital_interrupt_by_name(HW_INTERRUPT_PIN)
# We also need a software pwm pin and interrupt pin pair to test software pwm.
self.sw_pwm_pin = await self.board.gpio_pin_by_name(conf.SW_PWM_PIN)
self.sw_interrupt = await self.board.digital_interrupt_by_name(conf.SW_INTERRUPT_PIN)
async def asyncTearDown(self):
await self.output_pin.set(False)
await self.robot.close()
@parameterized.expand(((True,), (False,)))
async def test_gpios(self, value):
await self.output_pin.set(value)
result = await self.input_pin.get()
self.assertEqual(result, value)
@parameterized.expand((("hw",), ("sw",)))
async def test_interrupts_and_pwm(self, pwm):
FREQUENCY = 40 # Hertz
DURATION = 2 # seconds
if pwm == "hw":
pwm_pin = self.hw_pwm_pin
interrupt = self.hw_interrupt
error_factor = 0.05
else:
pwm_pin = self.sw_pwm_pin
interrupt = self.sw_interrupt
error_factor = 0.10 # Software PWM can get really inaccurate
# In order to diagnose a flaky test, we're going to record all tick-related data.
should_stop = asyncio.Event()
ticks = []
tick_stream = await self.board.stream_ticks([interrupt])
# If we are not getting any ticks in tick_stream the task will wait 5 secs
# for record_tick_data to return, then timeout.
counter_task = asyncio.create_task(asyncio.wait_for(
self.record_tick_data(tick_stream, ticks, should_stop), 5))
await pwm_pin.set_pwm_frequency(FREQUENCY)
await pwm_pin.set_pwm(0.5) # Duty cycle fraction: 0 to 1
starting_count = await interrupt.value()
await asyncio.sleep(DURATION)
should_stop.set()
await counter_task
await pwm_pin.set(False) # Turn the output off again
print("data from tick stream:")
for tick in ticks:
print(tick)
ending_count = await interrupt.value()
total_count = ending_count - starting_count
expected_count = FREQUENCY * DURATION
allowable_error = expected_count * error_factor
self.assertAlmostEqual(total_count, expected_count, delta=allowable_error)
@staticmethod
async def record_tick_data(tick_stream, data, should_stop):
async for tick in tick_stream:
data.append((tick.time, tick.high, tick.pin_name))
if should_stop.is_set():
return
class PingMonitorTest(unittest.IsolatedAsyncioTestCase):
async def test_monitor_is_online(self):
"""
One board canary is designated the "canary monitor," and makes sure
that all other canaries are online. There is a different canary that's
in charge of making sure the canary monitor is online, and that's what
this test is about.
"""
if conf.board_monitor is None:
return # No monitor to connect to
address, creds = conf.board_monitor
robot = await connect(address, creds)
await robot.close()
if __name__ == "__main__":
unittest.main()