diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..14f989d --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,19 @@ +{ + "editor.formatOnSave": true, + "cucumber.glue": [ + "${workspaceFolder}/**/*.rs", + "*specs*/**/*.cs", + "features/**/*.js", + "features/**/*.jsx", + "features/**/*.php", + "features/**/*.py", + "features/**/*.rs", + "features/**/*.rb", + "features/**/*.ts", + "features/**/*.tsx", + "features/**/*_test.go", + "src/test/**/*.java", + "tests/**/*.py", + "tests/**/*.rs" + ] +} \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..04cc0dc --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "panduza" +version = "0.0.0" +edition = "2021" + + +[[bin]] +name = "test_sync" +path = "src/examples/test_sync.rs" + + +[[test]] +name = "mqtt_connection" +harness = false + + +[dependencies] + +rumqttc = "0.24.0" + +monitor = "0.1.0" + +bytes = "1.0.1" +thiserror = "1.0.63" +panduza-proc = { path = "lib/panduza-proc" } + +[dev-dependencies] + + +# rumqttd = { git = "https://github.com/Panduza/rumqtt", branch = "pza_tests" } + +cucumber = { version = "0.21.1", features = ["tracing", "timestamps"] } +futures = "0.3" +# tokio = { version = "1", features = ["full", "tracing"] } + +config = "0.14.0" diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..261eeb9 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md index 9b8b2f3..a622443 100644 --- a/README.md +++ b/README.md @@ -1 +1,2 @@ -# panduza-rust \ No newline at end of file +# panduza-rust +Panduza Rust Client diff --git a/features/mqtt_connection.feature b/features/mqtt_connection.feature new file mode 100644 index 0000000..987966a --- /dev/null +++ b/features/mqtt_connection.feature @@ -0,0 +1,9 @@ +Feature: Mqtt Connection + As a user of the Panduza library + I want to connect to an MQTT broker + So that I can send and receive messages + + Scenario: Connect to an MQTT broker + Given A broker is running + When I connect to the broker with the reactor + Then I should be connected to the broker diff --git a/lib/panduza-proc/Cargo.toml b/lib/panduza-proc/Cargo.toml new file mode 100644 index 0000000..516ece9 --- /dev/null +++ b/lib/panduza-proc/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "panduza-proc" +version = "0.0.0" +edition = "2021" + +[lib] +proc-macro = true + +[dependencies] +quote = "1.0.9" +syn = "2.0.72" diff --git a/lib/panduza-proc/src/lib.rs b/lib/panduza-proc/src/lib.rs new file mode 100644 index 0000000..58a86d8 --- /dev/null +++ b/lib/panduza-proc/src/lib.rs @@ -0,0 +1,48 @@ + + + +extern crate proc_macro; +extern crate syn; +#[macro_use] +extern crate quote; +use proc_macro::TokenStream; +use quote::quote; +use syn::{parse_macro_input, DeriveInput}; + + +// il faut aussi utiliser les proc attributes pour les attributs et non pas derive + + +#[proc_macro_derive(HelloWorld)] +pub fn hello_world(input: TokenStream) -> TokenStream { + + let input = parse_macro_input!(input as DeriveInput); + + + let name = input.ident; + + // Build the output, possibly using quasi-quotation + let expanded = quote! { + + impl HelloWorld for #name { + fn hello_world() { + println!("Hello, World! My name is {}", stringify!(#name)); + } + } + + }; + + // Hand the output tokens back to the compiler + TokenStream::from(expanded) +} + +// fn impl_hello_world(ast: &syn::DeriveInput) -> quote::ToTokens { +// let name = &ast.ident; +// quote! { +// impl HelloWorld for #name { +// fn hello_world() { +// println!("Hello, World! My name is {}", stringify!(#name)); +// } +// } +// } +// } diff --git a/src/asyncv/mod.rs b/src/asyncv/mod.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/common.rs b/src/common.rs new file mode 100644 index 0000000..41efec5 --- /dev/null +++ b/src/common.rs @@ -0,0 +1,6 @@ +mod error; +mod pza_topic; +mod reactor_settings; + +pub type AttributeError = error::AttributeError; +pub type ReactorSettings = reactor_settings::ReactorSettings; diff --git a/src/common/error.rs b/src/common/error.rs new file mode 100644 index 0000000..8d23f1d --- /dev/null +++ b/src/common/error.rs @@ -0,0 +1,16 @@ +use rumqttc::ClientError; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum AttributeError { + #[error("error with mqtt connection")] + Message(#[from] ClientError), + #[error("error on mutex lock")] + InternalMutex(String), + #[error("weak pointer cannot be upgraded")] + InternalPointerUpgrade, + #[error("set request sent but no response received")] + EnsureTimeout, + #[error("we do not know what happened")] + Unkonwn, +} diff --git a/src/common/pza_topic.rs b/src/common/pza_topic.rs new file mode 100644 index 0000000..10a66d9 --- /dev/null +++ b/src/common/pza_topic.rs @@ -0,0 +1,26 @@ + + + + +struct PzaTopic { + + topic: String, + +} + +impl PzaTopic { + + pub fn new(topic: String) -> Self + { + Self + { + topic: topic + } + } + + pub fn get_topic(&self) -> String + { + self.topic.clone() + } + +} \ No newline at end of file diff --git a/src/common/reactor_settings.rs b/src/common/reactor_settings.rs new file mode 100644 index 0000000..c17026b --- /dev/null +++ b/src/common/reactor_settings.rs @@ -0,0 +1,21 @@ + + + +pub struct ReactorSettings +{ + addr: String, + port_mqtt: u16, +} + + +impl ReactorSettings +{ + pub fn new>(addr: A, port_mqtt: u16) -> Self + { + Self + { + addr: addr.into(), + port_mqtt: port_mqtt + } + } +} diff --git a/src/examples/test_sync.rs b/src/examples/test_sync.rs new file mode 100644 index 0000000..5321fdf --- /dev/null +++ b/src/examples/test_sync.rs @@ -0,0 +1,25 @@ +use std::{thread::sleep, time}; + +use panduza::syncv::Reactor; +use panduza::ReactorSettings; + +fn main() { + let settings = ReactorSettings::new("localhost", 1883); + let reactor = Reactor::new(settings); + + reactor.run_in_thread(); + + // wait for connection + + // sleep(time::Duration::from_secs(5)); + + println!("-----------"); + + // reactor.scan_platforms(); + + let pp = reactor.attribute_from_topic("ooo").unwrap().into_att_bool(); + + pp.set(true); + + sleep(time::Duration::from_secs(60)); +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..9abdb71 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,16 @@ +use bytes::Bytes; + +pub mod asyncv; +mod common; +pub mod syncv; + +// --- COMMON --- + +pub type AttributeError = common::AttributeError; +pub type ReactorSettings = common::ReactorSettings; + +/// Trait to manage an message attribute (MQTT) +/// Sync version +pub trait SyncMessageAttribute: Send + Sync { + fn on_message(&self, data: &Bytes); +} diff --git a/src/syncv.rs b/src/syncv.rs new file mode 100644 index 0000000..f65b58e --- /dev/null +++ b/src/syncv.rs @@ -0,0 +1,10 @@ +/// This module manage the message attributes (MQTT/TCP) +pub mod msg; + +/// This module manage the stream attributes (CUSTOM/QUIC) +pub mod stream; + +/// This module manage the reactor +mod reactor; +pub type Reactor = reactor::Reactor; +pub type ReactorData = reactor::ReactorData; diff --git a/src/syncv/msg.rs b/src/syncv/msg.rs new file mode 100644 index 0000000..9f7a8e8 --- /dev/null +++ b/src/syncv/msg.rs @@ -0,0 +1,17 @@ +use bytes::Bytes; + +/// Members shared by all attributes +mod core_members; +pub type CoreMembers = core_members::CoreMembers; + +/// +pub mod att; +pub mod att_bool; + +pub use super::ReactorData; + +/// Trait to manage an message attribute (MQTT) +/// Sync version +pub trait OnMessageHandler: Send + Sync { + fn on_message(&mut self, data: &Bytes); +} diff --git a/src/syncv/msg/att.rs b/src/syncv/msg/att.rs new file mode 100644 index 0000000..dd03166 --- /dev/null +++ b/src/syncv/msg/att.rs @@ -0,0 +1,59 @@ +mod inner_msg_att; +pub type InnerAtt = inner_msg_att::InnerAtt; + +use rumqttc::Client; +use std::sync::{Arc, Mutex}; + +use crate::AttributeError; + +use super::att_bool::AttBool; +pub use super::CoreMembers; +pub use super::OnMessageHandler; +pub use super::ReactorData; + +/// Generic Message Attribute +pub struct Att { + /// Attribute are mainly a wrapper for the inner manager + inner: Arc>, +} + +impl Att { + /// Create a new Message Attribute + pub fn new(reactor_data: Arc>, topic: String, mqtt_client: Client) -> Self { + // Create a new inner manager + let inner = InnerAtt::new( + Arc::downgrade(&reactor_data), + topic.clone(), + mqtt_client.clone(), + ) + .into_arc_mutex(); + + Self { inner: inner } + } + + /// Initialize the attribute + /// + pub fn init(self) -> Result { + self.inner + .lock() + .map_err(|e| AttributeError::InternalMutex(e.to_string()))? + .init(self.inner.clone())?; + Ok(self) + } + + /// Take the inner core data + /// + pub fn take_core_members(self) -> Result { + Ok(self + .inner + .lock() + .map_err(|e| AttributeError::InternalMutex(e.to_string()))? + .clone_core()) + } + + /// Easy conversion to AttBool + /// + pub fn into_att_bool(self) -> AttBool { + self.into() + } +} diff --git a/src/syncv/msg/att/inner_msg_att.rs b/src/syncv/msg/att/inner_msg_att.rs new file mode 100644 index 0000000..8c8fc93 --- /dev/null +++ b/src/syncv/msg/att/inner_msg_att.rs @@ -0,0 +1,48 @@ +pub use super::CoreMembers; +use super::OnMessageHandler; +use super::ReactorData; +use crate::AttributeError; +use bytes::Bytes; +use rumqttc::Client; +use std::sync::{Arc, Mutex, Weak}; + +/// Inner implementation of the generic message attribute +/// +pub struct InnerAtt { + /// Members at the core of each attribute + core: CoreMembers, +} + +impl InnerAtt { + /// Create a new InnerAtt + /// + pub fn new(reactor_data: Weak>, topic: String, mqtt_client: Client) -> Self { + Self { + core: CoreMembers::new(reactor_data, topic, mqtt_client), + } + } + + /// Convert the InnerAtt to an Arc> + /// + pub fn into_arc_mutex(self) -> Arc> { + Arc::new(Mutex::new(self)) + } + + /// Initialize the attribute + /// + pub fn init(&self, attribute: Arc>) -> Result<(), AttributeError> { + self.core.init(attribute) + } + + /// Clone the core members (to mutate into an other type) + /// + pub fn clone_core(&self) -> CoreMembers { + self.core.clone() + } +} + +impl OnMessageHandler for InnerAtt { + fn on_message(&mut self, data: &Bytes) { + println!("generic"); + } +} diff --git a/src/syncv/msg/att_bool.rs b/src/syncv/msg/att_bool.rs new file mode 100644 index 0000000..e536b0f --- /dev/null +++ b/src/syncv/msg/att_bool.rs @@ -0,0 +1,61 @@ +mod inner_msg_att_bool; +use inner_msg_att_bool::InnerAttBool; + +use std::sync::{Arc, Mutex}; + +use crate::AttributeError; + +use super::att::Att; +pub use super::CoreMembers; +pub use super::OnMessageHandler; +pub use super::ReactorData; + +pub trait OnChangeHandler: Send + Sync { + fn on_change(&self, new_value: bool); +} + +pub struct AttBool { + inner: Arc>, +} + +impl AttBool { + pub fn set_on_change_handler(&self, handler: Box) { + self.inner.lock().unwrap().set_on_change_handler(handler); + } + + /// Set the value of the attribute + /// + pub fn set(&self, value: bool) -> Result<(), AttributeError> { + self.inner.lock().unwrap().set(value)?; + let cv = self.inner.lock().unwrap().set_ensure_lock_clone(); + cv.with_lock(|mut done| { + while !*done { + done.wait(); + } + }); + Ok(()) + } + + /// Get the value of the attribute + /// + pub fn get(&self) -> Option { + self.inner.lock().unwrap().get() + } +} + +impl From for AttBool { + fn from(core_data: CoreMembers) -> Self { + let inner = InnerAttBool::from(core_data).to_arc_mutex(); + inner.lock().unwrap().register(inner.clone()).unwrap(); + Self { inner: inner } + } +} + +impl Into for Att { + fn into(self) -> AttBool { + match self.take_core_members() { + Ok(core_data) => AttBool::from(core_data), + Err(_) => panic!("Error"), + } + } +} diff --git a/src/syncv/msg/att_bool/inner_msg_att_bool.rs b/src/syncv/msg/att_bool/inner_msg_att_bool.rs new file mode 100644 index 0000000..590274f --- /dev/null +++ b/src/syncv/msg/att_bool/inner_msg_att_bool.rs @@ -0,0 +1,127 @@ +use std::sync::{Arc, Mutex}; + +use bytes::Bytes; + +use crate::AttributeError; + +use super::CoreMembers; +use super::OnChangeHandler; +use super::OnMessageHandler; + +use monitor::Monitor; + +/// Inner implementation of the boolean message attribute +/// +pub struct InnerAttBool { + /// Members at the core of each attribute + core: CoreMembers, + /// Current value of the attribute + value: Option, + /// Requested value of the attribute (set by the user) + requested_value: Option, + /// Handler to call when the value change + on_change_handler: Option>, + + set_ensure_lock: Arc>, +} + +impl InnerAttBool { + pub fn to_arc_mutex(self) -> Arc> { + Arc::new(Mutex::new(self)) + } + + pub fn set_ensure_lock_clone(&mut self) -> Arc> { + return self.set_ensure_lock.clone(); + } + + fn set_ensure_update(&mut self) { + if self.set_ensure_ok() { + self.set_ensure_lock.with_lock(|mut done| { + *done = true; + done.notify_one(); + }); + } + } + + fn set_ensure_ok(&self) -> bool { + return self.requested_value == self.value; + } + + /// Set the value of the attribute + /// + pub fn set(&mut self, new_value: bool) -> Result<(), AttributeError> { + // Do not go further if the value is already set + if let Some(current_value) = self.value { + if current_value == new_value { + return Ok(()); + } + } + + // Set the requested value and publish the request + self.requested_value = Some(new_value); + match self.requested_value { + Some(requested_value) => match requested_value { + true => self.core.publish("1"), + false => self.core.publish("0"), + }, + None => Err(AttributeError::Unkonwn), + } + } + + /// Get the value of the attribute + /// If None, the first value is not yet received + /// + pub fn get(&self) -> Option { + return self.value; + } + + pub fn set_on_change_handler(&mut self, handler: Box) { + self.on_change_handler = Some(handler); + } + + /// Register the attribute to the reactor + /// + pub fn register( + &self, + attribute: Arc>, + ) -> Result<(), AttributeError> { + self.core.register(attribute) + } +} + +impl OnMessageHandler for InnerAttBool { + fn on_message(&mut self, data: &Bytes) { + println!("boolean"); + if data.len() == 1 { + match data[0] { + b'1' => { + self.value = Some(true); + self.set_ensure_update(); + } + b'0' => { + self.value = Some(false); + self.set_ensure_update(); + } + _ => { + println!("unexcpedted payload {:?}", data); + return; + } + }; + // Do something with the value + } else { + println!("wierd payload {:?}", data); + } + } +} + +impl From for InnerAttBool { + fn from(core_data: CoreMembers) -> Self { + return Self { + core: core_data, + value: None, + requested_value: None, + on_change_handler: None, + set_ensure_lock: Arc::new(Monitor::new(false)), + }; + } +} diff --git a/src/syncv/msg/core_members.rs b/src/syncv/msg/core_members.rs new file mode 100644 index 0000000..edb1096 --- /dev/null +++ b/src/syncv/msg/core_members.rs @@ -0,0 +1,84 @@ +use rumqttc::{Client, QoS}; +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::Weak; + +use crate::AttributeError; + +use super::OnMessageHandler; +use super::ReactorData; + +/// The core data of the attribute +/// Those attributes will be moved between Att types +#[derive(Clone)] +pub struct CoreMembers { + /// The data of the reactor, to be able to subscribe to the + /// reactor and route messages to the attribute + reactor_data: Weak>, + + /// The mqtt client + mqtt_client: Client, + + /// The topic of the attribute + topic: String, + + /// The topic for commands + topic_cmd: String, +} + +impl CoreMembers { + /// Create a new core data + pub fn new(reactor_data: Weak>, topic: String, mqtt_client: Client) -> Self { + Self { + reactor_data: reactor_data, + mqtt_client: mqtt_client, + topic: topic.clone(), + topic_cmd: format!("{}/cmd", topic), + } + } + + /// Initialize the attribute + /// + pub fn init(&self, attribute: Arc>) -> Result<(), AttributeError> { + self.register(attribute)?; + self.subscribe() + } + + /// Publish a command + /// + pub fn publish(&self, value: V) -> Result<(), AttributeError> + where + V: Into>, + { + self.mqtt_client + .publish(&self.topic_cmd, QoS::AtMostOnce, true, value) + .map_err(|e| AttributeError::Message(e)) + } + + /// Subscribe to the topic + /// + pub fn subscribe(&self) -> Result<(), AttributeError> { + // no need to store the att topic + let topic_att = format!("{}/att", self.topic); + self.mqtt_client + .subscribe(topic_att, QoS::AtMostOnce) + .map_err(|e| AttributeError::Message(e)) + } + + /// Register the attribute to the reactor + /// + pub fn register( + &self, + attribute: Arc>, + ) -> Result<(), AttributeError> { + // no need to store the att topic + let topic_att = format!("{}/att", self.topic); + self.reactor_data + .upgrade() + .ok_or(AttributeError::InternalPointerUpgrade)? + .lock() + .map_err(|e| AttributeError::InternalMutex(e.to_string()))? + .register_message_attribute(topic_att, attribute); + Ok(()) + } +} diff --git a/src/syncv/reactor.rs b/src/syncv/reactor.rs new file mode 100644 index 0000000..1873ee6 --- /dev/null +++ b/src/syncv/reactor.rs @@ -0,0 +1,78 @@ +pub mod reactor_core; +pub mod reactor_data; + +use std::sync::{Arc, Mutex}; + +use crate::{AttributeError, ReactorSettings}; + +use reactor_core::ReactorCore; +pub use reactor_data::ReactorData; + +use rumqttc::{Client, MqttOptions, QoS}; + +use std::time::Duration; + +use super::msg::att::Att; +use super::msg::OnMessageHandler; + +/// The reactor is the main structure that will handle the connections and the events +/// +/// All the attribute and objects will be powered by the reactor +/// +#[derive(Clone)] +pub struct Reactor { + core: Arc>, + data: Arc>, + + mqtt_client: Client, +} + +impl Reactor { + /// Create a new Reactor + /// + /// # Arguments + /// + /// * `core` - The core of the reactor + /// + pub fn new(settings: ReactorSettings) -> Self { + println!("ReactorCore is running"); + let mut mqttoptions = MqttOptions::new("rumqtt-sync", "localhost", 1883); + mqttoptions.set_keep_alive(Duration::from_secs(3)); + + let (client, connection) = Client::new(mqttoptions, 100); + + let data = Arc::new(Mutex::new(ReactorData::new())); + + Reactor { + core: Arc::new(Mutex::new(ReactorCore::new(data.clone(), connection))), + data: data, + mqtt_client: client, + } + } + + pub fn run_in_thread(&self) { + let core = self.core.clone(); + std::thread::spawn(move || { + core.lock().unwrap().run(); + println!("ReactorCore is not runiing !!!!!!!!!!!!!!!!!!!!!!"); + }); + } + + /// Create a new attribute from its topic + /// + pub fn attribute_from_topic(&self, topic: &str) -> Result { + Att::new( + self.data.clone(), + topic.to_string(), + self.mqtt_client.clone(), + ) + .init() + } + + pub fn scan_platforms(&self) { + println!("publish"); + self.mqtt_client + .publish("pza", QoS::AtLeastOnce, true, "pok") + .unwrap(); + } +} diff --git a/src/syncv/reactor/reactor_core.rs b/src/syncv/reactor/reactor_core.rs new file mode 100644 index 0000000..0315310 --- /dev/null +++ b/src/syncv/reactor/reactor_core.rs @@ -0,0 +1,84 @@ +use std::sync::{Arc, Mutex}; + +use rumqttc::Connection; + +use super::reactor_data::ReactorData; + +pub struct ReactorCore { + data: Arc>, + mqtt_connection: Connection, +} + +impl ReactorCore { + pub fn new(data: Arc>, mqtt_connection: Connection) -> Self { + ReactorCore { + data: data, + mqtt_connection: mqtt_connection, + } + } + + pub fn run(&mut self) { + // Iterate to poll the eventloop for connection progress + for (i, notification) in self.mqtt_connection.iter().enumerate() { + // println!("Notification = {:?}", notification); + match notification { + Ok(event) => { + match event { + rumqttc::Event::Incoming(incoming) => { + // println!("Incoming = {:?}", incoming); + + match incoming { + // rumqttc::Packet::Connect(_) => todo!(), + // rumqttc::Packet::ConnAck(_) => todo!(), + rumqttc::Packet::Publish(packet) => { + // let payload = packet.payload; + // let payload_str = std::str::from_utf8(&payload).unwrap(); + // println!("Received = {:?} {:?}", payload_str, packet.topic); + + self.data + .lock() + .unwrap() + .trigger_on_change(&packet.topic, &packet.payload); + } + // rumqttc::Packet::PubAck(_) => todo!(), + // rumqttc::Packet::PubRec(_) => todo!(), + // rumqttc::Packet::PubRel(_) => todo!(), + // rumqttc::Packet::PubComp(_) => todo!(), + // rumqttc::Packet::Subscribe(_) => todo!(), + // rumqttc::Packet::SubAck(_) => todo!(), + // rumqttc::Packet::Unsubscribe(_) => todo!(), + // rumqttc::Packet::UnsubAck(_) => todo!(), + // rumqttc::Packet::PingReq => todo!(), + // rumqttc::Packet::PingResp => todo!(), + // rumqttc::Packet::Disconnect => todo!(), + _ => {} + } + } + rumqttc::Event::Outgoing(outgoing) => { + // println!("Outgoing = {:?}", outgoing); + match outgoing { + rumqttc::Outgoing::Publish(packet) => { + // println!("Publish = {:?}", packet); + } + rumqttc::Outgoing::Subscribe(p) => { + // println!("Subscribe = {:?}", p); + } + // rumqttc::Outgoing::Unsubscribe(_) => todo!(), + // rumqttc::Outgoing::PubAck(_) => todo!(), + // rumqttc::Outgoing::PubRec(_) => todo!(), + // rumqttc::Outgoing::PubRel(_) => todo!(), + // rumqttc::Outgoing::PubComp(_) => todo!(), + // rumqttc::Outgoing::PingReq => todo!(), + // rumqttc::Outgoing::PingResp => todo!(), + // rumqttc::Outgoing::Disconnect => todo!(), + // rumqttc::Outgoing::AwaitAck(_) => todo!(), + _ => {} + } + } + } + } + Err(_) => todo!(), + } + } + } +} diff --git a/src/syncv/reactor/reactor_data.rs b/src/syncv/reactor/reactor_data.rs new file mode 100644 index 0000000..6257f3c --- /dev/null +++ b/src/syncv/reactor/reactor_data.rs @@ -0,0 +1,51 @@ +use std::sync::Weak; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; + +use bytes::Bytes; + +use super::OnMessageHandler; + +/// Data used by the core the dispatch input data +/// +pub struct ReactorData { + /// List of attributes to trigger on message + message_attributes: HashMap>>, +} + +impl ReactorData { + /// Create a new ReactorData + /// + pub fn new() -> Self { + Self { + message_attributes: HashMap::new(), + } + } + + pub fn register_message_attribute( + &mut self, + topic: String, + attribute: Arc>, + ) { + self.message_attributes + .insert(topic, Arc::downgrade(&attribute)); + } + + /// Trigger the on_message of the attribute + /// + pub fn trigger_on_change(&self, topic: &str, new_value: &Bytes) { + // println!("{:?}", self.message_attributes.keys()); + if let Some(attribute) = self.message_attributes.get(topic) { + match attribute.upgrade() { + Some(attribute) => { + attribute.lock().unwrap().on_message(new_value); + } + None => { + println!("Attribute not found"); + } + } + } + } +} diff --git a/src/syncv/stream.rs b/src/syncv/stream.rs new file mode 100644 index 0000000..e69de29 diff --git a/tests/mqtt_connection.rs b/tests/mqtt_connection.rs new file mode 100644 index 0000000..300614c --- /dev/null +++ b/tests/mqtt_connection.rs @@ -0,0 +1,27 @@ +mod pza_world; + +use cucumber::World; +use futures::executor::block_on; +use pza_world::PanduzaWorld; + + + +fn main() { + + + block_on( + PanduzaWorld::cucumber() + .init_tracing() + // .after(|_feature, _rule, _scenario, _ev, _world| { + // if let Some(w) = _world { + // if w.serial_stream.is_some() { + // tracing::info!("Closing serial connection"); + // w.serial_stream.as_mut().unwrap().clear_line().unwrap(); + // w.serial_stream = None; + // } + // } + // sleep(Duration::from_millis(1000)).boxed_local() + // }) + .run("features/mqtt_connection.feature")); + +} diff --git a/tests/pza_world/mod.rs b/tests/pza_world/mod.rs new file mode 100644 index 0000000..c2ecb4b --- /dev/null +++ b/tests/pza_world/mod.rs @@ -0,0 +1,5 @@ +mod panduza_world; +mod steps; + +pub type PanduzaWorld = panduza_world::PanduzaWorld; + diff --git a/tests/pza_world/panduza_world.rs b/tests/pza_world/panduza_world.rs new file mode 100644 index 0000000..c3939d8 --- /dev/null +++ b/tests/pza_world/panduza_world.rs @@ -0,0 +1,73 @@ +use std::fmt::Debug; + + +// use super::api_dio::PicohaDioRequest; +// use super::api_dio::RequestType; + +use cucumber::World; + + +use panduza::ReactorSettings; +use panduza::SyncReactor; + + +// `World` is your shared, likely mutable state. +// Cucumber constructs it via `Default::default()` for each scenario. +#[derive(World)] +pub struct PanduzaWorld { + pub reactor: Option, + // pub serial_settings: SerialSettings, + // pub serial_stream: Option, + // // Accumulated incoming data buffer + // pub in_buf: [u8; 512], + // // Keep track of number of data in the buffer + // pub in_buf_size: usize, + + // decode_buffer: serial_line_ip::DecoderBuffer<512>, + + // pub last_answer: Option, +} + +impl Debug for PanduzaWorld { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PanduzaWorld") + // .field("usb_settings", &self.usb_settings) + // .field("serial_settings", &self.serial_settings) + // .field("serial_stream", &self.serial_stream) + // .field("in_buf", &self.in_buf) + // .field("in_buf_size", &self.in_buf_size) + // // .field("decode_buffer", &self.decode_buffer) + // .field("last_answer", &self.last_answer) + .finish() + } +} + +impl PanduzaWorld { + + + + + +} + +impl std::default::Default for PanduzaWorld { + fn default() -> Self { + // let usb_s = UsbSettings::new().set_vendor(0x16c0).set_model(0x05E1); + // let serial_s = SerialSettings::new() + // .set_port_name_from_usb_settings(&usb_s) + // .unwrap() + // .set_baudrate(9600) + // .set_read_timeout(std::time::Duration::from_secs(5)); + + PanduzaWorld { + reactor: None, + // usb_settings: usb_s, + // serial_settings: serial_s, + // serial_stream: None, + // in_buf: [0u8; 512], + // in_buf_size: 0, + // last_answer: None, + // decode_buffer: serial_line_ip::DecoderBuffer::new(), + } + } +} diff --git a/tests/pza_world/steps.rs b/tests/pza_world/steps.rs new file mode 100644 index 0000000..53d4cbd --- /dev/null +++ b/tests/pza_world/steps.rs @@ -0,0 +1,44 @@ +use core::time; + +use super::PanduzaWorld; + +use cucumber::given; +use cucumber::when; +use panduza::AttributeBoolean; +use panduza::ReactorSettings; +use panduza::SyncReactor; + +use std::thread::sleep; + +#[given("A broker is running")] +fn a_broker_is_running(world: &mut PanduzaWorld) { + // world.start_the_test_broker(); +} + +#[when("I connect to the broker with the reactor")] +fn i_connect_to_the_broker_with_the_reactor(world: &mut PanduzaWorld) { + + let settings = ReactorSettings::new("localhost", 1883); + let reactor = SyncReactor::new(settings); + + reactor.run_in_thread(); + + // wait for connection + + sleep(time::Duration::from_secs(5)); + + println!("-----------"); + + reactor.scan_platforms(); + + let pp: AttributeBoolean = reactor.create_attribute_from_topic("ooo").into(); + + + pp.set(true); + + + sleep(time::Duration::from_secs(60)); + +} + +