diff --git a/homie-influx/debian-scripts/homie-influx.service b/homie-influx/debian-scripts/homie-influx.service index 271ccac8..3fd74679 100644 --- a/homie-influx/debian-scripts/homie-influx.service +++ b/homie-influx/debian-scripts/homie-influx.service @@ -10,6 +10,7 @@ Environment=RUST_BACKTRACE=1 Environment=RUST_LIB_BACKTRACE=1 ExecStart=/usr/bin/homie-influx Restart=always +RestartSec=5 [Install] WantedBy=multi-user.target diff --git a/homie-influx/homie-influx.example.toml b/homie-influx/homie-influx.example.toml index 782634c3..20e64e0f 100644 --- a/homie-influx/homie-influx.example.toml +++ b/homie-influx/homie-influx.example.toml @@ -15,6 +15,8 @@ client_prefix="homie-influx" #password="" # Whether to use TLS for the connection to the MQTT broker. use_tls=false +# How long to wait between reconnection attempts if the MQTT connection is dropped. +reconnect_interval_seconds=5 [influxdb] # The URL of the InfluxDB to which to connect. diff --git a/homie-influx/src/config.rs b/homie-influx/src/config.rs index 01f805ae..a0808806 100644 --- a/homie-influx/src/config.rs +++ b/homie-influx/src/config.rs @@ -3,15 +3,18 @@ use influx_db_client::reqwest::Url; use influx_db_client::Client; use rumqttc::MqttOptions; use rustls::ClientConfig; +use serde::{Deserialize as _, Deserializer}; use serde_derive::Deserialize; use stable_eyre::eyre; use stable_eyre::eyre::WrapErr; use std::fs::read_to_string; use std::sync::Arc; +use std::time::Duration; const DEFAULT_MQTT_CLIENT_PREFIX: &str = "homie-influx"; const DEFAULT_MQTT_HOST: &str = "test.mosquitto.org"; const DEFAULT_MQTT_PORT: u16 = 1883; +const DEFAULT_MQTT_RECONNECT_INTERVAL: Duration = Duration::from_secs(5); const DEFAULT_INFLUXDB_URL: &str = "http://localhost:8086"; const CONFIG_FILENAME: &str = "homie-influx.toml"; const DEFAULT_MAPPINGS_FILENAME: &str = "mappings.toml"; @@ -45,6 +48,17 @@ pub struct MqttConfig { pub username: Option, pub password: Option, pub client_prefix: String, + #[serde( + deserialize_with = "de_duration_seconds", + rename = "reconnect_interval_seconds" + )] + pub reconnect_interval: Duration, +} + +/// Deserialize an integer as a number of seconds. +fn de_duration_seconds<'de, D: Deserializer<'de>>(d: D) -> Result { + let seconds = u64::deserialize(d)?; + Ok(Duration::from_secs(seconds)) } impl Default for MqttConfig { @@ -56,6 +70,7 @@ impl Default for MqttConfig { username: None, password: None, client_prefix: DEFAULT_MQTT_CLIENT_PREFIX.to_owned(), + reconnect_interval: DEFAULT_MQTT_RECONNECT_INTERVAL, } } } @@ -140,6 +155,7 @@ pub fn get_mqtt_options(config: &MqttConfig, client_name_suffix: &str) -> MqttOp let client_name = format!("{}-{}", config.client_prefix, client_name_suffix); let mut mqtt_options = MqttOptions::new(client_name, &config.host, config.port); mqtt_options.set_keep_alive(5); + mqtt_options.set_clean_session(false); if let (Some(username), Some(password)) = (&config.username, &config.password) { mqtt_options.set_credentials(username, password); diff --git a/homie-influx/src/main.rs b/homie-influx/src/main.rs index 0ce6a8b2..13c9dbf9 100644 --- a/homie-influx/src/main.rs +++ b/homie-influx/src/main.rs @@ -4,13 +4,14 @@ mod influx; use crate::config::{get_influxdb_client, get_mqtt_options, read_mappings, Config}; use crate::influx::send_property_value; use futures::future::try_join_all; -use futures::FutureExt; -use homie_controller::{Event, HomieController, HomieEventLoop}; +use homie_controller::{Event, HomieController, HomieEventLoop, PollError}; use influx_db_client::Client; +use rumqttc::ConnectionError; use stable_eyre::eyre; -use stable_eyre::eyre::WrapErr; use std::sync::Arc; +use std::time::Duration; use tokio::task::{self, JoinHandle}; +use tokio::time::delay_for; #[tokio::main] async fn main() -> Result<(), eyre::Report> { @@ -31,57 +32,41 @@ async fn main() -> Result<(), eyre::Report> { let influxdb_client = get_influxdb_client(&config.influxdb, &mapping.influxdb_database)?; - let handle = spawn_homie_poll_loop(event_loop, controller.clone(), influxdb_client); + let handle = spawn_homie_poll_loop( + event_loop, + controller.clone(), + influxdb_client, + config.mqtt.reconnect_interval, + ); controller.start().await?; - join_handles.push(handle.map(|res| Ok(res??))); + join_handles.push(handle); } - simplify_unit_vec(try_join_all(join_handles).await) + try_join_all(join_handles).await?; + Ok(()) } fn spawn_homie_poll_loop( mut event_loop: HomieEventLoop, controller: Arc, influx_db_client: Client, -) -> JoinHandle> { + reconnect_interval: Duration, +) -> JoinHandle<()> { task::spawn(async move { loop { - if let Some(event) = controller.poll(&mut event_loop).await.wrap_err_with(|| { - format!( - "Failed to poll HomieController for base topic '{}'.", - controller.base_topic() - ) - })? { - match event { - Event::PropertyValueChanged { - device_id, - node_id, - property_id, - value, - fresh, - } => { - log::trace!( - "{}/{}/{}/{} = {} ({})", - controller.base_topic(), - device_id, - node_id, - property_id, - value, - fresh - ); - if fresh { - send_property_value( - controller.as_ref(), - &influx_db_client, - device_id, - node_id, - property_id, - ) - .await?; - } - } - _ => { - log::info!("{} Event: {:?}", controller.base_topic(), event); + match controller.poll(&mut event_loop).await { + Ok(Some(event)) => { + handle_event(controller.as_ref(), &influx_db_client, event).await; + } + Ok(None) => {} + Err(e) => { + log::error!( + "Failed to poll HomieController for base topic '{}': {}", + controller.base_topic(), + e + ); + if let PollError::Connection(ConnectionError::Io(_)) = e { + delay_for(reconnect_interval).await; } } } @@ -89,6 +74,40 @@ fn spawn_homie_poll_loop( }) } -fn simplify_unit_vec(m: Result, E>) -> Result<(), E> { - m.map(|_| ()) +async fn handle_event(controller: &HomieController, influx_db_client: &Client, event: Event) { + match event { + Event::PropertyValueChanged { + device_id, + node_id, + property_id, + value, + fresh, + } => { + log::trace!( + "{}/{}/{}/{} = {} ({})", + controller.base_topic(), + device_id, + node_id, + property_id, + value, + fresh + ); + if fresh { + if let Err(e) = send_property_value( + controller, + influx_db_client, + device_id, + node_id, + property_id, + ) + .await + { + log::error!("{:?}", e); + } + } + } + _ => { + log::info!("{} Event: {:?}", controller.base_topic(), event); + } + } }