Skip to content

Commit

Permalink
remove dep on cereal
Browse files Browse the repository at this point in the history
  • Loading branch information
deanlee committed Sep 5, 2023
1 parent ef302f7 commit 97e3e8f
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 134 deletions.
4 changes: 2 additions & 2 deletions can/SConscript
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
Import('env', 'envCython', 'cereal', 'common')
Import('env', 'envCython', 'common')

import os

envDBC = env.Clone()
dbc_file_path = '-DDBC_FILE_PATH=\'"%s"\'' % (envDBC.Dir("..").abspath)
envDBC['CXXFLAGS'] += [dbc_file_path]
src = ["dbc.cc", "parser.cc", "packer.cc", "common.cc"]
libs = [common, "capnp", "kj", "zmq"]
libs = [common, "zmq"]

# shared library for openpilot
libdbc = envDBC.SharedLibrary('libdbc', src, LIBS=libs)
Expand Down
27 changes: 10 additions & 17 deletions can/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,6 @@
#include <unordered_map>
#include <vector>

#include <capnp/dynamic.h>
#include <capnp/serialize.h>

#ifndef DYNAMIC_CAPNP
#include "cereal/gen/cpp/log.capnp.h"
#endif

#include "opendbc/can/common_dbc.h"

#define INFO printf
Expand Down Expand Up @@ -58,34 +51,34 @@ class MessageState {
bool update_counter_generic(int64_t v, int cnt_size);
};

struct CanFrame {
uint64_t ts;
long src;
long address;
std::vector<uint8_t> dat;
};

class CANParser {
private:
const int bus;
kj::Array<capnp::word> aligned_buf;

const DBC *dbc = NULL;
std::unordered_map<uint32_t, MessageState> message_states;

public:
bool can_valid = false;
bool bus_timeout = false;
uint64_t first_sec = 0;
uint64_t last_sec = 0;
uint64_t last_nonempty_sec = 0;
uint64_t bus_timeout_threshold = 0;
uint64_t can_invalid_cnt = CAN_INVALID_CNT;

CANParser(int abus, const std::string& dbc_name,
const std::vector<std::pair<uint32_t, int>> &messages);
CANParser(int abus, const std::string& dbc_name, bool ignore_checksum, bool ignore_counter);
#ifndef DYNAMIC_CAPNP
void update_string(const std::string &data, bool sendcan);
void update_strings(const std::vector<std::string> &data, std::vector<SignalValue> &vals, bool sendcan);
void UpdateCans(uint64_t sec, const capnp::List<cereal::CanData>::Reader& cans);
#endif
void UpdateCans(uint64_t sec, const capnp::DynamicStruct::Reader& cans);
void update_frames(uint64_t frame_fist_sec, const std::vector<CanFrame> &frames, std::vector<SignalValue> &vals);
bool updateFrame(const CanFrame &data);
void UpdateValid(uint64_t sec);
void query_latest(std::vector<SignalValue> &vals, uint64_t last_ts = 0);
void query_latest(std::vector<SignalValue> &vals, uint64_t frame_fist_sec = 0);
};

class CANPacker {
Expand Down
8 changes: 7 additions & 1 deletion can/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,17 @@ cdef extern from "common_dbc.h":
cdef extern from "common.h":
cdef const DBC* dbc_lookup(const string)

cdef struct CanFrame:
uint64_t ts
long src
long address
vector[uint8_t] dat

cdef cppclass CANParser:
bool can_valid
bool bus_timeout
CANParser(int, string, vector[pair[uint32_t, int]]) except +
void update_strings(vector[string]&, vector[SignalValue]&, bool) except +
void update_frames(uint64_t sec, vector[CanFrame]&, vector[SignalValue]&) except +

cdef cppclass CANPacker:
CANPacker(string)
Expand Down
144 changes: 36 additions & 108 deletions can/parser.cc
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
#include <algorithm>
#include <cassert>
#include <cstring>
#include <limits>
#include <stdexcept>
#include <sstream>

#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <stdexcept>

#include "cereal/logger/logger.h"
#include "opendbc/can/common.h"
Expand Down Expand Up @@ -91,8 +85,7 @@ bool MessageState::update_counter_generic(int64_t v, int cnt_size) {
}


CANParser::CANParser(int abus, const std::string& dbc_name, const std::vector<std::pair<uint32_t, int>> &messages)
: bus(abus), aligned_buf(kj::heapArray<capnp::word>(1024)) {
CANParser::CANParser(int abus, const std::string& dbc_name, const std::vector<std::pair<uint32_t, int>> &messages) : bus(abus) {
dbc = dbc_lookup(dbc_name);
assert(dbc);
init_crc_lookup_tables();
Expand All @@ -101,7 +94,7 @@ CANParser::CANParser(int abus, const std::string& dbc_name, const std::vector<st

for (const auto& [address, frequency] : messages) {
// disallow duplicate message checks
if (message_states.find(address) != message_states.end()) {
if (message_states.find(address) != message_states.end()) {
std::stringstream is;
is << "Duplicate Message Check: " << address;
throw std::runtime_error(is.str());
Expand Down Expand Up @@ -142,10 +135,8 @@ CANParser::CANParser(int abus, const std::string& dbc_name, const std::vector<st
}
}

CANParser::CANParser(int abus, const std::string& dbc_name, bool ignore_checksum, bool ignore_counter)
: bus(abus) {
CANParser::CANParser(int abus, const std::string& dbc_name, bool ignore_checksum, bool ignore_counter) : bus(abus) {
// Add all messages and signals

dbc = dbc_lookup(dbc_name);
assert(dbc);
init_crc_lookup_tables();
Expand All @@ -169,111 +160,51 @@ CANParser::CANParser(int abus, const std::string& dbc_name, bool ignore_checksum
}
}

#ifndef DYNAMIC_CAPNP
void CANParser::update_string(const std::string &data, bool sendcan) {
// format for board, make copy due to alignment issues.
const size_t buf_size = (data.length() / sizeof(capnp::word)) + 1;
if (aligned_buf.size() < buf_size) {
aligned_buf = kj::heapArray<capnp::word>(buf_size);
}
memcpy(aligned_buf.begin(), data.data(), data.length());

// extract the messages
capnp::FlatArrayMessageReader cmsg(aligned_buf.slice(0, buf_size));
cereal::Event::Reader event = cmsg.getRoot<cereal::Event>();

void CANParser::update_frames(uint64_t frame_first_sec, const std::vector<CanFrame> &frames, std::vector<SignalValue> &vals) {
if (first_sec == 0) {
first_sec = event.getLogMonoTime();
first_sec = frame_first_sec;
}
last_sec = event.getLogMonoTime();

auto cans = sendcan ? event.getSendcan() : event.getCan();
UpdateCans(last_sec, cans);

UpdateValid(last_sec);
}

void CANParser::update_strings(const std::vector<std::string> &data, std::vector<SignalValue> &vals, bool sendcan) {
uint64_t current_sec = 0;
for (const auto &d : data) {
update_string(d, sendcan);
if (current_sec == 0) {
current_sec = last_sec;
uint64_t last_sec = frame_first_sec;
for (const auto &f : frames) {
if (updateFrame(f)) {
last_sec = std::max(last_sec, f.ts);
last_nonempty_sec = f.ts;
}
}
query_latest(vals, current_sec);
}

void CANParser::UpdateCans(uint64_t sec, const capnp::List<cereal::CanData>::Reader& cans) {
//DEBUG("got %d messages\n", cans.size());

bool bus_empty = true;

// parse the messages
for (const auto cmsg : cans) {
if (cmsg.getSrc() != bus) {
// DEBUG("skip %d: wrong bus\n", cmsg.getAddress());
continue;
}
bus_empty = false;

auto state_it = message_states.find(cmsg.getAddress());
if (state_it == message_states.end()) {
// DEBUG("skip %d: not specified\n", cmsg.getAddress());
continue;
}

auto dat = cmsg.getDat();

if (dat.size() > 64) {
DEBUG("got message longer than 64 bytes: 0x%X %zu\n", cmsg.getAddress(), dat.size());
continue;
}

// TODO: this actually triggers for some cars. fix and enable this
//if (dat.size() != state_it->second.size) {
// DEBUG("got message with unexpected length: expected %d, got %zu for %d", state_it->second.size, dat.size(), cmsg.getAddress());
// continue;
//}

std::vector<uint8_t> data(dat.size(), 0);
memcpy(data.data(), dat.begin(), dat.size());
state_it->second.parse(sec, data);
}

// update bus timeout
if (!bus_empty) {
last_nonempty_sec = sec;
}
bus_timeout = (sec - last_nonempty_sec) > bus_timeout_threshold;
bus_timeout = (last_sec - last_nonempty_sec) > bus_timeout_threshold;
UpdateValid(last_sec);
query_latest(vals, frame_first_sec);
}
#endif

void CANParser::UpdateCans(uint64_t sec, const capnp::DynamicStruct::Reader& cmsg) {
// assume message struct is `cereal::CanData` and parse
assert(cmsg.has("address") && cmsg.has("src") && cmsg.has("dat") && cmsg.has("busTime"));

if (cmsg.get("src").as<uint8_t>() != bus) {
DEBUG("skip %d: wrong bus\n", cmsg.get("address").as<uint32_t>());
return;
bool CANParser::updateFrame(const CanFrame &frame) {
// DEBUG("got %d messages\n", cans.size());
if (frame.src != bus) {
// DEBUG("skip %d: wrong bus\n", cmsg.getAddress());
return false;
}

auto state_it = message_states.find(cmsg.get("address").as<uint32_t>());
auto state_it = message_states.find(frame.address);
if (state_it == message_states.end()) {
DEBUG("skip %d: not specified\n", cmsg.get("address").as<uint32_t>());
return;
// DEBUG("skip %d: not specified\n", cmsg.getAddress());
return false;
}

auto dat = cmsg.get("dat").as<capnp::Data>();
if (dat.size() > 64) return; // shouldn't ever happen
std::vector<uint8_t> data(dat.size(), 0);
memcpy(data.data(), dat.begin(), dat.size());
state_it->second.parse(sec, data);
if (frame.dat.size() > 64) {
DEBUG("got message longer than 64 bytes: 0x%X %zu\n", frame.address, frame.dat.size());
return false;
}

// TODO: this actually triggers for some cars. fix and enable this
// if (dat.size() != state_it->second.size) {
// DEBUG("got message with unexpected length: expected %d, got %zu for %d", state_it->second.size, dat.size(), cmsg.getAddress());
// continue;
//}
return state_it->second.parse(frame.ts, frame.dat);
}

void CANParser::UpdateValid(uint64_t sec) {
const bool show_missing = (last_sec - first_sec) > 8e9;

const bool show_missing = (sec - first_sec) > 8e9;
bool _valid = true;
bool _counters_valid = true;
for (const auto& kv : message_states) {
Expand All @@ -300,13 +231,10 @@ void CANParser::UpdateValid(uint64_t sec) {
can_valid = (can_invalid_cnt < CAN_INVALID_CNT) && _counters_valid;
}

void CANParser::query_latest(std::vector<SignalValue> &vals, uint64_t last_ts) {
if (last_ts == 0) {
last_ts = last_sec;
}
void CANParser::query_latest(std::vector<SignalValue> &vals, uint64_t frame_fist_sec) {
for (auto& kv : message_states) {
auto& state = kv.second;
if (last_ts != 0 && state.last_seen_nanos < last_ts) {
if (frame_fist_sec != 0 && state.last_seen_nanos < frame_fist_sec) {
continue;
}

Expand Down
34 changes: 28 additions & 6 deletions can/parser_pyx.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ from libcpp.pair cimport pair
from libcpp.string cimport string
from libcpp.vector cimport vector
from libcpp.unordered_set cimport unordered_set
from libc.stdint cimport uint32_t
from libc.stdint cimport uint32_t, uint64_t

from .common cimport CANParser as cpp_CANParser
from .common cimport dbc_lookup, SignalValue, DBC
from .common cimport dbc_lookup, SignalValue, DBC, CanFrame

import numbers
import capnp
from cereal import log
from collections import defaultdict


cdef class CANParser:
cdef:
cpp_CANParser *can
Expand Down Expand Up @@ -70,12 +71,33 @@ cdef class CANParser:
for l in v.values(): # no-cython-lint
l.clear()

cdef vector[CanFrame] can_frames
cdef CanFrame* frame
cdef uint64_t frame_fist_sec = 0
try:
for s in strings:
with log.Event.from_bytes(s) as msg:
if frame_fist_sec == 0:
frame_fist_sec = msg.logMonoTime
else:
frame_fist_sec = min(frame_fist_sec, msg.logMonoTime)

can = msg.sendCan if sendcan else msg.can
for c in can:
frame = &(can_frames.emplace_back())
frame.ts = msg.logMonoTime
frame.src = c.src
frame.address = c.address
frame.dat = c.dat
except capnp.lib.capnp.KjException as ex:
raise RuntimeError(str(ex))

cdef vector[SignalValue] new_vals
cdef unordered_set[uint32_t] updated_addrs
self.can.update_frames(frame_fist_sec, can_frames, new_vals)

self.can.update_strings(strings, new_vals, sendcan)
cdef vector[SignalValue].iterator it = new_vals.begin()
cdef unordered_set[uint32_t] updated_addrs
cdef SignalValue* cv
cdef vector[SignalValue].iterator it = new_vals.begin()
while it != new_vals.end():
cv = &deref(it)
# Cast char * directly to unicode
Expand Down

0 comments on commit 97e3e8f

Please sign in to comment.