diff --git a/src/asyncv/attribute.rs b/src/asyncv/attribute.rs index 49f43c2..b2f4f84 100644 --- a/src/asyncv/attribute.rs +++ b/src/asyncv/attribute.rs @@ -1,6 +1,7 @@ pub mod message; -pub use message::MessageAttribute; +pub use message::MessageAttributeRo; +pub use message::MessageAttributeRw; pub use super::AttributeBuilder; pub use super::MessageClient; diff --git a/src/asyncv/attribute/message.rs b/src/asyncv/attribute/message.rs index cc1e45b..3dc9087 100644 --- a/src/asyncv/attribute/message.rs +++ b/src/asyncv/attribute/message.rs @@ -1,10 +1,17 @@ -mod attribute; mod dispatcher; +mod ro_attribute; +mod ro_inner; +mod rw_attribute; +mod rw_inner; use async_trait::async_trait; use bytes::Bytes; -pub type MessageAttribute = attribute::Attribute; +pub use ro_inner::MessageAttributeRoInner; +pub use rw_inner::MessageAttributeRwInner; + +pub use ro_attribute::MessageAttributeRo; +pub use rw_attribute::MessageAttributeRw; pub use dispatcher::MessageDispatcher; diff --git a/src/asyncv/attribute/message/attribute/inner.rs b/src/asyncv/attribute/message/attribute/inner.rs deleted file mode 100644 index 91eea2c..0000000 --- a/src/asyncv/attribute/message/attribute/inner.rs +++ /dev/null @@ -1,184 +0,0 @@ -use rumqttc::QoS; -use std::sync::Arc; -use std::sync::Weak; -use tokio::sync::Mutex; - -use crate::asyncv::MessageDispatcher; -use crate::AttributeError; - -use super::MessageClient; -use super::OnMessageHandler; - -use bytes::Bytes; - -use async_trait::async_trait; - -use crate::asyncv::AttributeBuilder; - -use tokio::sync::Notify; - -use super::AttributePayloadManager; - -/// Inner implementation of the message attribute -/// This inner implementation allow the public part to be cloneable easly -/// -pub struct AttributeInner { - /// Reactor message dispatcher - /// (to attach this attribute to the incoming messages) - message_dispatcher: Weak>, - /// The message client (MQTT) - message_client: MessageClient, - - /// The topic of the attribute - topic: String, - /// The topic for commands - topic_cmd: String, - - /// Current value of the attribute - value: Option, - /// Requested value of the attribute (set by the user) - requested_value: Option, -} - -impl AttributeInner { - /// - pub fn new(builder: AttributeBuilder) -> AttributeInner { - AttributeInner { - message_dispatcher: builder.message_dispatcher, - message_client: builder.message_client, - topic: builder.topic.as_ref().unwrap().clone(), - topic_cmd: format!("{}/cmd", builder.topic.as_ref().unwrap()), - value: None, - requested_value: None, - } - } - - pub fn to_arc_mutex(self) -> Arc> { - Arc::new(Mutex::new(self)) - } - - // pub fn set_ensure_lock_clone(&mut self) -> Arc> { - // return self.set_ensure_lock.clone(); - // } - - // fn set_ensure_update(&mut self) { - // if self.set_ensure_ok() { - // self.set_ensure_lock.with_lock(|mut done| { - // *done = true; - // done.notify_one(); - // }); - // } - // } - - /// Initialize the attribute - /// - pub async fn init( - &self, - attribute: Arc>, - ) -> Result<(), AttributeError> { - self.register(attribute).await?; - self.subscribe().await - } - - // fn set_ensure_ok(&self) -> bool { - // return self.requested_value == self.value; - // } - - /// Set the value of the attribute - /// - pub async fn set(&mut self, new_value: TYPE) -> Result<(), AttributeError> { - // Do not go further if the value is already set - if let Some(current_value) = self.value { - if current_value == new_value { - return Ok(()); - } - } - - // Set the requested value and publish the request - self.requested_value = Some(new_value); - match self.requested_value { - Some(requested_value) => { - self.publish(requested_value.into()).await; - Ok(()) - } - None => Err(AttributeError::Unkonwn), - } - } - - /// Get the value of the attribute - /// If None, the first value is not yet received - /// - pub fn get(&self) -> Option { - return self.value; - } - - /// Publish a command - /// - pub async fn publish(&self, value: V) -> Result<(), AttributeError> - where - V: Into>, - { - self.message_client - .publish(&self.topic_cmd, QoS::AtMostOnce, true, value) - .await - .map_err(|e| AttributeError::Message(e)) - } - - /// Subscribe to the topic - /// - pub async fn subscribe(&self) -> Result<(), AttributeError> { - // no need to store the att topic - let topic_att = format!("{}/att", self.topic); - self.message_client - .subscribe(topic_att, QoS::AtMostOnce) - .await - .map_err(|e| AttributeError::Message(e)) - } - - /// Register the attribute to the reactor - /// - pub async fn register( - &self, - attribute: Arc>, - ) -> Result<(), AttributeError> { - // no need to store the att topic - let topic_att = format!("{}/att", self.topic); - self.message_dispatcher - .upgrade() - .ok_or(AttributeError::InternalPointerUpgrade)? - .lock() - .await - // .map_err(|e| AttributeError::InternalMutex(e.to_string()))? - .register_message_attribute(topic_att, attribute); - Ok(()) - } -} - -#[async_trait] -impl OnMessageHandler for AttributeInner { - async fn on_message(&mut self, data: &Bytes) { - println!("boolean"); - - // OnChangeHandlerFunction - - // if data.len() == 1 { - // match data[0] { - // b'1' => { - // self.value = Some(true); - // // self.set_ensure_update(); - // } - // b'0' => { - // self.value = Some(false); - // // self.set_ensure_update(); - // } - // _ => { - // println!("unexcpedted payload {:?}", data); - // return; - // } - // }; - // // Do something with the value - // } else { - // println!("wierd payload {:?}", data); - // } - } -} diff --git a/src/asyncv/attribute/message/ro_attribute.rs b/src/asyncv/attribute/message/ro_attribute.rs new file mode 100644 index 0000000..8747f69 --- /dev/null +++ b/src/asyncv/attribute/message/ro_attribute.rs @@ -0,0 +1,55 @@ +use std::future::Future; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::Mutex; +use tokio::time::sleep; + +use crate::AttributeError; +use crate::MessagePayloadManager; + +use super::AttributeBuilder; +use super::MessageAttributeRoInner; + +/// Attribute to manage a boolean +#[derive(Clone)] +pub struct MessageAttributeRo { + /// Inner implementation + inner: Arc>>, +} + +impl MessageAttributeRo { + /// Initialize the attribute + pub async fn init(self) -> Result { + self.inner.lock().await.init(self.inner.clone()).await?; + Ok(self) + } + + pub async fn wait_change(&self) { + let change_notifier = self.inner.lock().await.clone_change_notifier(); + change_notifier.notified().await + } + + pub async fn wait_change_then(&self, function: F) + where + F: Future + Send + 'static, + { + let change_notifier = self.inner.lock().await.clone_change_notifier(); + change_notifier.notified().await; + function.await; + } + + /// Get the value of the attribute + /// + pub async fn get(&self) -> Option { + self.inner.lock().await.get() + } +} + +/// Allow creation from the builder +impl From for MessageAttributeRo { + fn from(builder: AttributeBuilder) -> Self { + MessageAttributeRo { + inner: MessageAttributeRoInner::from(builder).into(), + } + } +} diff --git a/src/asyncv/attribute/message/ro_inner.rs b/src/asyncv/attribute/message/ro_inner.rs new file mode 100644 index 0000000..fefa3aa --- /dev/null +++ b/src/asyncv/attribute/message/ro_inner.rs @@ -0,0 +1,123 @@ +use rumqttc::QoS; +use std::sync::Arc; +use std::sync::Weak; +use tokio::sync::Mutex; + +use crate::asyncv::MessageDispatcher; +use crate::AttributeError; + +use super::MessageClient; +use super::OnMessageHandler; + +use bytes::Bytes; + +use async_trait::async_trait; + +use crate::asyncv::AttributeBuilder; + +use tokio::sync::Notify; + +use crate::MessagePayloadManager; + +/// Read Only Inner implementation of the message attribute +/// This inner implementation allow the public part to be cloneable easly +pub struct MessageAttributeRoInner { + /// Reactor message dispatcher + /// (to attach this attribute to the incoming messages) + message_dispatcher: Weak>, + /// The message client (MQTT) + pub message_client: MessageClient, + + /// The topic of the attribute + topic: String, + + /// Current value of the attribute + pub value: Option, + + /// + change_notifier: Arc, +} + +impl MessageAttributeRoInner { + /// Initialize the attribute + /// Register the attribute on the message dispatcher then subscribe to att topic + pub async fn init( + &self, + attribute: Arc>, + ) -> Result<(), AttributeError> { + self.register(attribute).await?; + self.subscribe().await + } + + /// Get the value of the attribute + /// If None, the first value is not yet received + pub fn get(&self) -> Option { + return self.value; + } + + /// Subscribe to the topic + pub async fn subscribe(&self) -> Result<(), AttributeError> { + // no need to store the att topic + let topic_att = format!("{}/att", self.topic); + self.message_client + .subscribe(topic_att, QoS::AtMostOnce) + .await + .map_err(|e| AttributeError::Message(e)) + } + + /// Register the attribute to the reactor + /// + pub async fn register( + &self, + attribute: Arc>, + ) -> Result<(), AttributeError> { + // no need to store the att topic + let topic_att = format!("{}/att", self.topic); + self.message_dispatcher + .upgrade() + .ok_or(AttributeError::InternalPointerUpgrade)? + .lock() + .await + // .map_err(|e| AttributeError::InternalMutex(e.to_string()))? + .register_message_attribute(topic_att, attribute); + Ok(()) + } + + /// + /// + pub fn clone_change_notifier(&self) -> Arc { + self.change_notifier.clone() + } +} + +/// Allow creation from the builder +impl From for MessageAttributeRoInner { + fn from(builder: AttributeBuilder) -> Self { + MessageAttributeRoInner { + message_dispatcher: builder.message_dispatcher, + message_client: builder.message_client, + topic: builder.topic.as_ref().unwrap().clone(), + value: None, + change_notifier: Arc::new(Notify::new()), + } + } +} + +/// Allow mutation into Arc pointer +impl Into>>> + for MessageAttributeRoInner +{ + fn into(self) -> Arc>> { + Arc::new(Mutex::new(self)) + } +} + +#[async_trait] +impl OnMessageHandler for MessageAttributeRoInner { + async fn on_message(&mut self, data: &Bytes) { + println!("ro_inner::on_message"); + let new_value = TYPE::from(data.to_vec()); + self.value = Some(new_value); + self.change_notifier.notify_waiters(); + } +} diff --git a/src/asyncv/attribute/message/attribute.rs b/src/asyncv/attribute/message/rw_attribute.rs similarity index 78% rename from src/asyncv/attribute/message/attribute.rs rename to src/asyncv/attribute/message/rw_attribute.rs index 5854a88..4eb441d 100644 --- a/src/asyncv/attribute/message/attribute.rs +++ b/src/asyncv/attribute/message/rw_attribute.rs @@ -1,5 +1,3 @@ -mod inner; -use inner::AttributeInner; use tokio::time::sleep; use std::future::Future; @@ -7,9 +5,10 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::Mutex; +use super::MessageAttributeRwInner; pub use super::MessageClient; use crate::AttributeError; -use crate::AttributePayloadManager; +use crate::MessagePayloadManager; use super::AttributeBuilder; @@ -24,19 +23,12 @@ use super::OnMessageHandler; /// Attribute to manage a boolean #[derive(Clone)] -pub struct Attribute { +pub struct MessageAttributeRw { /// - inner: Arc>>, + inner: Arc>>, } -impl Attribute { - /// - pub fn new(builder: AttributeBuilder) -> Attribute { - Attribute { - inner: AttributeInner::new(builder).to_arc_mutex(), - } - } - +impl MessageAttributeRw { /// Initialize the attribute /// pub async fn init(self) -> Result { @@ -84,3 +76,12 @@ impl Attribute { self.inner.lock().await.get() } } + +/// Allow creation from the builder +impl From for MessageAttributeRw { + fn from(builder: AttributeBuilder) -> Self { + MessageAttributeRw { + inner: MessageAttributeRwInner::from(builder).into(), + } + } +} diff --git a/src/asyncv/attribute/message/rw_inner.rs b/src/asyncv/attribute/message/rw_inner.rs new file mode 100644 index 0000000..ebd09e4 --- /dev/null +++ b/src/asyncv/attribute/message/rw_inner.rs @@ -0,0 +1,136 @@ +use rumqttc::QoS; +use std::sync::Arc; +use std::sync::Weak; +use tokio::sync::Mutex; + +use crate::asyncv::MessageDispatcher; +use crate::AttributeError; +use crate::MessagePayloadManager; + +use super::MessageAttributeRoInner; +use super::MessageClient; +use super::OnMessageHandler; + +use bytes::Bytes; + +use async_trait::async_trait; + +use crate::asyncv::AttributeBuilder; + +use tokio::sync::Notify; + +/// Read Only Inner implementation of the message attribute +/// This inner implementation allow the public part to be cloneable easly +pub struct MessageAttributeRwInner { + /// Rw is based on Ro + base: MessageAttributeRoInner, + + /// The topic for commands + topic_cmd: String, + + /// Requested value of the attribute (set by the user) + requested_value: Option, +} + +impl MessageAttributeRwInner { + /// Initialize the attribute + pub async fn init( + &self, + attribute: Arc>, + ) -> Result<(), AttributeError> { + self.base.init(attribute).await + } + + /// Get the value of the attribute + /// If None, the first value is not yet received + /// + pub fn get(&self) -> Option { + return self.base.get(); + } + + /// Set the value of the attribute + /// + pub async fn set(&mut self, new_value: TYPE) -> Result<(), AttributeError> { + // // Do not go further if the value is already set + // if let Some(current_value) = self.value { + // if current_value == new_value { + // return Ok(()); + // } + // } + + // // Set the requested value and publish the request + // self.requested_value = Some(new_value); + // match self.requested_value { + // Some(requested_value) => { + // self.publish(requested_value.into()).await; + // Ok(()) + // } + // None => Err(AttributeError::Unkonwn), + // } + + Ok(()) + } + + /// Publish a command + /// + pub async fn publish(&self, value: V) -> Result<(), AttributeError> + where + V: Into>, + { + self.base + .message_client + .publish(&self.topic_cmd, QoS::AtMostOnce, true, value) + .await + .map_err(|e| AttributeError::Message(e)) + } +} + +/// Allow creation from the builder +impl From for MessageAttributeRwInner { + fn from(builder: AttributeBuilder) -> Self { + let topic_cmd = format!("{}/cmd", builder.topic.as_ref().unwrap()); + MessageAttributeRwInner { + base: MessageAttributeRoInner::from(builder), + topic_cmd: topic_cmd, + requested_value: None, + } + } +} + +/// Allow mutation into Arc pointer +impl Into>>> + for MessageAttributeRwInner +{ + fn into(self) -> Arc>> { + Arc::new(Mutex::new(self)) + } +} + +#[async_trait] +impl OnMessageHandler for MessageAttributeRwInner { + async fn on_message(&mut self, data: &Bytes) { + println!("boolean"); + + // OnChangeHandlerFunction + + // if data.len() == 1 { + // match data[0] { + // b'1' => { + // self.value = Some(true); + // // self.set_ensure_update(); + // } + // b'0' => { + // self.value = Some(false); + // // self.set_ensure_update(); + // } + // _ => { + // println!("unexcpedted payload {:?}", data); + // return; + // } + // }; + // // Do something with the value + // } else { + // println!("wierd payload {:?}", data); + // } + } +} diff --git a/src/asyncv/builder.rs b/src/asyncv/builder.rs index 50475f0..3c86e2e 100644 --- a/src/asyncv/builder.rs +++ b/src/asyncv/builder.rs @@ -2,11 +2,11 @@ use std::sync::Weak; use tokio::sync::Mutex; -use super::attribute::MessageAttribute; - +use super::attribute::MessageAttributeRo; +use super::attribute::MessageAttributeRw; pub use super::MessageClient; pub use super::MessageDispatcher; -use crate::AttributePayloadManager; +use crate::MessagePayloadManager; /// Object that allow to build an generic attribute /// @@ -20,11 +20,22 @@ pub struct AttributeBuilder { /// Topic of the attribute pub topic: Option, +} + +/// Builder specialisation for Ro Attribute +pub struct AttributeRoBuilder { + base: AttributeBuilder, +} - /// True if the attribute is readonly - pub is_read_only: bool, +/// Builder specialisation for Rw Attribute +pub struct AttributeRwBuilder { + base: AttributeBuilder, } +// ---------------------------------------------------------------------------- +// ---------------------------------------------------------------------------- +// ---------------------------------------------------------------------------- + impl AttributeBuilder { /// Create a new builder pub fn new( @@ -35,7 +46,6 @@ impl AttributeBuilder { message_client, message_dispatcher, topic: None, - is_read_only: true, } } @@ -45,12 +55,35 @@ impl AttributeBuilder { self } - pub fn as_read_write(mut self) -> Self { - self.is_read_only = false; - self + pub fn with_ro_access(self) -> AttributeRoBuilder { + AttributeRoBuilder { base: self } + } + + pub fn with_rw_access(self) -> AttributeRwBuilder { + AttributeRwBuilder { base: self } } +} + +// ---------------------------------------------------------------------------- +// ---------------------------------------------------------------------------- +// ---------------------------------------------------------------------------- + +impl AttributeRoBuilder { + pub async fn finish_with_message_type( + self, + ) -> MessageAttributeRo { + MessageAttributeRo::from(self.base).init().await.unwrap() + } +} + +// ---------------------------------------------------------------------------- +// ---------------------------------------------------------------------------- +// ---------------------------------------------------------------------------- - pub fn build_with_message_type(self) -> MessageAttribute { - MessageAttribute::new(self) +impl AttributeRwBuilder { + pub async fn finish_with_message_type( + self, + ) -> MessageAttributeRw { + MessageAttributeRw::from(self.base).init().await.unwrap() } } diff --git a/src/common/message/boolean.rs b/src/common/message/boolean.rs index 209bad9..e196dc8 100644 --- a/src/common/message/boolean.rs +++ b/src/common/message/boolean.rs @@ -1,4 +1,6 @@ -use crate::AttributePayloadManager; +use std::fmt::Display; + +use crate::MessagePayloadManager; #[derive(Copy, Clone, PartialEq)] pub struct BooleanMessage { @@ -21,4 +23,11 @@ impl Into> for BooleanMessage { return vec![1]; } } -impl AttributePayloadManager for BooleanMessage {} + +impl Display for BooleanMessage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("{}", self.value)) + } +} + +impl MessagePayloadManager for BooleanMessage {} diff --git a/src/examples/test_async.rs b/src/examples/test_async.rs index bfee1c9..4845467 100644 --- a/src/examples/test_async.rs +++ b/src/examples/test_async.rs @@ -11,20 +11,35 @@ async fn main() { reactor.start(); - let pp = reactor + let ro_bool = reactor .create_new_attribute() .with_topic("test") - // .control_config (exemple pour la suite) - .build_with_message_type::(); + .with_ro_access() + .finish_with_message_type::() + .await; - println!("send data"); - pp.set(true).await.unwrap(); + // Wait then execute the function once + let ro_bool_bis = ro_bool.clone(); + ro_bool + .wait_change_then(async move { + println!("cooucou"); + let _dat = ro_bool_bis.get().await.unwrap(); + println!("cooucou {} ", _dat); + }) + .await; - let pp2 = pp.clone(); - pp.when_change(async move { - println!("cooucou"); - let _dat = pp2.get().await.unwrap(); - println!("cooucou"); + // Task that run an action every time the value of the attribute change + tokio::spawn(async move { + loop { + let ro_bool_bis = ro_bool.clone(); + ro_bool + .wait_change_then(async move { + println!("cooucou"); + let _dat = ro_bool_bis.get().await.unwrap(); + println!("cooucou {} ", _dat); + }) + .await; + } }); sleep(Duration::from_secs(60)).await; diff --git a/src/lib.rs b/src/lib.rs index 2af030b..5efae34 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,7 +18,7 @@ pub trait SyncMessageAttribute: Send + Sync { pub use common::BooleanMessage; /// Trait for type that wan manage an attribute payload /// -pub trait AttributePayloadManager: +pub trait MessagePayloadManager: Into> + From> + PartialEq + Copy + Sync + Send + 'static { }