diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ffb409..a2bbc6b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +- SubscribeTopic search mqtt client from ancestors + ## [0.4.2] - 2024-09-26 - MqttPublishPacket contains entity diff --git a/Cargo.toml b/Cargo.toml index 406e83e..9522210 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ rust-tls = ["rumqttc/use-rustls"] bevy_app = { version = "0.14.2" } bevy_ecs = { version = "0.14.2" } bevy_log = { version = "0.14.2" } +bevy_reflect = { version = "0.14.2" } bevy_hierarchy = { version = "0.14.2" } rumqttc = { version = "0.24.0" } diff --git a/src/lib.rs b/src/lib.rs index d6de663..19d6b7d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,8 +2,9 @@ use bevy_app::{App, Plugin, Update}; use bevy_ecs::prelude::*; -use bevy_hierarchy::Parent; +use bevy_hierarchy::{HierarchyQueryExt, Parent}; use bevy_log::{debug, trace}; +use bevy_reflect::Reflect; use bytes::Bytes; use flume::{bounded, Receiver}; use regex::Regex; @@ -23,6 +24,7 @@ impl Plugin for MqttPlugin { .add_event::() .add_event::() .add_event::() + .register_type::() .add_systems( Update, (on_added_setting_component, pending_subscribe_topic), @@ -202,18 +204,76 @@ fn on_added_setting_component( } /// A component to store the topic and qos to subscribe -#[derive(Debug, Clone, Component)] +#[derive(Debug, Clone, Reflect, Component)] +#[reflect(from_reflect = false)] pub struct SubscribeTopic { topic: String, - re: Regex, + #[reflect(ignore)] qos: QoS, + #[reflect(ignore)] + re: Regex, +} + +pub trait ToQos { + fn to_qos(&self) -> QoS; +} + +impl ToQos for i32 { + fn to_qos(&self) -> QoS { + match self { + 0 => QoS::AtMostOnce, + 1 => QoS::AtLeastOnce, + 2 => QoS::ExactlyOnce, + _ => QoS::AtMostOnce, + } + } +} + +impl ToQos for u8 { + fn to_qos(&self) -> QoS { + match self { + 0 => QoS::AtMostOnce, + 1 => QoS::AtLeastOnce, + 2 => QoS::ExactlyOnce, + _ => QoS::AtMostOnce, + } + } +} + +impl ToQos for u16 { + fn to_qos(&self) -> QoS { + match self { + 0 => QoS::AtMostOnce, + 1 => QoS::AtLeastOnce, + 2 => QoS::ExactlyOnce, + _ => QoS::AtMostOnce, + } + } +} + +impl ToQos for u32 { + fn to_qos(&self) -> QoS { + match self { + 0 => QoS::AtMostOnce, + 1 => QoS::AtLeastOnce, + 2 => QoS::ExactlyOnce, + _ => QoS::AtMostOnce, + } + } +} + +impl ToQos for QoS { + fn to_qos(&self) -> QoS { + *self + } } impl SubscribeTopic { - pub fn new(topic: impl ToString, qos: QoS) -> Self { + pub fn new(topic: impl ToString, qos: impl ToQos) -> Self { let topic = topic.to_string(); let regex_pattern = topic.replace("+", "[^/]+").replace("#", ".+"); let re = Regex::new(&format!("^{}$", regex_pattern)).unwrap(); + let qos = qos.to_qos(); Self { topic, re, qos } } @@ -281,13 +341,17 @@ fn pending_subscribe_topic( fn on_add_subscribe( mut clients: Query<&mut MqttClient>, - query: Query<(&Parent, &SubscribeTopic), Added>, + parent_query: Query<&Parent>, + query: Query<(Entity, &SubscribeTopic), Added>, ) { - for (parent, subscribe) in query.iter() { - let mut client = clients.get_mut(**parent).unwrap(); - client - .pedding_subscribes - .push(SubscribeFilter::new(subscribe.topic.clone(), subscribe.qos)); + for (entity, subscribe) in query.iter() { + for ancestor in parent_query.iter_ancestors(entity) { + if let Ok(mut client) = clients.get_mut(ancestor) { + client + .pedding_subscribes + .push(SubscribeFilter::new(subscribe.topic.clone(), subscribe.qos)); + } + } } }