From 4a31d806afed8bbcaf953603b9f3b147a1a51e62 Mon Sep 17 00:00:00 2001 From: deanlee Date: Fri, 9 Aug 2024 15:32:00 +0800 Subject: [PATCH] refactor parser --- opendbc/can/common.h | 10 ++-- opendbc/can/common.pxd | 10 +++- opendbc/can/common_dbc.h | 2 - opendbc/can/parser.cc | 100 +++++++++++++-------------------- opendbc/can/parser_pyx.pyx | 112 +++++++++++++++++++++---------------- 5 files changed, 114 insertions(+), 120 deletions(-) diff --git a/opendbc/can/common.h b/opendbc/can/common.h index 8b5b8d320d..5953bec2ac 100644 --- a/opendbc/can/common.h +++ b/opendbc/can/common.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -45,8 +46,7 @@ class MessageState { unsigned int size; std::vector parse_sigs; - std::vector vals; - std::vector> all_vals; + std::map values; uint64_t last_seen_nanos; uint64_t check_threshold; @@ -79,11 +79,11 @@ class CANParser { CANParser(int abus, const std::string& dbc_name, const std::vector> &messages); CANParser(int abus, const std::string& dbc_name, bool ignore_checksum, bool ignore_counter); - void update(const std::vector &can_data, std::vector &vals); - void query_latest(std::vector &vals, uint64_t last_ts = 0); + MessageState *messageState(uint32_t address) { return &message_states.at(address); } + std::set update(const std::vector &can_data); protected: - void UpdateCans(const CanData &can); + void updateCans(const CanData &can, std::set &updated_addresses); void UpdateValid(uint64_t nanos); }; diff --git a/opendbc/can/common.pxd b/opendbc/can/common.pxd index 21e276fa07..802b81b809 100644 --- a/opendbc/can/common.pxd +++ b/opendbc/can/common.pxd @@ -3,7 +3,9 @@ from libc.stdint cimport uint8_t, uint32_t, uint64_t from libcpp cimport bool +from libcpp.map cimport map from libcpp.pair cimport pair +from libcpp.set cimport set from libcpp.string cimport string from libcpp.vector cimport vector from libcpp.unordered_map cimport unordered_map @@ -53,9 +55,7 @@ cdef extern from "common_dbc.h": unordered_map[string, const Msg*] name_to_msg cdef struct SignalValue: - uint32_t address uint64_t ts_nanos - string name double value vector[double] all_values @@ -67,6 +67,9 @@ cdef extern from "common_dbc.h": cdef extern from "common.h": cdef const DBC* dbc_lookup(const string) except + + cdef cppclass MessageState: + map[string, SignalValue] values + cdef struct CanFrame: long src uint32_t address @@ -80,7 +83,8 @@ cdef extern from "common.h": bool can_valid bool bus_timeout CANParser(int, string, vector[pair[uint32_t, int]]) except + - void update(vector[CanData]&, vector[SignalValue]&) except + + MessageState *messageState(uint32_t address) + set[uint32_t] update(vector[CanData]&) except + cdef cppclass CANPacker: CANPacker(string) diff --git a/opendbc/can/common_dbc.h b/opendbc/can/common_dbc.h index 19507ecd4e..94e974b671 100644 --- a/opendbc/can/common_dbc.h +++ b/opendbc/can/common_dbc.h @@ -11,9 +11,7 @@ struct SignalPackValue { }; struct SignalValue { - uint32_t address; uint64_t ts_nanos; - std::string name; double value; // latest value std::vector all_values; // all values from this cycle }; diff --git a/opendbc/can/parser.cc b/opendbc/can/parser.cc index a65a8ec2d6..cb9bcdfb47 100644 --- a/opendbc/can/parser.cc +++ b/opendbc/can/parser.cc @@ -5,11 +5,6 @@ #include #include -#include -#include -#include -#include - #include "opendbc/can/common.h" int64_t get_raw_value(const std::vector &msg, const Signal &sig) { @@ -31,7 +26,6 @@ int64_t get_raw_value(const std::vector &msg, const Signal &sig) { return ret; } - bool MessageState::parse(uint64_t nanos, const std::vector &dat) { std::vector tmp_vals(parse_sigs.size()); bool checksum_failed = false; @@ -69,8 +63,10 @@ bool MessageState::parse(uint64_t nanos, const std::vector &dat) { } for (int i = 0; i < parse_sigs.size(); i++) { - vals[i] = tmp_vals[i]; - all_vals[i].push_back(vals[i]); + auto &val = values[parse_sigs[i].name]; + val.value = tmp_vals[i]; + val.ts_nanos = nanos; + val.all_values.push_back(val.value); } last_seen_nanos = nanos; @@ -126,8 +122,9 @@ CANParser::CANParser(int abus, const std::string& dbc_name, const std::vectorsigs; - state.vals.resize(msg->sigs.size()); - state.all_vals.resize(msg->sigs.size()); + for (auto &sig : msg->sigs) { + state.values[sig.name] = {}; + } } } @@ -147,44 +144,49 @@ CANParser::CANParser(int abus, const std::string& dbc_name, bool ignore_checksum .ignore_counter = ignore_counter, }; - for (const auto& sig : msg.sigs) { - state.parse_sigs.push_back(sig); - state.vals.push_back(0); - state.all_vals.push_back({}); - } - + state.parse_sigs = msg.sigs; message_states[state.address] = state; + for (auto &sig : msg.sigs) { + message_states[state.address].values[sig.name] = {}; + } } } -void CANParser::update(const std::vector &can_data, std::vector &vals) { - uint64_t current_nanos = 0; - for (const auto &c : can_data) { - if (first_nanos == 0) { - first_nanos = c.nanos; - } - if (current_nanos == 0) { - current_nanos = c.nanos; +std::set CANParser::update(const std::vector &can_data) { + // Clear all_values + for (auto &state : message_states) { + for (auto &value : state.second.values) { + value.second.all_values.clear(); } - last_nanos = c.nanos; + } + + std::set updated_addresses; + if (can_data.empty()) { + return updated_addresses; + } - UpdateCans(c); - UpdateValid(last_nanos); + if (first_nanos == 0) { + first_nanos = can_data.front().nanos; + } + + for (const auto &c : can_data) { + last_nanos = c.nanos; + updateCans(c, updated_addresses); } - query_latest(vals, current_nanos); + UpdateValid(last_nanos); + + return updated_addresses; } -void CANParser::UpdateCans(const CanData &can) { +void CANParser::updateCans(const CanData &can, std::set &updated_addresses) { //DEBUG("got %zu messages\n", can.frames.size()); - bool bus_empty = true; - for (const auto &frame : can.frames) { if (frame.src != bus) { // DEBUG("skip %d: wrong bus\n", cmsg.getAddress()); continue; } - bus_empty = false; + last_nonempty_nanos = can.nanos; auto state_it = message_states.find(frame.address); if (state_it == message_states.end()) { @@ -202,14 +204,10 @@ void CANParser::UpdateCans(const CanData &can) { // continue; //} - state_it->second.parse(can.nanos, frame.dat); - } - - // update bus timeout - if (!bus_empty) { - last_nonempty_nanos = can.nanos; + if (state_it->second.parse(can.nanos, frame.dat)) { + updated_addresses.insert(frame.address); + } } - bus_timeout = (can.nanos - last_nonempty_nanos) > bus_timeout_threshold; } void CANParser::UpdateValid(uint64_t nanos) { @@ -239,27 +237,5 @@ void CANParser::UpdateValid(uint64_t nanos) { } can_invalid_cnt = _valid ? 0 : (can_invalid_cnt + 1); can_valid = (can_invalid_cnt < CAN_INVALID_CNT) && _counters_valid; -} - -void CANParser::query_latest(std::vector &vals, uint64_t last_ts) { - if (last_ts == 0) { - last_ts = last_nanos; - } - for (auto& kv : message_states) { - auto& state = kv.second; - if (last_ts != 0 && state.last_seen_nanos < last_ts) { - continue; - } - - for (int i = 0; i < state.parse_sigs.size(); i++) { - const Signal &sig = state.parse_sigs[i]; - SignalValue &v = vals.emplace_back(); - v.address = state.address; - v.ts_nanos = state.last_seen_nanos; - v.name = sig.name; - v.value = state.vals[i]; - v.all_values = state.all_vals[i]; - state.all_vals[i].clear(); - } - } + bus_timeout = (nanos - last_nonempty_nanos) > bus_timeout_threshold; } diff --git a/opendbc/can/parser_pyx.pyx b/opendbc/can/parser_pyx.pyx index 9fb0c9f021..c0dce351fd 100644 --- a/opendbc/can/parser_pyx.pyx +++ b/opendbc/can/parser_pyx.pyx @@ -1,32 +1,81 @@ # distutils: language = c++ # cython: c_string_encoding=ascii, language_level=3 -from cython.operator cimport dereference as deref, preincrement as preinc from libcpp.pair cimport pair from libcpp.string cimport string from libcpp.vector cimport vector from libc.stdint cimport uint32_t from .common cimport CANParser as cpp_CANParser -from .common cimport dbc_lookup, SignalValue, DBC, CanData, CanFrame +from .common cimport MessageState as cpp_MessageState +from .common cimport dbc_lookup, DBC, CanData, CanFrame import numbers from collections import defaultdict +from collections.abc import Mapping + +cdef class MessageState: + cdef cpp_MessageState *state + cdef list signal_names + + @property + def names(self): + return self.signal_names + + def value(self, name): + return self.state.values.at(name).value + + def all_values(self, name): + return self.state.values.at(name).all_values + + def ts_nanos(self, name): + return self.state.values.at(name).ts_nanos + + @staticmethod + cdef create(cpp_MessageState *s): + state = MessageState() + state.state = s + state.signal_names = [it.first.decode("utf-8") for it in s.values] + return state + + +class ValueDictBase(Mapping): + def __init__(self, state): + self.state = state + + def __iter__(self): + return iter(self.state.names) + + def __len__(self): + return len(self.state.names) + +class ValueDict(ValueDictBase): + def __getitem__(self, key): + return self.state.value(key) + +class AllValuesDict(ValueDictBase): + def __getitem__(self, key): + return self.state.all_values(key) + +class NanosDict(ValueDictBase): + def __getitem__(self, key): + return self.state.ts_nanos(key) cdef class CANParser: cdef: cpp_CANParser *can const DBC *dbc - vector[uint32_t] addresses cdef readonly: dict vl dict vl_all dict ts_nanos string dbc_name + int bus def __init__(self, dbc_name, messages, bus=0): + self.bus = bus self.dbc_name = dbc_name self.dbc = dbc_lookup(dbc_name) if not self.dbc: @@ -45,20 +94,18 @@ cdef class CANParser: except IndexError: raise RuntimeError(f"could not find message {repr(c[0])} in DBC {self.dbc_name}") - address = m.address - message_v.push_back((address, c[1])) - self.addresses.push_back(address) - - name = m.name.decode("utf8") - self.vl[address] = {} - self.vl[name] = self.vl[address] - self.vl_all[address] = defaultdict(list) - self.vl_all[name] = self.vl_all[address] - self.ts_nanos[address] = {} - self.ts_nanos[name] = self.ts_nanos[address] + message_v.push_back((m.address, c[1])) self.can = new cpp_CANParser(bus, dbc_name, message_v) - self.update_strings([]) + + # Populate dictionaries with ValueDict + for address, _ in message_v: + m = self.dbc.addr_to_msg.at(address) + name = m.name.decode("utf8") + state = MessageState.create(self.can.messageState(address)) + self.vl[name] = self.vl[address] = ValueDict(state) + self.vl_all[name] = self.vl_all[address] = AllValuesDict(state) + self.ts_nanos[name] = self.ts_nanos[address] = NanosDict(state) def __dealloc__(self): if self.can: @@ -68,16 +115,7 @@ cdef class CANParser: # input format: # [nanos, [[address, data, src], ...]] # [[nanos, [[address, data, src], ...], ...]] - for address in self.addresses: - self.vl_all[address].clear() - - cur_address = -1 - vl = {} - vl_all = {} - ts_nanos = {} - updated_addrs = set() - cdef vector[SignalValue] new_vals cdef CanFrame* frame cdef CanData* can_data cdef vector[CanData] can_data_array @@ -91,7 +129,7 @@ cdef class CANParser: can_data = &(can_data_array.emplace_back()) can_data.nanos = s[0] can_data.frames.reserve(len(s[1])) - for f in s[1]: + for f in (f for f in s[1] if f[2] == self.bus): frame = &(can_data.frames.emplace_back()) frame.address = f[0] frame.dat = f[1] @@ -99,29 +137,7 @@ cdef class CANParser: except TypeError: raise RuntimeError("invalid parameter") - self.can.update(can_data_array, new_vals) - - cdef vector[SignalValue].iterator it = new_vals.begin() - cdef SignalValue* cv - while it != new_vals.end(): - cv = &deref(it) - - # Check if the address has changed - if cv.address != cur_address: - cur_address = cv.address - vl = self.vl[cur_address] - vl_all = self.vl_all[cur_address] - ts_nanos = self.ts_nanos[cur_address] - updated_addrs.add(cur_address) - - # Cast char * directly to unicode - cv_name = cv.name - vl[cv_name] = cv.value - vl_all[cv_name] = cv.all_values - ts_nanos[cv_name] = cv.ts_nanos - preinc(it) - - return updated_addrs + return self.can.update(can_data_array) @property def can_valid(self):