diff --git a/opendbc/car/ford/tests/test_ford.py b/opendbc/car/ford/tests/test_ford.py index 5e3cc420b4f..c72d43cda9e 100644 --- a/opendbc/car/ford/tests/test_ford.py +++ b/opendbc/car/ford/tests/test_ford.py @@ -1,13 +1,6 @@ import random -from collections.abc import Iterable - -from hypothesis import settings, given, strategies as st -from parameterized import parameterized from opendbc.car.structs import CarParams -from opendbc.car.fw_versions import build_fw_dict -from opendbc.car.ford.values import CAR, FW_QUERY_CONFIG, FW_PATTERN, get_platform_codes -from opendbc.car.ford.fingerprints import FW_VERSIONS Ecu = CarParams.Ecu @@ -42,39 +35,50 @@ class TestFordFW: def test_fw_query_config(self): + from opendbc.car.ford.values import FW_QUERY_CONFIG for (ecu, addr, subaddr) in FW_QUERY_CONFIG.extra_ecus: assert ecu in ECU_ADDRESSES, "Unknown ECU" assert addr == ECU_ADDRESSES[ecu], "ECU address mismatch" assert subaddr is None, "Unexpected ECU subaddress" - @parameterized.expand(FW_VERSIONS.items()) - def test_fw_versions(self, car_model: str, fw_versions: dict[tuple[int, int, int | None], Iterable[bytes]]): - for (ecu, addr, subaddr), fws in fw_versions.items(): - assert ecu in ECU_PART_NUMBER, "Unexpected ECU" - assert addr == ECU_ADDRESSES[ecu], "ECU address mismatch" - assert subaddr is None, "Unexpected ECU subaddress" + def test_fw_versions(self): + from opendbc.car.ford.values import FW_PATTERN, get_platform_codes + from opendbc.car.ford.fingerprints import FW_VERSIONS + + for _car_model, fw_versions in FW_VERSIONS.items(): + for (ecu, addr, subaddr), fws in fw_versions.items(): + assert ecu in ECU_PART_NUMBER, "Unexpected ECU" + assert addr == ECU_ADDRESSES[ecu], "ECU address mismatch" + assert subaddr is None, "Unexpected ECU subaddress" - for fw in fws: - assert len(fw) == 24, "Expected ECU response to be 24 bytes" + for fw in fws: + assert len(fw) == 24, "Expected ECU response to be 24 bytes" - match = FW_PATTERN.match(fw) - assert match is not None, f"Unable to parse FW: {fw!r}" - if match: - part_number = match.group("part_number") - assert part_number in ECU_PART_NUMBER[ecu], f"Unexpected part number for {fw!r}" + match = FW_PATTERN.match(fw) + assert match is not None, f"Unable to parse FW: {fw!r}" + if match: + part_number = match.group("part_number") + assert part_number in ECU_PART_NUMBER[ecu], f"Unexpected part number for {fw!r}" - codes = get_platform_codes([fw]) - assert 1 == len(codes), f"Unable to parse FW: {fw!r}" + codes = get_platform_codes([fw]) + assert 1 == len(codes), f"Unable to parse FW: {fw!r}" - @settings(max_examples=100) - @given(data=st.data()) - def test_platform_codes_fuzzy_fw(self, data): + def test_platform_codes_fuzzy_fw(self): """Ensure function doesn't raise an exception""" - fw_strategy = st.lists(st.binary()) - fws = data.draw(fw_strategy) - get_platform_codes(fws) + from hypothesis import settings, given, strategies as st + + @settings(max_examples=100) + @given(data=st.data()) + def _test_impl(data): + from opendbc.car.ford.values import get_platform_codes + fw_strategy = st.lists(st.binary()) + fws = data.draw(fw_strategy) + get_platform_codes(fws) + + _test_impl() def test_platform_codes_spot_check(self): + from opendbc.car.ford.values import get_platform_codes # Asserts basic platform code parsing behavior for a few cases results = get_platform_codes([ b"JX6A-14C204-BPL\x00\x00\x00\x00\x00\x00\x00\x00\x00", @@ -85,6 +89,10 @@ def test_platform_codes_spot_check(self): assert results == {(b"X6A", b"J"), (b"Z6T", b"N"), (b"J6T", b"P"), (b"B5A", b"L")} def test_fuzzy_match(self): + from opendbc.car.ford.values import FW_QUERY_CONFIG + from opendbc.car.fw_versions import build_fw_dict + from opendbc.car.ford.fingerprints import FW_VERSIONS + for platform, fw_by_addr in FW_VERSIONS.items(): # Ensure there's no overlaps in platform codes for _ in range(20): @@ -100,6 +108,7 @@ def test_fuzzy_match(self): assert matches == {platform} def test_match_fw_fuzzy(self): + from opendbc.car.ford.values import CAR, FW_QUERY_CONFIG offline_fw = { (Ecu.eps, 0x730, None): [ b"L1MC-14D003-AJ\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", diff --git a/opendbc/car/hyundai/tests/test_hyundai.py b/opendbc/car/hyundai/tests/test_hyundai.py index b798acaa1d0..7fde55f311c 100644 --- a/opendbc/car/hyundai/tests/test_hyundai.py +++ b/opendbc/car/hyundai/tests/test_hyundai.py @@ -1,18 +1,12 @@ -from hypothesis import settings, given, strategies as st - import pytest from opendbc.car import gen_empty_fingerprint from opendbc.car.structs import CarParams -from opendbc.car.fw_versions import build_fw_dict -from opendbc.car.hyundai.interface import CarInterface from opendbc.car.hyundai.hyundaicanfd import CanBus -from opendbc.car.hyundai.radar_interface import RADAR_START_ADDR from opendbc.car.hyundai.values import CAMERA_SCC_CAR, CANFD_CAR, CAN_GEARS, CAR, CHECKSUM, DATE_FW_ECUS, \ HYBRID_CAR, EV_CAR, FW_QUERY_CONFIG, LEGACY_SAFETY_MODE_CAR, CANFD_FUZZY_WHITELIST, \ UNSUPPORTED_LONGITUDINAL_CAR, PLATFORM_CODE_ECUS, HYUNDAI_VERSION_REQUEST_LONG, \ HyundaiFlags, get_platform_codes, HyundaiSafetyFlags -from opendbc.car.hyundai.fingerprints import FW_VERSIONS Ecu = CarParams.Ecu @@ -45,6 +39,8 @@ class TestHyundaiFingerprint: def test_feature_detection(self): + from opendbc.car.hyundai.interface import CarInterface + from opendbc.car.hyundai.radar_interface import RADAR_START_ADDR # LKA steering for lka_steering in (True, False): fingerprint = gen_empty_fingerprint() @@ -63,6 +59,8 @@ def test_feature_detection(self): assert CP.radarUnavailable != radar def test_alternate_limits(self): + from opendbc.car.hyundai.interface import CarInterface + # Alternate lateral control limits, for high torque cars, verify Panda safety mode flag is set fingerprint = gen_empty_fingerprint() for car_model in CAR: @@ -83,6 +81,8 @@ def test_hybrid_ev_sets(self): assert CANFD_CAR & HYBRID_CAR == set(), "Hard coding CAN FD cars as hybrid is no longer supported" def test_canfd_ecu_whitelist(self): + from opendbc.car.hyundai.fingerprints import FW_VERSIONS + # Asserts only expected Ecus can exist in database for CAN-FD cars for car_model in CANFD_CAR: ecus = {fw[0] for fw in FW_VERSIONS[car_model].keys()} @@ -92,6 +92,8 @@ def test_canfd_ecu_whitelist(self): f"{car_model}: Car model has unexpected ECUs: {ecu_strings}" def test_blacklisted_parts(self, subtests): + from opendbc.car.hyundai.fingerprints import FW_VERSIONS + # Asserts no ECUs known to be shared across platforms exist in the database. # Tucson having Santa Cruz camera and EPS for example for car_model, ecus in FW_VERSIONS.items(): @@ -106,6 +108,8 @@ def test_blacklisted_parts(self, subtests): assert not part.startswith(b'CW'), "Car has bad part number" def test_correct_ecu_response_database(self, subtests): + from opendbc.car.hyundai.fingerprints import FW_VERSIONS + """ Assert standard responses for certain ECUs, since they can respond to multiple queries with different data @@ -117,15 +121,22 @@ def test_correct_ecu_response_database(self, subtests): assert all(fw.startswith(expected_fw_prefix) for fw in fws), \ f"FW from unexpected request in database: {(ecu, fws)}" - @settings(max_examples=100) - @given(data=st.data()) - def test_platform_codes_fuzzy_fw(self, data): + def test_platform_codes_fuzzy_fw(self): + from hypothesis import settings, given, strategies as st + """Ensure function doesn't raise an exception""" - fw_strategy = st.lists(st.binary()) - fws = data.draw(fw_strategy) - get_platform_codes(fws) + @settings(max_examples=100) + @given(data=st.data()) + def _test_impl(data): + fw_strategy = st.lists(st.binary()) + fws = data.draw(fw_strategy) + get_platform_codes(fws) + + _test_impl() def test_expected_platform_codes(self, subtests): + from opendbc.car.hyundai.fingerprints import FW_VERSIONS + # Ensures we don't accidentally add multiple platform codes for a car unless it is intentional for car_model, ecus in FW_VERSIONS.items(): with subtests.test(car_model=car_model.value): @@ -145,6 +156,8 @@ def test_expected_platform_codes(self, subtests): # Tests for platform codes, part numbers, and FW dates which Hyundai will use to fuzzy # fingerprint in the absence of full FW matches: def test_platform_code_ecus_available(self, subtests): + from opendbc.car.hyundai.fingerprints import FW_VERSIONS + # TODO: add queries for these non-CAN FD cars to get EPS no_eps_platforms = CANFD_CAR | {CAR.KIA_SORENTO, CAR.KIA_OPTIMA_G4, CAR.KIA_OPTIMA_G4_FL, CAR.KIA_OPTIMA_H, CAR.KIA_OPTIMA_H_G4_FL, CAR.HYUNDAI_SONATA_LF, CAR.HYUNDAI_TUCSON, CAR.GENESIS_G90, CAR.GENESIS_G80, CAR.HYUNDAI_ELANTRA} @@ -160,6 +173,8 @@ def test_platform_code_ecus_available(self, subtests): assert platform_code_ecu in [e[0] for e in ecus] def test_fw_format(self, subtests): + from opendbc.car.hyundai.fingerprints import FW_VERSIONS + # Asserts: # - every supported ECU FW version returns one platform code # - every supported ECU FW version has a part number @@ -219,6 +234,9 @@ def test_platform_codes_spot_check(self): (b"ON-S9100", b"190405"), (b"ON-S9100", b"190720")} def test_fuzzy_excluded_platforms(self): + from opendbc.car.hyundai.fingerprints import FW_VERSIONS + from opendbc.car.fw_versions import build_fw_dict + # Asserts a list of platforms that will not fuzzy fingerprint with platform codes due to them being shared. # This list can be shrunk as we combine platforms and detect features excluded_platforms = { diff --git a/opendbc/car/tests/test_can_fingerprint.py b/opendbc/car/tests/test_can_fingerprint.py index 30dba600081..48789baf493 100644 --- a/opendbc/car/tests/test_can_fingerprint.py +++ b/opendbc/car/tests/test_can_fingerprint.py @@ -1,27 +1,28 @@ -import pytest from opendbc.car.can_definitions import CanData -from opendbc.car.car_helpers import FRAME_FINGERPRINT, can_fingerprint -from opendbc.car.fingerprints import _FINGERPRINTS as FINGERPRINTS class TestCanFingerprint: - @pytest.mark.parametrize("car_model, fingerprints", FINGERPRINTS.items()) - def test_can_fingerprint(self, car_model, fingerprints): + def test_can_fingerprint(self): """Tests online fingerprinting function on offline fingerprints""" + from opendbc.car.car_helpers import can_fingerprint + from opendbc.car.fingerprints import _FINGERPRINTS as FINGERPRINTS - for fingerprint in fingerprints: # can have multiple fingerprints for each platform - can = [CanData(address=address, dat=b'\x00' * length, src=src) - for address, length in fingerprint.items() for src in (0, 1)] + for car_model, fingerprints in FINGERPRINTS.items(): + for fingerprint in fingerprints: # can have multiple fingerprints for each platform + can = [CanData(address=address, dat=b'\x00' * length, src=src) + for address, length in fingerprint.items() for src in (0, 1)] - fingerprint_iter = iter([can]) - car_fingerprint, finger = can_fingerprint(lambda **kwargs: [next(fingerprint_iter, [])]) # noqa: B023 + fingerprint_iter = iter([can]) + car_fingerprint, finger = can_fingerprint(lambda **kwargs: [next(fingerprint_iter, [])]) # noqa: B023 - assert car_fingerprint == car_model - assert finger[0] == fingerprint - assert finger[1] == fingerprint - assert finger[2] == {} + assert car_fingerprint == car_model + assert finger[0] == fingerprint + assert finger[1] == fingerprint + assert finger[2] == {} def test_timing(self, subtests): + from opendbc.car.car_helpers import FRAME_FINGERPRINT, can_fingerprint + from opendbc.car.fingerprints import _FINGERPRINTS as FINGERPRINTS # just pick any CAN fingerprinting car car_model = "CHEVROLET_BOLT_EUV" fingerprint = FINGERPRINTS[car_model][0] diff --git a/opendbc/car/tests/test_car_interfaces.py b/opendbc/car/tests/test_car_interfaces.py index 63bedf0bb59..124a9ea5fdc 100644 --- a/opendbc/car/tests/test_car_interfaces.py +++ b/opendbc/car/tests/test_car_interfaces.py @@ -1,132 +1,168 @@ import os import math -import hypothesis.strategies as st import pytest -from hypothesis import Phase, given, settings from collections.abc import Callable from typing import Any +from functools import lru_cache from opendbc.car import DT_CTRL, CanData, structs -from opendbc.car.car_helpers import interfaces -from opendbc.car.fingerprints import FW_VERSIONS -from opendbc.car.fw_versions import FW_QUERY_CONFIGS -from opendbc.car.interfaces import CarInterfaceBase, get_interface_attr from opendbc.car.mock.values import CAR as MOCK -from opendbc.car.values import PLATFORMS - -DrawType = Callable[[st.SearchStrategy], Any] - -ALL_ECUS = {ecu for ecus in FW_VERSIONS.values() for ecu in ecus.keys()} -ALL_ECUS |= {ecu for config in FW_QUERY_CONFIGS.values() for ecu in config.extra_ecus} - -ALL_REQUESTS = {tuple(r.request) for config in FW_QUERY_CONFIGS.values() for r in config.requests} # From panda/python/__init__.py DLC_TO_LEN = [0, 1, 2, 3, 4, 5, 6, 7, 8, 12, 16, 20, 24, 32, 48, 64] MAX_EXAMPLES = int(os.environ.get('MAX_EXAMPLES', '15')) +CAR_CHUNK_SIZE = int(os.environ.get('CAR_CHUNK_SIZE', '4')) +CAR_BATCHES = int(os.environ.get('CAR_BATCHES', '128')) # overprovisioned + + +@lru_cache(maxsize=1) +def _get_all_ecus(): + """Lazy loader for ALL_ECUS""" + from opendbc.car.fingerprints import FW_VERSIONS + from opendbc.car.fw_versions import FW_QUERY_CONFIGS -def get_fuzzy_car_interface(car_name: str, draw: DrawType) -> CarInterfaceBase: - # Fuzzy CAN fingerprints and FW versions to test more states of the CarInterface - fingerprint_strategy = st.fixed_dictionaries({0: st.dictionaries(st.integers(min_value=0, max_value=0x800), - st.sampled_from(DLC_TO_LEN))}) + all_ecus = {ecu for ecus in FW_VERSIONS.values() for ecu in ecus.keys()} + all_ecus |= {ecu for config in FW_QUERY_CONFIGS.values() for ecu in config.extra_ecus} + return all_ecus - # only pick from possible ecus to reduce search space - car_fw_strategy = st.lists(st.builds( - lambda fw, req: structs.CarParams.CarFw(ecu=fw[0], address=fw[1], subAddress=fw[2] or 0, request=req), - st.sampled_from(sorted(ALL_ECUS)), - st.sampled_from(sorted(ALL_REQUESTS)), - )) - params_strategy = st.fixed_dictionaries({ - 'fingerprints': fingerprint_strategy, - 'car_fw': car_fw_strategy, - 'alpha_long': st.booleans(), - }) +@lru_cache(maxsize=1) +def _get_all_requests(): + """Lazy loader for ALL_REQUESTS""" + from opendbc.car.fw_versions import FW_QUERY_CONFIGS + return {tuple(r.request) for config in FW_QUERY_CONFIGS.values() for r in config.requests} - params: dict = draw(params_strategy) - # reduce search space by duplicating CAN fingerprints across all buses - params['fingerprints'] |= {key + 1: params['fingerprints'][0] for key in range(6)} - # initialize car interface - CarInterface = interfaces[car_name] - car_params = CarInterface.get_params(car_name, params['fingerprints'], params['car_fw'], - alpha_long=params['alpha_long'], is_release=False, docs=False) - return CarInterface(car_params) +@pytest.fixture(scope="session") +def _platform_list(): + from opendbc.car.values import PLATFORMS + return sorted(PLATFORMS) class TestCarInterfaces: - # FIXME: Due to the lists used in carParams, Phase.target is very slow and will cause - # many generated examples to overrun when max_examples > ~20, don't use it - @pytest.mark.parametrize("car_name", sorted(PLATFORMS)) - @settings(max_examples=MAX_EXAMPLES, deadline=None, - phases=(Phase.reuse, Phase.generate, Phase.shrink)) - @given(data=st.data()) - def test_car_interfaces(self, car_name, data): - car_interface = get_fuzzy_car_interface(car_name, data.draw) - car_params = car_interface.CP.as_reader() - - assert car_params.mass > 1 - assert car_params.wheelbase > 0 - # centerToFront is center of gravity to front wheels, assert a reasonable range - assert car_params.wheelbase * 0.3 < car_params.centerToFront < car_params.wheelbase * 0.7 - assert car_params.maxLateralAccel > 0 - - # Longitudinal sanity checks - assert len(car_params.longitudinalTuning.kpV) == len(car_params.longitudinalTuning.kpBP) - assert len(car_params.longitudinalTuning.kiV) == len(car_params.longitudinalTuning.kiBP) - - # Lateral sanity checks - if car_params.steerControlType != structs.CarParams.SteerControlType.angle: - tune = car_params.lateralTuning - if tune.which() == 'pid': - if car_name != MOCK.MOCK: - assert not math.isnan(tune.pid.kf) and tune.pid.kf > 0 - assert len(tune.pid.kpV) > 0 and len(tune.pid.kpV) == len(tune.pid.kpBP) - assert len(tune.pid.kiV) > 0 and len(tune.pid.kiV) == len(tune.pid.kiBP) - - elif tune.which() == 'torque': - assert not math.isnan(tune.torque.kf) and tune.torque.kf > 0 - assert not math.isnan(tune.torque.friction) and tune.torque.friction > 0 - - # Run car interface - # TODO: use hypothesis to generate random messages - now_nanos = 0 - CC = structs.CarControl().as_reader() - for _ in range(10): - car_interface.update([]) - car_interface.apply(CC, now_nanos) - now_nanos += DT_CTRL * 1e9 # 10 ms - - CC = structs.CarControl() - CC.enabled = True - CC.latActive = True - CC.longActive = True - CC = CC.as_reader() - for _ in range(10): - car_interface.update([]) - car_interface.apply(CC, now_nanos) - now_nanos += DT_CTRL * 1e9 # 10ms - - # Test radar interface - radar_interface = car_interface.RadarInterface(car_params) - assert radar_interface - - # Run radar interface once - radar_interface.update([]) - if not car_params.radarUnavailable and radar_interface.rcp is not None and \ - hasattr(radar_interface, '_update') and hasattr(radar_interface, 'trigger_msg'): - radar_interface._update([radar_interface.trigger_msg]) - - # Test radar fault - if not car_params.radarUnavailable and radar_interface.rcp is not None: - cans = [(0, [CanData(0, b'', 0) for _ in range(5)])] - rr = radar_interface.update(cans) - assert rr is None or len(rr.errors) > 0 + _PARAM_RANGE = range(CAR_BATCHES) + + @pytest.mark.parametrize("car_batch_idx", _PARAM_RANGE) + def test_car_interfaces(self, car_batch_idx, _platform_list, subtests): + import hypothesis.strategies as st + from hypothesis import Phase, given, settings + from opendbc.car.interfaces import CarInterfaceBase + + DrawType = Callable[[st.SearchStrategy], Any] + + def get_fuzzy_car_interface(car_name: str, draw: DrawType) -> CarInterfaceBase: + from opendbc.car.car_helpers import interfaces + # Fuzzy CAN fingerprints and FW versions to test more states of the CarInterface + fingerprint_strategy = st.fixed_dictionaries({0: st.dictionaries(st.integers(min_value=0, max_value=0x800), + st.sampled_from(DLC_TO_LEN))}) + all_ecus = _get_all_ecus() + all_requests = _get_all_requests() + + # only pick from possible ecus to reduce search space + car_fw_strategy = st.lists(st.builds( + lambda fw, req: structs.CarParams.CarFw(ecu=fw[0], address=fw[1], subAddress=fw[2] or 0, request=req), + st.sampled_from(sorted(all_ecus)), + st.sampled_from(sorted(all_requests)), + )) + + params_strategy = st.fixed_dictionaries({ + 'fingerprints': fingerprint_strategy, + 'car_fw': car_fw_strategy, + 'alpha_long': st.booleans(), + }) + + params: dict = draw(params_strategy) + # reduce search space by duplicating CAN fingerprints across all buses + params['fingerprints'] |= {key + 1: params['fingerprints'][0] for key in range(6)} + + # initialize car interface + CarInterface = interfaces[car_name] + car_params = CarInterface.get_params(car_name, params['fingerprints'], + params['car_fw'],alpha_long=params['alpha_long'], is_release=False, docs=False) + return CarInterface(car_params) + + # FIXME: Due to the lists used in carParams, Phase.target is very slow and will cause + # many generated examples to overrun when max_examples > ~20, don't use it + @settings(max_examples=MAX_EXAMPLES, deadline=None, + phases=(Phase.reuse, Phase.generate, Phase.shrink)) + @given(data=st.data()) + def run_car_interface_test(data, car_name: str): + car_interface = get_fuzzy_car_interface(car_name, data.draw) + car_params = car_interface.CP.as_reader() + + assert car_params.mass > 1 + assert car_params.wheelbase > 0 + # centerToFront is center of gravity to front wheels, assert a reasonable range + assert car_params.wheelbase * 0.3 < car_params.centerToFront < car_params.wheelbase * 0.7 + assert car_params.maxLateralAccel > 0 + + # Longitudinal sanity checks + assert len(car_params.longitudinalTuning.kpV) == len(car_params.longitudinalTuning.kpBP) + assert len(car_params.longitudinalTuning.kiV) == len(car_params.longitudinalTuning.kiBP) + + # Lateral sanity checks + if car_params.steerControlType != structs.CarParams.SteerControlType.angle: + tune = car_params.lateralTuning + if tune.which() == 'pid': + if car_name != MOCK.MOCK: + assert not math.isnan(tune.pid.kf) and tune.pid.kf > 0 + assert len(tune.pid.kpV) > 0 and len(tune.pid.kpV) == len(tune.pid.kpBP) + assert len(tune.pid.kiV) > 0 and len(tune.pid.kiV) == len(tune.pid.kiBP) + + elif tune.which() == 'torque': + assert not math.isnan(tune.torque.kf) and tune.torque.kf > 0 + assert not math.isnan(tune.torque.friction) and tune.torque.friction > 0 + + # Run car interface + # TODO: use hypothesis to generate random messages + now_nanos = 0 + CC = structs.CarControl().as_reader() + for _ in range(10): + car_interface.update([]) + car_interface.apply(CC, now_nanos) + now_nanos += DT_CTRL * 1e9 # 10 ms + + CC = structs.CarControl() + CC.enabled = True + CC.latActive = True + CC.longActive = True + CC = CC.as_reader() + for _ in range(10): + car_interface.update([]) + car_interface.apply(CC, now_nanos) + now_nanos += DT_CTRL * 1e9 # 10ms + + # Test radar interface + radar_interface = car_interface.RadarInterface(car_params) + assert radar_interface + + # Run radar interface once + radar_interface.update([]) + if not car_params.radarUnavailable and radar_interface.rcp is not None and \ + hasattr(radar_interface, '_update') and hasattr(radar_interface, 'trigger_msg'): + radar_interface._update([radar_interface.trigger_msg]) + + # Test radar fault + if not car_params.radarUnavailable and radar_interface.rcp is not None: + cans = [(0, [CanData(0, b'', 0) for _ in range(5)])] + rr = radar_interface.update(cans) + assert rr is None or len(rr.errors) > 0 + + start = car_batch_idx * CAR_CHUNK_SIZE + end = start + CAR_CHUNK_SIZE + car_batch = _platform_list[start:end] + if not car_batch: + pytest.skip(f"No cars in batch {car_batch_idx}") + + for car_name in car_batch: + with subtests.test(car_name=car_name): + run_car_interface_test(car_name=car_name) def test_interface_attrs(self): """Asserts basic behavior of interface attribute getter""" + from opendbc.car.interfaces import get_interface_attr num_brands = len(get_interface_attr('CAR')) assert num_brands >= 12 diff --git a/opendbc/car/tests/test_docs.py b/opendbc/car/tests/test_docs.py index 8a4ae09b5ad..6b19bee4053 100644 --- a/opendbc/car/tests/test_docs.py +++ b/opendbc/car/tests/test_docs.py @@ -1,16 +1,13 @@ from collections import defaultdict import pytest -from opendbc.car.car_helpers import interfaces -from opendbc.car.docs import get_all_car_docs from opendbc.car.docs_definitions import Cable, Column, PartType, Star, SupportType -from opendbc.car.honda.values import CAR as HONDA -from opendbc.car.values import PLATFORMS class TestCarDocs: @classmethod def setup_class(cls): + from opendbc.car.docs import get_all_car_docs cls.all_cars = get_all_car_docs() def test_duplicate_years(self, subtests): @@ -26,6 +23,8 @@ def test_duplicate_years(self, subtests): make_model_years[make_model].append(year) def test_missing_car_docs(self, subtests): + from opendbc.car.values import PLATFORMS + from opendbc.car.car_helpers import interfaces all_car_docs_platforms = [name for name, config in PLATFORMS.items()] for platform in sorted(interfaces.keys()): with subtests.test(platform=platform): @@ -48,6 +47,7 @@ def test_naming_conventions(self, subtests): assert "RAV4" in car.model, "Use correct capitalization" def test_torque_star(self, subtests): + from opendbc.car.honda.values import CAR as HONDA # Asserts brand-specific assumptions around steering torque star for car in self.all_cars: with subtests.test(car=car.name): diff --git a/opendbc/car/tests/test_fw_fingerprint.py b/opendbc/car/tests/test_fw_fingerprint.py index 08e8cf85440..13a19ef1ee9 100644 --- a/opendbc/car/tests/test_fw_fingerprint.py +++ b/opendbc/car/tests/test_fw_fingerprint.py @@ -4,12 +4,7 @@ from collections import defaultdict from opendbc.car.can_definitions import CanData -from opendbc.car.car_helpers import interfaces from opendbc.car.structs import CarParams -from opendbc.car.fingerprints import FW_VERSIONS -from opendbc.car.fw_versions import FW_QUERY_CONFIGS, FUZZY_EXCLUDE_ECUS, VERSIONS, build_fw_dict, \ - match_fw_to_car, get_brand_ecu_matches, get_fw_versions, get_present_ecus -from opendbc.car.vin import get_vin CarFw = CarParams.CarFw Ecu = CarParams.Ecu @@ -23,79 +18,90 @@ def assertFingerprints(self, candidates, expected): assert len(candidates) == 1, f"got more than one candidate: {candidates}" assert candidates[0] == expected - @pytest.mark.parametrize("brand, car_model, ecus, test_non_essential", - [(b, c, e[c], n) for b, e in VERSIONS.items() for c in e for n in (True, False)]) - def test_exact_match(self, brand, car_model, ecus, test_non_essential): - config = FW_QUERY_CONFIGS[brand] - CP = CarParams() - for _ in range(20): - fw = [] - for ecu, fw_versions in ecus.items(): - # Assume non-essential ECUs apply to all cars, so we catch cases where Car A with - # missing ECUs won't match to Car B where only Car B has labeled non-essential ECUs - if ecu[0] in config.non_essential_ecus and test_non_essential: - continue - - ecu_name, addr, sub_addr = ecu - fw.append(CarFw(ecu=ecu_name, fwVersion=random.choice(fw_versions), brand=brand, - address=addr, subAddress=0 if sub_addr is None else sub_addr)) - CP.carFw = fw - _, matches = match_fw_to_car(CP.carFw, CP.carVin, allow_fuzzy=False) - if not test_non_essential: - self.assertFingerprints(matches, car_model) - else: - # if we're removing ECUs we expect some match loss, but it shouldn't mismatch - if len(matches) != 0: - self.assertFingerprints(matches, car_model) - - @pytest.mark.parametrize("brand, car_model, ecus", [(b, c, e[c]) for b, e in VERSIONS.items() for c in e]) - def test_custom_fuzzy_match(self, brand, car_model, ecus): - # Assert brand-specific fuzzy fingerprinting function doesn't disagree with standard fuzzy function - config = FW_QUERY_CONFIGS[brand] - if config.match_fw_to_car_fuzzy is None: - pytest.skip("Brand does not implement custom fuzzy fingerprinting function") - - CP = CarParams() - for _ in range(5): - fw = [] - for ecu, fw_versions in ecus.items(): - ecu_name, addr, sub_addr = ecu - fw.append(CarFw(ecu=ecu_name, fwVersion=random.choice(fw_versions), brand=brand, - address=addr, subAddress=0 if sub_addr is None else sub_addr)) - CP.carFw = fw - _, matches = match_fw_to_car(CP.carFw, CP.carVin, allow_exact=False, log=False) - brand_matches = config.match_fw_to_car_fuzzy(build_fw_dict(CP.carFw), CP.carVin, VERSIONS[brand]) - - # If both have matches, they must agree - if len(matches) == 1 and len(brand_matches) == 1: - assert matches == brand_matches - - @pytest.mark.parametrize("brand, car_model, ecus", [(b, c, e[c]) for b, e in VERSIONS.items() for c in e]) - def test_fuzzy_match_ecu_count(self, brand, car_model, ecus): - # Asserts that fuzzy matching does not count matching FW, but ECU address keys - valid_ecus = [e for e in ecus if e[0] not in FUZZY_EXCLUDE_ECUS] - if not len(valid_ecus): - pytest.skip("Car model has no compatible ECUs for fuzzy matching") - - fw = [] - for ecu in valid_ecus: - ecu_name, addr, sub_addr = ecu - for _ in range(5): - # Add multiple FW versions to simulate ECU returning to multiple queries in a brand - fw.append(CarFw(ecu=ecu_name, fwVersion=random.choice(ecus[ecu]), brand=brand, - address=addr, subAddress=0 if sub_addr is None else sub_addr)) - CP = CarParams(carFw=fw) - _, matches = match_fw_to_car(CP.carFw, CP.carVin, allow_exact=False, log=False) - - # Assert no match if there are not enough unique ECUs - unique_ecus = {(f.address, f.subAddress) for f in fw} - if len(unique_ecus) < 2: - assert len(matches) == 0, car_model - # There won't always be a match due to shared FW, but if there is it should be correct - elif len(matches): - self.assertFingerprints(matches, car_model) + def test_exact_match(self): + from opendbc.car.fw_versions import FW_QUERY_CONFIGS, VERSIONS, match_fw_to_car + + for brand, ecus_dict in VERSIONS.items(): + for car_model, ecus in ecus_dict.items(): + for test_non_essential in (True, False): + config = FW_QUERY_CONFIGS[brand] + CP = CarParams() + for _ in range(20): + fw = [] + for ecu, fw_versions in ecus.items(): + # Assume non-essential ECUs apply to all cars, so we catch cases where Car A with + # missing ECUs won't match to Car B where only Car B has labeled non-essential ECUs + if ecu[0] in config.non_essential_ecus and test_non_essential: + continue + + ecu_name, addr, sub_addr = ecu + fw.append(CarFw(ecu=ecu_name, fwVersion=random.choice(fw_versions), brand=brand, + address=addr, subAddress=0 if sub_addr is None else sub_addr)) + CP.carFw = fw + _, matches = match_fw_to_car(CP.carFw, CP.carVin, allow_fuzzy=False) + if not test_non_essential: + self.assertFingerprints(matches, car_model) + else: + # if we're removing ECUs we expect some match loss, but it shouldn't mismatch + if len(matches) != 0: + self.assertFingerprints(matches, car_model) + + def test_custom_fuzzy_match(self): + from opendbc.car.fw_versions import FW_QUERY_CONFIGS, VERSIONS, match_fw_to_car, build_fw_dict + + for brand, ecus_dict in VERSIONS.items(): + for _car_model, ecus in ecus_dict.items(): + # Assert brand-specific fuzzy fingerprinting function doesn't disagree with standard fuzzy function + config = FW_QUERY_CONFIGS[brand] + if config.match_fw_to_car_fuzzy is None: + continue # Skip if brand does not implement custom fuzzy fingerprinting function + + CP = CarParams() + for _ in range(5): + fw = [] + for ecu, fw_versions in ecus.items(): + ecu_name, addr, sub_addr = ecu + fw.append(CarFw(ecu=ecu_name, fwVersion=random.choice(fw_versions), brand=brand, + address=addr, subAddress=0 if sub_addr is None else sub_addr)) + CP.carFw = fw + _, matches = match_fw_to_car(CP.carFw, CP.carVin, allow_exact=False, log=False) + brand_matches = config.match_fw_to_car_fuzzy(build_fw_dict(CP.carFw), CP.carVin, VERSIONS[brand]) + + # If both have matches, they must agree + if len(matches) == 1 and len(brand_matches) == 1: + assert matches == brand_matches + + def test_fuzzy_match_ecu_count(self): + from opendbc.car.fw_versions import FUZZY_EXCLUDE_ECUS, VERSIONS, match_fw_to_car + + for brand, ecus_dict in VERSIONS.items(): + for car_model, ecus in ecus_dict.items(): + # Asserts that fuzzy matching does not count matching FW, but ECU address keys + valid_ecus = [e for e in ecus if e[0] not in FUZZY_EXCLUDE_ECUS] + if not len(valid_ecus): + continue # Skip if car model has no compatible ECUs for fuzzy matching + + fw = [] + for ecu in valid_ecus: + ecu_name, addr, sub_addr = ecu + for _ in range(5): + # Add multiple FW versions to simulate ECU returning to multiple queries in a brand + fw.append(CarFw(ecu=ecu_name, fwVersion=random.choice(ecus[ecu]), brand=brand, + address=addr, subAddress=0 if sub_addr is None else sub_addr)) + CP = CarParams(carFw=fw) + _, matches = match_fw_to_car(CP.carFw, CP.carVin, allow_exact=False, log=False) + + # Assert no match if there are not enough unique ECUs + unique_ecus = {(f.address, f.subAddress) for f in fw} + if len(unique_ecus) < 2: + assert len(matches) == 0, car_model + # There won't always be a match due to shared FW, but if there is it should be correct + elif len(matches): + self.assertFingerprints(matches, car_model) def test_fw_version_lists(self, subtests): + from opendbc.car.fingerprints import FW_VERSIONS + for car_model, ecus in FW_VERSIONS.items(): with subtests.test(car_model=car_model.value): for ecu, ecu_fw in ecus.items(): @@ -105,6 +111,8 @@ def test_fw_version_lists(self, subtests): assert len(ecu_fw) > 0, f'{car_model}: No FW versions: Ecu.{ecu[0]}' def test_all_addrs_map_to_one_ecu(self): + from opendbc.car.fw_versions import VERSIONS + for brand, cars in VERSIONS.items(): addr_to_ecu = defaultdict(set) for ecus in cars.values(): @@ -115,6 +123,8 @@ def test_all_addrs_map_to_one_ecu(self): assert len(ecus_for_addr) <= 1, f"{brand} has multiple ECUs that map to one address: {ecu_strings} -> ({hex(addr)}, {sub_addr})" def test_data_collection_ecus(self, subtests): + from opendbc.car.fw_versions import FW_QUERY_CONFIGS, VERSIONS + # Asserts no extra ECUs are in the fingerprinting database for brand, config in FW_QUERY_CONFIGS.items(): for car_model, ecus in VERSIONS[brand].items(): @@ -123,6 +133,9 @@ def test_data_collection_ecus(self, subtests): assert not len(bad_ecus), f'{car_model}: Fingerprints contain ECUs added for data collection: {bad_ecus}' def test_blacklisted_ecus(self, subtests): + from opendbc.car.fingerprints import FW_VERSIONS + from opendbc.car.car_helpers import interfaces + blacklisted_addrs = (0x7c4, 0x7d0) # includes A/C ecu and an unknown ecu for car_model, ecus in FW_VERSIONS.items(): with subtests.test(car_model=car_model.value): @@ -138,6 +151,8 @@ def test_blacklisted_ecus(self, subtests): assert ecu[0] != Ecu.transmission, f"{car_model}: Blacklisted ecu: (Ecu.{ecu[0]}, {hex(ecu[1])})" def test_missing_versions_and_configs(self, subtests): + from opendbc.car.fw_versions import FW_QUERY_CONFIGS, VERSIONS + brand_versions = set(VERSIONS.keys()) brand_configs = set(FW_QUERY_CONFIGS.keys()) if len(brand_configs - brand_versions): @@ -156,6 +171,8 @@ def test_missing_versions_and_configs(self, subtests): assert len(config.get_all_ecus(VERSIONS[brand])) > 0 def test_fw_request_ecu_whitelist(self, subtests): + from opendbc.car.fw_versions import FW_QUERY_CONFIGS, VERSIONS + for brand, config in FW_QUERY_CONFIGS.items(): with subtests.test(brand=brand): whitelisted_ecus = {ecu for r in config.requests for ecu in r.whitelist_ecus} @@ -170,6 +187,8 @@ def test_fw_request_ecu_whitelist(self, subtests): f'{brand.title()}: ECUs not in any FW query whitelists: {ecu_strings}' def test_request_ecus_in_versions(self): + from opendbc.car.fw_versions import FW_QUERY_CONFIGS, VERSIONS + # All ECUs in requests should be in the brand's FW versions for brand, config in FW_QUERY_CONFIGS.items(): request_ecus = {ecu for r in config.requests for ecu in r.whitelist_ecus} - {ecu[0] for ecu in config.extra_ecus} @@ -179,6 +198,8 @@ def test_request_ecus_in_versions(self): assert request_ecu in {e for e, _, _ in version_ecus}, f"Ecu.{ECU_NAME[request_ecu]} not in {brand} FW versions" def test_brand_ecu_matches(self): + from opendbc.car.fw_versions import get_brand_ecu_matches + brand_matches = get_brand_ecu_matches(set()) assert len(brand_matches) > 0 assert all(len(e) and not any(e) for e in brand_matches.values()) @@ -217,6 +238,8 @@ def fake_get_data(self, timeout): return {} def _benchmark_brand(self, brand, num_pandas, mocker): + from opendbc.car.fw_versions import get_fw_versions + self.total_time = 0 mocker.patch("opendbc.car.isotp_parallel_query.IsoTpParallelQuery.get_data", self.fake_get_data) for _ in range(self.N): @@ -234,6 +257,9 @@ def _assert_timing(self, avg_time, ref_time): assert avg_time > ref_time - self.TOL, "Performance seems to have improved, update test refs." def test_startup_timing(self, subtests, mocker): + from opendbc.car.fw_versions import get_present_ecus + from opendbc.car.vin import get_vin + # Tests worse-case VIN query time and typical present ECU query time vin_ref_times = {'worst': 1.6, 'best': 0.8} # best assumes we go through all queries to get a match present_ecu_ref_time = 0.45 @@ -260,6 +286,8 @@ def fake_get_ecu_addrs(*_, timeout): print(f'get_vin {name} case, query time={self.total_time / self.N} seconds') def test_fw_query_timing(self, subtests, mocker): + from opendbc.car.fw_versions import FW_QUERY_CONFIGS + total_ref_time = {1: 7.4, 2: 8.0} brand_ref_times = { 1: { @@ -307,6 +335,8 @@ def test_fw_query_timing(self, subtests, mocker): print(f'all brands, total FW query time={total_time} seconds') def test_get_fw_versions(self, subtests, mocker): + from opendbc.car.fw_versions import FW_QUERY_CONFIGS, get_fw_versions + # some coverage on IsoTpParallelQuery and panda UDS library # TODO: replace this with full fingerprint simulation testing # https://github.com/commaai/panda/pull/1329 diff --git a/opendbc/car/tests/test_lateral_limits.py b/opendbc/car/tests/test_lateral_limits.py index 070da5d7420..16204980b13 100755 --- a/opendbc/car/tests/test_lateral_limits.py +++ b/opendbc/car/tests/test_lateral_limits.py @@ -1,15 +1,10 @@ #!/usr/bin/env python3 from collections import defaultdict import importlib -from parameterized import parameterized_class import pytest import sys from opendbc.car import DT_CTRL -from opendbc.car.car_helpers import interfaces -from opendbc.car.interfaces import get_torque_params -from opendbc.car.lateral import ISO_LATERAL_ACCEL -from opendbc.car.values import PLATFORMS # ISO 11270 - allowed up jerk is strictly lower than recommended limits MAX_LAT_JERK_UP = 2.5 # m/s^3 @@ -20,76 +15,93 @@ JERK_MEAS_T = 0.5 -@parameterized_class('car_model', [(c,) for c in sorted(PLATFORMS)]) -class TestLateralLimits: - car_model: str +def get_car_models(): + """Get PLATFORMS with local import""" + from opendbc.car.values import PLATFORMS + return sorted(PLATFORMS) + + +def get_car_setup(car_model): + """Setup car parameters for a given model""" + from opendbc.car.car_helpers import interfaces + from opendbc.car.interfaces import get_torque_params + + CarInterface = interfaces[car_model] + CP = CarInterface.get_non_essential_params(car_model) + + if car_model == 'MOCK': + pytest.skip('Mock car') + + # TODO: test all platforms + if CP.steerControlType != 'torque': + pytest.skip() - @classmethod - def setup_class(cls): - CarInterface = interfaces[cls.car_model] - CP = CarInterface.get_non_essential_params(cls.car_model) + if CP.notCar: + pytest.skip() - if cls.car_model == 'MOCK': - pytest.skip('Mock car') + CarControllerParams = importlib.import_module(f'opendbc.car.{CP.brand}.values').CarControllerParams + control_params = CarControllerParams(CP) + torque_params = get_torque_params()[car_model] - # TODO: test all platforms - if CP.steerControlType != 'torque': - pytest.skip() + return control_params, torque_params - if CP.notCar: - pytest.skip() - CarControllerParams = importlib.import_module(f'opendbc.car.{CP.brand}.values').CarControllerParams - cls.control_params = CarControllerParams(CP) - cls.torque_params = get_torque_params()[cls.car_model] +def calculate_0_5s_jerk(control_params, torque_params): + steer_step = control_params.STEER_STEP + max_lat_accel = torque_params['MAX_LAT_ACCEL_MEASURED'] - @staticmethod - def calculate_0_5s_jerk(control_params, torque_params): - steer_step = control_params.STEER_STEP - max_lat_accel = torque_params['MAX_LAT_ACCEL_MEASURED'] + # Steer up/down delta per 10ms frame, in percentage of max torque + steer_up_per_frame = control_params.STEER_DELTA_UP / control_params.STEER_MAX / steer_step + steer_down_per_frame = control_params.STEER_DELTA_DOWN / control_params.STEER_MAX / steer_step - # Steer up/down delta per 10ms frame, in percentage of max torque - steer_up_per_frame = control_params.STEER_DELTA_UP / control_params.STEER_MAX / steer_step - steer_down_per_frame = control_params.STEER_DELTA_DOWN / control_params.STEER_MAX / steer_step + # Lateral acceleration reached in 0.5 seconds, clipping to max torque + accel_up_0_5_sec = min(steer_up_per_frame * JERK_MEAS_T / DT_CTRL, 1.0) * max_lat_accel + accel_down_0_5_sec = min(steer_down_per_frame * JERK_MEAS_T / DT_CTRL, 1.0) * max_lat_accel - # Lateral acceleration reached in 0.5 seconds, clipping to max torque - accel_up_0_5_sec = min(steer_up_per_frame * JERK_MEAS_T / DT_CTRL, 1.0) * max_lat_accel - accel_down_0_5_sec = min(steer_down_per_frame * JERK_MEAS_T / DT_CTRL, 1.0) * max_lat_accel + # Convert to m/s^3 + return accel_up_0_5_sec / JERK_MEAS_T, accel_down_0_5_sec / JERK_MEAS_T - # Convert to m/s^3 - return accel_up_0_5_sec / JERK_MEAS_T, accel_down_0_5_sec / JERK_MEAS_T - def test_jerk_limits(self): - up_jerk, down_jerk = self.calculate_0_5s_jerk(self.control_params, self.torque_params) - assert up_jerk <= MAX_LAT_JERK_UP + MAX_LAT_JERK_UP_TOLERANCE - assert down_jerk <= MAX_LAT_JERK_DOWN +class TestLateralLimits: + def test_jerk_limits(self, subtests): + car_models = get_car_models() + + for car_model in car_models: + with subtests.test(car_model=car_model): + control_params, torque_params = get_car_setup(car_model) + up_jerk, down_jerk = calculate_0_5s_jerk(control_params, torque_params) + + # Store results for reporting + LatAccelReport.car_model_jerks[car_model] = {"up_jerk": up_jerk, "down_jerk": down_jerk} - def test_max_lateral_accel(self): - assert self.torque_params["MAX_LAT_ACCEL_MEASURED"] <= ISO_LATERAL_ACCEL + assert up_jerk <= MAX_LAT_JERK_UP + MAX_LAT_JERK_UP_TOLERANCE + assert down_jerk <= MAX_LAT_JERK_DOWN + + def test_max_lateral_accel(self, subtests): + from opendbc.car.lateral import ISO_LATERAL_ACCEL + car_models = get_car_models() + + for car_model in car_models: + with subtests.test(car_model=car_model): + _control_params, torque_params = get_car_setup(car_model) + assert torque_params["MAX_LAT_ACCEL_MEASURED"] <= ISO_LATERAL_ACCEL class LatAccelReport: car_model_jerks: defaultdict[str, dict[str, float]] = defaultdict(dict) def pytest_sessionfinish(self): + from opendbc.car.values import PLATFORMS print(f"\n\n---- Lateral limit report ({len(PLATFORMS)} cars) ----\n") max_car_model_len = max([len(car_model) for car_model in self.car_model_jerks]) - for car_model, _jerks in sorted(self.car_model_jerks.items(), key=lambda i: i[1]['up_jerk'], reverse=True): - violation = _jerks["up_jerk"] > MAX_LAT_JERK_UP + MAX_LAT_JERK_UP_TOLERANCE or \ - _jerks["down_jerk"] > MAX_LAT_JERK_DOWN + for car_model, jerks in sorted(self.car_model_jerks.items(), key=lambda i: i[1]['up_jerk'], reverse=True): + violation = jerks["up_jerk"] > MAX_LAT_JERK_UP + MAX_LAT_JERK_UP_TOLERANCE or \ + jerks["down_jerk"] > MAX_LAT_JERK_DOWN violation_str = " - VIOLATION" if violation else "" - print(f"{car_model:{max_car_model_len}} - up jerk: {round(_jerks['up_jerk'], 2):5} " + - f"m/s^3, down jerk: {round(_jerks['down_jerk'], 2):5} m/s^3{violation_str}") - - @pytest.fixture(scope="class", autouse=True) - def class_setup(self, request): - yield - cls = request.cls - if hasattr(cls, "control_params"): - up_jerk, down_jerk = TestLateralLimits.calculate_0_5s_jerk(cls.control_params, cls.torque_params) - self.car_model_jerks[cls.car_model] = {"up_jerk": up_jerk, "down_jerk": down_jerk} + print(f"{car_model:{max_car_model_len}} - up jerk: {round(jerks['up_jerk'], 2):5} " + + f"m/s^3, down jerk: {round(jerks['down_jerk'], 2):5} m/s^3{violation_str}") if __name__ == '__main__': diff --git a/opendbc/car/tests/test_platform_configs.py b/opendbc/car/tests/test_platform_configs.py index 1704621ddda..f67adffe8df 100644 --- a/opendbc/car/tests/test_platform_configs.py +++ b/opendbc/car/tests/test_platform_configs.py @@ -1,8 +1,6 @@ -from opendbc.car.values import PLATFORMS - - class TestPlatformConfigs: def test_configs(self, subtests): + from opendbc.car.values import PLATFORMS for name, platform in PLATFORMS.items(): with subtests.test(platform=str(platform)): diff --git a/opendbc/car/tests/test_routes.py b/opendbc/car/tests/test_routes.py index 92176909db5..1948343a659 100644 --- a/opendbc/car/tests/test_routes.py +++ b/opendbc/car/tests/test_routes.py @@ -1,11 +1,10 @@ -import pytest +def test_test_route_present(subtests): + from opendbc.car.values import PLATFORMS + from opendbc.car.tests.routes import non_tested_cars, routes -from opendbc.car.values import PLATFORMS -from opendbc.car.tests.routes import non_tested_cars, routes - - -@pytest.mark.parametrize("platform", PLATFORMS.keys()) -def test_test_route_present(platform): tested_platforms = [r.car_model for r in routes] - assert platform in set(tested_platforms) | set(non_tested_cars), \ - f"Missing test route for {platform}. Add a route to opendbc/car/tests/routes.py" + tested_platforms_set = set(tested_platforms) | set(non_tested_cars) + + for platform in PLATFORMS.keys(): + with subtests.test(msg=f"test for {platform}"): + assert platform in tested_platforms_set, f"Missing test route for {platform}. Add a route to opendbc/car/tests/routes.py" diff --git a/opendbc/car/tests/test_vehicle_model.py b/opendbc/car/tests/test_vehicle_model.py index b88a8dab4e0..209019026f7 100644 --- a/opendbc/car/tests/test_vehicle_model.py +++ b/opendbc/car/tests/test_vehicle_model.py @@ -3,13 +3,12 @@ import numpy as np -from opendbc.car.honda.interface import CarInterface -from opendbc.car.honda.values import CAR -from opendbc.car.vehicle_model import VehicleModel, dyn_ss_sol, create_dyn_state_matrices - class TestVehicleModel: def setup_method(self): + from opendbc.car.honda.values import CAR + from opendbc.car.honda.interface import CarInterface + from opendbc.car.vehicle_model import VehicleModel CP = CarInterface.get_non_essential_params(CAR.HONDA_CIVIC) self.VM = VehicleModel(CP) @@ -27,6 +26,7 @@ def test_dyn_ss_sol_against_yaw_rate(self): """Verify that the yaw_rate helper function matches the results from the state space model.""" + from opendbc.car.vehicle_model import dyn_ss_sol for roll in np.linspace(math.radians(-20), math.radians(20), num=11): for u in np.linspace(1, 30, num=10): for sa in np.linspace(math.radians(-20), math.radians(20), num=11): @@ -41,6 +41,7 @@ def test_dyn_ss_sol_against_yaw_rate(self): def test_syn_ss_sol_simulate(self): """Verifies that dyn_ss_sol matches a simulation""" + from opendbc.car.vehicle_model import dyn_ss_sol, create_dyn_state_matrices for roll in np.linspace(math.radians(-20), math.radians(20), num=11): for u in np.linspace(1, 30, num=10): A, B = create_dyn_state_matrices(u, self.VM) diff --git a/opendbc/car/toyota/tests/test_toyota.py b/opendbc/car/toyota/tests/test_toyota.py index 5ab47f35651..657620dca8d 100644 --- a/opendbc/car/toyota/tests/test_toyota.py +++ b/opendbc/car/toyota/tests/test_toyota.py @@ -1,12 +1,5 @@ -from hypothesis import given, settings, strategies as st - from opendbc.car import Bus from opendbc.car.structs import CarParams -from opendbc.car.fw_versions import build_fw_dict -from opendbc.car.toyota.fingerprints import FW_VERSIONS -from opendbc.car.toyota.values import CAR, DBC, TSS2_CAR, ANGLE_CONTROL_CAR, RADAR_ACC_CAR, SECOC_CAR, \ - FW_QUERY_CONFIG, PLATFORM_CODE_ECUS, FUZZY_EXCLUDED_PLATFORMS, \ - get_platform_codes Ecu = CarParams.Ecu @@ -18,22 +11,28 @@ def check_fw_version(fw_version: bytes) -> bool: class TestToyotaInterfaces: def test_car_sets(self): + from opendbc.car.toyota.values import TSS2_CAR, ANGLE_CONTROL_CAR, RADAR_ACC_CAR assert len(ANGLE_CONTROL_CAR - TSS2_CAR) == 0 assert len(RADAR_ACC_CAR - TSS2_CAR) == 0 def test_lta_platforms(self): # At this time, only RAV4 2023 is expected to use LTA/angle control + from opendbc.car.toyota.values import CAR, ANGLE_CONTROL_CAR assert ANGLE_CONTROL_CAR == {CAR.TOYOTA_RAV4_TSS2_2023} def test_tss2_dbc(self): # We make some assumptions about TSS2 platforms, # like looking up certain signals only in this DBC + from opendbc.car.toyota.values import DBC, TSS2_CAR, SECOC_CAR for car_model, dbc in DBC.items(): if car_model in TSS2_CAR and car_model not in SECOC_CAR: assert dbc[Bus.pt] == "toyota_nodsu_pt_generated" def test_essential_ecus(self, subtests): # Asserts standard ECUs exist for each platform + from opendbc.car.toyota.values import CAR + from opendbc.car.toyota.fingerprints import FW_VERSIONS + common_ecus = {Ecu.fwdRadar, Ecu.fwdCamera} for car_model, ecus in FW_VERSIONS.items(): with subtests.test(car_model=car_model.value): @@ -55,6 +54,9 @@ def test_essential_ecus(self, subtests): class TestToyotaFingerprint: def test_non_essential_ecus(self, subtests): # Ensures only the cars that have multiple engine ECUs are in the engine non-essential ECU list + from opendbc.car.toyota.fingerprints import FW_VERSIONS + from opendbc.car.toyota.values import FW_QUERY_CONFIG + for car_model, ecus in FW_VERSIONS.items(): with subtests.test(car_model=car_model.value): engine_ecus = {ecu for ecu in ecus if ecu[0] == Ecu.engine} @@ -63,6 +65,8 @@ def test_non_essential_ecus(self, subtests): def test_valid_fw_versions(self, subtests): # Asserts all FW versions are valid + from opendbc.car.toyota.fingerprints import FW_VERSIONS + for car_model, ecus in FW_VERSIONS.items(): with subtests.test(car_model=car_model.value): for fws in ecus.values(): @@ -71,15 +75,24 @@ def test_valid_fw_versions(self, subtests): # Tests for part numbers, platform codes, and sub-versions which Toyota will use to fuzzy # fingerprint in the absence of full FW matches: - @settings(max_examples=100) - @given(data=st.data()) - def test_platform_codes_fuzzy_fw(self, data): - fw_strategy = st.lists(st.binary()) - fws = data.draw(fw_strategy) - get_platform_codes(fws) + def test_platform_codes_fuzzy_fw(self): + from hypothesis import given, settings, strategies as st + from opendbc.car.toyota.values import get_platform_codes + + @settings(max_examples=100) + @given(data=st.data()) + def _test_impl(data): + fw_strategy = st.lists(st.binary()) + fws = data.draw(fw_strategy) + get_platform_codes(fws) + + _test_impl() def test_platform_code_ecus_available(self, subtests): # Asserts ECU keys essential for fuzzy fingerprinting are available on all platforms + from opendbc.car.toyota.values import CAR, PLATFORM_CODE_ECUS + from opendbc.car.toyota.fingerprints import FW_VERSIONS + for car_model, ecus in FW_VERSIONS.items(): with subtests.test(car_model=car_model.value): for platform_code_ecu in PLATFORM_CODE_ECUS: @@ -94,6 +107,8 @@ def test_fw_format(self, subtests): # - every supported ECU FW version returns one platform code # - every supported ECU FW version has a part number # - expected parsing of ECU sub-versions + from opendbc.car.toyota.values import PLATFORM_CODE_ECUS, get_platform_codes + from opendbc.car.toyota.fingerprints import FW_VERSIONS for car_model, ecus in FW_VERSIONS.items(): with subtests.test(car_model=car_model.value): @@ -116,6 +131,8 @@ def test_fw_format(self, subtests): def test_platform_codes_spot_check(self): # Asserts basic platform code parsing behavior for a few cases + from opendbc.car.toyota.values import get_platform_codes + results = get_platform_codes([ b"F152607140\x00\x00\x00\x00\x00\x00", b"F152607171\x00\x00\x00\x00\x00\x00", @@ -147,6 +164,10 @@ def test_platform_codes_spot_check(self): def test_fuzzy_excluded_platforms(self): # Asserts a list of platforms that will not fuzzy fingerprint with platform codes due to them being shared. + from opendbc.car.fw_versions import build_fw_dict + from opendbc.car.toyota.fingerprints import FW_VERSIONS + from opendbc.car.toyota.values import FW_QUERY_CONFIG, FUZZY_EXCLUDED_PLATFORMS + platforms_with_shared_codes = set() for platform, fw_by_addr in FW_VERSIONS.items(): car_fw = [] diff --git a/opendbc/safety/tests/libsafety/libsafety_py.py b/opendbc/safety/tests/libsafety/libsafety_py.py index bae99fc0a41..1aa090ab0ba 100644 --- a/opendbc/safety/tests/libsafety/libsafety_py.py +++ b/opendbc/safety/tests/libsafety/libsafety_py.py @@ -1,6 +1,5 @@ import os -from cffi import FFI -from typing import Protocol +from typing import Protocol, Any from opendbc.safety import LEN_TO_DLC from opendbc.safety.tests.libsafety.safety_helpers import PandaSafety, setup_safety_helpers @@ -8,9 +7,8 @@ libsafety_dir = os.path.dirname(os.path.abspath(__file__)) libsafety_fn = os.path.join(libsafety_dir, "libsafety.so") -ffi = FFI() - -ffi.cdef(""" +_CDEFS: list[tuple[str, dict]] = [ + (""" typedef struct { unsigned char fd : 1; unsigned char bus : 3; @@ -22,20 +20,64 @@ unsigned char checksum; unsigned char data[64]; } CANPacket_t; -""", packed=True) - -ffi.cdef(""" +""", {"packed": True}), + (""" bool safety_rx_hook(CANPacket_t *msg); bool safety_tx_hook(CANPacket_t *msg); int safety_fwd_hook(int bus_num, int addr); int set_safety_hooks(uint16_t mode, uint16_t param); -""") - -ffi.cdef(""" +""", {}), + (""" void can_set_checksum(CANPacket_t *packet); -""") +""", {}), +] + + +# lazy ffi proxy +class _LazyFFI: + def __init__(self): + self._real = None + + def _ensure(self): + if self._real is None: + from cffi import FFI + real = FFI() + for src, kwargs in _CDEFS: + if kwargs: + real.cdef(src, **kwargs) + else: + real.cdef(src) + setup_safety_helpers(real) + self._real = real + return self._real + + def __getattr__(self, name: str) -> Any: + return getattr(self._ensure(), name) + + def __repr__(self) -> str: + return "" if self._real else "" -setup_safety_helpers(ffi) + +ffi = _LazyFFI() + + +# lazy libsafety proxy +class _LazyLib: + def __init__(self, path: str): + self._path = path + self._real = None + + def _ensure(self): + if self._real is None: + real_ffi = ffi._ensure() + self._real = real_ffi.dlopen(self._path) + return self._real + + def __getattr__(self, name: str) -> Any: + return getattr(self._ensure(), name) + + def __repr__(self) -> str: + return f"" if self._real else f"" class CANPacket: @@ -60,11 +102,11 @@ def safety_fwd_hook(self, bus_num: int, addr: int) -> int: ... def set_safety_hooks(self, mode: int, param: int) -> int: ... -libsafety: Panda = ffi.dlopen(libsafety_fn) - +libsafety: Panda = _LazyLib(libsafety_fn) # helpers + def make_CANPacket(addr: int, bus: int, dat): ret = ffi.new('CANPacket_t *') ret[0].extended = 1 if addr >= 0x800 else 0 diff --git a/opendbc/safety/tests/misra/test_mutation.py b/opendbc/safety/tests/misra/test_mutation.py index 7c25ad862a4..0852b26eec7 100644 --- a/opendbc/safety/tests/misra/test_mutation.py +++ b/opendbc/safety/tests/misra/test_mutation.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 import os import glob -import pytest import shutil import subprocess import tempfile @@ -16,12 +15,7 @@ 'opendbc/safety/board/', ) -mutations = [ - # no mutation, should pass - (None, None, lambda s: s, False), -] - -patterns = [ +PATTERNS = [ ("misra-c2012-10.3", lambda s: s + "\nvoid test(float len) { for (float j = 0; j < len; j++) {;} }\n"), ("misra-c2012-13.3", lambda s: s + "\nvoid test(int tmp) { int tmp2 = tmp++ + 2; if (tmp2) {;}}\n"), ("misra-c2012-13.4", lambda s: s + "\nint test(int x, int y) { return (x=2) && (y=2); }\n"), @@ -34,18 +28,9 @@ ("misra-c2012-20.5", lambda s: s + "\n#define TEST 1\n#undef TEST\n"), ] -all_files = glob.glob('opendbc/safety/**', root_dir=ROOT, recursive=True) -files = [f for f in all_files if f.endswith(('.c', '.h')) and not f.startswith(IGNORED_PATHS)] -assert len(files) > 20, files - -for p in patterns: - mutations.append((random.choice(files), *p, True)) -mutations = random.sample(mutations, 2) # can remove this once cppcheck is faster - - -@pytest.mark.parametrize("fn, rule, transform, should_fail", mutations) -def test_misra_mutation(fn, rule, transform, should_fail): +def run_misra_test(fn, rule, transform, should_fail): + """Helper function to run a single MISRA test mutation.""" with tempfile.TemporaryDirectory() as tmp: shutil.copytree(ROOT, tmp, dirs_exist_ok=True, ignore=shutil.ignore_patterns('.venv', 'cppcheck', '.git', '*.ctu-info', '.hypothesis')) @@ -64,4 +49,26 @@ def test_misra_mutation(fn, rule, transform, should_fail): failed = r.returncode != 0 assert failed == should_fail if should_fail: - assert rule in r.stdout, "MISRA test failed but not for the correct violation" \ No newline at end of file + assert rule in r.stdout, "MISRA test failed but not for the correct violation" + + +def test_misra_mutation(): + """Test MISRA violations using mutations.""" + mutations = [ + # no mutation, should pass + (None, None, lambda s: s, False), + ] + + all_files = glob.glob('opendbc/safety/**', root_dir=ROOT, recursive=True) + files = [f for f in all_files if f.endswith(('.c', '.h')) and not f.startswith(IGNORED_PATHS)] + assert len(files) > 20, files + + for p in PATTERNS: + mutations.append((random.choice(files), *p, True)) + + # can remove this once cppcheck is faster + mutations = random.sample(mutations, 2) + + for fn, rule, transform, should_fail in mutations: + print(f"Testing mutation: {rule if rule else 'no mutation'} on file: {fn if fn else 'none'}") + run_misra_test(fn, rule, transform, should_fail)