Skip to content

Commit

Permalink
Implement stats message.
Browse files Browse the repository at this point in the history
This message is used to indicate that a Relay Gateway is still alive and
to collect the path between Relay Gateway and Border Gateway (in case
there are multiple Relay Gateways in-between).
  • Loading branch information
brocaar committed Jun 10, 2024
1 parent eff371f commit e988aa3
Show file tree
Hide file tree
Showing 15 changed files with 587 additions and 7 deletions.
21 changes: 19 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
"usage",
"derive",
] }
chirpstack_api = { version = "4.8", default-features = false }
chirpstack_api = { version = "4.9.0-test.1", default-features = false }
lrwn_filters = { version = "4.7", features = ["serde"] }
log = "0.4"
simple_logger = "5.0"
syslog = "6.1"
toml = "0.8"
handlebars = "5.1"
anyhow = "1.0"
humantime-serde = "1.1"
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.37", features = [
"macros",
Expand Down
14 changes: 14 additions & 0 deletions src/cache.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::time::UNIX_EPOCH;
use std::{collections::VecDeque, usize};

use crate::packets;
Expand Down Expand Up @@ -37,6 +38,7 @@ impl<T> Cache<T> {
pub struct PayloadCache {
p_type: packets::PayloadType,
uplink_id: u16,
timestamp: u32,
relay_id: [u8; 4],
}

Expand All @@ -49,11 +51,23 @@ impl From<&packets::MeshPacket> for PayloadCache {
p_type,
uplink_id: v.metadata.uplink_id,
relay_id: v.relay_id,
timestamp: 0,
},
packets::Payload::Downlink(v) => PayloadCache {
p_type,
uplink_id: v.metadata.uplink_id,
relay_id: v.relay_id,
timestamp: 0,
},
packets::Payload::Stats(v) => PayloadCache {
p_type,
uplink_id: 0,
relay_id: v.relay_id,
timestamp: v
.timestamp
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as u32,
},
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/cmd/configfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ pub fn run() {
# uplinks and forward these to the proxy API, rather than relaying these.
border_gateway={{ mesh.border_gateway }}
# Stats interval (Relay Gateway only).
#
# This defines the interval in which a Relay Gateway (border_gateway=false)
# will emit stats messages.
stats_interval="{{ mesh.stats_interval }}"
# Max hop count.
#
# This defines the maximum number of hops a relayed payload will pass.
Expand Down
3 changes: 2 additions & 1 deletion src/cmd/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ use signal_hook::consts::signal::*;
use signal_hook_tokio::Signals;

use crate::config::Configuration;
use crate::{backend, proxy};
use crate::{backend, proxy, stats};

pub async fn run(conf: &Configuration) -> Result<()> {
proxy::setup(conf).await?;
backend::setup(conf).await?;
stats::setup(conf).await?;

let mut signals = Signals::new([SIGINT, SIGTERM])?;
let handle = signals.handle();
Expand Down
4 changes: 4 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::fs;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use anyhow::Result;
use once_cell::sync::OnceCell;
Expand Down Expand Up @@ -51,6 +52,8 @@ impl Default for Logging {
#[serde(default)]
pub struct Mesh {
pub signing_key: Aes128Key,
#[serde(with = "humantime_serde")]
pub stats_interval: Duration,
pub frequencies: Vec<u32>,
pub data_rate: DataRate,
pub tx_power: i32,
Expand All @@ -65,6 +68,7 @@ impl Default for Mesh {
fn default() -> Self {
Mesh {
signing_key: Aes128Key::null(),
stats_interval: Duration::from_secs(300),
frequencies: vec![868100000, 868300000, 868500000],
data_rate: DataRate {
modulation: Modulation::LORA,
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ pub mod logging;
pub mod mesh;
pub mod packets;
pub mod proxy;
pub mod stats;
45 changes: 42 additions & 3 deletions src/mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ pub async fn handle_mesh(border_gateway: bool, pl: gw::UplinkFrame) -> Result<()
};

match border_gateway {
// In this case we only care about proxy-ing relayed uplinks
// Proxy relayed uplink
true => match packet.mhdr.payload_type {
PayloadType::Uplink => proxy_uplink_mesh_packet(&pl, packet).await,
PayloadType::Stats => proxy_stats_mesh_packet(&pl, packet).await,
_ => Ok(()),
},
false => relay_mesh_packet(&pl, packet).await,
Expand Down Expand Up @@ -151,11 +152,35 @@ async fn proxy_uplink_mesh_packet(pl: &gw::UplinkFrame, packet: MeshPacket) -> R
proxy::send_uplink(&pl).await
}

async fn proxy_stats_mesh_packet(pl: &gw::UplinkFrame, packet: MeshPacket) -> Result<()> {
let mesh_pl = match &packet.payload {
Payload::Stats(v) => v,
_ => {
return Err(anyhow!("Expected Stats payload"));
}
};

info!(
"Unwrapping relay stats packet, uplink_id: {}, mesh_packet: {}",
pl.rx_info.as_ref().map(|v| v.uplink_id).unwrap_or_default(),
packet
);

let stats_pl = gw::MeshStats {
gateway_id: hex::encode(backend::get_gateway_id().await?),
relay_id: hex::encode(mesh_pl.relay_id),
relay_path: mesh_pl.relay_path.iter().map(hex::encode).collect(),
time: Some(mesh_pl.timestamp.into()),
};

proxy::send_mesh_stats(&stats_pl).await
}

async fn relay_mesh_packet(_: &gw::UplinkFrame, mut packet: MeshPacket) -> Result<()> {
let conf = config::get();
let relay_id = backend::get_relay_id().await?;

match &packet.payload {
match &mut packet.payload {
packets::Payload::Uplink(pl) => {
if pl.relay_id == relay_id {
trace!("Dropping packet as this relay was the sender");
Expand Down Expand Up @@ -203,13 +228,27 @@ async fn relay_mesh_packet(_: &gw::UplinkFrame, mut packet: MeshPacket) -> Resul
return helpers::tx_ack_to_err(&backend::send_downlink(&pl).await?);
}
}
packets::Payload::Stats(pl) => {
if pl.relay_id == relay_id {
trace!("Dropping packet as this relay was the sender");

// Drop the packet, as we are the sender.
return Ok(());
}

// Add our Relay ID to the path.
pl.relay_path.push(relay_id);
}
}

// In any other case, we increment the hop_count and re-transmit the mesh encapsulated
// packet.

// Increment hop count.
packet.mhdr.hop_count += 1;

// We need to re-set the MIC as we have changed the payload by incrementing
// the hop count (and in casee of stats, we have modified the Relay path).
packet.set_mic(conf.mesh.signing_key)?;

if packet.mhdr.hop_count > conf.mesh.max_hop_count {
Expand Down Expand Up @@ -429,7 +468,7 @@ async fn relay_downlink_lora_packet(pl: &gw::DownlinkFrame) -> Result<gw::Downli
})
}

fn get_mesh_frequency(conf: &Configuration) -> Result<u32> {
pub fn get_mesh_frequency(conf: &Configuration) -> Result<u32> {
if conf.mesh.frequencies.is_empty() {
return Err(anyhow!("No mesh frequencies are configured"));
}
Expand Down
Loading

0 comments on commit e988aa3

Please sign in to comment.