diff --git a/homie-device/src/lib.rs b/homie-device/src/lib.rs index eca245a4..f3e3e2b9 100644 --- a/homie-device/src/lib.rs +++ b/homie-device/src/lib.rs @@ -9,14 +9,16 @@ use futures::FutureExt; use mac_address::get_mac_address; use rumqttc::{ self, AsyncClient, ClientError, ConnectionError, Event, EventLoop, Incoming, LastWill, - MqttOptions, QoS, + MqttOptions, Outgoing, QoS, StateError, }; use std::fmt::{self, Debug, Display, Formatter}; use std::future::Future; use std::pin::Pin; use std::str; +use std::sync::Arc; use std::time::{Duration, Instant}; use thiserror::Error; +use tokio::sync::Mutex; use tokio::task::{self, JoinError, JoinHandle}; use tokio::time::sleep; @@ -231,11 +233,7 @@ impl HomieDeviceBuilder { /// single MQTT connection. #[derive(Debug)] pub struct HomieDevice { - publisher: DevicePublisher, - device_name: String, - nodes: Vec, - state: State, - extension_ids: String, + state: Arc>, reconnect_interval: Duration, } @@ -272,30 +270,37 @@ impl HomieDevice { reconnect_interval: Duration, ) -> HomieDevice { HomieDevice { - publisher, - device_name, - nodes: vec![], - state: State::Disconnected, - extension_ids: extension_ids.join(","), + state: Arc::new(Mutex::new(DeviceState { + publisher, + device_name, + nodes: vec![], + state: State::Disconnected, + extension_ids: extension_ids.join(","), + })), reconnect_interval, } } async fn start(&mut self) -> Result<(), ClientError> { - assert_eq!(self.state, State::Disconnected); - self.publisher + let mut state = self.state.lock().await; + assert_eq!(state.state, State::Disconnected); + state + .publisher .publish_retained("$homie", HOMIE_VERSION) .await?; - self.publisher - .publish_retained("$extensions", self.extension_ids.as_str()) + state + .publisher + .publish_retained("$extensions", state.extension_ids.as_str()) .await?; - self.publisher + state + .publisher .publish_retained("$implementation", HOMIE_IMPLEMENTATION) .await?; - self.publisher - .publish_retained("$name", self.device_name.as_str()) + state + .publisher + .publish_retained("$name", state.device_name.as_str()) .await?; - self.set_state(State::Init).await?; + state.set_state(State::Init).await?; Ok(()) } @@ -305,39 +310,61 @@ impl HomieDevice { mut event_loop: EventLoop, mut update_callback: Option, ) -> impl Future> { - let device_base = format!("{}/", self.publisher.device_base); let (incoming_tx, incoming_rx) = async_channel::unbounded(); let reconnect_interval = self.reconnect_interval; let mqtt_task = task::spawn(async move { + let mut disconnect_requested = false; loop { match event_loop.poll().await { Ok(notification) => { log::trace!("Notification = {:?}", notification); - if let Event::Incoming(incoming) = notification { - incoming_tx.send(incoming).await.map_err(|_| { - SpawnError::Internal("Incoming event channel receiver closed.") - })?; + match notification { + Event::Incoming(incoming) => { + incoming_tx.send(incoming).await.map_err(|_| { + SpawnError::Internal("Incoming event channel receiver closed.") + })?; + } + Event::Outgoing(Outgoing::Disconnect) => { + // Flag that we have tried to disconnect intentionally, but keep + // polling until we get an error implying that we have actually disconnected. + disconnect_requested = true; + } + Event::Outgoing(_) => {} } } Err(e) => { + if disconnect_requested { + log::trace!("Disconnected as requested."); + return Ok(()); + } log::error!("Failed to poll EventLoop: {}", e); - if let ConnectionError::Io(_) = e { - sleep(reconnect_interval).await; + match e { + ConnectionError::Io(_) => { + sleep(reconnect_interval).await; + } + ConnectionError::MqttState(StateError::AwaitPingResp) => {} + _ => return Err(e.into()), } } } } }); - let publisher = self.publisher.clone(); - let incoming_task: JoinHandle> = - task::spawn(async move { - loop { - if let Incoming::Publish(publish) = incoming_rx.recv().await.map_err(|_| { - SpawnError::Internal("Incoming event channel sender closed.") - })? { + let state = self.state.clone(); + let incoming_task: JoinHandle> = task::spawn(async move { + let publisher = state.lock().await.publisher.clone(); + let device_base = format!("{}/", publisher.device_base); + loop { + match incoming_rx.recv().await { + Err(_) => { + // This happens when the task above exits either because of an error or + // because we disconnected intentionally. + log::trace!("Incoming event channel sender closed."); + return Ok(()); + } + Ok(Incoming::Publish(publish)) => { if let Some(rest) = publish.topic.strip_prefix(&device_base) { if let ([node_id, property_id, "set"], Ok(payload)) = ( rest.split('/').collect::>().as_slice(), @@ -370,8 +397,14 @@ impl HomieDevice { log::warn!("Unexpected publish: {:?}", publish); } } + Ok(Incoming::ConnAck(_)) => { + // TODO: Republish nodes and everything, unless this is the initial connection. + state.lock().await.send_state().await?; + } + _ => {} } - }); + } + }); try_join_unit_handles(mqtt_task, incoming_task) } @@ -380,135 +413,46 @@ impl HomieDevice { /// This will panic if you attempt to add a node with the same ID as a node which was previously /// added. pub async fn add_node(&mut self, node: Node) -> Result<(), ClientError> { - // First check that there isn't already a node with the same ID. - if self.nodes.iter().any(|n| n.id == node.id) { - panic!("Tried to add node with duplicate ID: {:?}", node); - } - self.nodes.push(node); - // `node` was moved into the `nodes` vector, but we can safely get a reference to it because - // nothing else can modify `nodes` in the meantime. - let node = &self.nodes[self.nodes.len() - 1]; - - self.publish_node(&node).await?; - self.publish_nodes().await + self.state.lock().await.add_node(node).await } /// Remove the node with the given ID. pub async fn remove_node(&mut self, node_id: &str) -> Result<(), ClientError> { - // Panic on attempt to remove a node which was never added. - let index = self.nodes.iter().position(|n| n.id == node_id).unwrap(); - self.unpublish_node(&self.nodes[index]).await?; - self.nodes.remove(index); - self.publish_nodes().await - } - - async fn publish_node(&self, node: &Node) -> Result<(), ClientError> { - self.publisher - .publish_retained(&format!("{}/$name", node.id), node.name.as_str()) - .await?; - self.publisher - .publish_retained(&format!("{}/$type", node.id), node.node_type.as_str()) - .await?; - let mut property_ids: Vec<&str> = vec![]; - for property in &node.properties { - property_ids.push(&property.id); - self.publisher - .publish_retained( - &format!("{}/{}/$name", node.id, property.id), - property.name.as_str(), - ) - .await?; - self.publisher - .publish_retained( - &format!("{}/{}/$datatype", node.id, property.id), - property.datatype, - ) - .await?; - self.publisher - .publish_retained( - &format!("{}/{}/$settable", node.id, property.id), - if property.settable { "true" } else { "false" }, - ) - .await?; - if let Some(unit) = &property.unit { - self.publisher - .publish_retained(&format!("{}/{}/$unit", node.id, property.id), unit.as_str()) - .await?; - } - if let Some(format) = &property.format { - self.publisher - .publish_retained( - &format!("{}/{}/$format", node.id, property.id), - format.as_str(), - ) - .await?; - } - if property.settable { - self.publisher - .subscribe(&format!("{}/{}/set", node.id, property.id)) - .await?; - } - } - self.publisher - .publish_retained(&format!("{}/$properties", node.id), property_ids.join(",")) - .await?; - Ok(()) - } - - async fn unpublish_node(&self, node: &Node) -> Result<(), ClientError> { - for property in &node.properties { - if property.settable { - self.publisher - .unsubscribe(&format!("{}/{}/set", node.id, property.id)) - .await?; - } - } - Ok(()) - } - - async fn publish_nodes(&mut self) -> Result<(), ClientError> { - let node_ids = self - .nodes - .iter() - .map(|node| node.id.as_str()) - .collect::>() - .join(","); - self.publisher.publish_retained("$nodes", node_ids).await - } - - async fn set_state(&mut self, state: State) -> Result<(), ClientError> { - self.state = state; - self.publisher.publish_retained("$state", self.state).await + self.state.lock().await.remove_node(node_id).await } /// Update the [state](https://homieiot.github.io/specification/#device-lifecycle) of the Homie /// device to 'ready'. This should be called once it is ready to begin normal operation, or to /// return to normal operation after calling `sleep()` or `alert()`. pub async fn ready(&mut self) -> Result<(), ClientError> { - assert!(&[State::Init, State::Sleeping, State::Alert].contains(&self.state)); - self.set_state(State::Ready).await + let mut state = self.state.lock().await; + assert!(&[State::Init, State::Sleeping, State::Alert].contains(&state.state)); + state.set_state(State::Ready).await } /// Update the [state](https://homieiot.github.io/specification/#device-lifecycle) of the Homie /// device to 'sleeping'. This should be only be called after `ready()`, otherwise it will panic. pub async fn sleep(&mut self) -> Result<(), ClientError> { - assert_eq!(self.state, State::Ready); - self.set_state(State::Sleeping).await + let mut state = self.state.lock().await; + assert_eq!(state.state, State::Ready); + state.set_state(State::Sleeping).await } /// Update the [state](https://homieiot.github.io/specification/#device-lifecycle) of the Homie /// device to 'alert', to indicate that something wrong is happening and manual intervention may /// be required. This should be only be called after `ready()`, otherwise it will panic. pub async fn alert(&mut self) -> Result<(), ClientError> { - assert_eq!(self.state, State::Ready); - self.set_state(State::Alert).await + let mut state = self.state.lock().await; + assert_eq!(state.state, State::Ready); + state.set_state(State::Alert).await } /// Disconnect cleanly from the MQTT broker, after updating the state of the Homie device to // 'disconnected'. - pub async fn disconnect(mut self) -> Result<(), ClientError> { - self.set_state(State::Disconnected).await?; - self.publisher.client.disconnect().await + pub async fn disconnect(self) -> Result<(), ClientError> { + let mut state = self.state.lock().await; + state.set_state(State::Disconnected).await?; + state.publisher.client.disconnect().await } /// Publish a new value for the given property of the given node of this device. The caller is @@ -519,12 +463,73 @@ impl HomieDevice { property_id: &str, value: impl ToString, ) -> Result<(), ClientError> { - self.publisher + self.state + .lock() + .await + .publisher .publish_retained(&format!("{}/{}", node_id, property_id), value.to_string()) .await } } +/// The internal state of a `HomieDevice`, so it can be shared between threads. +#[derive(Debug)] +struct DeviceState { + publisher: DevicePublisher, + device_name: String, + nodes: Vec, + state: State, + extension_ids: String, +} + +impl DeviceState { + /// Publish the current list of node IDs. + async fn publish_nodes(&self) -> Result<(), ClientError> { + let node_ids = self + .nodes + .iter() + .map(|node| node.id.as_str()) + .collect::>() + .join(","); + self.publisher.publish_retained("$nodes", node_ids).await + } + + /// Set the state to the given value, and publish it. + async fn set_state(&mut self, state: State) -> Result<(), ClientError> { + self.state = state; + self.send_state().await + } + + /// Publish the current state. + async fn send_state(&self) -> Result<(), ClientError> { + self.publisher.publish_retained("$state", self.state).await + } + + /// Add a node to the Homie device and publish it. + async fn add_node(&mut self, node: Node) -> Result<(), ClientError> { + // First check that there isn't already a node with the same ID. + if self.nodes.iter().any(|n| n.id == node.id) { + panic!("Tried to add node with duplicate ID: {:?}", node); + } + self.nodes.push(node); + // `node` was moved into the `nodes` vector, but we can safely get a reference to it because + // nothing else can modify `nodes` in the meantime. + let node = &self.nodes[self.nodes.len() - 1]; + + self.publisher.publish_node(&node).await?; + self.publish_nodes().await + } + + /// Remove the node with the given ID. + async fn remove_node(&mut self, node_id: &str) -> Result<(), ClientError> { + // Panic on attempt to remove a node which was never added. + let index = self.nodes.iter().position(|n| n.id == node_id).unwrap(); + self.publisher.unpublish_node(&self.nodes[index]).await?; + self.nodes.remove(index); + self.publish_nodes().await + } +} + #[derive(Clone, Debug)] struct DevicePublisher { pub client: AsyncClient, @@ -559,6 +564,61 @@ impl DevicePublisher { let topic = format!("{}/{}", self.device_base, subtopic); self.client.unsubscribe(topic).await } + + /// Publish metadata about the given node and its properties. + async fn publish_node(&self, node: &Node) -> Result<(), ClientError> { + self.publish_retained(&format!("{}/$name", node.id), node.name.as_str()) + .await?; + self.publish_retained(&format!("{}/$type", node.id), node.node_type.as_str()) + .await?; + let mut property_ids: Vec<&str> = vec![]; + for property in &node.properties { + property_ids.push(&property.id); + self.publish_retained( + &format!("{}/{}/$name", node.id, property.id), + property.name.as_str(), + ) + .await?; + self.publish_retained( + &format!("{}/{}/$datatype", node.id, property.id), + property.datatype, + ) + .await?; + self.publish_retained( + &format!("{}/{}/$settable", node.id, property.id), + if property.settable { "true" } else { "false" }, + ) + .await?; + if let Some(unit) = &property.unit { + self.publish_retained(&format!("{}/{}/$unit", node.id, property.id), unit.as_str()) + .await?; + } + if let Some(format) = &property.format { + self.publish_retained( + &format!("{}/{}/$format", node.id, property.id), + format.as_str(), + ) + .await?; + } + if property.settable { + self.subscribe(&format!("{}/{}/set", node.id, property.id)) + .await?; + } + } + self.publish_retained(&format!("{}/$properties", node.id), property_ids.join(",")) + .await?; + Ok(()) + } + + async fn unpublish_node(&self, node: &Node) -> Result<(), ClientError> { + for property in &node.properties { + if property.settable { + self.unsubscribe(&format!("{}/{}/set", node.id, property.id)) + .await?; + } + } + Ok(()) + } } /// Legacy stats extension. @@ -590,6 +650,7 @@ impl HomieStats { fn spawn(self) -> impl Future> { let task: JoinHandle> = task::spawn(async move { loop { + // TODO: Break out of this loop if disconnection is requested. let uptime = Instant::now() - self.start_time; self.publisher .publish_retained("$stats/uptime", uptime.as_secs().to_string()) @@ -787,8 +848,9 @@ mod tests { let (_event_loop, homie, _stats, firmware, _callback) = builder.build(); - assert_eq!(homie.device_name, "Test device"); - assert_eq!(homie.publisher.device_base, "homie/test-device"); + let state = homie.state.lock().await; + assert_eq!(state.device_name, "Test device"); + assert_eq!(state.publisher.device_base, "homie/test-device"); assert!(firmware.is_none()); Ok(()) @@ -806,8 +868,9 @@ mod tests { let (_event_loop, homie, _stats, firmware, _callback) = builder.build(); - assert_eq!(homie.device_name, "Test device"); - assert_eq!(homie.publisher.device_base, "homie/test-device"); + let state = homie.state.lock().await; + assert_eq!(state.device_name, "Test device"); + assert_eq!(state.publisher.device_base, "homie/test-device"); let firmware = firmware.unwrap(); assert_eq!(firmware.firmware_name, "firmware_name"); assert_eq!(firmware.firmware_version, "firmware_version");