Skip to content

Commit

Permalink
Add MQTT connection callback config.
Browse files Browse the repository at this point in the history
This can be used for example to control a gateway LED.
  • Loading branch information
brocaar committed Dec 12, 2024
1 parent e42d47b commit c2a94e8
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 2 deletions.
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[toolchain]
channel = "1.80.1"
channel = "1.83.0"
components = ["rustfmt", "clippy"]
profile = "default"
16 changes: 16 additions & 0 deletions src/cmd/configfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,22 @@ pub fn run(config: &Configuration) {
{{/each}}
]
{{/each}}
# Callback commands.
#
# These are commands that are triggered by certain events (e.g. MQTT connected
# or error). These commands are intended to e.g. trigger a LED of a gateway.
# Commands are configured as an array, where the first item is the path to the
# command, and the (optional) remaining elements are the arguments. An empty
# array disables the callback.
[callbacks]
# On MQTT connected.
on_mqtt_connected=[]
# On MQTT connection error.
on_mqtt_connection_error=[]
"#;

let reg = Handlebars::new();
Expand Down
46 changes: 45 additions & 1 deletion src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::process::Stdio;

use anyhow::Result;
use chirpstack_api::gw;
use log::info;
use log::{error, info};
use once_cell::sync::OnceCell;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
Expand Down Expand Up @@ -74,9 +74,53 @@ pub async fn exec(pl: &gw::GatewayCommandExecRequest) -> Result<gw::GatewayComma
})
}

pub async fn exec_callback(cmd_args: &[String]) {
tokio::spawn({
let cmd_args = cmd_args.to_vec();

async move {
if cmd_args.is_empty() {
return;
}

info!("Executing callback, callback: {:?}", cmd_args);

let mut cmd = Command::new(&cmd_args[0]);
if cmd_args.len() > 1 {
cmd.args(&cmd_args[1..]);
}

if let Err(e) = cmd.output().await {
error!(
"Execute callback error, callback: {:?}, error: {}",
cmd_args, e
);
}
}
});
}

#[cfg(test)]
mod test {
use super::*;
use std::{env, fs};

#[tokio::test]
async fn test_exec_callback() {
let temp_file = env::temp_dir().join("test.txt");
fs::write(&temp_file, vec![]).unwrap();
assert!(fs::exists(&temp_file).unwrap());

exec_callback(&[
"rm".into(),
temp_file.clone().into_os_string().into_string().unwrap(),
])
.await;

tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

assert_eq!(false, fs::exists(&temp_file).unwrap());
}

#[tokio::test]
async fn test_commands() {
Expand Down
8 changes: 8 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub struct Configuration {
pub backend: Backend,
pub metadata: Metadata,
pub commands: HashMap<String, Vec<String>>,
pub callbacks: Callbacks,
}

impl Configuration {
Expand Down Expand Up @@ -168,3 +169,10 @@ pub struct Metadata {
pub r#static: HashMap<String, String>,
pub commands: HashMap<String, Vec<String>>,
}

#[derive(Serialize, Deserialize, Default)]
#[serde(default)]
pub struct Callbacks {
pub on_mqtt_connected: Vec<String>,
pub on_mqtt_connection_error: Vec<String>,
}
7 changes: 7 additions & 0 deletions src/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ pub async fn setup(conf: &Configuration) -> Result<()> {

// Eventloop
tokio::spawn({
let on_mqtt_connected = conf.callbacks.on_mqtt_connected.clone();
let on_mqtt_connection_error = conf.callbacks.on_mqtt_connection_error.clone();

async move {
info!("Starting MQTT event loop");

Expand All @@ -228,6 +231,8 @@ pub async fn setup(conf: &Configuration) -> Result<()> {
}
Event::Incoming(Incoming::ConnAck(v)) => {
if v.code == ConnectReturnCode::Success {
commands::exec_callback(&on_mqtt_connected).await;

if let Err(e) = connect_tx.try_send(()) {
error!("Send to subscribe channel error, error: {}", e);
}
Expand All @@ -240,6 +245,8 @@ pub async fn setup(conf: &Configuration) -> Result<()> {
}
}
Err(e) => {
commands::exec_callback(&on_mqtt_connection_error).await;

error!("MQTT error, error: {}", e);
sleep(Duration::from_secs(1)).await
}
Expand Down

0 comments on commit c2a94e8

Please sign in to comment.