Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnect to MQTT broker if connection is dropped #98

Merged
merged 5 commits into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions homie-influx/debian-scripts/homie-influx.service
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Environment=RUST_BACKTRACE=1
Environment=RUST_LIB_BACKTRACE=1
ExecStart=/usr/bin/homie-influx
Restart=always
RestartSec=5

[Install]
WantedBy=multi-user.target
2 changes: 2 additions & 0 deletions homie-influx/homie-influx.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ client_prefix="homie-influx"
#password=""
# Whether to use TLS for the connection to the MQTT broker.
use_tls=false
# How long to wait between reconnection attempts if the MQTT connection is dropped.
reconnect_interval_seconds=5

[influxdb]
# The URL of the InfluxDB to which to connect.
Expand Down
16 changes: 16 additions & 0 deletions homie-influx/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ use influx_db_client::reqwest::Url;
use influx_db_client::Client;
use rumqttc::MqttOptions;
use rustls::ClientConfig;
use serde::{Deserialize as _, Deserializer};
use serde_derive::Deserialize;
use stable_eyre::eyre;
use stable_eyre::eyre::WrapErr;
use std::fs::read_to_string;
use std::sync::Arc;
use std::time::Duration;

const DEFAULT_MQTT_CLIENT_PREFIX: &str = "homie-influx";
const DEFAULT_MQTT_HOST: &str = "test.mosquitto.org";
const DEFAULT_MQTT_PORT: u16 = 1883;
const DEFAULT_MQTT_RECONNECT_INTERVAL: Duration = Duration::from_secs(5);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How long does it take to get synced up with all the mqtt state? If you were in a fresh loop, what fraction of your time would be spent hammering the server?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The state is preserved on the broker when clean_session is set to false, so the client just needs to authenticate and connect/conack. WIthout a delay it just seems to constantly try to reconnect. I didn't check CPU usage, but there was certainly a lot of log spam.

const DEFAULT_INFLUXDB_URL: &str = "http://localhost:8086";
const CONFIG_FILENAME: &str = "homie-influx.toml";
const DEFAULT_MAPPINGS_FILENAME: &str = "mappings.toml";
Expand Down Expand Up @@ -45,6 +48,17 @@ pub struct MqttConfig {
pub username: Option<String>,
pub password: Option<String>,
pub client_prefix: String,
#[serde(
deserialize_with = "de_duration_seconds",
rename = "reconnect_interval_seconds"
)]
pub reconnect_interval: Duration,
}

/// Deserialize an integer as a number of seconds.
fn de_duration_seconds<'de, D: Deserializer<'de>>(d: D) -> Result<Duration, D::Error> {
let seconds = u64::deserialize(d)?;
Ok(Duration::from_secs(seconds))
}

impl Default for MqttConfig {
Expand All @@ -56,6 +70,7 @@ impl Default for MqttConfig {
username: None,
password: None,
client_prefix: DEFAULT_MQTT_CLIENT_PREFIX.to_owned(),
reconnect_interval: DEFAULT_MQTT_RECONNECT_INTERVAL,
}
}
}
Expand Down Expand Up @@ -140,6 +155,7 @@ pub fn get_mqtt_options(config: &MqttConfig, client_name_suffix: &str) -> MqttOp
let client_name = format!("{}-{}", config.client_prefix, client_name_suffix);
let mut mqtt_options = MqttOptions::new(client_name, &config.host, config.port);
mqtt_options.set_keep_alive(5);
mqtt_options.set_clean_session(false);

if let (Some(username), Some(password)) = (&config.username, &config.password) {
mqtt_options.set_credentials(username, password);
Expand Down
109 changes: 64 additions & 45 deletions homie-influx/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ mod influx;
use crate::config::{get_influxdb_client, get_mqtt_options, read_mappings, Config};
use crate::influx::send_property_value;
use futures::future::try_join_all;
use futures::FutureExt;
use homie_controller::{Event, HomieController, HomieEventLoop};
use homie_controller::{Event, HomieController, HomieEventLoop, PollError};
use influx_db_client::Client;
use rumqttc::ConnectionError;
use stable_eyre::eyre;
use stable_eyre::eyre::WrapErr;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::{self, JoinHandle};
use tokio::time::delay_for;

#[tokio::main]
async fn main() -> Result<(), eyre::Report> {
Expand All @@ -31,64 +32,82 @@ async fn main() -> Result<(), eyre::Report> {

let influxdb_client = get_influxdb_client(&config.influxdb, &mapping.influxdb_database)?;

let handle = spawn_homie_poll_loop(event_loop, controller.clone(), influxdb_client);
let handle = spawn_homie_poll_loop(
event_loop,
controller.clone(),
influxdb_client,
config.mqtt.reconnect_interval,
);
controller.start().await?;
join_handles.push(handle.map(|res| Ok(res??)));
join_handles.push(handle);
}

simplify_unit_vec(try_join_all(join_handles).await)
try_join_all(join_handles).await?;
Ok(())
}

fn spawn_homie_poll_loop(
mut event_loop: HomieEventLoop,
controller: Arc<HomieController>,
influx_db_client: Client,
) -> JoinHandle<Result<(), eyre::Report>> {
reconnect_interval: Duration,
) -> JoinHandle<()> {
task::spawn(async move {
loop {
if let Some(event) = controller.poll(&mut event_loop).await.wrap_err_with(|| {
format!(
"Failed to poll HomieController for base topic '{}'.",
controller.base_topic()
)
})? {
match event {
Event::PropertyValueChanged {
device_id,
node_id,
property_id,
value,
fresh,
} => {
log::trace!(
"{}/{}/{}/{} = {} ({})",
controller.base_topic(),
device_id,
node_id,
property_id,
value,
fresh
);
if fresh {
send_property_value(
controller.as_ref(),
&influx_db_client,
device_id,
node_id,
property_id,
)
.await?;
}
}
_ => {
log::info!("{} Event: {:?}", controller.base_topic(), event);
match controller.poll(&mut event_loop).await {
Ok(Some(event)) => {
handle_event(controller.as_ref(), &influx_db_client, event).await;
}
Ok(None) => {}
Err(e) => {
log::error!(
"Failed to poll HomieController for base topic '{}': {}",
controller.base_topic(),
e
);
if let PollError::Connection(ConnectionError::Io(_)) = e {
delay_for(reconnect_interval).await;
}
}
}
}
})
}

fn simplify_unit_vec<E>(m: Result<Vec<()>, E>) -> Result<(), E> {
m.map(|_| ())
async fn handle_event(controller: &HomieController, influx_db_client: &Client, event: Event) {
match event {
Event::PropertyValueChanged {
device_id,
node_id,
property_id,
value,
fresh,
} => {
log::trace!(
"{}/{}/{}/{} = {} ({})",
controller.base_topic(),
device_id,
node_id,
property_id,
value,
fresh
);
if fresh {
if let Err(e) = send_property_value(
controller,
influx_db_client,
device_id,
node_id,
property_id,
)
.await
{
log::error!("{:?}", e);
}
}
}
_ => {
log::info!("{} Event: {:?}", controller.base_topic(), event);
}
}
}