Skip to content

Commit

Permalink
SubscribeTopic search mqtt client from ancestors
Browse files Browse the repository at this point in the history
  • Loading branch information
foxzool committed Oct 11, 2024
1 parent 4341216 commit 8cbabbe
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 10 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

- SubscribeTopic search mqtt client from ancestors

## [0.4.2] - 2024-09-26

- MqttPublishPacket contains entity
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ rust-tls = ["rumqttc/use-rustls"]
bevy_app = { version = "0.14.2" }
bevy_ecs = { version = "0.14.2" }
bevy_log = { version = "0.14.2" }
bevy_reflect = { version = "0.14.2" }
bevy_hierarchy = { version = "0.14.2" }

rumqttc = { version = "0.24.0" }
Expand Down
84 changes: 74 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

use bevy_app::{App, Plugin, Update};
use bevy_ecs::prelude::*;
use bevy_hierarchy::Parent;
use bevy_hierarchy::{HierarchyQueryExt, Parent};
use bevy_log::{debug, trace};
use bevy_reflect::Reflect;
use bytes::Bytes;
use flume::{bounded, Receiver};
use regex::Regex;
Expand All @@ -23,6 +24,7 @@ impl Plugin for MqttPlugin {
.add_event::<MqttPublishOutgoing>()
.add_event::<MqttPublishPacket>()
.add_event::<DisconnectMqttClient>()
.register_type::<SubscribeTopic>()
.add_systems(
Update,
(on_added_setting_component, pending_subscribe_topic),
Expand Down Expand Up @@ -202,18 +204,76 @@ fn on_added_setting_component(
}

/// A component to store the topic and qos to subscribe
#[derive(Debug, Clone, Component)]
#[derive(Debug, Clone, Reflect, Component)]
#[reflect(from_reflect = false)]
pub struct SubscribeTopic {
topic: String,
re: Regex,
#[reflect(ignore)]
qos: QoS,
#[reflect(ignore)]
re: Regex,
}

pub trait ToQos {
fn to_qos(&self) -> QoS;
}

impl ToQos for i32 {
fn to_qos(&self) -> QoS {
match self {
0 => QoS::AtMostOnce,
1 => QoS::AtLeastOnce,
2 => QoS::ExactlyOnce,
_ => QoS::AtMostOnce,
}
}
}

impl ToQos for u8 {
fn to_qos(&self) -> QoS {
match self {
0 => QoS::AtMostOnce,
1 => QoS::AtLeastOnce,
2 => QoS::ExactlyOnce,
_ => QoS::AtMostOnce,
}
}
}

impl ToQos for u16 {
fn to_qos(&self) -> QoS {
match self {
0 => QoS::AtMostOnce,
1 => QoS::AtLeastOnce,
2 => QoS::ExactlyOnce,
_ => QoS::AtMostOnce,
}
}
}

impl ToQos for u32 {
fn to_qos(&self) -> QoS {
match self {
0 => QoS::AtMostOnce,
1 => QoS::AtLeastOnce,
2 => QoS::ExactlyOnce,
_ => QoS::AtMostOnce,
}
}
}

impl ToQos for QoS {
fn to_qos(&self) -> QoS {
*self
}
}

impl SubscribeTopic {
pub fn new(topic: impl ToString, qos: QoS) -> Self {
pub fn new(topic: impl ToString, qos: impl ToQos) -> Self {
let topic = topic.to_string();
let regex_pattern = topic.replace("+", "[^/]+").replace("#", ".+");
let re = Regex::new(&format!("^{}$", regex_pattern)).unwrap();
let qos = qos.to_qos();
Self { topic, re, qos }
}

Expand Down Expand Up @@ -281,13 +341,17 @@ fn pending_subscribe_topic(

fn on_add_subscribe(
mut clients: Query<&mut MqttClient>,
query: Query<(&Parent, &SubscribeTopic), Added<SubscribeTopic>>,
parent_query: Query<&Parent>,
query: Query<(Entity, &SubscribeTopic), Added<SubscribeTopic>>,
) {
for (parent, subscribe) in query.iter() {
let mut client = clients.get_mut(**parent).unwrap();
client
.pedding_subscribes
.push(SubscribeFilter::new(subscribe.topic.clone(), subscribe.qos));
for (entity, subscribe) in query.iter() {
for ancestor in parent_query.iter_ancestors(entity) {
if let Ok(mut client) = clients.get_mut(ancestor) {
client
.pedding_subscribes
.push(SubscribeFilter::new(subscribe.topic.clone(), subscribe.qos));
}
}
}
}

Expand Down

0 comments on commit 8cbabbe

Please sign in to comment.