diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d37c5d15818..db9f88bdb3e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -13,6 +13,6 @@ repos: additional_dependencies: ['git+https://github.com/numpy/numpy-stubs', 'types-requests', 'types-atomicwrites', 'types-pycurl'] - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.0.285 + rev: v0.0.286 hooks: - id: ruff diff --git a/board/safety/safety_hyundai_canfd.h b/board/safety/safety_hyundai_canfd.h index fa6bf0b611f..3ea7a486a54 100644 --- a/board/safety/safety_hyundai_canfd.h +++ b/board/safety/safety_hyundai_canfd.h @@ -24,6 +24,12 @@ const CanMsg HYUNDAI_CANFD_HDA2_TX_MSGS[] = { {0x2A4, 0, 24}, // CAM_0x2A4 }; +const CanMsg HYUNDAI_CANFD_HDA2_ALT_STEERING_TX_MSGS[] = { + {0x110, 0, 32}, // LKAS_ALT + {0x1CF, 1, 8}, // CRUISE_BUTTON + {0x362, 0, 32}, // CAM_0x362 +}; + const CanMsg HYUNDAI_CANFD_HDA2_LONG_TX_MSGS[] = { {0x50, 0, 16}, // LKAS {0x1CF, 1, 8}, // CRUISE_BUTTON @@ -116,10 +122,16 @@ uint16_t hyundai_canfd_crc_lut[256]; const int HYUNDAI_PARAM_CANFD_HDA2 = 16; const int HYUNDAI_PARAM_CANFD_ALT_BUTTONS = 32; +const int HYUNDAI_PARAM_CANFD_HDA2_ALT_STEERING = 128; bool hyundai_canfd_hda2 = false; bool hyundai_canfd_alt_buttons = false; +bool hyundai_canfd_hda2_alt_steering = false; +int hyundai_canfd_hda2_get_lkas_addr(void) { + return hyundai_canfd_hda2_alt_steering ? 0x110 : 0x50; +} + static uint8_t hyundai_canfd_get_counter(CANPacket_t *to_push) { uint8_t ret = 0; if (GET_LEN(to_push) == 8U) { @@ -231,7 +243,7 @@ static int hyundai_canfd_rx_hook(CANPacket_t *to_push) { } } - const int steer_addr = hyundai_canfd_hda2 ? 0x50 : 0x12a; + const int steer_addr = hyundai_canfd_hda2 ? hyundai_canfd_hda2_get_lkas_addr() : 0x12a; bool stock_ecu_detected = (addr == steer_addr) && (bus == 0); if (hyundai_longitudinal) { // on HDA2, ensure ADRV ECU is still knocked out @@ -250,7 +262,11 @@ static int hyundai_canfd_tx_hook(CANPacket_t *to_send) { int addr = GET_ADDR(to_send); if (hyundai_canfd_hda2 && !hyundai_longitudinal) { - tx = msg_allowed(to_send, HYUNDAI_CANFD_HDA2_TX_MSGS, sizeof(HYUNDAI_CANFD_HDA2_TX_MSGS)/sizeof(HYUNDAI_CANFD_HDA2_TX_MSGS[0])); + if (hyundai_canfd_hda2_alt_steering) { + tx = msg_allowed(to_send, HYUNDAI_CANFD_HDA2_ALT_STEERING_TX_MSGS, sizeof(HYUNDAI_CANFD_HDA2_ALT_STEERING_TX_MSGS)/sizeof(HYUNDAI_CANFD_HDA2_ALT_STEERING_TX_MSGS[0])); + } else { + tx = msg_allowed(to_send, HYUNDAI_CANFD_HDA2_TX_MSGS, sizeof(HYUNDAI_CANFD_HDA2_TX_MSGS)/sizeof(HYUNDAI_CANFD_HDA2_TX_MSGS[0])); + } } else if (hyundai_canfd_hda2 && hyundai_longitudinal) { tx = msg_allowed(to_send, HYUNDAI_CANFD_HDA2_LONG_TX_MSGS, sizeof(HYUNDAI_CANFD_HDA2_LONG_TX_MSGS)/sizeof(HYUNDAI_CANFD_HDA2_LONG_TX_MSGS[0])); } else { @@ -258,7 +274,7 @@ static int hyundai_canfd_tx_hook(CANPacket_t *to_send) { } // steering - const int steer_addr = (hyundai_canfd_hda2 && !hyundai_longitudinal) ? 0x50 : 0x12a; + const int steer_addr = (hyundai_canfd_hda2 && !hyundai_longitudinal) ? hyundai_canfd_hda2_get_lkas_addr() : 0x12a; if (addr == steer_addr) { int desired_torque = (((GET_BYTE(to_send, 6) & 0xFU) << 7U) | (GET_BYTE(to_send, 5) >> 1U)) - 1024U; bool steer_req = GET_BIT(to_send, 52U) != 0U; @@ -320,16 +336,17 @@ static int hyundai_canfd_fwd_hook(int bus_num, int addr) { } if (bus_num == 2) { // LKAS for HDA2, LFA for HDA1 - int is_lkas_msg = (((addr == 0x50) || (addr == 0x2a4)) && hyundai_canfd_hda2); - int is_lfa_msg = ((addr == 0x12a) && !hyundai_canfd_hda2); + int hda2_lfa_block_addr = hyundai_canfd_hda2_alt_steering ? 0x362 : 0x2a4; + bool is_lkas_msg = ((addr == hyundai_canfd_hda2_get_lkas_addr()) || (addr == hda2_lfa_block_addr)) && hyundai_canfd_hda2; + bool is_lfa_msg = ((addr == 0x12a) && !hyundai_canfd_hda2); // HUD icons - int is_lfahda_msg = ((addr == 0x1e0) && !hyundai_canfd_hda2); + bool is_lfahda_msg = ((addr == 0x1e0) && !hyundai_canfd_hda2); // CRUISE_INFO for non-HDA2, we send our own longitudinal commands - int is_scc_msg = ((addr == 0x1a0) && hyundai_longitudinal && !hyundai_canfd_hda2); + bool is_scc_msg = ((addr == 0x1a0) && hyundai_longitudinal && !hyundai_canfd_hda2); - int block_msg = is_lkas_msg || is_lfa_msg || is_lfahda_msg || is_scc_msg; + bool block_msg = is_lkas_msg || is_lfa_msg || is_lfahda_msg || is_scc_msg; if (!block_msg) { bus_fwd = 0; } @@ -344,6 +361,7 @@ static const addr_checks* hyundai_canfd_init(uint16_t param) { gen_crc_lookup_table_16(0x1021, hyundai_canfd_crc_lut); hyundai_canfd_hda2 = GET_FLAG(param, HYUNDAI_PARAM_CANFD_HDA2); hyundai_canfd_alt_buttons = GET_FLAG(param, HYUNDAI_PARAM_CANFD_ALT_BUTTONS); + hyundai_canfd_hda2_alt_steering = GET_FLAG(param, HYUNDAI_PARAM_CANFD_HDA2_ALT_STEERING); // no long for ICE yet if (!hyundai_ev_gas_signal && !hyundai_hybrid_gas_signal) { diff --git a/board/safety/safety_subaru.h b/board/safety/safety_subaru.h index 9d31ff6ed4c..da282e90d63 100644 --- a/board/safety/safety_subaru.h +++ b/board/safety/safety_subaru.h @@ -188,8 +188,10 @@ static int subaru_tx_hook(CANPacket_t *to_send) { int desired_torque = ((GET_BYTES(to_send, 0, 4) >> 16) & 0x1FFFU); desired_torque = -1 * to_signed(desired_torque, 13); + bool steer_req = GET_BIT(to_send, 29U) != 0U; + const SteeringLimits limits = subaru_gen2 ? SUBARU_GEN2_STEERING_LIMITS : SUBARU_STEERING_LIMITS; - violation |= steer_torque_cmd_checks(desired_torque, -1, limits); + violation |= steer_torque_cmd_checks(desired_torque, steer_req, limits); } // check es_brake brake_pressure limits diff --git a/python/__init__.py b/python/__init__.py index 56900d26b63..a475915f328 100644 --- a/python/__init__.py +++ b/python/__init__.py @@ -238,6 +238,7 @@ class Panda: FLAG_HYUNDAI_CANFD_HDA2 = 16 FLAG_HYUNDAI_CANFD_ALT_BUTTONS = 32 FLAG_HYUNDAI_ALT_LIMITS = 64 + FLAG_HYUNDAI_CANFD_HDA2_ALT_STEERING = 128 FLAG_TESLA_POWERTRAIN = 1 FLAG_TESLA_LONG_CONTROL = 2 @@ -469,7 +470,7 @@ def reconnect(self): success = False # wait up to 15 seconds - for _ in range(0, 15*10): + for _ in range(15*10): try: self.connect() success = True @@ -510,7 +511,7 @@ def flash_static(handle, code, mcu_type): # flash over EP2 STEP = 0x10 logging.warning("flash: flashing") - for i in range(0, len(code), STEP): + for i in range(len(code), STEP): handle.bulkWrite(2, code[i:i + STEP]) # reset @@ -886,7 +887,7 @@ def serial_write(self, port_number, ln): ret = 0 if isinstance(ln, str): ln = bytes(ln, 'utf-8') - for i in range(0, len(ln), 0x20): + for i in range(len(ln), 0x20): ret += self._handle.bulkWrite(2, struct.pack("B", port_number) + ln[i:i + 0x20]) return ret @@ -939,7 +940,7 @@ def kline_send(self, x, bus=2, checksum=True): self.kline_drain(bus=bus) if checksum: x += bytes([sum(x) % 0x100]) - for i in range(0, len(x), 0xf): + for i in range(len(x), 0xf): ts = x[i:i + 0xf] logging.debug(f"kline send: 0x{ts.hex()}") self._handle.bulkWrite(2, bytes([bus]) + ts) diff --git a/python/spi.py b/python/spi.py index 0be28f49c1e..faec497c6f8 100644 --- a/python/spi.py +++ b/python/spi.py @@ -421,7 +421,7 @@ def close(self): def program(self, address, dat): bs = 256 # max block size for writing to flash over SPI dat += b"\xFF" * ((bs - len(dat)) % bs) - for i in range(0, len(dat) // bs): + for i in range(len(dat) // bs): block = dat[i * bs:(i + 1) * bs] self._cmd(0x31, data=[ struct.pack('>I', address + i*bs), diff --git a/python/usb.py b/python/usb.py index 0f1bff7c168..e6b807ca6fe 100644 --- a/python/usb.py +++ b/python/usb.py @@ -78,7 +78,7 @@ def program(self, address, dat): # Program bs = min(len(dat), self._mcu_type.config.block_size) dat += b"\xFF" * ((bs - len(dat)) % bs) - for i in range(0, len(dat) // bs): + for i in range(len(dat) // bs): ldat = dat[i * bs:(i + 1) * bs] print("programming %d with length %d" % (i, len(ldat))) self._libusb_handle.controlWrite(0x21, self.DFU_DNLOAD, 2 + i, 0, ldat) diff --git a/tests/safety/common.py b/tests/safety/common.py index 36df296eaca..6104c9b18c0 100644 --- a/tests/safety/common.py +++ b/tests/safety/common.py @@ -126,7 +126,7 @@ def test_prev_gas_interceptor(self): self.safety.set_gas_interceptor_detected(False) def test_disengage_on_gas_interceptor(self): - for g in range(0, 0x1000): + for g in range(0x1000): self._rx(self._interceptor_user_gas(0)) self.safety.set_controls_allowed(True) self._rx(self._interceptor_user_gas(g)) @@ -138,7 +138,7 @@ def test_disengage_on_gas_interceptor(self): def test_alternative_experience_no_disengage_on_gas_interceptor(self): self.safety.set_controls_allowed(True) self.safety.set_alternative_experience(ALTERNATIVE_EXPERIENCE.DISABLE_DISENGAGE_ON_GAS) - for g in range(0, 0x1000): + for g in range(0x1000): self._rx(self._interceptor_user_gas(g)) # Test we allow lateral, but not longitudinal self.assertTrue(self.safety.get_controls_allowed()) @@ -576,13 +576,13 @@ def test_torque_measurements(self): self.assertTrue(self.safety.get_torque_meas_min() in min_range) self.assertTrue(self.safety.get_torque_meas_max() in max_range) - max_range = range(0, self.TORQUE_MEAS_TOLERANCE + 1) + max_range = range(self.TORQUE_MEAS_TOLERANCE + 1) min_range = range(-(trq + self.TORQUE_MEAS_TOLERANCE), -trq + 1) self._rx(self._torque_meas_msg(0)) self.assertTrue(self.safety.get_torque_meas_min() in min_range) self.assertTrue(self.safety.get_torque_meas_max() in max_range) - max_range = range(0, self.TORQUE_MEAS_TOLERANCE + 1) + max_range = range(self.TORQUE_MEAS_TOLERANCE + 1) min_range = range(-self.TORQUE_MEAS_TOLERANCE, 0 + 1) self._rx(self._torque_meas_msg(0)) self.assertTrue(self.safety.get_torque_meas_min() in min_range) @@ -733,7 +733,7 @@ def test_angle_cmd_when_disabled(self): @add_regen_tests class PandaSafetyTest(PandaSafetyTestBase): TX_MSGS: Optional[List[List[int]]] = None - SCANNED_ADDRS = [*range(0x0, 0x800), # Entire 11-bit CAN address space + SCANNED_ADDRS = [*range(0x800), # Entire 11-bit CAN address space *range(0x18DA00F1, 0x18DB00F1, 0x100), # 29-bit UDS physical addressing *range(0x18DB00F1, 0x18DC00F1, 0x100), # 29-bit UDS functional addressing *range(0x3300, 0x3400), # Honda @@ -791,14 +791,14 @@ def test_relay_malfunction(self): self.assertFalse(self.safety.get_relay_malfunction()) self._rx(make_msg(self.RELAY_MALFUNCTION_BUS, self.RELAY_MALFUNCTION_ADDR, 8)) self.assertTrue(self.safety.get_relay_malfunction()) - for bus in range(0, 3): + for bus in range(3): for addr in self.SCANNED_ADDRS: self.assertEqual(-1, self._tx(make_msg(bus, addr, 8))) self.assertEqual(-1, self.safety.safety_fwd_hook(bus, addr)) def test_fwd_hook(self): # some safety modes don't forward anything, while others blacklist msgs - for bus in range(0, 3): + for bus in range(3): for addr in self.SCANNED_ADDRS: # assume len 8 fwd_bus = self.FWD_BUS_LOOKUP.get(bus, -1) @@ -807,7 +807,7 @@ def test_fwd_hook(self): self.assertEqual(fwd_bus, self.safety.safety_fwd_hook(bus, addr), f"{addr=:#x} from {bus=} to {fwd_bus=}") def test_spam_can_buses(self): - for bus in range(0, 4): + for bus in range(4): for addr in self.SCANNED_ADDRS: if all(addr != m[0] or bus != m[1] for m in self.TX_MSGS): self.assertFalse(self._tx(make_msg(bus, addr, 8)), f"allowed TX {addr=} {bus=}") diff --git a/tests/safety/test_honda.py b/tests/safety/test_honda.py index 0be536cee83..6164189ccc5 100755 --- a/tests/safety/test_honda.py +++ b/tests/safety/test_honda.py @@ -300,8 +300,8 @@ def _send_acc_hud_msg(self, pcm_gas, pcm_speed): def test_acc_hud_safety_check(self): for controls_allowed in [True, False]: self.safety.set_controls_allowed(controls_allowed) - for pcm_gas in range(0, 255): - for pcm_speed in range(0, 100): + for pcm_gas in range(255): + for pcm_speed in range(100): send = (controls_allowed and pcm_gas <= self.MAX_GAS) or (pcm_gas == 0 and pcm_speed == 0) self.assertEqual(send, self._tx(self._send_acc_hud_msg(pcm_gas, pcm_speed))) diff --git a/tests/safety/test_hyundai_canfd.py b/tests/safety/test_hyundai_canfd.py index 60f6667f3e5..b1f4f895cb6 100755 --- a/tests/safety/test_hyundai_canfd.py +++ b/tests/safety/test_hyundai_canfd.py @@ -180,6 +180,28 @@ def setUp(self): self.safety.init_tests() +# TODO: Handle ICE and HEV configurations once we see cars that use the new messages +class TestHyundaiCanfdHDA2EVAltSteering(TestHyundaiCanfdBase): + + TX_MSGS = [[0x110, 0], [0x1CF, 1], [0x362, 0]] + RELAY_MALFUNCTION_ADDR = 0x110 + RELAY_MALFUNCTION_BUS = 0 + FWD_BLACKLISTED_ADDRS = {2: [0x110, 0x362]} + FWD_BUS_LOOKUP = {0: 2, 2: 0} + + PT_BUS = 1 + SCC_BUS = 1 + STEER_MSG = "LKAS_ALT" + GAS_MSG = ("ACCELERATOR", "ACCELERATOR_PEDAL") + + def setUp(self): + self.packer = CANPackerPanda("hyundai_canfd") + self.safety = libpanda_py.libpanda + self.safety.set_safety_hooks(Panda.SAFETY_HYUNDAI_CANFD, Panda.FLAG_HYUNDAI_CANFD_HDA2 | Panda.FLAG_HYUNDAI_EV_GAS | + Panda.FLAG_HYUNDAI_CANFD_HDA2_ALT_STEERING) + self.safety.init_tests() + + class TestHyundaiCanfdHDA2LongEV(HyundaiLongitudinalBase, TestHyundaiCanfdHDA2EV): TX_MSGS = [[0x50, 0], [0x1CF, 1], [0x2A4, 0], [0x51, 0], [0x730, 1], [0x12a, 1], [0x160, 1], diff --git a/tests/safety/test_subaru.py b/tests/safety/test_subaru.py index 9131322c1a0..e89cda80dc3 100755 --- a/tests/safety/test_subaru.py +++ b/tests/safety/test_subaru.py @@ -148,7 +148,7 @@ class TestSubaruTorqueSafetyBase(TestSubaruSafetyBase, common.DriverTorqueSteeri MAX_TORQUE = 2047 def _torque_cmd_msg(self, torque, steer_req=1): - values = {"LKAS_Output": torque} + values = {"LKAS_Output": torque, "LKAS_Request": steer_req} return self.packer.make_can_msg_panda("ES_LKAS", SUBARU_MAIN_BUS, values)