Skip to content

Commit

Permalink
First attempt at reconnection for mijia-homie.
Browse files Browse the repository at this point in the history
  • Loading branch information
qwandor committed Dec 30, 2020
1 parent 2d82f1f commit a46ceaf
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 10 deletions.
63 changes: 53 additions & 10 deletions homie-device/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ const HOMIE_IMPLEMENTATION: &str = "homie-rs";
const STATS_INTERVAL: Duration = Duration::from_secs(60);
const REQUESTS_CAP: usize = 10;

/// The default duration to wait between attempts to reconnect to the MQTT broker if the connection
/// is lost.
pub const DEFAULT_RECONNECT_INTERVAL: Duration = Duration::from_secs(5);

/// Error type for futures representing tasks spawned by this crate.
#[derive(Error, Debug)]
pub enum SpawnError {
Expand Down Expand Up @@ -100,6 +104,7 @@ pub struct HomieDeviceBuilder {
firmware_version: Option<String>,
mqtt_options: MqttOptions,
update_callback: Option<UpdateCallback>,
reconnect_interval: Duration,
}

impl Debug for HomieDeviceBuilder {
Expand Down Expand Up @@ -127,6 +132,14 @@ impl HomieDeviceBuilder {
self.firmware_version = Some(firmware_version.to_string());
}

/// Set the duration to wait between attempts to reconnect to the MQTT broker if the connection
/// is lost.
///
/// If this is not set it will default to `DEFAULT_RECONNECT_INTERVAL`.
pub fn set_reconnect_interval(&mut self, reconnect_interval: Duration) {
self.reconnect_interval = reconnect_interval;
}

pub fn set_update_callback<F, Fut>(&mut self, mut update_callback: F)
where
F: (FnMut(String, String, String) -> Fut) + Send + Sync + 'static,
Expand Down Expand Up @@ -182,6 +195,8 @@ impl HomieDeviceBuilder {
);
last_will.retain = true;
mqtt_options.set_last_will(last_will);
// Setting this to false means that our subscriptions will be kept when we reconnect.
mqtt_options.set_clean_session(false);
let (client, event_loop) = AsyncClient::new(mqtt_options, REQUESTS_CAP);

let publisher = DevicePublisher::new(client, self.device_base);
Expand All @@ -201,7 +216,12 @@ impl HomieDeviceBuilder {
None
};

let homie = HomieDevice::new(publisher, self.device_name, &extension_ids);
let homie = HomieDevice::new(
publisher,
self.device_name,
&extension_ids,
self.reconnect_interval,
);

(event_loop, homie, stats, firmware, self.update_callback)
}
Expand All @@ -216,6 +236,7 @@ pub struct HomieDevice {
nodes: Vec<Node>,
state: State,
extension_ids: String,
reconnect_interval: Duration,
}

impl HomieDevice {
Expand All @@ -240,16 +261,23 @@ impl HomieDevice {
firmware_version: None,
mqtt_options,
update_callback: None,
reconnect_interval: DEFAULT_RECONNECT_INTERVAL,
}
}

fn new(publisher: DevicePublisher, device_name: String, extension_ids: &[&str]) -> HomieDevice {
fn new(
publisher: DevicePublisher,
device_name: String,
extension_ids: &[&str],
reconnect_interval: Duration,
) -> HomieDevice {
HomieDevice {
publisher,
device_name,
nodes: vec![],
state: State::Disconnected,
extension_ids: extension_ids.join(","),
reconnect_interval,
}
}

Expand Down Expand Up @@ -280,15 +308,25 @@ impl HomieDevice {
let device_base = format!("{}/", self.publisher.device_base);
let (incoming_tx, incoming_rx) = async_channel::unbounded();

let reconnect_interval = self.reconnect_interval;
let mqtt_task = task::spawn(async move {
loop {
let notification = event_loop.poll().await?;
log::trace!("Notification = {:?}", notification);

if let Event::Incoming(incoming) = notification {
incoming_tx.send(incoming).await.map_err(|_| {
SpawnError::Internal("Incoming event channel receiver closed.")
})?;
match event_loop.poll().await {
Ok(notification) => {
log::trace!("Notification = {:?}", notification);

if let Event::Incoming(incoming) = notification {
incoming_tx.send(incoming).await.map_err(|_| {
SpawnError::Internal("Incoming event channel receiver closed.")
})?;
}
}
Err(e) => {
log::error!("Failed to poll EventLoop: {}", e);
if let ConnectionError::Io(_) = e {
sleep(reconnect_interval).await;
}
}
}
}
});
Expand Down Expand Up @@ -636,7 +674,12 @@ mod tests {
let (cancel_tx, _cancel_rx) = async_channel::unbounded();
let client = AsyncClient::from_senders(requests_tx, cancel_tx);
let publisher = DevicePublisher::new(client, "homie/test-device".to_string());
let device = HomieDevice::new(publisher, "Test device".to_string(), &[]);
let device = HomieDevice::new(
publisher,
"Test device".to_string(),
&[],
DEFAULT_RECONNECT_INTERVAL,
);
(device, requests_rx)
}

Expand Down
7 changes: 7 additions & 0 deletions mijia-homie/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use eyre::Report;
use homie_device::DEFAULT_RECONNECT_INTERVAL;
use mijia::bluetooth::{MacAddress, ParseMacAddressError};
use rumqttc::{MqttOptions, Transport};
use rustls::ClientConfig;
Expand Down Expand Up @@ -45,6 +46,11 @@ pub struct MqttConfig {
pub username: Option<String>,
pub password: Option<String>,
pub client_name: Option<String>,
#[serde(
deserialize_with = "de_duration_seconds",
rename = "reconnect_interval_seconds"
)]
pub reconnect_interval: Duration,
}

impl Default for MqttConfig {
Expand All @@ -56,6 +62,7 @@ impl Default for MqttConfig {
username: None,
password: None,
client_name: None,
reconnect_interval: DEFAULT_RECONNECT_INTERVAL,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions mijia-homie/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ async fn main() -> Result<(), eyre::Report> {
let config = Config::from_file()?;
let sensor_names = read_sensor_names(&config.homie.sensor_names_filename)?;

let reconnect_interval = config.mqtt.reconnect_interval;
let mqtt_options = get_mqtt_options(config.mqtt, &config.homie.device_id);
let device_base = format!("{}/{}", config.homie.prefix, config.homie.device_id);
let mut homie_builder =
HomieDevice::builder(&device_base, &config.homie.device_name, mqtt_options);
homie_builder.set_firmware(env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"));
homie_builder.set_reconnect_interval(reconnect_interval);
let (homie, homie_handle) = homie_builder.spawn().await?;

// Connect a Bluetooth session.
Expand Down

0 comments on commit a46ceaf

Please sign in to comment.