Skip to content

Commit

Permalink
naif portage to async
Browse files Browse the repository at this point in the history
  • Loading branch information
XdoctorwhoZ committed Aug 2, 2024
1 parent 7b788ba commit 669e048
Show file tree
Hide file tree
Showing 13 changed files with 680 additions and 2 deletions.
11 changes: 9 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ version = "0.0.0"
edition = "2021"


# [[bin]]
# name = "test_sync"
# path = "src/examples/test_sync.rs"


[[bin]]
name = "test_sync"
path = "src/examples/test_sync.rs"
name = "test_async"
path = "src/examples/test_async.rs"


[[test]]
Expand All @@ -24,6 +29,8 @@ bytes = "1.0.1"
thiserror = "1.0.63"
panduza-proc = { path = "lib/panduza-proc" }

tokio = { version = "1", features = ["full"] }

[dev-dependencies]


Expand Down
10 changes: 10 additions & 0 deletions src/asyncv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/// This module manage the message attributes (MQTT/TCP)
pub mod msg;

/// This module manage the stream attributes (CUSTOM/QUIC)
pub mod stream;

/// This module manage the reactor
mod reactor;
pub type Reactor = reactor::Reactor;
pub type ReactorData = reactor::ReactorData;
17 changes: 17 additions & 0 deletions src/asyncv/msg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use bytes::Bytes;

/// Members shared by all attributes
mod core_members;
pub type CoreMembers = core_members::CoreMembers;

///
pub mod att;
pub mod att_bool;

pub use super::ReactorData;

/// Trait to manage an message attribute (MQTT)
/// Sync version
pub trait OnMessageHandler: Send + Sync {
fn on_message(&mut self, data: &Bytes);
}
60 changes: 60 additions & 0 deletions src/asyncv/msg/att.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
mod inner_msg_att;
pub type InnerAtt = inner_msg_att::InnerAtt;

use rumqttc::AsyncClient;
use std::sync::Arc;
use tokio::sync::Mutex;

use crate::AttributeError;

use super::att_bool::AttBool;
pub use super::CoreMembers;
pub use super::OnMessageHandler;
pub use super::ReactorData;

/// Generic Message Attribute
pub struct Att {
/// Attribute are mainly a wrapper for the inner manager
inner: Arc<Mutex<InnerAtt>>,
}

impl Att {
/// Create a new Message Attribute
pub fn new(
reactor_data: Arc<Mutex<ReactorData>>,
topic: String,
mqtt_client: AsyncClient,
) -> Self {
// Create a new inner manager
let inner = InnerAtt::new(
Arc::downgrade(&reactor_data),
topic.clone(),
mqtt_client.clone(),
)
.into_arc_mutex();

Self { inner: inner }
}

/// Initialize the attribute
///
pub async fn init(self) -> Result<Self, AttributeError> {
self.inner.lock().await.init(self.inner.clone()).await?;
Ok(self)
}

/// Take the inner core data
///
pub async fn take_core_members(self) -> Result<CoreMembers, AttributeError> {
Ok(self.inner.lock().await.clone_core())
}

/// Easy conversion to AttBool
///
pub async fn into_att_bool(self) -> AttBool {
match self.take_core_members().await {
Ok(core_data) => AttBool::from_core_members(core_data).await,
Err(_) => panic!("Error"),
}
}
}
56 changes: 56 additions & 0 deletions src/asyncv/msg/att/inner_msg_att.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
pub use super::CoreMembers;
use super::OnMessageHandler;
use super::ReactorData;
use crate::AttributeError;
use bytes::Bytes;
use rumqttc::AsyncClient;
use std::sync::{Arc, Weak};
use tokio::sync::Mutex;

/// Inner implementation of the generic message attribute
///
pub struct InnerAtt {
/// Members at the core of each attribute
core: CoreMembers,
}

impl InnerAtt {
/// Create a new InnerAtt
///
pub fn new(
reactor_data: Weak<Mutex<ReactorData>>,
topic: String,
mqtt_client: AsyncClient,
) -> Self {
Self {
core: CoreMembers::new(reactor_data, topic, mqtt_client),
}
}

/// Convert the InnerAtt to an Arc<Mutex<InnerAtt>>
///
pub fn into_arc_mutex(self) -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(self))
}

/// Initialize the attribute
///
pub async fn init(
&self,
attribute: Arc<Mutex<dyn OnMessageHandler>>,
) -> Result<(), AttributeError> {
self.core.init(attribute).await
}

/// Clone the core members (to mutate into an other type)
///
pub fn clone_core(&self) -> CoreMembers {
self.core.clone()
}
}

impl OnMessageHandler for InnerAtt {
fn on_message(&mut self, data: &Bytes) {
println!("generic");
}
}
60 changes: 60 additions & 0 deletions src/asyncv/msg/att_bool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
mod inner_msg_att_bool;
use inner_msg_att_bool::InnerAttBool;

use std::sync::Arc;
use tokio::sync::Mutex;

use crate::AttributeError;

use super::att::Att;
pub use super::CoreMembers;
pub use super::OnMessageHandler;
pub use super::ReactorData;

pub trait OnChangeHandler: Send + Sync {
fn on_change(&self, new_value: bool);
}

pub struct AttBool {
inner: Arc<Mutex<InnerAttBool>>,
}

impl AttBool {
pub async fn set_on_change_handler(&self, handler: Box<dyn OnChangeHandler>) {
self.inner.lock().await.set_on_change_handler(handler);
}

/// Set the value of the attribute
///
pub async fn set(&self, value: bool) -> Result<(), AttributeError> {
self.inner.lock().await.set(value).await?;
let cv = self.inner.lock().await.set_ensure_lock_clone();
cv.with_lock(|mut done| {
while !*done {
done.wait();
}
});
Ok(())
}

/// Get the value of the attribute
///
pub async fn get(&self) -> Option<bool> {
self.inner.lock().await.get()
}

pub async fn from_core_members(core_data: CoreMembers) -> Self {
let inner = InnerAttBool::from(core_data).to_arc_mutex();
inner.lock().await.register(inner.clone()).await.unwrap();
Self { inner: inner }
}
}

// impl Into<AttBool> for Att {
// fn into(self) -> AttBool {
// match self.take_core_members() {
// Ok(core_data) => AttBool::from(core_data),
// Err(_) => panic!("Error"),
// }
// }
// }
128 changes: 128 additions & 0 deletions src/asyncv/msg/att_bool/inner_msg_att_bool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use std::sync::Arc;
use tokio::sync::Mutex;

use bytes::Bytes;

use crate::AttributeError;

use super::CoreMembers;
use super::OnChangeHandler;
use super::OnMessageHandler;

use monitor::Monitor;

/// Inner implementation of the boolean message attribute
///
pub struct InnerAttBool {
/// Members at the core of each attribute
core: CoreMembers,
/// Current value of the attribute
value: Option<bool>,
/// Requested value of the attribute (set by the user)
requested_value: Option<bool>,
/// Handler to call when the value change
on_change_handler: Option<Box<dyn OnChangeHandler>>,

set_ensure_lock: Arc<Monitor<bool>>,
}

impl InnerAttBool {
pub fn to_arc_mutex(self) -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(self))
}

pub fn set_ensure_lock_clone(&mut self) -> Arc<Monitor<bool>> {
return self.set_ensure_lock.clone();
}

fn set_ensure_update(&mut self) {
if self.set_ensure_ok() {
self.set_ensure_lock.with_lock(|mut done| {
*done = true;
done.notify_one();
});
}
}

fn set_ensure_ok(&self) -> bool {
return self.requested_value == self.value;
}

/// Set the value of the attribute
///
pub async fn set(&mut self, new_value: bool) -> Result<(), AttributeError> {
// Do not go further if the value is already set
if let Some(current_value) = self.value {
if current_value == new_value {
return Ok(());
}
}

// Set the requested value and publish the request
self.requested_value = Some(new_value);
match self.requested_value {
Some(requested_value) => match requested_value {
true => self.core.publish("1").await,
false => self.core.publish("0").await,
},
None => Err(AttributeError::Unkonwn),
}
}

/// Get the value of the attribute
/// If None, the first value is not yet received
///
pub fn get(&self) -> Option<bool> {
return self.value;
}

pub fn set_on_change_handler(&mut self, handler: Box<dyn OnChangeHandler>) {
self.on_change_handler = Some(handler);
}

/// Register the attribute to the reactor
///
pub async fn register(
&self,
attribute: Arc<Mutex<dyn OnMessageHandler>>,
) -> Result<(), AttributeError> {
self.core.register(attribute).await
}
}

impl OnMessageHandler for InnerAttBool {
fn on_message(&mut self, data: &Bytes) {
println!("boolean");
if data.len() == 1 {
match data[0] {
b'1' => {
self.value = Some(true);
self.set_ensure_update();
}
b'0' => {
self.value = Some(false);
self.set_ensure_update();
}
_ => {
println!("unexcpedted payload {:?}", data);
return;
}
};
// Do something with the value
} else {
println!("wierd payload {:?}", data);
}
}
}

impl From<CoreMembers> for InnerAttBool {
fn from(core_data: CoreMembers) -> Self {
return Self {
core: core_data,
value: None,
requested_value: None,
on_change_handler: None,
set_ensure_lock: Arc::new(Monitor::new(false)),
};
}
}
Loading

0 comments on commit 669e048

Please sign in to comment.