diff --git a/Cargo.toml b/Cargo.toml index f394d0e..5e5df0c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,14 +9,14 @@ edition = "2021" # path = "src/examples/test_sync.rs" -# [[bin]] -# name = "test_async" -# path = "src/examples/test_async.rs" - - [[bin]] name = "test_async" -path = "src/examples/test_cb.rs" +path = "src/examples/test_async.rs" + + +# [[bin]] +# name = "test_async" +# path = "src/examples/test_cb.rs" [[test]] @@ -37,6 +37,7 @@ panduza-proc = { path = "lib/panduza-proc" } tokio = { version = "1", features = ["full"] } futures = "0.3.30" +async-trait = "0.1.81" [dev-dependencies] diff --git a/src/asyncv.rs b/src/asyncv.rs index f65b58e..ecf318b 100644 --- a/src/asyncv.rs +++ b/src/asyncv.rs @@ -1,10 +1,17 @@ +pub mod attribute; +pub mod builder; + +pub use builder::AttributeBuilder; + +pub use attribute::message::MessageDispatcher; + /// This module manage the message attributes (MQTT/TCP) -pub mod msg; +// pub mod msg; +pub type MessageClient = rumqttc::AsyncClient; /// This module manage the stream attributes (CUSTOM/QUIC) -pub mod stream; +// pub mod stream; /// This module manage the reactor mod reactor; pub type Reactor = reactor::Reactor; -pub type ReactorData = reactor::ReactorData; diff --git a/src/asyncv/attribute.rs b/src/asyncv/attribute.rs new file mode 100644 index 0000000..4be1cbe --- /dev/null +++ b/src/asyncv/attribute.rs @@ -0,0 +1,5 @@ +pub mod message; + +pub use super::AttributeBuilder; +pub use super::MessageClient; +pub use super::MessageDispatcher; diff --git a/src/asyncv/attribute/message.rs b/src/asyncv/attribute/message.rs new file mode 100644 index 0000000..022ac5b --- /dev/null +++ b/src/asyncv/attribute/message.rs @@ -0,0 +1,20 @@ +pub mod boolean; +mod core_members; +mod dispatcher; + +use async_trait::async_trait; +use bytes::Bytes; + +pub use core_members::MessageCoreMembers; +pub use dispatcher::MessageDispatcher; + +pub use super::AttributeBuilder; + +pub type MessageClient = rumqttc::AsyncClient; + +/// Trait to manage an message attribute (MQTT) +/// Sync version +#[async_trait] +pub trait OnMessageHandler: Send + Sync { + async fn on_message(&mut self, data: &Bytes); +} diff --git a/src/asyncv/attribute/message/boolean.rs b/src/asyncv/attribute/message/boolean.rs new file mode 100644 index 0000000..b60d940 --- /dev/null +++ b/src/asyncv/attribute/message/boolean.rs @@ -0,0 +1,12 @@ +pub mod attribute; +pub mod builder; + +pub use attribute::AttributeBoolean; +pub use builder::BuilderBoolean; + +pub use super::AttributeBuilder; +pub use super::MessageClient; +pub use super::MessageDispatcher; + +pub use super::MessageCoreMembers; +pub use super::OnMessageHandler; diff --git a/src/asyncv/attribute/message/boolean/attribute.rs b/src/asyncv/attribute/message/boolean/attribute.rs new file mode 100644 index 0000000..dd92154 --- /dev/null +++ b/src/asyncv/attribute/message/boolean/attribute.rs @@ -0,0 +1,62 @@ +mod inner; +use inner::InnerBoolean; + +use std::future::Future; +use std::sync::Arc; +use tokio::sync::Mutex; + +use crate::AttributeError; +use async_trait::async_trait; + +pub use super::BuilderBoolean; +use super::MessageCoreMembers; + +pub use super::OnMessageHandler; + +// use super::att::Att; +// pub use super::CoreMembers; +// pub use super::OnMessageHandler; +// pub use super::ReactorData; + +// pub use inner_msg_att_bool::OnChangeHandlerFunction; + +/// Attribute to manage a boolean +pub struct AttributeBoolean { + /// + inner: Arc>, +} + +impl AttributeBoolean { + /// + pub fn new(builder: BuilderBoolean) -> AttributeBoolean { + AttributeBoolean { + inner: InnerBoolean::new(builder).to_arc_mutex(), + } + } + + // pub async fn on_change_handler(&self, handler: Box) { + // self.inner.lock().await.on_change_handler(handler); + // } + // pub async fn on_change(&self, handler: OnChangeHandlerFunction) { + // self.inner.lock().await.on_change(handler); + // } + + /// Set the value of the attribute + /// + // pub async fn set(&self, value: bool) -> Result<(), AttributeError> { + // self.inner.lock().await.set(value).await?; + // // let cv = self.inner.lock().await.set_ensure_lock_clone(); + // // cv.with_lock(|mut done| { + // // while !*done { + // // done.wait(); + // // } + // // }); + // // Ok(()) + // } + + /// Get the value of the attribute + /// + pub async fn get(&self) -> Option { + self.inner.lock().await.get() + } +} diff --git a/src/asyncv/attribute/message/boolean/attribute/inner.rs b/src/asyncv/attribute/message/boolean/attribute/inner.rs new file mode 100644 index 0000000..79a746f --- /dev/null +++ b/src/asyncv/attribute/message/boolean/attribute/inner.rs @@ -0,0 +1,178 @@ +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +use futures::future::BoxFuture; +use futures::FutureExt; +use tokio::sync::Mutex; + +use bytes::Bytes; + +use crate::AttributeError; + +use super::MessageCoreMembers; +// use super::OnChangeHandler; +use super::OnMessageHandler; + +pub use super::BuilderBoolean; +use monitor::Monitor; +// pub type OnChangeHandlerFunction = Arc + Send + Sync>>; +// pub type OnChangeHandlerFunction = Pin + Send + Sync>>; +// pub type OnChangeHandlerFunction = BoxFuture<'static, ()>; + +// type OnChangeHandlerFunction = Arc BoxFuture<'static, ()>>; + +/// Inner implementation of the boolean message attribute +/// +pub struct InnerBoolean { + /// Members at the core of each attribute + core: MessageCoreMembers, + /// Current value of the attribute + value: Option, + /// Requested value of the attribute (set by the user) + requested_value: Option, + // / Handler to call when the value change + // on_change_handler: Option>, + // on_change_handler: Option, + // set_ensure_lock: Arc>, +} + +impl InnerBoolean { + /// + pub fn new(builder: BuilderBoolean) -> InnerBoolean { + InnerBoolean { + core: MessageCoreMembers::new( + builder.message_client, + builder.message_dispatcher, + builder.topic.unwrap(), + ), + value: None, + requested_value: None, + // set_ensure_lock: None, + } + } + + pub fn to_arc_mutex(self) -> Arc> { + Arc::new(Mutex::new(self)) + } + + // pub fn set_ensure_lock_clone(&mut self) -> Arc> { + // return self.set_ensure_lock.clone(); + // } + + // fn set_ensure_update(&mut self) { + // if self.set_ensure_ok() { + // self.set_ensure_lock.with_lock(|mut done| { + // *done = true; + // done.notify_one(); + // }); + // } + // } + + fn set_ensure_ok(&self) -> bool { + return self.requested_value == self.value; + } + + /// Set the value of the attribute + /// + pub async fn set(&mut self, new_value: bool) -> Result<(), AttributeError> { + // Do not go further if the value is already set + if let Some(current_value) = self.value { + if current_value == new_value { + return Ok(()); + } + } + + // Set the requested value and publish the request + self.requested_value = Some(new_value); + match self.requested_value { + Some(requested_value) => match requested_value { + true => self.core.publish("1").await, + false => self.core.publish("0").await, + }, + None => Err(AttributeError::Unkonwn), + } + } + + /// Get the value of the attribute + /// If None, the first value is not yet received + /// + pub fn get(&self) -> Option { + return self.value; + } + + // pub fn on_change_handler(&mut self, handler: Box) { + // self.on_change_handler = Some(handler); + // } + + // pub fn on_change(&mut self, handler: OnChangeHandlerFunction) { + // self.on_change_handler = Some(handler); + // } + + /// Register the attribute to the reactor + /// + pub async fn register( + &self, + attribute: Arc>, + ) -> Result<(), AttributeError> { + self.core.register(attribute).await + } +} + +// impl OnMessageHandler for InnerBoolean { +// fn on_message(&mut self, data: &Bytes) { +// println!("boolean"); + +// // OnChangeHandlerFunction + +// // let a = async { true }; +// // let b = || a; + +// // self.on_change_handler = Some(Arc::new(|| a)); +// // tokio::spawn(b.clone()()); +// // tokio::spawn(b()); +// // tokio::spawn(b()); +// // tokio::spawn(pp55); +// // tokio::spawn(pp55); + +// // let pp: Pin + Send>> = async move { true }.boxed(); +// // tokio::spawn(pp); +// // tokio::spawn(pp); +// // tokio::spawn(pp); + +// // if let Some(handler) = self.on_change_handler.as_ref() { +// // tokio::spawn(*(handler.clone())); +// // } +// if data.len() == 1 { +// match data[0] { +// b'1' => { +// self.value = Some(true); +// self.set_ensure_update(); +// } +// b'0' => { +// self.value = Some(false); +// self.set_ensure_update(); +// } +// _ => { +// println!("unexcpedted payload {:?}", data); +// return; +// } +// }; +// // Do something with the value +// } else { +// println!("wierd payload {:?}", data); +// } +// } +// } + +// impl From for InnerBoolean { +// fn from(core_data: CoreMembers) -> Self { +// return Self { +// core: core_data, +// value: None, +// requested_value: None, +// on_change_handler: None, +// set_ensure_lock: Arc::new(Monitor::new(false)), +// }; +// } +// } diff --git a/src/asyncv/attribute/message/boolean/builder.rs b/src/asyncv/attribute/message/boolean/builder.rs new file mode 100644 index 0000000..bcc7876 --- /dev/null +++ b/src/asyncv/attribute/message/boolean/builder.rs @@ -0,0 +1,36 @@ +use std::sync::Weak; + +use tokio::sync::Mutex; + +use super::AttributeBoolean; +use super::AttributeBuilder; + +pub use super::MessageClient; +pub use super::MessageDispatcher; + +pub struct BuilderBoolean { + /// The mqtt client + pub message_client: MessageClient, + + /// The Object that allow the reactor to dispatch + /// incoming messages on attributes + pub message_dispatcher: Weak>, + + /// Topic of the attribute + pub topic: Option, +} + +impl BuilderBoolean { + /// New boolean builder + pub fn new(parent_builder: AttributeBuilder) -> BuilderBoolean { + BuilderBoolean { + message_client: parent_builder.message_client, + message_dispatcher: parent_builder.message_dispatcher, + topic: parent_builder.topic, + } + } + + pub fn finish(self) -> AttributeBoolean { + AttributeBoolean::new(self) + } +} diff --git a/src/asyncv/msg/core_members.rs b/src/asyncv/attribute/message/core_members.rs similarity index 80% rename from src/asyncv/msg/core_members.rs rename to src/asyncv/attribute/message/core_members.rs index 6f9e4bc..feee6de 100644 --- a/src/asyncv/msg/core_members.rs +++ b/src/asyncv/attribute/message/core_members.rs @@ -1,23 +1,24 @@ -use rumqttc::{AsyncClient, QoS}; +use rumqttc::QoS; use std::sync::Arc; use std::sync::Weak; use tokio::sync::Mutex; use crate::AttributeError; +use super::MessageClient; +use super::MessageDispatcher; use super::OnMessageHandler; -use super::ReactorData; /// The core data of the attribute /// Those attributes will be moved between Att types #[derive(Clone)] -pub struct CoreMembers { +pub struct MessageCoreMembers { /// The data of the reactor, to be able to subscribe to the /// reactor and route messages to the attribute - reactor_data: Weak>, + message_dispatcher: Weak>, /// The mqtt client - mqtt_client: AsyncClient, + message_client: MessageClient, /// The topic of the attribute topic: String, @@ -26,16 +27,16 @@ pub struct CoreMembers { topic_cmd: String, } -impl CoreMembers { +impl MessageCoreMembers { /// Create a new core data pub fn new( - reactor_data: Weak>, + message_client: MessageClient, + message_dispatcher: Weak>, topic: String, - mqtt_client: AsyncClient, ) -> Self { Self { - reactor_data: reactor_data, - mqtt_client: mqtt_client, + message_dispatcher: message_dispatcher, + message_client: message_client, topic: topic.clone(), topic_cmd: format!("{}/cmd", topic), } @@ -57,7 +58,7 @@ impl CoreMembers { where V: Into>, { - self.mqtt_client + self.message_client .publish(&self.topic_cmd, QoS::AtMostOnce, true, value) .await .map_err(|e| AttributeError::Message(e)) @@ -68,7 +69,7 @@ impl CoreMembers { pub async fn subscribe(&self) -> Result<(), AttributeError> { // no need to store the att topic let topic_att = format!("{}/att", self.topic); - self.mqtt_client + self.message_client .subscribe(topic_att, QoS::AtMostOnce) .await .map_err(|e| AttributeError::Message(e)) @@ -82,7 +83,7 @@ impl CoreMembers { ) -> Result<(), AttributeError> { // no need to store the att topic let topic_att = format!("{}/att", self.topic); - self.reactor_data + self.message_dispatcher .upgrade() .ok_or(AttributeError::InternalPointerUpgrade)? .lock() diff --git a/src/asyncv/reactor/reactor_data.rs b/src/asyncv/attribute/message/dispatcher.rs similarity index 92% rename from src/asyncv/reactor/reactor_data.rs rename to src/asyncv/attribute/message/dispatcher.rs index 730738b..90c948c 100644 --- a/src/asyncv/reactor/reactor_data.rs +++ b/src/asyncv/attribute/message/dispatcher.rs @@ -8,13 +8,13 @@ use super::OnMessageHandler; /// Data used by the core the dispatch input data /// -pub struct ReactorData { +pub struct MessageDispatcher { /// List of attributes to trigger on message message_attributes: HashMap>>, } -impl ReactorData { - /// Create a new ReactorData +impl MessageDispatcher { + /// Create a new MessageDispatcher /// pub fn new() -> Self { Self { diff --git a/src/asyncv/builder.rs b/src/asyncv/builder.rs new file mode 100644 index 0000000..8beeb2e --- /dev/null +++ b/src/asyncv/builder.rs @@ -0,0 +1,45 @@ +use std::sync::Weak; + +use tokio::sync::Mutex; + +use super::attribute::message::boolean::BuilderBoolean; +pub use super::MessageClient; +pub use super::MessageDispatcher; + +/// Object that allow to build an generic attribute +/// +pub struct AttributeBuilder { + /// The mqtt client + pub message_client: MessageClient, + + /// The Object that allow the reactor to dispatch + /// incoming messages on attributes + pub message_dispatcher: Weak>, + + /// Topic of the attribute + pub topic: Option, +} + +impl AttributeBuilder { + /// Create a new builder + pub fn new( + message_client: MessageClient, + message_dispatcher: Weak>, + ) -> AttributeBuilder { + AttributeBuilder { + message_client, + message_dispatcher, + topic: None, + } + } + + /// Attach a topic + pub fn with_topic>(mut self, topic: T) -> Self { + self.topic = Some(topic.into()); + self + } + + pub fn with_type_boolean(self) -> BuilderBoolean { + BuilderBoolean::new(self) + } +} diff --git a/src/asyncv/msg.rs b/src/asyncv/msg.rs index 9f7a8e8..012220d 100644 --- a/src/asyncv/msg.rs +++ b/src/asyncv/msg.rs @@ -6,12 +6,7 @@ pub type CoreMembers = core_members::CoreMembers; /// pub mod att; -pub mod att_bool; +pub mod attribute_boolean; +pub use attribute_boolean::AttributeBoolean; pub use super::ReactorData; - -/// Trait to manage an message attribute (MQTT) -/// Sync version -pub trait OnMessageHandler: Send + Sync { - fn on_message(&mut self, data: &Bytes); -} diff --git a/src/asyncv/msg/att_bool.rs b/src/asyncv/msg/att_bool.rs deleted file mode 100644 index c565db5..0000000 --- a/src/asyncv/msg/att_bool.rs +++ /dev/null @@ -1,66 +0,0 @@ -mod inner_msg_att_bool; -use inner_msg_att_bool::InnerAttBool; - -use std::future::Future; -use std::sync::Arc; -use tokio::sync::Mutex; - -use crate::AttributeError; - -use super::att::Att; -pub use super::CoreMembers; -pub use super::OnMessageHandler; -pub use super::ReactorData; - -// pub trait OnChangeHandler: Send + Sync { -// fn on_change(&self, new_value: bool); -// } - -pub use inner_msg_att_bool::OnChangeHandlerFunction; - -pub struct AttBool { - inner: Arc>, -} - -impl AttBool { - // pub async fn set_on_change_handler(&self, handler: Box) { - // self.inner.lock().await.set_on_change_handler(handler); - // } - pub async fn on_change(&self, handler: OnChangeHandlerFunction) { - self.inner.lock().await.on_change(handler); - } - - /// Set the value of the attribute - /// - pub async fn set(&self, value: bool) -> Result<(), AttributeError> { - self.inner.lock().await.set(value).await?; - let cv = self.inner.lock().await.set_ensure_lock_clone(); - cv.with_lock(|mut done| { - while !*done { - done.wait(); - } - }); - Ok(()) - } - - /// Get the value of the attribute - /// - pub async fn get(&self) -> Option { - self.inner.lock().await.get() - } - - pub async fn from_core_members(core_data: CoreMembers) -> Self { - let inner = InnerAttBool::from(core_data).to_arc_mutex(); - inner.lock().await.register(inner.clone()).await.unwrap(); - Self { inner: inner } - } -} - -// impl Into for Att { -// fn into(self) -> AttBool { -// match self.take_core_members() { -// Ok(core_data) => AttBool::from(core_data), -// Err(_) => panic!("Error"), -// } -// } -// } diff --git a/src/asyncv/msg/att_bool/inner_msg_att_bool.rs b/src/asyncv/msg/att_bool/inner_msg_att_bool.rs deleted file mode 100644 index de6de79..0000000 --- a/src/asyncv/msg/att_bool/inner_msg_att_bool.rs +++ /dev/null @@ -1,160 +0,0 @@ -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; - -use futures::future::BoxFuture; -use futures::FutureExt; -use tokio::sync::Mutex; - -use bytes::Bytes; - -use crate::AttributeError; - -use super::CoreMembers; -// use super::OnChangeHandler; -use super::OnMessageHandler; - -use monitor::Monitor; - -// pub type OnChangeHandlerFunction = Arc + Send + Sync>>; -pub type OnChangeHandlerFunction = Pin + Send + Sync>>; -// pub type OnChangeHandlerFunction = BoxFuture<'static, ()>; - -// type OnChangeHandlerFunction = Arc BoxFuture<'static, ()>>; - -/// Inner implementation of the boolean message attribute -/// -pub struct InnerAttBool { - /// Members at the core of each attribute - core: CoreMembers, - /// Current value of the attribute - value: Option, - /// Requested value of the attribute (set by the user) - requested_value: Option, - /// Handler to call when the value change - // on_change_handler: Option>, - on_change_handler: Option, - set_ensure_lock: Arc>, -} - -impl InnerAttBool { - pub fn to_arc_mutex(self) -> Arc> { - Arc::new(Mutex::new(self)) - } - - pub fn set_ensure_lock_clone(&mut self) -> Arc> { - return self.set_ensure_lock.clone(); - } - - fn set_ensure_update(&mut self) { - if self.set_ensure_ok() { - self.set_ensure_lock.with_lock(|mut done| { - *done = true; - done.notify_one(); - }); - } - } - - fn set_ensure_ok(&self) -> bool { - return self.requested_value == self.value; - } - - /// Set the value of the attribute - /// - pub async fn set(&mut self, new_value: bool) -> Result<(), AttributeError> { - // Do not go further if the value is already set - if let Some(current_value) = self.value { - if current_value == new_value { - return Ok(()); - } - } - - // Set the requested value and publish the request - self.requested_value = Some(new_value); - match self.requested_value { - Some(requested_value) => match requested_value { - true => self.core.publish("1").await, - false => self.core.publish("0").await, - }, - None => Err(AttributeError::Unkonwn), - } - } - - /// Get the value of the attribute - /// If None, the first value is not yet received - /// - pub fn get(&self) -> Option { - return self.value; - } - - pub fn on_change(&mut self, handler: OnChangeHandlerFunction) { - self.on_change_handler = Some(handler); - } - - /// Register the attribute to the reactor - /// - pub async fn register( - &self, - attribute: Arc>, - ) -> Result<(), AttributeError> { - self.core.register(attribute).await - } -} - -impl OnMessageHandler for InnerAttBool { - fn on_message(&mut self, data: &Bytes) { - println!("boolean"); - - // OnChangeHandlerFunction - - // let a = async { true }; - // let b = || a; - - // self.on_change_handler = Some(Arc::new(|| a)); - // tokio::spawn(b.clone()()); - // tokio::spawn(b()); - // tokio::spawn(b()); - // tokio::spawn(pp55); - // tokio::spawn(pp55); - - // let pp: Pin + Send>> = async move { true }.boxed(); - // tokio::spawn(pp); - // tokio::spawn(pp); - // tokio::spawn(pp); - - // if let Some(handler) = self.on_change_handler.as_ref() { - // tokio::spawn(*(handler.clone())); - // } - if data.len() == 1 { - match data[0] { - b'1' => { - self.value = Some(true); - self.set_ensure_update(); - } - b'0' => { - self.value = Some(false); - self.set_ensure_update(); - } - _ => { - println!("unexcpedted payload {:?}", data); - return; - } - }; - // Do something with the value - } else { - println!("wierd payload {:?}", data); - } - } -} - -impl From for InnerAttBool { - fn from(core_data: CoreMembers) -> Self { - return Self { - core: core_data, - value: None, - requested_value: None, - on_change_handler: None, - set_ensure_lock: Arc::new(Monitor::new(false)), - }; - } -} diff --git a/src/asyncv/reactor.rs b/src/asyncv/reactor.rs index 16a9c8c..ce3790a 100644 --- a/src/asyncv/reactor.rs +++ b/src/asyncv/reactor.rs @@ -1,21 +1,20 @@ -pub mod reactor_core; -pub mod reactor_data; +mod message_engine; +use message_engine::MessageEngine; use std::sync::Arc; use tokio::sync::Mutex; use crate::{AttributeError, ReactorSettings}; -use reactor_core::ReactorCore; -pub use reactor_data::ReactorData; - use rumqttc::AsyncClient; use rumqttc::{Client, MqttOptions, QoS}; use std::time::Duration; -use super::msg::att::Att; -use super::msg::OnMessageHandler; +use super::attribute::message::MessageDispatcher; + +use super::builder::AttributeBuilder; +use super::MessageClient; /// The reactor is the main structure that will handle the connections and the events /// @@ -23,10 +22,11 @@ use super::msg::OnMessageHandler; /// #[derive(Clone)] pub struct Reactor { - core: Arc>, - data: Arc>, + /// The mqtt client + message_client: Option, - mqtt_client: AsyncClient, + /// + message_dispatcher: Arc>, } impl Reactor { @@ -37,46 +37,40 @@ impl Reactor { /// * `core` - The core of the reactor /// pub fn new(settings: ReactorSettings) -> Self { - println!("ReactorCore is running"); - let mut mqttoptions = MqttOptions::new("rumqtt-sync", "localhost", 1883); - mqttoptions.set_keep_alive(Duration::from_secs(3)); + // println!("ReactorCore is running"); + // let mut mqttoptions = MqttOptions::new("rumqtt-sync", "localhost", 1883); + // mqttoptions.set_keep_alive(Duration::from_secs(3)); - let (client, connection) = AsyncClient::new(mqttoptions, 100); + // let (client, event_loop) = AsyncClient::new(mqttoptions, 100); - let data = Arc::new(Mutex::new(ReactorData::new())); + // let data = ; Reactor { - core: Arc::new(Mutex::new(ReactorCore::new(data.clone(), connection))), - data: data, - mqtt_client: client, + message_client: None, + message_dispatcher: Arc::new(Mutex::new(MessageDispatcher::new())), } } - pub fn run_in_thread(&self) { - let core = self.core.clone(); - tokio::spawn(async move { - core.lock().await.run().await; - println!("ReactorCore is not runiing !!!!!!!!!!!!!!!!!!!!!!"); - }); + pub fn start(&self) { + // let message_engine = MessageEngine() + // tokio::spawn(async move { + // core.lock().await.run().await; + // println!("ReactorCore is not runiing !!!!!!!!!!!!!!!!!!!!!!"); + // }); } - /// Create a new attribute from its topic - /// - pub async fn attribute_from_topic(&self, topic: &str) -> Result { - Att::new( - self.data.clone(), - topic.to_string(), - self.mqtt_client.clone(), + pub fn create_new_attribute(&self) -> AttributeBuilder { + AttributeBuilder::new( + self.message_client.as_ref().unwrap().clone(), + Arc::downgrade(&self.message_dispatcher), ) - .init() - .await } - pub async fn scan_platforms(&self) { - println!("publish"); - self.mqtt_client - .publish("pza", QoS::AtLeastOnce, true, "pok") - .await - .unwrap(); - } + // pub async fn scan_platforms(&self) { + // println!("publish"); + // self.message_client + // .publish("pza", QoS::AtLeastOnce, true, "pok") + // .await + // .unwrap(); + // } } diff --git a/src/asyncv/reactor/reactor_core.rs b/src/asyncv/reactor/message_engine.rs similarity index 82% rename from src/asyncv/reactor/reactor_core.rs rename to src/asyncv/reactor/message_engine.rs index 4607d2f..61902f4 100644 --- a/src/asyncv/reactor/reactor_core.rs +++ b/src/asyncv/reactor/message_engine.rs @@ -1,25 +1,29 @@ use std::sync::Arc; + use tokio::sync::Mutex; -use rumqttc::{Connection, EventLoop}; +use crate::asyncv::attribute::message::MessageDispatcher; -use super::reactor_data::ReactorData; +type MessageEventLoop = rumqttc::EventLoop; -pub struct ReactorCore { - data: Arc>, - mqtt_connection: EventLoop, +pub struct MessageEngine { + message_dispatcher: Arc>, + message_event_loop: MessageEventLoop, } -impl ReactorCore { - pub fn new(data: Arc>, mqtt_connection: EventLoop) -> Self { - ReactorCore { - data: data, - mqtt_connection: mqtt_connection, +impl MessageEngine { + pub fn new( + message_dispatcher: Arc>, + message_event_loop: MessageEventLoop, + ) -> MessageEngine { + MessageEngine { + message_dispatcher: message_dispatcher, + message_event_loop: message_event_loop, } } - pub async fn run(&mut self) { - while let Ok(event) = self.mqtt_connection.poll().await { + async fn task(&mut self) { + while let Ok(event) = self.message_event_loop.poll().await { // println!("Notification = {:?}", notification); // match notification { // Ok(event) => { @@ -35,7 +39,7 @@ impl ReactorCore { // let payload_str = std::str::from_utf8(&payload).unwrap(); // println!("Received = {:?} {:?}", payload_str, packet.topic); - self.data + self.message_dispatcher .lock() .await .trigger_on_change(&packet.topic, &packet.payload) diff --git a/src/asyncv/stream.rs b/src/asyncv/stream.rs deleted file mode 100644 index e69de29..0000000 diff --git a/src/examples/test_async.rs b/src/examples/test_async.rs index 44bb806..b582893 100644 --- a/src/examples/test_async.rs +++ b/src/examples/test_async.rs @@ -8,7 +8,7 @@ async fn main() { let settings = ReactorSettings::new("localhost", 1883); let reactor = Reactor::new(settings); - reactor.run_in_thread(); + // reactor.run_in_thread(); // // wait for connection @@ -19,13 +19,12 @@ async fn main() { // reactor.scan_platforms(); let pp = reactor - .attribute_from_topic("ooo") - .await - .unwrap() - .into_att_bool() - .await; + .create_new_attribute() + .with_topic("test") + .with_type_boolean() + .finish(); - pp.set(true).await.unwrap(); + // pp.set(true).await.unwrap(); sleep(Duration::from_secs(60)).await; } diff --git a/src/examples/test_cb.rs b/src/examples/test_cb.rs index 94a96bd..38246ab 100644 --- a/src/examples/test_cb.rs +++ b/src/examples/test_cb.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use tokio::time::sleep; use tokio::time::Duration; @@ -31,6 +33,10 @@ impl Test { { self.b = Some(Box::pin(function)); } + + // async fn fire(&self) { + // self.b.unwrap().await + // } } #[tokio::main] @@ -48,7 +54,7 @@ async fn main() { // cbb = Some(a); let mut abc = Test { b: None }; - abc.set_b(async move { + att.on_change(async move { println!("neww!! !!!!"); Ok(true) });