Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
XdoctorwhoZ committed Aug 4, 2024
1 parent bd183ee commit a4bd814
Show file tree
Hide file tree
Showing 11 changed files with 420 additions and 224 deletions.
3 changes: 2 additions & 1 deletion src/asyncv/attribute.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod message;

pub use message::MessageAttribute;
pub use message::MessageAttributeRo;
pub use message::MessageAttributeRw;

pub use super::AttributeBuilder;
pub use super::MessageClient;
Expand Down
11 changes: 9 additions & 2 deletions src/asyncv/attribute/message.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
mod attribute;
mod dispatcher;
mod ro_attribute;
mod ro_inner;
mod rw_attribute;
mod rw_inner;

use async_trait::async_trait;
use bytes::Bytes;

pub type MessageAttribute<TYPE> = attribute::Attribute<TYPE>;
pub use ro_inner::MessageAttributeRoInner;
pub use rw_inner::MessageAttributeRwInner;

pub use ro_attribute::MessageAttributeRo;
pub use rw_attribute::MessageAttributeRw;

pub use dispatcher::MessageDispatcher;

Expand Down
184 changes: 0 additions & 184 deletions src/asyncv/attribute/message/attribute/inner.rs

This file was deleted.

55 changes: 55 additions & 0 deletions src/asyncv/attribute/message/ro_attribute.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::sleep;

use crate::AttributeError;
use crate::MessagePayloadManager;

use super::AttributeBuilder;
use super::MessageAttributeRoInner;

/// Attribute to manage a boolean
#[derive(Clone)]
pub struct MessageAttributeRo<TYPE: MessagePayloadManager> {
/// Inner implementation
inner: Arc<Mutex<MessageAttributeRoInner<TYPE>>>,
}

impl<TYPE: MessagePayloadManager> MessageAttributeRo<TYPE> {
/// Initialize the attribute
pub async fn init(self) -> Result<Self, AttributeError> {
self.inner.lock().await.init(self.inner.clone()).await?;
Ok(self)
}

pub async fn wait_change(&self) {
let change_notifier = self.inner.lock().await.clone_change_notifier();
change_notifier.notified().await
}

pub async fn wait_change_then<F>(&self, function: F)
where
F: Future<Output = ()> + Send + 'static,
{
let change_notifier = self.inner.lock().await.clone_change_notifier();
change_notifier.notified().await;
function.await;
}

/// Get the value of the attribute
///
pub async fn get(&self) -> Option<TYPE> {
self.inner.lock().await.get()
}
}

/// Allow creation from the builder
impl<TYPE: MessagePayloadManager> From<AttributeBuilder> for MessageAttributeRo<TYPE> {
fn from(builder: AttributeBuilder) -> Self {
MessageAttributeRo {
inner: MessageAttributeRoInner::from(builder).into(),
}
}
}
123 changes: 123 additions & 0 deletions src/asyncv/attribute/message/ro_inner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use rumqttc::QoS;
use std::sync::Arc;
use std::sync::Weak;
use tokio::sync::Mutex;

use crate::asyncv::MessageDispatcher;
use crate::AttributeError;

use super::MessageClient;
use super::OnMessageHandler;

use bytes::Bytes;

use async_trait::async_trait;

use crate::asyncv::AttributeBuilder;

use tokio::sync::Notify;

use crate::MessagePayloadManager;

/// Read Only Inner implementation of the message attribute
/// This inner implementation allow the public part to be cloneable easly
pub struct MessageAttributeRoInner<TYPE: MessagePayloadManager> {
/// Reactor message dispatcher
/// (to attach this attribute to the incoming messages)
message_dispatcher: Weak<Mutex<MessageDispatcher>>,
/// The message client (MQTT)
pub message_client: MessageClient,

/// The topic of the attribute
topic: String,

/// Current value of the attribute
pub value: Option<TYPE>,

///
change_notifier: Arc<Notify>,
}

impl<TYPE: MessagePayloadManager> MessageAttributeRoInner<TYPE> {
/// Initialize the attribute
/// Register the attribute on the message dispatcher then subscribe to att topic
pub async fn init(
&self,
attribute: Arc<Mutex<dyn OnMessageHandler>>,
) -> Result<(), AttributeError> {
self.register(attribute).await?;
self.subscribe().await
}

/// Get the value of the attribute
/// If None, the first value is not yet received
pub fn get(&self) -> Option<TYPE> {
return self.value;
}

/// Subscribe to the topic
pub async fn subscribe(&self) -> Result<(), AttributeError> {
// no need to store the att topic
let topic_att = format!("{}/att", self.topic);
self.message_client
.subscribe(topic_att, QoS::AtMostOnce)
.await
.map_err(|e| AttributeError::Message(e))
}

/// Register the attribute to the reactor
///
pub async fn register(
&self,
attribute: Arc<Mutex<dyn OnMessageHandler>>,
) -> Result<(), AttributeError> {
// no need to store the att topic
let topic_att = format!("{}/att", self.topic);
self.message_dispatcher
.upgrade()
.ok_or(AttributeError::InternalPointerUpgrade)?
.lock()
.await
// .map_err(|e| AttributeError::InternalMutex(e.to_string()))?
.register_message_attribute(topic_att, attribute);
Ok(())
}

///
///
pub fn clone_change_notifier(&self) -> Arc<Notify> {
self.change_notifier.clone()
}
}

/// Allow creation from the builder
impl<TYPE: MessagePayloadManager> From<AttributeBuilder> for MessageAttributeRoInner<TYPE> {
fn from(builder: AttributeBuilder) -> Self {
MessageAttributeRoInner {
message_dispatcher: builder.message_dispatcher,
message_client: builder.message_client,
topic: builder.topic.as_ref().unwrap().clone(),
value: None,
change_notifier: Arc::new(Notify::new()),
}
}
}

/// Allow mutation into Arc pointer
impl<TYPE: MessagePayloadManager> Into<Arc<Mutex<MessageAttributeRoInner<TYPE>>>>
for MessageAttributeRoInner<TYPE>
{
fn into(self) -> Arc<Mutex<MessageAttributeRoInner<TYPE>>> {
Arc::new(Mutex::new(self))
}
}

#[async_trait]
impl<TYPE: MessagePayloadManager> OnMessageHandler for MessageAttributeRoInner<TYPE> {
async fn on_message(&mut self, data: &Bytes) {
println!("ro_inner::on_message");
let new_value = TYPE::from(data.to_vec());
self.value = Some(new_value);
self.change_notifier.notify_waiters();
}
}
Loading

0 comments on commit a4bd814

Please sign in to comment.