diff --git a/Cargo.toml b/Cargo.toml index 04cc0dc..3227001 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,9 +4,14 @@ version = "0.0.0" edition = "2021" +# [[bin]] +# name = "test_sync" +# path = "src/examples/test_sync.rs" + + [[bin]] -name = "test_sync" -path = "src/examples/test_sync.rs" +name = "test_async" +path = "src/examples/test_async.rs" [[test]] @@ -24,6 +29,8 @@ bytes = "1.0.1" thiserror = "1.0.63" panduza-proc = { path = "lib/panduza-proc" } +tokio = { version = "1", features = ["full"] } + [dev-dependencies] diff --git a/src/asyncv.rs b/src/asyncv.rs new file mode 100644 index 0000000..f65b58e --- /dev/null +++ b/src/asyncv.rs @@ -0,0 +1,10 @@ +/// This module manage the message attributes (MQTT/TCP) +pub mod msg; + +/// This module manage the stream attributes (CUSTOM/QUIC) +pub mod stream; + +/// This module manage the reactor +mod reactor; +pub type Reactor = reactor::Reactor; +pub type ReactorData = reactor::ReactorData; diff --git a/src/asyncv/msg.rs b/src/asyncv/msg.rs new file mode 100644 index 0000000..9f7a8e8 --- /dev/null +++ b/src/asyncv/msg.rs @@ -0,0 +1,17 @@ +use bytes::Bytes; + +/// Members shared by all attributes +mod core_members; +pub type CoreMembers = core_members::CoreMembers; + +/// +pub mod att; +pub mod att_bool; + +pub use super::ReactorData; + +/// Trait to manage an message attribute (MQTT) +/// Sync version +pub trait OnMessageHandler: Send + Sync { + fn on_message(&mut self, data: &Bytes); +} diff --git a/src/asyncv/msg/att.rs b/src/asyncv/msg/att.rs new file mode 100644 index 0000000..4f57507 --- /dev/null +++ b/src/asyncv/msg/att.rs @@ -0,0 +1,60 @@ +mod inner_msg_att; +pub type InnerAtt = inner_msg_att::InnerAtt; + +use rumqttc::AsyncClient; +use std::sync::Arc; +use tokio::sync::Mutex; + +use crate::AttributeError; + +use super::att_bool::AttBool; +pub use super::CoreMembers; +pub use super::OnMessageHandler; +pub use super::ReactorData; + +/// Generic Message Attribute +pub struct Att { + /// Attribute are mainly a wrapper for the inner manager + inner: Arc>, +} + +impl Att { + /// Create a new Message Attribute + pub fn new( + reactor_data: Arc>, + topic: String, + mqtt_client: AsyncClient, + ) -> Self { + // Create a new inner manager + let inner = InnerAtt::new( + Arc::downgrade(&reactor_data), + topic.clone(), + mqtt_client.clone(), + ) + .into_arc_mutex(); + + Self { inner: inner } + } + + /// Initialize the attribute + /// + pub async fn init(self) -> Result { + self.inner.lock().await.init(self.inner.clone()).await?; + Ok(self) + } + + /// Take the inner core data + /// + pub async fn take_core_members(self) -> Result { + Ok(self.inner.lock().await.clone_core()) + } + + /// Easy conversion to AttBool + /// + pub async fn into_att_bool(self) -> AttBool { + match self.take_core_members().await { + Ok(core_data) => AttBool::from_core_members(core_data).await, + Err(_) => panic!("Error"), + } + } +} diff --git a/src/asyncv/msg/att/inner_msg_att.rs b/src/asyncv/msg/att/inner_msg_att.rs new file mode 100644 index 0000000..7f96fd1 --- /dev/null +++ b/src/asyncv/msg/att/inner_msg_att.rs @@ -0,0 +1,56 @@ +pub use super::CoreMembers; +use super::OnMessageHandler; +use super::ReactorData; +use crate::AttributeError; +use bytes::Bytes; +use rumqttc::AsyncClient; +use std::sync::{Arc, Weak}; +use tokio::sync::Mutex; + +/// Inner implementation of the generic message attribute +/// +pub struct InnerAtt { + /// Members at the core of each attribute + core: CoreMembers, +} + +impl InnerAtt { + /// Create a new InnerAtt + /// + pub fn new( + reactor_data: Weak>, + topic: String, + mqtt_client: AsyncClient, + ) -> Self { + Self { + core: CoreMembers::new(reactor_data, topic, mqtt_client), + } + } + + /// Convert the InnerAtt to an Arc> + /// + pub fn into_arc_mutex(self) -> Arc> { + Arc::new(Mutex::new(self)) + } + + /// Initialize the attribute + /// + pub async fn init( + &self, + attribute: Arc>, + ) -> Result<(), AttributeError> { + self.core.init(attribute).await + } + + /// Clone the core members (to mutate into an other type) + /// + pub fn clone_core(&self) -> CoreMembers { + self.core.clone() + } +} + +impl OnMessageHandler for InnerAtt { + fn on_message(&mut self, data: &Bytes) { + println!("generic"); + } +} diff --git a/src/asyncv/msg/att_bool.rs b/src/asyncv/msg/att_bool.rs new file mode 100644 index 0000000..ef0c704 --- /dev/null +++ b/src/asyncv/msg/att_bool.rs @@ -0,0 +1,60 @@ +mod inner_msg_att_bool; +use inner_msg_att_bool::InnerAttBool; + +use std::sync::Arc; +use tokio::sync::Mutex; + +use crate::AttributeError; + +use super::att::Att; +pub use super::CoreMembers; +pub use super::OnMessageHandler; +pub use super::ReactorData; + +pub trait OnChangeHandler: Send + Sync { + fn on_change(&self, new_value: bool); +} + +pub struct AttBool { + inner: Arc>, +} + +impl AttBool { + pub async fn set_on_change_handler(&self, handler: Box) { + self.inner.lock().await.set_on_change_handler(handler); + } + + /// Set the value of the attribute + /// + pub async fn set(&self, value: bool) -> Result<(), AttributeError> { + self.inner.lock().await.set(value).await?; + let cv = self.inner.lock().await.set_ensure_lock_clone(); + cv.with_lock(|mut done| { + while !*done { + done.wait(); + } + }); + Ok(()) + } + + /// Get the value of the attribute + /// + pub async fn get(&self) -> Option { + self.inner.lock().await.get() + } + + pub async fn from_core_members(core_data: CoreMembers) -> Self { + let inner = InnerAttBool::from(core_data).to_arc_mutex(); + inner.lock().await.register(inner.clone()).await.unwrap(); + Self { inner: inner } + } +} + +// impl Into for Att { +// fn into(self) -> AttBool { +// match self.take_core_members() { +// Ok(core_data) => AttBool::from(core_data), +// Err(_) => panic!("Error"), +// } +// } +// } diff --git a/src/asyncv/msg/att_bool/inner_msg_att_bool.rs b/src/asyncv/msg/att_bool/inner_msg_att_bool.rs new file mode 100644 index 0000000..46aae81 --- /dev/null +++ b/src/asyncv/msg/att_bool/inner_msg_att_bool.rs @@ -0,0 +1,128 @@ +use std::sync::Arc; +use tokio::sync::Mutex; + +use bytes::Bytes; + +use crate::AttributeError; + +use super::CoreMembers; +use super::OnChangeHandler; +use super::OnMessageHandler; + +use monitor::Monitor; + +/// Inner implementation of the boolean message attribute +/// +pub struct InnerAttBool { + /// Members at the core of each attribute + core: CoreMembers, + /// Current value of the attribute + value: Option, + /// Requested value of the attribute (set by the user) + requested_value: Option, + /// Handler to call when the value change + on_change_handler: Option>, + + set_ensure_lock: Arc>, +} + +impl InnerAttBool { + pub fn to_arc_mutex(self) -> Arc> { + Arc::new(Mutex::new(self)) + } + + pub fn set_ensure_lock_clone(&mut self) -> Arc> { + return self.set_ensure_lock.clone(); + } + + fn set_ensure_update(&mut self) { + if self.set_ensure_ok() { + self.set_ensure_lock.with_lock(|mut done| { + *done = true; + done.notify_one(); + }); + } + } + + fn set_ensure_ok(&self) -> bool { + return self.requested_value == self.value; + } + + /// Set the value of the attribute + /// + pub async fn set(&mut self, new_value: bool) -> Result<(), AttributeError> { + // Do not go further if the value is already set + if let Some(current_value) = self.value { + if current_value == new_value { + return Ok(()); + } + } + + // Set the requested value and publish the request + self.requested_value = Some(new_value); + match self.requested_value { + Some(requested_value) => match requested_value { + true => self.core.publish("1").await, + false => self.core.publish("0").await, + }, + None => Err(AttributeError::Unkonwn), + } + } + + /// Get the value of the attribute + /// If None, the first value is not yet received + /// + pub fn get(&self) -> Option { + return self.value; + } + + pub fn set_on_change_handler(&mut self, handler: Box) { + self.on_change_handler = Some(handler); + } + + /// Register the attribute to the reactor + /// + pub async fn register( + &self, + attribute: Arc>, + ) -> Result<(), AttributeError> { + self.core.register(attribute).await + } +} + +impl OnMessageHandler for InnerAttBool { + fn on_message(&mut self, data: &Bytes) { + println!("boolean"); + if data.len() == 1 { + match data[0] { + b'1' => { + self.value = Some(true); + self.set_ensure_update(); + } + b'0' => { + self.value = Some(false); + self.set_ensure_update(); + } + _ => { + println!("unexcpedted payload {:?}", data); + return; + } + }; + // Do something with the value + } else { + println!("wierd payload {:?}", data); + } + } +} + +impl From for InnerAttBool { + fn from(core_data: CoreMembers) -> Self { + return Self { + core: core_data, + value: None, + requested_value: None, + on_change_handler: None, + set_ensure_lock: Arc::new(Monitor::new(false)), + }; + } +} diff --git a/src/asyncv/msg/core_members.rs b/src/asyncv/msg/core_members.rs new file mode 100644 index 0000000..6f9e4bc --- /dev/null +++ b/src/asyncv/msg/core_members.rs @@ -0,0 +1,94 @@ +use rumqttc::{AsyncClient, QoS}; +use std::sync::Arc; +use std::sync::Weak; +use tokio::sync::Mutex; + +use crate::AttributeError; + +use super::OnMessageHandler; +use super::ReactorData; + +/// The core data of the attribute +/// Those attributes will be moved between Att types +#[derive(Clone)] +pub struct CoreMembers { + /// The data of the reactor, to be able to subscribe to the + /// reactor and route messages to the attribute + reactor_data: Weak>, + + /// The mqtt client + mqtt_client: AsyncClient, + + /// The topic of the attribute + topic: String, + + /// The topic for commands + topic_cmd: String, +} + +impl CoreMembers { + /// Create a new core data + pub fn new( + reactor_data: Weak>, + topic: String, + mqtt_client: AsyncClient, + ) -> Self { + Self { + reactor_data: reactor_data, + mqtt_client: mqtt_client, + topic: topic.clone(), + topic_cmd: format!("{}/cmd", topic), + } + } + + /// Initialize the attribute + /// + pub async fn init( + &self, + attribute: Arc>, + ) -> Result<(), AttributeError> { + self.register(attribute).await?; + self.subscribe().await + } + + /// Publish a command + /// + pub async fn publish(&self, value: V) -> Result<(), AttributeError> + where + V: Into>, + { + self.mqtt_client + .publish(&self.topic_cmd, QoS::AtMostOnce, true, value) + .await + .map_err(|e| AttributeError::Message(e)) + } + + /// Subscribe to the topic + /// + pub async fn subscribe(&self) -> Result<(), AttributeError> { + // no need to store the att topic + let topic_att = format!("{}/att", self.topic); + self.mqtt_client + .subscribe(topic_att, QoS::AtMostOnce) + .await + .map_err(|e| AttributeError::Message(e)) + } + + /// Register the attribute to the reactor + /// + pub async fn register( + &self, + attribute: Arc>, + ) -> Result<(), AttributeError> { + // no need to store the att topic + let topic_att = format!("{}/att", self.topic); + self.reactor_data + .upgrade() + .ok_or(AttributeError::InternalPointerUpgrade)? + .lock() + .await + // .map_err(|e| AttributeError::InternalMutex(e.to_string()))? + .register_message_attribute(topic_att, attribute); + Ok(()) + } +} diff --git a/src/asyncv/reactor.rs b/src/asyncv/reactor.rs new file mode 100644 index 0000000..16a9c8c --- /dev/null +++ b/src/asyncv/reactor.rs @@ -0,0 +1,82 @@ +pub mod reactor_core; +pub mod reactor_data; + +use std::sync::Arc; +use tokio::sync::Mutex; + +use crate::{AttributeError, ReactorSettings}; + +use reactor_core::ReactorCore; +pub use reactor_data::ReactorData; + +use rumqttc::AsyncClient; +use rumqttc::{Client, MqttOptions, QoS}; + +use std::time::Duration; + +use super::msg::att::Att; +use super::msg::OnMessageHandler; + +/// The reactor is the main structure that will handle the connections and the events +/// +/// All the attribute and objects will be powered by the reactor +/// +#[derive(Clone)] +pub struct Reactor { + core: Arc>, + data: Arc>, + + mqtt_client: AsyncClient, +} + +impl Reactor { + /// Create a new Reactor + /// + /// # Arguments + /// + /// * `core` - The core of the reactor + /// + pub fn new(settings: ReactorSettings) -> Self { + println!("ReactorCore is running"); + let mut mqttoptions = MqttOptions::new("rumqtt-sync", "localhost", 1883); + mqttoptions.set_keep_alive(Duration::from_secs(3)); + + let (client, connection) = AsyncClient::new(mqttoptions, 100); + + let data = Arc::new(Mutex::new(ReactorData::new())); + + Reactor { + core: Arc::new(Mutex::new(ReactorCore::new(data.clone(), connection))), + data: data, + mqtt_client: client, + } + } + + pub fn run_in_thread(&self) { + let core = self.core.clone(); + tokio::spawn(async move { + core.lock().await.run().await; + println!("ReactorCore is not runiing !!!!!!!!!!!!!!!!!!!!!!"); + }); + } + + /// Create a new attribute from its topic + /// + pub async fn attribute_from_topic(&self, topic: &str) -> Result { + Att::new( + self.data.clone(), + topic.to_string(), + self.mqtt_client.clone(), + ) + .init() + .await + } + + pub async fn scan_platforms(&self) { + println!("publish"); + self.mqtt_client + .publish("pza", QoS::AtLeastOnce, true, "pok") + .await + .unwrap(); + } +} diff --git a/src/asyncv/reactor/reactor_core.rs b/src/asyncv/reactor/reactor_core.rs new file mode 100644 index 0000000..4607d2f --- /dev/null +++ b/src/asyncv/reactor/reactor_core.rs @@ -0,0 +1,84 @@ +use std::sync::Arc; +use tokio::sync::Mutex; + +use rumqttc::{Connection, EventLoop}; + +use super::reactor_data::ReactorData; + +pub struct ReactorCore { + data: Arc>, + mqtt_connection: EventLoop, +} + +impl ReactorCore { + pub fn new(data: Arc>, mqtt_connection: EventLoop) -> Self { + ReactorCore { + data: data, + mqtt_connection: mqtt_connection, + } + } + + pub async fn run(&mut self) { + while let Ok(event) = self.mqtt_connection.poll().await { + // println!("Notification = {:?}", notification); + // match notification { + // Ok(event) => { + match event { + rumqttc::Event::Incoming(incoming) => { + // println!("Incoming = {:?}", incoming); + + match incoming { + // rumqttc::Packet::Connect(_) => todo!(), + // rumqttc::Packet::ConnAck(_) => todo!(), + rumqttc::Packet::Publish(packet) => { + // let payload = packet.payload; + // let payload_str = std::str::from_utf8(&payload).unwrap(); + // println!("Received = {:?} {:?}", payload_str, packet.topic); + + self.data + .lock() + .await + .trigger_on_change(&packet.topic, &packet.payload) + .await; + } + // rumqttc::Packet::PubAck(_) => todo!(), + // rumqttc::Packet::PubRec(_) => todo!(), + // rumqttc::Packet::PubRel(_) => todo!(), + // rumqttc::Packet::PubComp(_) => todo!(), + // rumqttc::Packet::Subscribe(_) => todo!(), + // rumqttc::Packet::SubAck(_) => todo!(), + // rumqttc::Packet::Unsubscribe(_) => todo!(), + // rumqttc::Packet::UnsubAck(_) => todo!(), + // rumqttc::Packet::PingReq => todo!(), + // rumqttc::Packet::PingResp => todo!(), + // rumqttc::Packet::Disconnect => todo!(), + _ => {} + } + } + rumqttc::Event::Outgoing(outgoing) => { + // println!("Outgoing = {:?}", outgoing); + match outgoing { + rumqttc::Outgoing::Publish(packet) => { + // println!("Publish = {:?}", packet); + } + rumqttc::Outgoing::Subscribe(p) => { + // println!("Subscribe = {:?}", p); + } + // rumqttc::Outgoing::Unsubscribe(_) => todo!(), + // rumqttc::Outgoing::PubAck(_) => todo!(), + // rumqttc::Outgoing::PubRec(_) => todo!(), + // rumqttc::Outgoing::PubRel(_) => todo!(), + // rumqttc::Outgoing::PubComp(_) => todo!(), + // rumqttc::Outgoing::PingReq => todo!(), + // rumqttc::Outgoing::PingResp => todo!(), + // rumqttc::Outgoing::Disconnect => todo!(), + // rumqttc::Outgoing::AwaitAck(_) => todo!(), + _ => {} + } + } // } + // } + // Err(_) => todo!(), + } + } + } +} diff --git a/src/asyncv/reactor/reactor_data.rs b/src/asyncv/reactor/reactor_data.rs new file mode 100644 index 0000000..730738b --- /dev/null +++ b/src/asyncv/reactor/reactor_data.rs @@ -0,0 +1,49 @@ +use std::sync::Weak; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::Mutex; + +use bytes::Bytes; + +use super::OnMessageHandler; + +/// Data used by the core the dispatch input data +/// +pub struct ReactorData { + /// List of attributes to trigger on message + message_attributes: HashMap>>, +} + +impl ReactorData { + /// Create a new ReactorData + /// + pub fn new() -> Self { + Self { + message_attributes: HashMap::new(), + } + } + + pub fn register_message_attribute( + &mut self, + topic: String, + attribute: Arc>, + ) { + self.message_attributes + .insert(topic, Arc::downgrade(&attribute)); + } + + /// Trigger the on_message of the attribute + /// + pub async fn trigger_on_change(&self, topic: &str, new_value: &Bytes) { + // println!("{:?}", self.message_attributes.keys()); + if let Some(attribute) = self.message_attributes.get(topic) { + match attribute.upgrade() { + Some(attribute) => { + attribute.lock().await.on_message(new_value); + } + None => { + println!("Attribute not found"); + } + } + } + } +} diff --git a/src/asyncv/mod.rs b/src/asyncv/stream.rs similarity index 100% rename from src/asyncv/mod.rs rename to src/asyncv/stream.rs diff --git a/src/examples/test_async.rs b/src/examples/test_async.rs new file mode 100644 index 0000000..44bb806 --- /dev/null +++ b/src/examples/test_async.rs @@ -0,0 +1,31 @@ +use panduza::asyncv::Reactor; +use panduza::ReactorSettings; +use tokio::time::sleep; +use tokio::time::Duration; + +#[tokio::main] +async fn main() { + let settings = ReactorSettings::new("localhost", 1883); + let reactor = Reactor::new(settings); + + reactor.run_in_thread(); + + // // wait for connection + + // // sleep(time::Duration::from_secs(5)); + + // println!("-----------"); + + // reactor.scan_platforms(); + + let pp = reactor + .attribute_from_topic("ooo") + .await + .unwrap() + .into_att_bool() + .await; + + pp.set(true).await.unwrap(); + + sleep(Duration::from_secs(60)).await; +}