Skip to content

Commit

Permalink
up
Browse files Browse the repository at this point in the history
  • Loading branch information
XdoctorwhoZ committed Aug 3, 2024
1 parent 3b33944 commit dadab8a
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 111 deletions.
78 changes: 62 additions & 16 deletions src/asyncv/attribute/message/boolean/attribute/inner.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
use rumqttc::QoS;
use std::sync::Arc;
use std::sync::Weak;
use tokio::sync::Mutex;

use crate::AttributeError;

use super::MessageClient;
use super::OnMessageHandler;

use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

use futures::future::BoxFuture;
use futures::FutureExt;
use tokio::sync::Mutex;

use bytes::Bytes;

use async_trait::async_trait;

use crate::asyncv::AttributeBuilder;
use crate::AttributeError;

use super::MessageCoreMembers;
// use super::OnChangeHandler;
Expand All @@ -24,8 +31,19 @@ use super::AttributePayloadManager;
/// Inner implementation of the boolean message attribute
///
pub struct AttributeInner<TYPE: AttributePayloadManager> {
/// Members at the core of each attribute
core: MessageCoreMembers,
/// The data of the reactor, to be able to subscribe to the
/// reactor and route messages to the attribute
message_dispatcher: Weak<Mutex<MessageDispatcher>>,

/// The mqtt client
message_client: MessageClient,

/// The topic of the attribute
topic: String,

/// The topic for commands
topic_cmd: String,

/// Current value of the attribute
value: Option<TYPE>,
/// Requested value of the attribute (set by the user)
Expand All @@ -37,11 +55,10 @@ impl<TYPE: AttributePayloadManager> AttributeInner<TYPE> {
///
pub fn new(builder: AttributeBuilder) -> AttributeInner<TYPE> {
AttributeInner {
core: MessageCoreMembers::new(
builder.message_client,
builder.message_dispatcher,
builder.topic.unwrap(),
),
message_dispatcher: builder.message_dispatcher,
message_client: builder.message_client,
topic: builder.topic.as_ref().unwrap().clone(),
topic_cmd: format!("{}/cmd", builder.topic.as_ref().unwrap()),
value: None,
requested_value: None,
}
Expand Down Expand Up @@ -70,7 +87,8 @@ impl<TYPE: AttributePayloadManager> AttributeInner<TYPE> {
&self,
attribute: Arc<Mutex<dyn OnMessageHandler>>,
) -> Result<(), AttributeError> {
self.core.init(attribute).await
self.register(attribute).await?;
self.subscribe().await
}

// fn set_ensure_ok(&self) -> bool {
Expand All @@ -91,7 +109,7 @@ impl<TYPE: AttributePayloadManager> AttributeInner<TYPE> {
self.requested_value = Some(new_value);
match self.requested_value {
Some(requested_value) => {
self.core.publish(requested_value.into()).await;
self.publish(requested_value.into()).await;
Ok(())
}
None => Err(AttributeError::Unkonwn),
Expand All @@ -105,17 +123,45 @@ impl<TYPE: AttributePayloadManager> AttributeInner<TYPE> {
return self.value;
}

// pub fn on_change(&mut self, handler: OnChangeHandlerFunction) {
// self.on_change_handler = Some(handler);
// }
/// Publish a command
///
pub async fn publish<V>(&self, value: V) -> Result<(), AttributeError>
where
V: Into<Vec<u8>>,
{
self.message_client
.publish(&self.topic_cmd, QoS::AtMostOnce, true, value)
.await
.map_err(|e| AttributeError::Message(e))
}

/// Subscribe to the topic
///
pub async fn subscribe(&self) -> Result<(), AttributeError> {
// no need to store the att topic
let topic_att = format!("{}/att", self.topic);
self.message_client
.subscribe(topic_att, QoS::AtMostOnce)
.await
.map_err(|e| AttributeError::Message(e))
}

/// Register the attribute to the reactor
///
pub async fn register(
&self,
attribute: Arc<Mutex<dyn OnMessageHandler>>,
) -> Result<(), AttributeError> {
self.core.register(attribute).await
// no need to store the att topic
let topic_att = format!("{}/att", self.topic);
self.message_dispatcher
.upgrade()
.ok_or(AttributeError::InternalPointerUpgrade)?
.lock()
.await
// .map_err(|e| AttributeError::InternalMutex(e.to_string()))?
.register_message_attribute(topic_att, attribute);
Ok(())
}
}

Expand Down
95 changes: 0 additions & 95 deletions src/asyncv/attribute/message/core_members.rs

This file was deleted.

0 comments on commit dadab8a

Please sign in to comment.