|
1 | 1 | import os |
2 | 2 | import math |
3 | | -import hypothesis.strategies as st |
4 | | -import pytest |
5 | | -from hypothesis import Phase, given, settings |
6 | 3 | from collections.abc import Callable |
7 | 4 | from typing import Any |
| 5 | +from functools import lru_cache |
8 | 6 |
|
9 | 7 | from opendbc.car import DT_CTRL, CanData, structs |
10 | | -from opendbc.car.car_helpers import interfaces |
11 | | -from opendbc.car.fingerprints import FW_VERSIONS |
12 | | -from opendbc.car.fw_versions import FW_QUERY_CONFIGS |
13 | | -from opendbc.car.interfaces import CarInterfaceBase, get_interface_attr |
14 | 8 | from opendbc.car.mock.values import CAR as MOCK |
15 | | -from opendbc.car.values import PLATFORMS |
16 | | - |
17 | | -DrawType = Callable[[st.SearchStrategy], Any] |
18 | | - |
19 | | -ALL_ECUS = {ecu for ecus in FW_VERSIONS.values() for ecu in ecus.keys()} |
20 | | -ALL_ECUS |= {ecu for config in FW_QUERY_CONFIGS.values() for ecu in config.extra_ecus} |
21 | | - |
22 | | -ALL_REQUESTS = {tuple(r.request) for config in FW_QUERY_CONFIGS.values() for r in config.requests} |
23 | 9 |
|
24 | 10 | # From panda/python/__init__.py |
25 | 11 | DLC_TO_LEN = [0, 1, 2, 3, 4, 5, 6, 7, 8, 12, 16, 20, 24, 32, 48, 64] |
26 | 12 |
|
27 | 13 | MAX_EXAMPLES = int(os.environ.get('MAX_EXAMPLES', '15')) |
28 | 14 |
|
29 | 15 |
|
30 | | -def get_fuzzy_car_interface(car_name: str, draw: DrawType) -> CarInterfaceBase: |
31 | | - # Fuzzy CAN fingerprints and FW versions to test more states of the CarInterface |
32 | | - fingerprint_strategy = st.fixed_dictionaries({0: st.dictionaries(st.integers(min_value=0, max_value=0x800), |
33 | | - st.sampled_from(DLC_TO_LEN))}) |
34 | | - |
35 | | - # only pick from possible ecus to reduce search space |
36 | | - car_fw_strategy = st.lists(st.builds( |
37 | | - lambda fw, req: structs.CarParams.CarFw(ecu=fw[0], address=fw[1], subAddress=fw[2] or 0, request=req), |
38 | | - st.sampled_from(sorted(ALL_ECUS)), |
39 | | - st.sampled_from(sorted(ALL_REQUESTS)), |
40 | | - )) |
| 16 | +@lru_cache(maxsize=1) |
| 17 | +def _get_all_ecus(): |
| 18 | + """Lazy loader for ALL_ECUS""" |
| 19 | + from opendbc.car.fingerprints import FW_VERSIONS |
| 20 | + from opendbc.car.fw_versions import FW_QUERY_CONFIGS |
41 | 21 |
|
42 | | - params_strategy = st.fixed_dictionaries({ |
43 | | - 'fingerprints': fingerprint_strategy, |
44 | | - 'car_fw': car_fw_strategy, |
45 | | - 'alpha_long': st.booleans(), |
46 | | - }) |
| 22 | + all_ecus = {ecu for ecus in FW_VERSIONS.values() for ecu in ecus.keys()} |
| 23 | + all_ecus |= {ecu for config in FW_QUERY_CONFIGS.values() for ecu in config.extra_ecus} |
| 24 | + return all_ecus |
47 | 25 |
|
48 | | - params: dict = draw(params_strategy) |
49 | | - # reduce search space by duplicating CAN fingerprints across all buses |
50 | | - params['fingerprints'] |= {key + 1: params['fingerprints'][0] for key in range(6)} |
51 | 26 |
|
52 | | - # initialize car interface |
53 | | - CarInterface = interfaces[car_name] |
54 | | - car_params = CarInterface.get_params(car_name, params['fingerprints'], params['car_fw'], |
55 | | - alpha_long=params['alpha_long'], is_release=False, docs=False) |
56 | | - return CarInterface(car_params) |
| 27 | +@lru_cache(maxsize=1) |
| 28 | +def _get_all_requests(): |
| 29 | + """Lazy loader for ALL_REQUESTS""" |
| 30 | + from opendbc.car.fw_versions import FW_QUERY_CONFIGS |
| 31 | + return {tuple(r.request) for config in FW_QUERY_CONFIGS.values() for r in config.requests} |
57 | 32 |
|
58 | 33 |
|
59 | 34 | class TestCarInterfaces: |
60 | | - # FIXME: Due to the lists used in carParams, Phase.target is very slow and will cause |
61 | | - # many generated examples to overrun when max_examples > ~20, don't use it |
62 | | - @pytest.mark.parametrize("car_name", sorted(PLATFORMS)) |
63 | | - @settings(max_examples=MAX_EXAMPLES, deadline=None, |
64 | | - phases=(Phase.reuse, Phase.generate, Phase.shrink)) |
65 | | - @given(data=st.data()) |
66 | | - def test_car_interfaces(self, car_name, data): |
67 | | - car_interface = get_fuzzy_car_interface(car_name, data.draw) |
68 | | - car_params = car_interface.CP.as_reader() |
69 | | - |
70 | | - assert car_params.mass > 1 |
71 | | - assert car_params.wheelbase > 0 |
72 | | - # centerToFront is center of gravity to front wheels, assert a reasonable range |
73 | | - assert car_params.wheelbase * 0.3 < car_params.centerToFront < car_params.wheelbase * 0.7 |
74 | | - assert car_params.maxLateralAccel > 0 |
75 | | - |
76 | | - # Longitudinal sanity checks |
77 | | - assert len(car_params.longitudinalTuning.kpV) == len(car_params.longitudinalTuning.kpBP) |
78 | | - assert len(car_params.longitudinalTuning.kiV) == len(car_params.longitudinalTuning.kiBP) |
79 | | - |
80 | | - # Lateral sanity checks |
81 | | - if car_params.steerControlType != structs.CarParams.SteerControlType.angle: |
82 | | - tune = car_params.lateralTuning |
83 | | - if tune.which() == 'pid': |
84 | | - if car_name != MOCK.MOCK: |
85 | | - assert not math.isnan(tune.pid.kf) and tune.pid.kf > 0 |
86 | | - assert len(tune.pid.kpV) > 0 and len(tune.pid.kpV) == len(tune.pid.kpBP) |
87 | | - assert len(tune.pid.kiV) > 0 and len(tune.pid.kiV) == len(tune.pid.kiBP) |
88 | | - |
89 | | - elif tune.which() == 'torque': |
90 | | - assert not math.isnan(tune.torque.kf) and tune.torque.kf > 0 |
91 | | - assert not math.isnan(tune.torque.friction) and tune.torque.friction > 0 |
92 | | - |
93 | | - # Run car interface |
94 | | - # TODO: use hypothesis to generate random messages |
95 | | - now_nanos = 0 |
96 | | - CC = structs.CarControl().as_reader() |
97 | | - for _ in range(10): |
98 | | - car_interface.update([]) |
99 | | - car_interface.apply(CC, now_nanos) |
100 | | - now_nanos += DT_CTRL * 1e9 # 10 ms |
101 | | - |
102 | | - CC = structs.CarControl() |
103 | | - CC.enabled = True |
104 | | - CC.latActive = True |
105 | | - CC.longActive = True |
106 | | - CC = CC.as_reader() |
107 | | - for _ in range(10): |
108 | | - car_interface.update([]) |
109 | | - car_interface.apply(CC, now_nanos) |
110 | | - now_nanos += DT_CTRL * 1e9 # 10ms |
111 | | - |
112 | | - # Test radar interface |
113 | | - radar_interface = car_interface.RadarInterface(car_params) |
114 | | - assert radar_interface |
115 | | - |
116 | | - # Run radar interface once |
117 | | - radar_interface.update([]) |
118 | | - if not car_params.radarUnavailable and radar_interface.rcp is not None and \ |
119 | | - hasattr(radar_interface, '_update') and hasattr(radar_interface, 'trigger_msg'): |
120 | | - radar_interface._update([radar_interface.trigger_msg]) |
121 | | - |
122 | | - # Test radar fault |
123 | | - if not car_params.radarUnavailable and radar_interface.rcp is not None: |
124 | | - cans = [(0, [CanData(0, b'', 0) for _ in range(5)])] |
125 | | - rr = radar_interface.update(cans) |
126 | | - assert rr is None or len(rr.errors) > 0 |
| 35 | + def test_car_interfaces(self, subtests): |
| 36 | + import hypothesis.strategies as st |
| 37 | + from hypothesis import Phase, given, settings |
| 38 | + from opendbc.car.interfaces import CarInterfaceBase |
| 39 | + |
| 40 | + DrawType = Callable[[st.SearchStrategy], Any] |
| 41 | + |
| 42 | + def get_fuzzy_car_interface(car_name: str, draw: DrawType) -> CarInterfaceBase: |
| 43 | + from opendbc.car.car_helpers import interfaces |
| 44 | + # Fuzzy CAN fingerprints and FW versions to test more states of the CarInterface |
| 45 | + fingerprint_strategy = st.fixed_dictionaries({0: st.dictionaries(st.integers(min_value=0, max_value=0x800), |
| 46 | + st.sampled_from(DLC_TO_LEN))}) |
| 47 | + all_ecus = _get_all_ecus() |
| 48 | + all_requests = _get_all_requests() |
| 49 | + |
| 50 | + # only pick from possible ecus to reduce search space |
| 51 | + car_fw_strategy = st.lists(st.builds( |
| 52 | + lambda fw, req: structs.CarParams.CarFw(ecu=fw[0], address=fw[1], subAddress=fw[2] or 0, request=req), |
| 53 | + st.sampled_from(sorted(all_ecus)), |
| 54 | + st.sampled_from(sorted(all_requests)), |
| 55 | + )) |
| 56 | + |
| 57 | + params_strategy = st.fixed_dictionaries({ |
| 58 | + 'fingerprints': fingerprint_strategy, |
| 59 | + 'car_fw': car_fw_strategy, |
| 60 | + 'alpha_long': st.booleans(), |
| 61 | + }) |
| 62 | + |
| 63 | + params: dict = draw(params_strategy) |
| 64 | + # reduce search space by duplicating CAN fingerprints across all buses |
| 65 | + params['fingerprints'] |= {key + 1: params['fingerprints'][0] for key in range(6)} |
| 66 | + |
| 67 | + # initialize car interface |
| 68 | + CarInterface = interfaces[car_name] |
| 69 | + car_params = CarInterface.get_params(car_name, params['fingerprints'], |
| 70 | + params['car_fw'],alpha_long=params['alpha_long'], is_release=False, docs=False) |
| 71 | + return CarInterface(car_params) |
| 72 | + |
| 73 | + # FIXME: Due to the lists used in carParams, Phase.target is very slow and will cause |
| 74 | + # many generated examples to overrun when max_examples > ~20, don't use it |
| 75 | + @settings(max_examples=MAX_EXAMPLES, deadline=None, |
| 76 | + phases=(Phase.reuse, Phase.generate, Phase.shrink)) |
| 77 | + @given(data=st.data()) |
| 78 | + def run_car_interface_test(data, car_name: str): |
| 79 | + car_interface = get_fuzzy_car_interface(car_name, data.draw) |
| 80 | + car_params = car_interface.CP.as_reader() |
| 81 | + |
| 82 | + assert car_params.mass > 1 |
| 83 | + assert car_params.wheelbase > 0 |
| 84 | + # centerToFront is center of gravity to front wheels, assert a reasonable range |
| 85 | + assert car_params.wheelbase * 0.3 < car_params.centerToFront < car_params.wheelbase * 0.7 |
| 86 | + assert car_params.maxLateralAccel > 0 |
| 87 | + |
| 88 | + # Longitudinal sanity checks |
| 89 | + assert len(car_params.longitudinalTuning.kpV) == len(car_params.longitudinalTuning.kpBP) |
| 90 | + assert len(car_params.longitudinalTuning.kiV) == len(car_params.longitudinalTuning.kiBP) |
| 91 | + |
| 92 | + # Lateral sanity checks |
| 93 | + if car_params.steerControlType != structs.CarParams.SteerControlType.angle: |
| 94 | + tune = car_params.lateralTuning |
| 95 | + if tune.which() == 'pid': |
| 96 | + if car_name != MOCK.MOCK: |
| 97 | + assert not math.isnan(tune.pid.kf) and tune.pid.kf > 0 |
| 98 | + assert len(tune.pid.kpV) > 0 and len(tune.pid.kpV) == len(tune.pid.kpBP) |
| 99 | + assert len(tune.pid.kiV) > 0 and len(tune.pid.kiV) == len(tune.pid.kiBP) |
| 100 | + |
| 101 | + elif tune.which() == 'torque': |
| 102 | + assert not math.isnan(tune.torque.kf) and tune.torque.kf > 0 |
| 103 | + assert not math.isnan(tune.torque.friction) and tune.torque.friction > 0 |
| 104 | + |
| 105 | + # Run car interface |
| 106 | + # TODO: use hypothesis to generate random messages |
| 107 | + now_nanos = 0 |
| 108 | + CC = structs.CarControl().as_reader() |
| 109 | + for _ in range(10): |
| 110 | + car_interface.update([]) |
| 111 | + car_interface.apply(CC, now_nanos) |
| 112 | + now_nanos += DT_CTRL * 1e9 # 10 ms |
| 113 | + |
| 114 | + CC = structs.CarControl() |
| 115 | + CC.enabled = True |
| 116 | + CC.latActive = True |
| 117 | + CC.longActive = True |
| 118 | + CC = CC.as_reader() |
| 119 | + for _ in range(10): |
| 120 | + car_interface.update([]) |
| 121 | + car_interface.apply(CC, now_nanos) |
| 122 | + now_nanos += DT_CTRL * 1e9 # 10ms |
| 123 | + |
| 124 | + # Test radar interface |
| 125 | + radar_interface = car_interface.RadarInterface(car_params) |
| 126 | + assert radar_interface |
| 127 | + |
| 128 | + # Run radar interface once |
| 129 | + radar_interface.update([]) |
| 130 | + if not car_params.radarUnavailable and radar_interface.rcp is not None and \ |
| 131 | + hasattr(radar_interface, '_update') and hasattr(radar_interface, 'trigger_msg'): |
| 132 | + radar_interface._update([radar_interface.trigger_msg]) |
| 133 | + |
| 134 | + # Test radar fault |
| 135 | + if not car_params.radarUnavailable and radar_interface.rcp is not None: |
| 136 | + cans = [(0, [CanData(0, b'', 0) for _ in range(5)])] |
| 137 | + rr = radar_interface.update(cans) |
| 138 | + assert rr is None or len(rr.errors) > 0 |
| 139 | + |
| 140 | + from opendbc.car.values import PLATFORMS |
| 141 | + for car_name in sorted(PLATFORMS): |
| 142 | + with subtests.test(car_name=car_name): |
| 143 | + run_car_interface_test(car_name=car_name) |
127 | 144 |
|
128 | 145 | def test_interface_attrs(self): |
129 | 146 | """Asserts basic behavior of interface attribute getter""" |
| 147 | + from opendbc.car.interfaces import get_interface_attr |
130 | 148 | num_brands = len(get_interface_attr('CAR')) |
131 | 149 | assert num_brands >= 12 |
132 | 150 |
|
|
0 commit comments