Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor CAN Parser #1046

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions opendbc/can/common.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <map>
#include <set>
#include <string>
#include <utility>
#include <unordered_map>
Expand Down Expand Up @@ -46,8 +47,7 @@ class MessageState {
unsigned int size;

std::vector<Signal> parse_sigs;
std::vector<double> vals;
std::vector<std::vector<double>> all_vals;
std::map<std::string, SignalValue> values;

uint64_t last_seen_nanos;
uint64_t check_threshold;
Expand Down Expand Up @@ -80,11 +80,11 @@ class CANParser {
CANParser(int abus, const std::string& dbc_name,
const std::vector<std::pair<uint32_t, int>> &messages);
CANParser(int abus, const std::string& dbc_name, bool ignore_checksum, bool ignore_counter);
void update(const std::vector<CanData> &can_data, std::vector<SignalValue> &vals);
void query_latest(std::vector<SignalValue> &vals, uint64_t last_ts = 0);
MessageState *messageState(uint32_t address) { return &message_states.at(address); }
std::set<uint32_t> update(const std::vector<CanData> &can_data, bool sendcan);

protected:
void UpdateCans(const CanData &can);
void UpdateCans(const CanData &can, std::set<uint32_t> &updated_addresses);
void UpdateValid(uint64_t nanos);
};

Expand Down
10 changes: 7 additions & 3 deletions opendbc/can/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

from libc.stdint cimport uint8_t, uint32_t, uint64_t
from libcpp cimport bool
from libcpp.map cimport map
from libcpp.pair cimport pair
from libcpp.set cimport set
from libcpp.string cimport string
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map
Expand Down Expand Up @@ -54,9 +56,7 @@ cdef extern from "common_dbc.h":
unordered_map[string, const Msg*] name_to_msg

cdef struct SignalValue:
uint32_t address
uint64_t ts_nanos
string name
double value
vector[double] all_values

Expand All @@ -68,6 +68,9 @@ cdef extern from "common_dbc.h":
cdef extern from "common.h":
cdef const DBC* dbc_lookup(const string) except +

cdef cppclass MessageState:
map[string, SignalValue] values

cdef struct CanFrame:
long src
uint32_t address
Expand All @@ -81,7 +84,8 @@ cdef extern from "common.h":
bool can_valid
bool bus_timeout
CANParser(int, string, vector[pair[uint32_t, int]]) except +
void update(vector[CanData]&, vector[SignalValue]&) except +
MessageState *messageState(uint32_t address)
set[uint32_t] update(vector[CanData]&, bool) except +

cdef cppclass CANPacker:
CANPacker(string)
Expand Down
2 changes: 0 additions & 2 deletions opendbc/can/common_dbc.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ struct SignalPackValue {
};

struct SignalValue {
uint32_t address;
uint64_t ts_nanos;
std::string name;
double value; // latest value
std::vector<double> all_values; // all values from this cycle
};
Expand Down
75 changes: 29 additions & 46 deletions opendbc/can/parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@
#include <stdexcept>
#include <sstream>

#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/mman.h>

#include "opendbc/can/common.h"

int64_t get_raw_value(const std::vector<uint8_t> &msg, const Signal &sig) {
Expand All @@ -31,13 +26,12 @@ int64_t get_raw_value(const std::vector<uint8_t> &msg, const Signal &sig) {
return ret;
}


bool MessageState::parse(uint64_t nanos, const std::vector<uint8_t> &dat) {
std::vector<double> tmp_vals(parse_sigs.size());
bool checksum_failed = false;
bool counter_failed = false;

for (int i = 0; i < parse_sigs.size(); i++) {
for (int i = 0; i < parse_sigs.size(); ++i) {
const auto &sig = parse_sigs[i];

int64_t tmp = get_raw_value(dat, sig);
Expand Down Expand Up @@ -68,9 +62,11 @@ bool MessageState::parse(uint64_t nanos, const std::vector<uint8_t> &dat) {
return false;
}

for (int i = 0; i < parse_sigs.size(); i++) {
vals[i] = tmp_vals[i];
all_vals[i].push_back(vals[i]);
for (int i = 0; i < parse_sigs.size(); ++i) {
auto &val = values[parse_sigs[i].name];
val.value = tmp_vals[i];
val.ts_nanos = nanos;
val.all_values.push_back(val.value);
}
last_seen_nanos = nanos;

Expand Down Expand Up @@ -126,8 +122,9 @@ CANParser::CANParser(int abus, const std::string& dbc_name, const std::vector<st

// track all signals for this message
state.parse_sigs = msg->sigs;
state.vals.resize(msg->sigs.size());
state.all_vals.resize(msg->sigs.size());
for (auto &sig : msg->sigs) {
state.values[sig.name] = {};
}
}
}

Expand All @@ -147,17 +144,24 @@ CANParser::CANParser(int abus, const std::string& dbc_name, bool ignore_checksum
.ignore_counter = ignore_counter,
};

for (const auto& sig : msg.sigs) {
state.parse_sigs.push_back(sig);
state.vals.push_back(0);
state.all_vals.push_back({});
}

state.parse_sigs = msg.sigs;
message_states[state.address] = state;
// Initialize value entries for each signal in the message
for (auto &sig : msg.sigs) {
message_states[state.address].values[sig.name] = {};
}
}
}

void CANParser::update(const std::vector<CanData> &can_data, std::vector<SignalValue> &vals) {
std::set<uint32_t> CANParser::update(const std::vector<CanData> &can_data, bool sendcan) {
// Clear all_values
for (auto &state : message_states) {
for (auto &value : state.second.values) {
value.second.all_values.clear();
}
}

std::set<uint32_t> updated_addresses;
uint64_t current_nanos = 0;
for (const auto &c : can_data) {
if (first_nanos == 0) {
Expand All @@ -168,13 +172,13 @@ void CANParser::update(const std::vector<CanData> &can_data, std::vector<SignalV
}
last_nanos = c.nanos;

UpdateCans(c);
UpdateCans(c, updated_addresses);
UpdateValid(last_nanos);
}
query_latest(vals, current_nanos);
return updated_addresses;
}

void CANParser::UpdateCans(const CanData &can) {
void CANParser::UpdateCans(const CanData &can, std::set<uint32_t> &updated_addresses) {
//DEBUG("got %zu messages\n", can.frames.size());

bool bus_empty = true;
Expand Down Expand Up @@ -202,7 +206,9 @@ void CANParser::UpdateCans(const CanData &can) {
// continue;
//}

state_it->second.parse(can.nanos, frame.dat);
if (state_it->second.parse(can.nanos, frame.dat)) {
updated_addresses.insert(state_it->first);
}
}

// update bus timeout
Expand Down Expand Up @@ -240,26 +246,3 @@ void CANParser::UpdateValid(uint64_t nanos) {
can_invalid_cnt = _valid ? 0 : (can_invalid_cnt + 1);
can_valid = (can_invalid_cnt < CAN_INVALID_CNT) && _counters_valid;
}

void CANParser::query_latest(std::vector<SignalValue> &vals, uint64_t last_ts) {
if (last_ts == 0) {
last_ts = last_nanos;
}
for (auto& kv : message_states) {
auto& state = kv.second;
if (last_ts != 0 && state.last_seen_nanos < last_ts) {
continue;
}

for (int i = 0; i < state.parse_sigs.size(); i++) {
const Signal &sig = state.parse_sigs[i];
SignalValue &v = vals.emplace_back();
v.address = state.address;
v.ts_nanos = state.last_seen_nanos;
v.name = sig.name;
v.value = state.vals[i];
v.all_values = state.all_vals[i];
state.all_vals[i].clear();
}
}
}
100 changes: 53 additions & 47 deletions opendbc/can/parser_pyx.pyx
Original file line number Diff line number Diff line change
@@ -1,24 +1,63 @@
# distutils: language = c++
# cython: c_string_encoding=ascii, language_level=3

from cython.operator cimport dereference as deref, preincrement as preinc
from libcpp.pair cimport pair
from libcpp.string cimport string
from libcpp.vector cimport vector
from libc.stdint cimport uint32_t

from .common cimport CANParser as cpp_CANParser
from .common cimport dbc_lookup, SignalValue, DBC, CanData, CanFrame
from .common cimport MessageState as cpp_MessageState
from .common cimport dbc_lookup, DBC, CanData, CanFrame

import numbers
from collections import defaultdict
from collections.abc import Mapping

cdef class MessageState:
cdef cpp_MessageState *state
cdef list signal_names

@property
def names(self):
return self.signal_names

def value(self, name):
return self.state.values.at(name).value

def all_values(self, name):
return self.state.values.at(name).all_values

def ts_nanos(self, name):
return self.state.values.at(name).ts_nanos

@staticmethod
cdef create(cpp_MessageState *s):
state = MessageState()
state.state = s
state.signal_names = [it.first.decode("utf-8") for it in s.values]
return state


class ValueDict(Mapping):
def __init__(self, MessageState state, fetch_func):
self.state = state
self.fetch_func = fetch_func

def __getitem__(self, key):
return self.fetch_func(self.state, key)

def __iter__(self):
return iter(self.state.names)

def __len__(self):
return len(self.state.names)


cdef class CANParser:
cdef:
cpp_CANParser *can
const DBC *dbc
vector[uint32_t] addresses

cdef readonly:
dict vl
Expand All @@ -45,20 +84,18 @@ cdef class CANParser:
except IndexError:
raise RuntimeError(f"could not find message {repr(c[0])} in DBC {self.dbc_name}")

address = m.address
message_v.push_back((address, c[1]))
self.addresses.push_back(address)

name = m.name.decode("utf8")
self.vl[address] = {}
self.vl[name] = self.vl[address]
self.vl_all[address] = defaultdict(list)
self.vl_all[name] = self.vl_all[address]
self.ts_nanos[address] = {}
self.ts_nanos[name] = self.ts_nanos[address]
message_v.push_back((m.address, c[1]))

self.can = new cpp_CANParser(bus, dbc_name, message_v)
self.update_strings([])

# Populate dictionaries with ValueDict
for address, _ in message_v:
m = self.dbc.addr_to_msg.at(address)
name = m.name.decode("utf8")
state = MessageState.create(self.can.messageState(address))
self.vl[name] = self.vl[address] = ValueDict(state, MessageState.value)
self.vl_all[name] = self.vl_all[address] = ValueDict(state, MessageState.all_values)
self.ts_nanos[name] = self.ts_nanos[address] = ValueDict(state, MessageState.ts_nanos)

def __dealloc__(self):
if self.can:
Expand All @@ -68,16 +105,7 @@ cdef class CANParser:
# input format:
# [nanos, [[address, data, src], ...]]
# [[nanos, [[address, data, src], ...], ...]]
for address in self.addresses:
self.vl_all[address].clear()

cur_address = -1
vl = {}
vl_all = {}
ts_nanos = {}
updated_addrs = set()

cdef vector[SignalValue] new_vals
cdef CanFrame* frame
cdef CanData* can_data
cdef vector[CanData] can_data_array
Expand All @@ -99,29 +127,7 @@ cdef class CANParser:
except TypeError:
raise RuntimeError("invalid parameter")

self.can.update(can_data_array, new_vals)

cdef vector[SignalValue].iterator it = new_vals.begin()
cdef SignalValue* cv
while it != new_vals.end():
cv = &deref(it)

# Check if the address has changed
if cv.address != cur_address:
cur_address = cv.address
vl = self.vl[cur_address]
vl_all = self.vl_all[cur_address]
ts_nanos = self.ts_nanos[cur_address]
updated_addrs.add(cur_address)

# Cast char * directly to unicode
cv_name = <unicode>cv.name
vl[cv_name] = cv.value
vl_all[cv_name] = cv.all_values
ts_nanos[cv_name] = cv.ts_nanos
preinc(it)

return updated_addrs
return self.can.update(can_data_array, sendcan)

@property
def can_valid(self):
Expand Down
Loading