diff --git a/src/asyncv/attribute/message/boolean/attribute/inner.rs b/src/asyncv/attribute/message/boolean/attribute/inner.rs index 0bb92e5..b3e433a 100644 --- a/src/asyncv/attribute/message/boolean/attribute/inner.rs +++ b/src/asyncv/attribute/message/boolean/attribute/inner.rs @@ -1,17 +1,24 @@ +use rumqttc::QoS; +use std::sync::Arc; +use std::sync::Weak; +use tokio::sync::Mutex; + +use crate::AttributeError; + +use super::MessageClient; +use super::OnMessageHandler; + use std::future::Future; use std::pin::Pin; -use std::sync::Arc; use futures::future::BoxFuture; use futures::FutureExt; -use tokio::sync::Mutex; use bytes::Bytes; use async_trait::async_trait; use crate::asyncv::AttributeBuilder; -use crate::AttributeError; use super::MessageCoreMembers; // use super::OnChangeHandler; @@ -24,8 +31,19 @@ use super::AttributePayloadManager; /// Inner implementation of the boolean message attribute /// pub struct AttributeInner { - /// Members at the core of each attribute - core: MessageCoreMembers, + /// The data of the reactor, to be able to subscribe to the + /// reactor and route messages to the attribute + message_dispatcher: Weak>, + + /// The mqtt client + message_client: MessageClient, + + /// The topic of the attribute + topic: String, + + /// The topic for commands + topic_cmd: String, + /// Current value of the attribute value: Option, /// Requested value of the attribute (set by the user) @@ -37,11 +55,10 @@ impl AttributeInner { /// pub fn new(builder: AttributeBuilder) -> AttributeInner { AttributeInner { - core: MessageCoreMembers::new( - builder.message_client, - builder.message_dispatcher, - builder.topic.unwrap(), - ), + message_dispatcher: builder.message_dispatcher, + message_client: builder.message_client, + topic: builder.topic.as_ref().unwrap().clone(), + topic_cmd: format!("{}/cmd", builder.topic.as_ref().unwrap()), value: None, requested_value: None, } @@ -70,7 +87,8 @@ impl AttributeInner { &self, attribute: Arc>, ) -> Result<(), AttributeError> { - self.core.init(attribute).await + self.register(attribute).await?; + self.subscribe().await } // fn set_ensure_ok(&self) -> bool { @@ -91,7 +109,7 @@ impl AttributeInner { self.requested_value = Some(new_value); match self.requested_value { Some(requested_value) => { - self.core.publish(requested_value.into()).await; + self.publish(requested_value.into()).await; Ok(()) } None => Err(AttributeError::Unkonwn), @@ -105,9 +123,28 @@ impl AttributeInner { return self.value; } - // pub fn on_change(&mut self, handler: OnChangeHandlerFunction) { - // self.on_change_handler = Some(handler); - // } + /// Publish a command + /// + pub async fn publish(&self, value: V) -> Result<(), AttributeError> + where + V: Into>, + { + self.message_client + .publish(&self.topic_cmd, QoS::AtMostOnce, true, value) + .await + .map_err(|e| AttributeError::Message(e)) + } + + /// Subscribe to the topic + /// + pub async fn subscribe(&self) -> Result<(), AttributeError> { + // no need to store the att topic + let topic_att = format!("{}/att", self.topic); + self.message_client + .subscribe(topic_att, QoS::AtMostOnce) + .await + .map_err(|e| AttributeError::Message(e)) + } /// Register the attribute to the reactor /// @@ -115,7 +152,16 @@ impl AttributeInner { &self, attribute: Arc>, ) -> Result<(), AttributeError> { - self.core.register(attribute).await + // no need to store the att topic + let topic_att = format!("{}/att", self.topic); + self.message_dispatcher + .upgrade() + .ok_or(AttributeError::InternalPointerUpgrade)? + .lock() + .await + // .map_err(|e| AttributeError::InternalMutex(e.to_string()))? + .register_message_attribute(topic_att, attribute); + Ok(()) } } diff --git a/src/asyncv/attribute/message/core_members.rs b/src/asyncv/attribute/message/core_members.rs deleted file mode 100644 index feee6de..0000000 --- a/src/asyncv/attribute/message/core_members.rs +++ /dev/null @@ -1,95 +0,0 @@ -use rumqttc::QoS; -use std::sync::Arc; -use std::sync::Weak; -use tokio::sync::Mutex; - -use crate::AttributeError; - -use super::MessageClient; -use super::MessageDispatcher; -use super::OnMessageHandler; - -/// The core data of the attribute -/// Those attributes will be moved between Att types -#[derive(Clone)] -pub struct MessageCoreMembers { - /// The data of the reactor, to be able to subscribe to the - /// reactor and route messages to the attribute - message_dispatcher: Weak>, - - /// The mqtt client - message_client: MessageClient, - - /// The topic of the attribute - topic: String, - - /// The topic for commands - topic_cmd: String, -} - -impl MessageCoreMembers { - /// Create a new core data - pub fn new( - message_client: MessageClient, - message_dispatcher: Weak>, - topic: String, - ) -> Self { - Self { - message_dispatcher: message_dispatcher, - message_client: message_client, - topic: topic.clone(), - topic_cmd: format!("{}/cmd", topic), - } - } - - /// Initialize the attribute - /// - pub async fn init( - &self, - attribute: Arc>, - ) -> Result<(), AttributeError> { - self.register(attribute).await?; - self.subscribe().await - } - - /// Publish a command - /// - pub async fn publish(&self, value: V) -> Result<(), AttributeError> - where - V: Into>, - { - self.message_client - .publish(&self.topic_cmd, QoS::AtMostOnce, true, value) - .await - .map_err(|e| AttributeError::Message(e)) - } - - /// Subscribe to the topic - /// - pub async fn subscribe(&self) -> Result<(), AttributeError> { - // no need to store the att topic - let topic_att = format!("{}/att", self.topic); - self.message_client - .subscribe(topic_att, QoS::AtMostOnce) - .await - .map_err(|e| AttributeError::Message(e)) - } - - /// Register the attribute to the reactor - /// - pub async fn register( - &self, - attribute: Arc>, - ) -> Result<(), AttributeError> { - // no need to store the att topic - let topic_att = format!("{}/att", self.topic); - self.message_dispatcher - .upgrade() - .ok_or(AttributeError::InternalPointerUpgrade)? - .lock() - .await - // .map_err(|e| AttributeError::InternalMutex(e.to_string()))? - .register_message_attribute(topic_att, attribute); - Ok(()) - } -}