Skip to content

Commit

Permalink
release 0.3.2
Browse files Browse the repository at this point in the history
support topic message trigger
  • Loading branch information
foxzool committed Sep 13, 2024
1 parent 4500449 commit 9f7d09b
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 81 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "bevy_mqtt"
description = "A simple MQTT client for Bevy"
version = "0.3.1"
description = "A simple MQTT client for Bevy ECS"
version = "0.3.2"
edition = "2021"
readme = "README.md"
repository = "https://github.com/foxzool/bevy_mqtt"
Expand All @@ -28,7 +28,7 @@ bevy-tokio-tasks = { version = "0.14.0" }
kanal = "0.1.0-pre8"
bytes = "1"
tokio = { version = "1", features = ["rt", "sync"] }

regex = { version = "1.10.6" }

[dev-dependencies]
bevy = { version = "0.14.2", default-features = false }
Expand Down
47 changes: 38 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ then run the example
use std::time::SystemTime;

use bevy::{prelude::*, time::common_conditions::on_timer};
use bevy_log::LogPlugin;
use bevy_mqtt::{
rumqttc::{MqttOptions, QoS},
MqttClientState, MqttError, MqttEvent, MqttPlugin, MqttPublishOutgoing, MqttSetting,
MqttSubTopicOutgoing,
MqttClient, MqttClientState, MqttError, MqttEvent, MqttPlugin, MqttPublishOutgoing,
MqttSetting, SubscribeTopic, TopicMessage,
};
use bevy_state::prelude::OnEnter;
use bincode::ErrorKind;
Expand Down Expand Up @@ -59,9 +60,12 @@ fn main() {
mqtt_options: MqttOptions::new("mqtt-serde", "127.0.0.1", 1883),
cap: 10,
})
.add_plugins((MinimalPlugins, MqttPlugin))
.add_plugins((MinimalPlugins, MqttPlugin, LogPlugin::default()))
.add_systems(Update, (handle_message, handle_error))
.add_systems(OnEnter(MqttClientState::Connected), sub_topic)
.add_systems(
OnEnter(MqttClientState::Connected),
(sub_topic_direct, sub_topic_by_component),
)
.add_systems(
Update,
publish_message.run_if(on_timer(std::time::Duration::from_secs(1))),
Expand Down Expand Up @@ -92,11 +96,25 @@ fn handle_error(mut error_events: EventReader<MqttError>) {
}
}

fn sub_topic(mut sub_events: EventWriter<MqttSubTopicOutgoing>) {
sub_events.send(MqttSubTopicOutgoing {
topic: "hello/mqtt".to_string(),
qos: QoS::AtMostOnce,
});
/// there are two ways to subscribe to a topic
/// 1. Directly subscribe to a topic
/// 2. Subscribe to a topic by component
fn sub_topic_direct(client: Res<MqttClient>) {
client
.try_subscribe("hello/mqtt", QoS::AtMostOnce)
.expect("subscribe failed");
}

fn sub_topic_by_component(mut commands: Commands) {
commands
.spawn(SubscribeTopic::new("+/mqtt", QoS::AtMostOnce))
.observe(|topic_message: Trigger<TopicMessage>| {
println!(
"topic: {} Received : {:?}",
topic_message.event().topic,
topic_message.event().payload
);
});
}

fn publish_message(mut pub_events: EventWriter<MqttPublishOutgoing>) {
Expand All @@ -113,12 +131,23 @@ fn publish_message(mut pub_events: EventWriter<MqttPublishOutgoing>) {
retain: false,
payload: message.into(),
});
list.push(MqttPublishOutgoing {
topic: "bevy/mqtt".to_string(),
qos: QoS::AtLeastOnce,
retain: false,
payload: Message {
i: 999,
time: SystemTime::now(),
}
.into(),
});
}

pub_events.send_batch(list);
}



```

## Supported Versions
Expand Down
25 changes: 19 additions & 6 deletions examples/pub_and_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use bevy_log::LogPlugin;
use bevy_mqtt::{
rumqttc::{MqttOptions, QoS},
MqttClient, MqttClientState, MqttError, MqttEvent, MqttPlugin, MqttPublishOutgoing,
MqttSetting, MqttSubscribe,
MqttSetting, SubscribeTopic, TopicMessage,
};
use bevy_state::prelude::OnEnter;
use bincode::ErrorKind;
Expand Down Expand Up @@ -89,12 +89,15 @@ fn sub_topic_direct(client: Res<MqttClient>) {
}

fn sub_topic_by_component(mut commands: Commands) {
for component in ["bevy", "ecs"] {
commands.spawn(MqttSubscribe {
topic: format!("{}/mqtt", component),
qos: QoS::AtMostOnce,
commands
.spawn(SubscribeTopic::new("+/mqtt", QoS::AtMostOnce))
.observe(|topic_message: Trigger<TopicMessage>| {
println!(
"topic: {} received : {:?}",
topic_message.event().topic,
topic_message.event().payload
);
});
}
}

fn publish_message(mut pub_events: EventWriter<MqttPublishOutgoing>) {
Expand All @@ -111,6 +114,16 @@ fn publish_message(mut pub_events: EventWriter<MqttPublishOutgoing>) {
retain: false,
payload: message.into(),
});
list.push(MqttPublishOutgoing {
topic: "bevy/mqtt".to_string(),
qos: QoS::AtLeastOnce,
retain: false,
payload: Message {
i: 999,
time: SystemTime::now(),
}
.into(),
});
}

pub_events.send_batch(list);
Expand Down
139 changes: 76 additions & 63 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use bevy_app::{App, Plugin, Update};
use bevy_derive::{Deref, DerefMut};
use bevy_ecs::prelude::*;
use bevy_log::{debug, error};
use bevy_log::{debug, error, trace};
use bevy_state::{
app::{AppExtStates, StatesPlugin},
prelude::in_state,
Expand All @@ -12,8 +12,9 @@ use bevy_state::{
use bevy_tokio_tasks::{TokioTasksPlugin, TokioTasksRuntime};
use bytes::Bytes;
use kanal::{bounded_async, AsyncReceiver};
use regex::Regex;
pub use rumqttc;
use rumqttc::{ConnectionError, QoS, SubscribeFilter};
use rumqttc::{ConnectionError, QoS};
use std::ops::{Deref, DerefMut};

#[derive(Default)]
Expand All @@ -31,25 +32,19 @@ impl Plugin for MqttPlugin {
app.init_state::<MqttClientState>()
.add_event::<MqttEvent>()
.add_event::<MqttError>()
.add_event::<MqttSubTopicOutgoing>()
.add_event::<MqttSubManyOutgoing>()
.add_event::<MqttUnSubTopicOutgoing>()
.add_event::<MqttPublishOutgoing>()
.add_event::<MqttPublishPacket>()
.add_event::<DisconnectMqttClient>()
.add_systems(Update, spawn_client.run_if(resource_added::<MqttSetting>))
.add_systems(
Update,
handle_mqtt_events.run_if(resource_exists::<MqttClient>),
(handle_mqtt_events, dispatch_publish_to_topic)
.run_if(resource_exists::<MqttClient>),
)
.add_systems(
Update,
(
handle_sub_topic,
handle_outgoing_publish,
handle_disconnect_event,
)
.distributive_run_if(in_state(MqttClientState::Connected)),
(handle_outgoing_publish, handle_disconnect_event)
.run_if(in_state(MqttClientState::Connected)),
)
.observe(on_add_subscribe)
.observe(on_remove_subscribe);
Expand Down Expand Up @@ -102,22 +97,6 @@ pub struct MqttEvent(pub rumqttc::Event);
#[derive(Debug, Deref, DerefMut, Event)]
pub struct MqttError(pub ConnectionError);

#[derive(Debug, Event)]
pub struct MqttSubTopicOutgoing {
pub topic: String,
pub qos: QoS,
}

#[derive(Debug, Event)]
pub struct MqttSubManyOutgoing {
pub topics: Vec<SubscribeFilter>,
}

#[derive(Debug, Event)]
pub struct MqttUnSubTopicOutgoing {
pub topic: String,
}

#[derive(Debug, Event)]
pub struct MqttPublishOutgoing {
pub topic: String,
Expand Down Expand Up @@ -214,15 +193,72 @@ fn spawn_client(setting: Res<MqttSetting>, runtime: Res<TokioTasksRuntime>) {
});
}

/// A component to store the topic and qos to subscribe
#[derive(Debug, Clone, Component)]
pub struct MqttSubscribe {
pub struct SubscribeTopic {
topic: String,
re: Regex,
qos: QoS,
}

impl SubscribeTopic {
pub fn new(topic: impl ToString, qos: QoS) -> Self {
let topic = topic.to_string();
let regex_pattern = topic.replace("+", "[^/]+").replace("#", ".+");
let re = Regex::new(&format!("^{}$", regex_pattern)).unwrap();
Self { topic, re, qos }
}

pub fn matches(&self, topic: &str) -> bool {
self.re.is_match(topic)
}

pub fn topic(&self) -> &str {
&self.topic
}

pub fn qos(&self) -> QoS {
self.qos
}
}

#[derive(Debug, Event)]
pub struct TopicMessage {
pub topic: String,
pub qos: QoS,
pub payload: Bytes,
}

fn dispatch_publish_to_topic(
mut publish_incoming: EventReader<MqttPublishPacket>,
topic_query: Query<(Entity, &SubscribeTopic)>,
mut commands: Commands,
) {
for packet in publish_incoming.read() {
let mut match_entities = vec![];
for (e, subscribed_topic) in topic_query.iter() {
if subscribed_topic.matches(&packet.topic) {
trace!(
"{} Received publish packet: {:?}",
subscribed_topic.topic(),
packet
);
match_entities.push(e);
}
}

commands.trigger_targets(
TopicMessage {
topic: packet.topic.clone(),
payload: packet.payload.clone(),
},
match_entities,
);
}
}

fn on_add_subscribe(
trigger: Trigger<OnAdd, MqttSubscribe>,
query: Query<&MqttSubscribe>,
trigger: Trigger<OnAdd, SubscribeTopic>,
query: Query<&SubscribeTopic>,
client: Res<MqttClient>,
) {
let subscribe = query.get(trigger.entity()).unwrap();
Expand All @@ -233,8 +269,8 @@ fn on_add_subscribe(
}

fn on_remove_subscribe(
trigger: Trigger<OnRemove, MqttSubscribe>,
query: Query<&MqttSubscribe>,
trigger: Trigger<OnRemove, SubscribeTopic>,
query: Query<&SubscribeTopic>,
client: Res<MqttClient>,
) {
let subscribe = query.get(trigger.entity()).unwrap();
Expand All @@ -244,35 +280,6 @@ fn on_remove_subscribe(
.expect("unsubscribe failed");
}

fn handle_sub_topic(
mut sub_events: EventReader<MqttSubTopicOutgoing>,
mut sub_many_events: EventReader<MqttSubManyOutgoing>,
mut unsub_events: EventReader<MqttUnSubTopicOutgoing>,
client: Res<MqttClient>,
) {
for sub in sub_events.read() {
debug!("subscribe to {:?}", sub.topic);
client
.try_subscribe(sub.topic.clone(), sub.qos)
.expect("subscribe failed");
}

for sub in sub_many_events.read() {
for topic in &sub.topics {
debug!("subscribe to {:?}", topic.path);
}
client
.try_subscribe_many(sub.topics.clone())
.expect("subscribe many failed");
}

for unsub in unsub_events.read() {
client
.try_unsubscribe(unsub.topic.clone())
.expect("unsubscribe failed");
}
}

fn handle_outgoing_publish(
mut pub_events: EventReader<MqttPublishOutgoing>,
client: Res<MqttClient>,
Expand All @@ -297,3 +304,9 @@ fn handle_disconnect_event(
client.try_disconnect().expect("disconnect failed");
}
}

#[test]
fn test_topic_matches() {
let subscribe = SubscribeTopic::new("hello/+/world".to_string(), QoS::AtMostOnce);
assert!(subscribe.matches("hello/1/world"));
}

0 comments on commit 9f7d09b

Please sign in to comment.