Skip to content

Commit

Permalink
improve builder
Browse files Browse the repository at this point in the history
  • Loading branch information
XdoctorwhoZ committed Aug 3, 2024
1 parent 83d5701 commit 2090d53
Show file tree
Hide file tree
Showing 19 changed files with 458 additions and 319 deletions.
13 changes: 7 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ edition = "2021"
# path = "src/examples/test_sync.rs"


# [[bin]]
# name = "test_async"
# path = "src/examples/test_async.rs"


[[bin]]
name = "test_async"
path = "src/examples/test_cb.rs"
path = "src/examples/test_async.rs"


# [[bin]]
# name = "test_async"
# path = "src/examples/test_cb.rs"


[[test]]
Expand All @@ -37,6 +37,7 @@ panduza-proc = { path = "lib/panduza-proc" }
tokio = { version = "1", features = ["full"] }

futures = "0.3.30"
async-trait = "0.1.81"

[dev-dependencies]

Expand Down
13 changes: 10 additions & 3 deletions src/asyncv.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
pub mod attribute;
pub mod builder;

pub use builder::AttributeBuilder;

pub use attribute::message::MessageDispatcher;

/// This module manage the message attributes (MQTT/TCP)
pub mod msg;
// pub mod msg;
pub type MessageClient = rumqttc::AsyncClient;

/// This module manage the stream attributes (CUSTOM/QUIC)
pub mod stream;
// pub mod stream;

/// This module manage the reactor
mod reactor;
pub type Reactor = reactor::Reactor;
pub type ReactorData = reactor::ReactorData;
5 changes: 5 additions & 0 deletions src/asyncv/attribute.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub mod message;

pub use super::AttributeBuilder;
pub use super::MessageClient;
pub use super::MessageDispatcher;
20 changes: 20 additions & 0 deletions src/asyncv/attribute/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
pub mod boolean;
mod core_members;
mod dispatcher;

use async_trait::async_trait;
use bytes::Bytes;

pub use core_members::MessageCoreMembers;
pub use dispatcher::MessageDispatcher;

pub use super::AttributeBuilder;

pub type MessageClient = rumqttc::AsyncClient;

/// Trait to manage an message attribute (MQTT)
/// Sync version
#[async_trait]
pub trait OnMessageHandler: Send + Sync {
async fn on_message(&mut self, data: &Bytes);
}
12 changes: 12 additions & 0 deletions src/asyncv/attribute/message/boolean.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
pub mod attribute;
pub mod builder;

pub use attribute::AttributeBoolean;
pub use builder::BuilderBoolean;

pub use super::AttributeBuilder;
pub use super::MessageClient;
pub use super::MessageDispatcher;

pub use super::MessageCoreMembers;
pub use super::OnMessageHandler;
62 changes: 62 additions & 0 deletions src/asyncv/attribute/message/boolean/attribute.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
mod inner;
use inner::InnerBoolean;

use std::future::Future;
use std::sync::Arc;
use tokio::sync::Mutex;

use crate::AttributeError;
use async_trait::async_trait;

pub use super::BuilderBoolean;
use super::MessageCoreMembers;

pub use super::OnMessageHandler;

// use super::att::Att;
// pub use super::CoreMembers;
// pub use super::OnMessageHandler;
// pub use super::ReactorData;

// pub use inner_msg_att_bool::OnChangeHandlerFunction;

/// Attribute to manage a boolean
pub struct AttributeBoolean {
///
inner: Arc<Mutex<InnerBoolean>>,
}

impl AttributeBoolean {
///
pub fn new(builder: BuilderBoolean) -> AttributeBoolean {
AttributeBoolean {
inner: InnerBoolean::new(builder).to_arc_mutex(),
}
}

// pub async fn on_change_handler(&self, handler: Box<dyn OnChangeHandler>) {
// self.inner.lock().await.on_change_handler(handler);
// }
// pub async fn on_change(&self, handler: OnChangeHandlerFunction) {
// self.inner.lock().await.on_change(handler);
// }

/// Set the value of the attribute
///
// pub async fn set(&self, value: bool) -> Result<(), AttributeError> {
// self.inner.lock().await.set(value).await?;
// // let cv = self.inner.lock().await.set_ensure_lock_clone();
// // cv.with_lock(|mut done| {
// // while !*done {
// // done.wait();
// // }
// // });
// // Ok(())
// }

/// Get the value of the attribute
///
pub async fn get(&self) -> Option<bool> {
self.inner.lock().await.get()
}
}
178 changes: 178 additions & 0 deletions src/asyncv/attribute/message/boolean/attribute/inner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

use futures::future::BoxFuture;
use futures::FutureExt;
use tokio::sync::Mutex;

use bytes::Bytes;

use crate::AttributeError;

use super::MessageCoreMembers;
// use super::OnChangeHandler;
use super::OnMessageHandler;

pub use super::BuilderBoolean;
use monitor::Monitor;
// pub type OnChangeHandlerFunction = Arc<Box<dyn Future<Output = bool> + Send + Sync>>;
// pub type OnChangeHandlerFunction = Pin<Box<dyn Future<Output = bool> + Send + Sync>>;
// pub type OnChangeHandlerFunction = BoxFuture<'static, ()>;

// type OnChangeHandlerFunction = Arc<dyn 'static + Send + Sync + Fn(bool) -> BoxFuture<'static, ()>>;

/// Inner implementation of the boolean message attribute
///
pub struct InnerBoolean {
/// Members at the core of each attribute
core: MessageCoreMembers,
/// Current value of the attribute
value: Option<bool>,
/// Requested value of the attribute (set by the user)
requested_value: Option<bool>,
// / Handler to call when the value change
// on_change_handler: Option<Box<dyn OnChangeHandler>>,
// on_change_handler: Option<OnChangeHandlerFunction>,
// set_ensure_lock: Arc<Monitor<bool>>,
}

impl InnerBoolean {
///
pub fn new(builder: BuilderBoolean) -> InnerBoolean {
InnerBoolean {
core: MessageCoreMembers::new(
builder.message_client,
builder.message_dispatcher,
builder.topic.unwrap(),
),
value: None,
requested_value: None,
// set_ensure_lock: None,
}
}

pub fn to_arc_mutex(self) -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(self))
}

// pub fn set_ensure_lock_clone(&mut self) -> Arc<Monitor<bool>> {
// return self.set_ensure_lock.clone();
// }

// fn set_ensure_update(&mut self) {
// if self.set_ensure_ok() {
// self.set_ensure_lock.with_lock(|mut done| {
// *done = true;
// done.notify_one();
// });
// }
// }

fn set_ensure_ok(&self) -> bool {
return self.requested_value == self.value;
}

/// Set the value of the attribute
///
pub async fn set(&mut self, new_value: bool) -> Result<(), AttributeError> {
// Do not go further if the value is already set
if let Some(current_value) = self.value {
if current_value == new_value {
return Ok(());
}
}

// Set the requested value and publish the request
self.requested_value = Some(new_value);
match self.requested_value {
Some(requested_value) => match requested_value {
true => self.core.publish("1").await,
false => self.core.publish("0").await,
},
None => Err(AttributeError::Unkonwn),
}
}

/// Get the value of the attribute
/// If None, the first value is not yet received
///
pub fn get(&self) -> Option<bool> {
return self.value;
}

// pub fn on_change_handler(&mut self, handler: Box<dyn OnChangeHandler>) {
// self.on_change_handler = Some(handler);
// }

// pub fn on_change(&mut self, handler: OnChangeHandlerFunction) {
// self.on_change_handler = Some(handler);
// }

/// Register the attribute to the reactor
///
pub async fn register(
&self,
attribute: Arc<Mutex<dyn OnMessageHandler>>,
) -> Result<(), AttributeError> {
self.core.register(attribute).await
}
}

// impl OnMessageHandler for InnerBoolean {
// fn on_message(&mut self, data: &Bytes) {
// println!("boolean");

// // OnChangeHandlerFunction

// // let a = async { true };
// // let b = || a;

// // self.on_change_handler = Some(Arc::new(|| a));
// // tokio::spawn(b.clone()());
// // tokio::spawn(b());
// // tokio::spawn(b());
// // tokio::spawn(pp55);
// // tokio::spawn(pp55);

// // let pp: Pin<Box<dyn Future<Output = bool> + Send>> = async move { true }.boxed();
// // tokio::spawn(pp);
// // tokio::spawn(pp);
// // tokio::spawn(pp);

// // if let Some(handler) = self.on_change_handler.as_ref() {
// // tokio::spawn(*(handler.clone()));
// // }
// if data.len() == 1 {
// match data[0] {
// b'1' => {
// self.value = Some(true);
// self.set_ensure_update();
// }
// b'0' => {
// self.value = Some(false);
// self.set_ensure_update();
// }
// _ => {
// println!("unexcpedted payload {:?}", data);
// return;
// }
// };
// // Do something with the value
// } else {
// println!("wierd payload {:?}", data);
// }
// }
// }

// impl From<CoreMembers> for InnerBoolean {
// fn from(core_data: CoreMembers) -> Self {
// return Self {
// core: core_data,
// value: None,
// requested_value: None,
// on_change_handler: None,
// set_ensure_lock: Arc::new(Monitor::new(false)),
// };
// }
// }
36 changes: 36 additions & 0 deletions src/asyncv/attribute/message/boolean/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use std::sync::Weak;

use tokio::sync::Mutex;

use super::AttributeBoolean;
use super::AttributeBuilder;

pub use super::MessageClient;
pub use super::MessageDispatcher;

pub struct BuilderBoolean {
/// The mqtt client
pub message_client: MessageClient,

/// The Object that allow the reactor to dispatch
/// incoming messages on attributes
pub message_dispatcher: Weak<Mutex<MessageDispatcher>>,

/// Topic of the attribute
pub topic: Option<String>,
}

impl BuilderBoolean {
/// New boolean builder
pub fn new(parent_builder: AttributeBuilder) -> BuilderBoolean {
BuilderBoolean {
message_client: parent_builder.message_client,
message_dispatcher: parent_builder.message_dispatcher,
topic: parent_builder.topic,
}
}

pub fn finish(self) -> AttributeBoolean {
AttributeBoolean::new(self)
}
}
Loading

0 comments on commit 2090d53

Please sign in to comment.