Skip to content

Commit

Permalink
Update MQTT example and library handling: Refactor error handling, su…
Browse files Browse the repository at this point in the history
…bscribe method, and event types for clarity and robustness.
  • Loading branch information
foxzool committed Sep 24, 2024
1 parent 9f7d09b commit fb10d0d
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 34 deletions.
21 changes: 13 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use bevy::{prelude::*, time::common_conditions::on_timer};
use bevy_log::LogPlugin;
use bevy_mqtt::{
rumqttc::{MqttOptions, QoS},
MqttClient, MqttClientState, MqttError, MqttEvent, MqttPlugin, MqttPublishOutgoing,
MqttSetting, SubscribeTopic, TopicMessage,
MqttClient, MqttClientError, MqttClientState, MqttConnectError, MqttEvent, MqttPlugin,
MqttPublishOutgoing, MqttSetting, SubscribeTopic, TopicMessage,
};
use bevy_state::prelude::OnEnter;
use bincode::ErrorKind;
Expand Down Expand Up @@ -90,9 +90,16 @@ fn handle_message(mut mqtt_event: EventReader<MqttEvent>) {
}
}

fn handle_error(mut error_events: EventReader<MqttError>) {
for error in error_events.read() {
println!("Error: {:?}", error);
fn handle_error(
mut connect_errors: EventReader<MqttConnectError>,
mut client_errors: EventReader<MqttClientError>,
) {
for error in connect_errors.read() {
println!("connect Error: {:?}", error);
}

for error in client_errors.read() {
println!("client Error: {:?}", error);
}
}

Expand All @@ -110,7 +117,7 @@ fn sub_topic_by_component(mut commands: Commands) {
.spawn(SubscribeTopic::new("+/mqtt", QoS::AtMostOnce))
.observe(|topic_message: Trigger<TopicMessage>| {
println!(
"topic: {} Received : {:?}",
"topic: {} received : {:?}",
topic_message.event().topic,
topic_message.event().payload
);
Expand Down Expand Up @@ -146,8 +153,6 @@ fn publish_message(mut pub_events: EventWriter<MqttPublishOutgoing>) {
pub_events.send_batch(list);
}



```

## Supported Versions
Expand Down
17 changes: 12 additions & 5 deletions examples/pub_and_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use bevy::{prelude::*, time::common_conditions::on_timer};
use bevy_log::LogPlugin;
use bevy_mqtt::{
rumqttc::{MqttOptions, QoS},
MqttClient, MqttClientState, MqttError, MqttEvent, MqttPlugin, MqttPublishOutgoing,
MqttSetting, SubscribeTopic, TopicMessage,
MqttClient, MqttClientError, MqttClientState, MqttConnectError, MqttEvent, MqttPlugin,
MqttPublishOutgoing, MqttSetting, SubscribeTopic, TopicMessage,
};
use bevy_state::prelude::OnEnter;
use bincode::ErrorKind;
Expand Down Expand Up @@ -73,9 +73,16 @@ fn handle_message(mut mqtt_event: EventReader<MqttEvent>) {
}
}

fn handle_error(mut error_events: EventReader<MqttError>) {
for error in error_events.read() {
println!("Error: {:?}", error);
fn handle_error(
mut connect_errors: EventReader<MqttConnectError>,
mut client_errors: EventReader<MqttClientError>,
) {
for error in connect_errors.read() {
println!("connect Error: {:?}", error);
}

for error in client_errors.read() {
println!("client Error: {:?}", error);
}
}

Expand Down
17 changes: 8 additions & 9 deletions examples/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use bevy::{prelude::*, time::common_conditions::on_timer};
use bevy_log::LogPlugin;
use bevy_mqtt::{
rumqttc::{MqttOptions, QoS, Transport},
MqttClientState, MqttError, MqttPlugin, MqttPublishOutgoing, MqttPublishPacket, MqttSetting,
MqttSubTopicOutgoing,
MqttClient, MqttClientState, MqttConnectError, MqttPlugin, MqttPublishOutgoing,
MqttPublishPacket, MqttSetting,
};
use bevy_state::state::OnEnter;

Expand All @@ -24,7 +24,7 @@ fn main() {
.add_systems(Update, (handle_mqtt_publish, handle_error))
.add_systems(
Update,
publish_message.run_if(on_timer(std::time::Duration::from_secs(1))),
publish_message.run_if(on_timer(Duration::from_secs(1))),
)
.run();
}
Expand All @@ -41,17 +41,16 @@ fn handle_mqtt_publish(mut mqtt_event: EventReader<MqttPublishPacket>) {
}
}

fn handle_error(mut error_events: EventReader<MqttError>) {
fn handle_error(mut error_events: EventReader<MqttConnectError>) {
for error in error_events.read() {
println!("Error: {:?}", error);
}
}

fn sub_topic(mut sub_events: EventWriter<MqttSubTopicOutgoing>) {
sub_events.send(MqttSubTopicOutgoing {
topic: "hello/mqtt".to_string(),
qos: QoS::AtMostOnce,
});
fn sub_topic(client: Res<MqttClient>) {
client
.try_subscribe("hello/mqtt", QoS::AtMostOnce)
.expect("subscribe failed");
}

fn publish_message(mut pub_events: EventWriter<MqttPublishOutgoing>) {
Expand Down
32 changes: 20 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use bytes::Bytes;
use kanal::{bounded_async, AsyncReceiver};
use regex::Regex;
pub use rumqttc;
use rumqttc::{ConnectionError, QoS};
use rumqttc::{ClientError, ConnectionError, QoS};
use std::ops::{Deref, DerefMut};

#[derive(Default)]
Expand All @@ -31,7 +31,8 @@ impl Plugin for MqttPlugin {

app.init_state::<MqttClientState>()
.add_event::<MqttEvent>()
.add_event::<MqttError>()
.add_event::<MqttConnectError>()
.add_event::<MqttClientError>()
.add_event::<MqttPublishOutgoing>()
.add_event::<MqttPublishPacket>()
.add_event::<DisconnectMqttClient>()
Expand Down Expand Up @@ -95,7 +96,11 @@ pub struct MqttEvent(pub rumqttc::Event);

/// A wrapper around rumqttc::ConnectionError
#[derive(Debug, Deref, DerefMut, Event)]
pub struct MqttError(pub ConnectionError);
pub struct MqttConnectError(pub ConnectionError);

/// A wrapper around rumqttc::ClientError
#[derive(Debug, Deref, DerefMut, Event)]
pub struct MqttClientError(pub ClientError);

#[derive(Debug, Event)]
pub struct MqttPublishOutgoing {
Expand All @@ -115,14 +120,14 @@ pub struct MqttPublishPacket {
pub payload: Bytes,
}

/// A event to disconnect an MQTT client
/// AN event to disconnect an MQTT client
#[derive(Event)]
pub struct DisconnectMqttClient;

fn handle_mqtt_events(
client: Res<MqttClient>,
mut mqtt_events: EventWriter<MqttEvent>,
mut error_events: EventWriter<MqttError>,
mut error_events: EventWriter<MqttConnectError>,
mut next_state: ResMut<NextState<MqttClientState>>,
mut publish_incoming: EventWriter<MqttPublishPacket>,
) {
Expand Down Expand Up @@ -151,7 +156,7 @@ fn handle_mqtt_events(

while let Ok(Some(err)) = client.from_async_error.try_recv() {
next_state.set(MqttClientState::Disconnected);
error_events.send(MqttError(err));
error_events.send(MqttConnectError(err));
}
}

Expand Down Expand Up @@ -260,39 +265,42 @@ fn on_add_subscribe(
trigger: Trigger<OnAdd, SubscribeTopic>,
query: Query<&SubscribeTopic>,
client: Res<MqttClient>,
mut client_error: EventWriter<MqttClientError>,
) {
let subscribe = query.get(trigger.entity()).unwrap();
debug!("subscribe to {:?}", subscribe.topic);
client
let _ = client
.try_subscribe(subscribe.topic.clone(), subscribe.qos)
.expect("subscribe failed");
.map_err(|e| client_error.send(MqttClientError(e)));
}

fn on_remove_subscribe(
trigger: Trigger<OnRemove, SubscribeTopic>,
query: Query<&SubscribeTopic>,
client: Res<MqttClient>,
mut client_error: EventWriter<MqttClientError>,
) {
let subscribe = query.get(trigger.entity()).unwrap();
debug!("unsubscribe to {:?}", subscribe.topic);
client
let _ = client
.try_unsubscribe(subscribe.topic.clone())
.expect("unsubscribe failed");
.map_err(|e| client_error.send(MqttClientError(e)));
}

fn handle_outgoing_publish(
mut pub_events: EventReader<MqttPublishOutgoing>,
client: Res<MqttClient>,
mut client_error: EventWriter<MqttClientError>,
) {
for event in pub_events.read() {
client
let _ = client
.try_publish(
event.topic.clone(),
event.qos,
event.retain,
event.payload.clone(),
)
.expect("publish failed");
.map_err(|e| client_error.send(MqttClientError(e)));
}
}

Expand Down

0 comments on commit fb10d0d

Please sign in to comment.