Skip to content

Commit

Permalink
Convert tests to pytests
Browse files Browse the repository at this point in the history
  • Loading branch information
UkuLoskit committed Jul 9, 2024
1 parent de39b14 commit 019df98
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 71 deletions.
11 changes: 3 additions & 8 deletions can/tests/test_checksums.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
#!/usr/bin/env python3
import unittest

from opendbc.can.parser import CANParser
from opendbc.can.packer import CANPacker
from opendbc.can.tests.test_packer_parser import can_list_to_can_capnp


class TestCanChecksums(unittest.TestCase):
class TestCanChecksums:

def test_honda_checksum(self):
"""Test checksums for Honda standard and extended CAN ids"""
Expand Down Expand Up @@ -34,9 +33,5 @@ def test_honda_checksum(self):
can_strings = [can_list_to_can_capnp(msgs), ]
parser.update_strings(can_strings)

self.assertEqual(parser.vl['LKAS_HUD']['CHECKSUM'], std)
self.assertEqual(parser.vl['LKAS_HUD_A']['CHECKSUM'], ext)


if __name__ == "__main__":
unittest.main()
assert parser.vl['LKAS_HUD']['CHECKSUM'] == std
assert parser.vl['LKAS_HUD_A']['CHECKSUM'] == ext
13 changes: 7 additions & 6 deletions can/tests/test_dbc_exceptions.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
#!/usr/bin/env python3

import pytest
import unittest

from opendbc.can.parser import CANParser, CANDefine
from opendbc.can.packer import CANPacker
from opendbc.can.tests import TEST_DBC


class TestCanParserPackerExceptions(unittest.TestCase):
class TestCanParserPackerExceptions:
def test_civic_exceptions(self):
dbc_file = "honda_civic_touring_2016_can_generated"
dbc_invalid = dbc_file + "abcdef"
msgs = [("STEERING_CONTROL", 50)]
with self.assertRaises(RuntimeError):
with pytest.raises(RuntimeError):
CANParser(dbc_invalid, msgs, 0)
with self.assertRaises(RuntimeError):
with pytest.raises(RuntimeError):
CANPacker(dbc_invalid)
with self.assertRaises(RuntimeError):
with pytest.raises(RuntimeError):
CANDefine(dbc_invalid)
with self.assertRaises(KeyError):
with pytest.raises(KeyError):
CANDefine(TEST_DBC)

parser = CANParser(dbc_file, msgs, 0)
with self.assertRaises(RuntimeError):
with pytest.raises(RuntimeError):
parser.update_strings([b''])

# Everything is supposed to work below
Expand Down
4 changes: 2 additions & 2 deletions can/tests/test_dbc_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
from opendbc.can.tests import ALL_DBCS


class TestDBCParser(unittest.TestCase):
class TestDBCParser:
def test_enough_dbcs(self):
# sanity check that we're running on the real DBCs
self.assertGreater(len(ALL_DBCS), 20)
assert len(ALL_DBCS) > 20

def test_parse_all_dbcs(self):
"""
Expand Down
11 changes: 5 additions & 6 deletions can/tests/test_define.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,23 @@
from opendbc.can.tests import ALL_DBCS


class TestCADNDefine(unittest.TestCase):
class TestCADNDefine:
def test_civic(self):

dbc_file = "honda_civic_touring_2016_can_generated"
defs = CANDefine(dbc_file)

self.assertDictEqual(defs.dv[399], defs.dv['STEER_STATUS'])
self.assertDictEqual(defs.dv[399],
{'STEER_STATUS':
assert defs.dv[399] == defs.dv['STEER_STATUS']
assert defs.dv[399] == \
{'STEER_STATUS': \
{7: 'PERMANENT_FAULT',
6: 'TMP_FAULT',
5: 'FAULT_1',
4: 'NO_TORQUE_ALERT_2',
3: 'LOW_SPEED_LOCKOUT',
2: 'NO_TORQUE_ALERT_1',
0: 'NORMAL'}
0: 'NORMAL'} \
}
)

def test_all_dbcs(self):
# Asserts no exceptions on all DBCs
Expand Down
91 changes: 46 additions & 45 deletions can/tests/test_packer_parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/usr/bin/env python3
import pytest
import unittest
import random

Expand Down Expand Up @@ -31,17 +32,17 @@ def can_list_to_can_capnp(can_msgs, msgtype='can', logMonoTime=None):
return dat.to_bytes()


class TestCanParserPacker(unittest.TestCase):
class TestCanParserPacker:
def test_packer(self):
packer = CANPacker(TEST_DBC)

for b in range(6):
for i in range(256):
values = {"COUNTER": i}
addr, _, dat, bus = packer.make_can_msg("CAN_FD_MESSAGE", b, values)
self.assertEqual(addr, 245)
self.assertEqual(bus, b)
self.assertEqual(dat[0], i)
assert addr == 245
assert bus == b
assert dat[0] == i

def test_packer_counter(self):
msgs = [("CAN_FD_MESSAGE", 0), ]
Expand All @@ -53,7 +54,7 @@ def test_packer_counter(self):
msg = packer.make_can_msg("CAN_FD_MESSAGE", 0, {})
dat = can_list_to_can_capnp([msg, ])
parser.update_strings([dat])
self.assertEqual(parser.vl["CAN_FD_MESSAGE"]["COUNTER"], i % 256)
assert parser.vl["CAN_FD_MESSAGE"]["COUNTER"] == i % 256

# setting COUNTER should override
for _ in range(100):
Expand All @@ -64,37 +65,37 @@ def test_packer_counter(self):
})
dat = can_list_to_can_capnp([msg, ])
parser.update_strings([dat])
self.assertEqual(parser.vl["CAN_FD_MESSAGE"]["COUNTER"], cnt)
assert parser.vl["CAN_FD_MESSAGE"]["COUNTER"] == cnt

# then, should resume counting from the override value
cnt = parser.vl["CAN_FD_MESSAGE"]["COUNTER"]
for i in range(100):
msg = packer.make_can_msg("CAN_FD_MESSAGE", 0, {})
dat = can_list_to_can_capnp([msg, ])
parser.update_strings([dat])
self.assertEqual(parser.vl["CAN_FD_MESSAGE"]["COUNTER"], (cnt + i) % 256)
assert parser.vl["CAN_FD_MESSAGE"]["COUNTER"] == (cnt + i) % 256

def test_parser_can_valid(self):
msgs = [("CAN_FD_MESSAGE", 10), ]
packer = CANPacker(TEST_DBC)
parser = CANParser(TEST_DBC, msgs, 0)

# shouldn't be valid initially
self.assertFalse(parser.can_valid)
assert not parser.can_valid

# not valid until the message is seen
for _ in range(100):
dat = can_list_to_can_capnp([])
parser.update_strings([dat])
self.assertFalse(parser.can_valid)
assert not parser.can_valid

# valid once seen
for i in range(1, 100):
t = int(0.01 * i * 1e9)
msg = packer.make_can_msg("CAN_FD_MESSAGE", 0, {})
dat = can_list_to_can_capnp([msg, ], logMonoTime=t)
parser.update_strings([dat])
self.assertTrue(parser.can_valid)
assert parser.can_valid

def test_parser_counter_can_valid(self):
"""
Expand All @@ -113,13 +114,13 @@ def test_parser_counter_can_valid(self):
# bad static counter, invalid once it's seen MAX_BAD_COUNTER messages
for idx in range(0x1000):
parser.update_strings([bts])
self.assertEqual((idx + 1) < MAX_BAD_COUNTER, parser.can_valid)
assert (idx + 1) < MAX_BAD_COUNTER == parser.can_valid

# one to recover
msg = packer.make_can_msg("STEERING_CONTROL", 0, {"COUNTER": 1})
bts = can_list_to_can_capnp([msg])
parser.update_strings([bts])
self.assertTrue(parser.can_valid)
assert parser.can_valid

def test_parser_no_partial_update(self):
"""
Expand All @@ -144,18 +145,18 @@ def rx_steering_msg(values, bad_checksum=False):
parser.update_strings([bts])

rx_steering_msg({"STEER_TORQUE": 100}, bad_checksum=False)
self.assertEqual(parser.vl["STEERING_CONTROL"]["STEER_TORQUE"], 100)
self.assertEqual(parser.vl_all["STEERING_CONTROL"]["STEER_TORQUE"], [100])
assert parser.vl["STEERING_CONTROL"]["STEER_TORQUE"] == 100
assert parser.vl_all["STEERING_CONTROL"]["STEER_TORQUE"] == [100]

for _ in range(5):
rx_steering_msg({"STEER_TORQUE": 200}, bad_checksum=True)
self.assertEqual(parser.vl["STEERING_CONTROL"]["STEER_TORQUE"], 100)
self.assertEqual(parser.vl_all["STEERING_CONTROL"]["STEER_TORQUE"], [])
assert parser.vl["STEERING_CONTROL"]["STEER_TORQUE"] == 100
assert parser.vl_all["STEERING_CONTROL"]["STEER_TORQUE"] == []

# Even if CANParser doesn't update instantaneous vl, make sure it didn't add invalid values to vl_all
rx_steering_msg({"STEER_TORQUE": 300}, bad_checksum=False)
self.assertEqual(parser.vl["STEERING_CONTROL"]["STEER_TORQUE"], 300)
self.assertEqual(parser.vl_all["STEERING_CONTROL"]["STEER_TORQUE"], [300])
assert parser.vl["STEERING_CONTROL"]["STEER_TORQUE"] == 300
assert parser.vl_all["STEERING_CONTROL"]["STEER_TORQUE"] == [300]

def test_packer_parser(self):
msgs = [
Expand Down Expand Up @@ -189,11 +190,11 @@ def test_packer_parser(self):

for k, v in values.items():
for key, val in v.items():
self.assertAlmostEqual(parser.vl[k][key], val)
assert parser.vl[k][key] == pytest.approx(val)

# also check address
for sig in ("STEER_TORQUE", "STEER_TORQUE_REQUEST", "COUNTER", "CHECKSUM"):
self.assertEqual(parser.vl["STEERING_CONTROL"][sig], parser.vl[228][sig])
assert parser.vl["STEERING_CONTROL"][sig] == parser.vl[228][sig]

def test_scale_offset(self):
"""Test that both scale and offset are correctly preserved"""
Expand All @@ -209,7 +210,7 @@ def test_scale_offset(self):

parser.update_strings([bts])

self.assertAlmostEqual(parser.vl["VSA_STATUS"]["USER_BRAKE"], brake)
assert parser.vl["VSA_STATUS"]["USER_BRAKE"] == pytest.approx(brake)

def test_subaru(self):
# Subaru is little endian
Expand All @@ -234,10 +235,10 @@ def test_subaru(self):
bts = can_list_to_can_capnp([msgs])
parser.update_strings([bts])

self.assertAlmostEqual(parser.vl["ES_LKAS"]["LKAS_Output"], steer)
self.assertAlmostEqual(parser.vl["ES_LKAS"]["LKAS_Request"], active)
self.assertAlmostEqual(parser.vl["ES_LKAS"]["SET_1"], 1)
self.assertAlmostEqual(parser.vl["ES_LKAS"]["COUNTER"], idx % 16)
assert parser.vl["ES_LKAS"]["LKAS_Output"] == pytest.approx(steer)
assert parser.vl["ES_LKAS"]["LKAS_Request"] == pytest.approx(active)
assert parser.vl["ES_LKAS"]["SET_1"] == pytest.approx(1)
assert parser.vl["ES_LKAS"]["COUNTER"] == pytest.approx(idx % 16)
idx += 1

def test_bus_timeout(self):
Expand Down Expand Up @@ -267,16 +268,16 @@ def send_msg(blank=False):
# all good, no timeout
for _ in range(1000):
send_msg()
self.assertFalse(parser.bus_timeout, str(_))
assert not parser.bus_timeout, str(_)

# timeout after 10 blank msgs
for n in range(200):
send_msg(blank=True)
self.assertEqual(n >= 10, parser.bus_timeout)
assert n >= 10 == parser.bus_timeout

# no timeout immediately after seen again
send_msg()
self.assertFalse(parser.bus_timeout)
assert not parser.bus_timeout

def test_updated(self):
"""Test updated value dict"""
Expand All @@ -286,7 +287,7 @@ def test_updated(self):
packer = CANPacker(dbc_file)

# Make sure nothing is updated
self.assertEqual(len(parser.vl_all["VSA_STATUS"]["USER_BRAKE"]), 0)
assert len(parser.vl_all["VSA_STATUS"]["USER_BRAKE"]) == 0

idx = 0
for _ in range(10):
Expand All @@ -304,9 +305,9 @@ def test_updated(self):
parser.update_strings(can_strings)
vl_all = parser.vl_all["VSA_STATUS"]["USER_BRAKE"]

self.assertEqual(vl_all, user_brake_vals)
assert vl_all == user_brake_vals
if len(user_brake_vals):
self.assertEqual(vl_all[-1], parser.vl["VSA_STATUS"]["USER_BRAKE"])
assert vl_all[-1] == parser.vl["VSA_STATUS"]["USER_BRAKE"]

def test_timestamp_nanos(self):
"""Test message timestamp dict"""
Expand All @@ -323,7 +324,7 @@ def test_timestamp_nanos(self):
# Check the default timestamp is zero
for msg in ("VSA_STATUS", "POWERTRAIN_DATA"):
ts_nanos = parser.ts_nanos[msg].values()
self.assertEqual(set(ts_nanos), {0})
assert set(ts_nanos) == {0}

# Check:
# - timestamp is only updated for correct messages
Expand All @@ -339,23 +340,23 @@ def test_timestamp_nanos(self):
parser.update_strings(can_strings)

ts_nanos = parser.ts_nanos["VSA_STATUS"].values()
self.assertEqual(set(ts_nanos), {log_mono_time})
assert set(ts_nanos) == {log_mono_time}
ts_nanos = parser.ts_nanos["POWERTRAIN_DATA"].values()
self.assertEqual(set(ts_nanos), {0})
assert set(ts_nanos) == {0}

def test_nonexistent_messages(self):
# Ensure we don't allow messages not in the DBC
existing_messages = ("STEERING_CONTROL", 228, "CAN_FD_MESSAGE", 245)

for msg in existing_messages:
CANParser(TEST_DBC, [(msg, 0)])
with self.assertRaises(RuntimeError):
with pytest.raises(RuntimeError):
new_msg = msg + "1" if isinstance(msg, str) else msg + 1
CANParser(TEST_DBC, [(new_msg, 0)])

def test_track_all_signals(self):
parser = CANParser("toyota_nodsu_pt_generated", [("ACC_CONTROL", 0)])
self.assertEqual(parser.vl["ACC_CONTROL"], {
assert parser.vl["ACC_CONTROL"] == {
"ACCEL_CMD": 0,
"ALLOW_LONG_PRESS": 0,
"ACC_MALFUNCTION": 0,
Expand All @@ -371,28 +372,28 @@ def test_track_all_signals(self):
"ITS_CONNECT_LEAD": 0,
"ACCEL_CMD_ALT": 0,
"CHECKSUM": 0,
})
}

def test_disallow_duplicate_messages(self):
CANParser("toyota_nodsu_pt_generated", [("ACC_CONTROL", 5)])

with self.assertRaises(RuntimeError):
with pytest.raises(RuntimeError):
CANParser("toyota_nodsu_pt_generated", [("ACC_CONTROL", 5), ("ACC_CONTROL", 10)])

with self.assertRaises(RuntimeError):
with pytest.raises(RuntimeError):
CANParser("toyota_nodsu_pt_generated", [("ACC_CONTROL", 10), ("ACC_CONTROL", 10)])

def test_allow_undefined_msgs(self):
# TODO: we should throw an exception for these, but we need good
# discovery tests in openpilot first
packer = CANPacker("toyota_nodsu_pt_generated")

self.assertEqual(packer.make_can_msg("ACC_CONTROL", 0, {"UNKNOWN_SIGNAL": 0}),
[835, 0, b'\x00\x00\x00\x00\x00\x00\x00N', 0])
self.assertEqual(packer.make_can_msg("UNKNOWN_MESSAGE", 0, {"UNKNOWN_SIGNAL": 0}),
[0, 0, b'', 0])
self.assertEqual(packer.make_can_msg(0, 0, {"UNKNOWN_SIGNAL": 0}),
[0, 0, b'', 0])
assert packer.make_can_msg("ACC_CONTROL", 0, {"UNKNOWN_SIGNAL": 0}) == \
[835, 0, b'\x00\x00\x00\x00\x00\x00\x00N', 0]
assert packer.make_can_msg("UNKNOWN_MESSAGE", 0, {"UNKNOWN_SIGNAL": 0}) == \
[0, 0, b'', 0]
assert packer.make_can_msg(0, 0, {"UNKNOWN_SIGNAL": 0}) == \
[0, 0, b'', 0]


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit 019df98

Please sign in to comment.