diff --git a/README.md b/README.md index 1be25c5..7bb9292 100644 --- a/README.md +++ b/README.md @@ -21,8 +21,8 @@ use bevy::{prelude::*, time::common_conditions::on_timer}; use bevy_log::LogPlugin; use bevy_mqtt::{ rumqttc::{MqttOptions, QoS}, - MqttClient, MqttClientState, MqttError, MqttEvent, MqttPlugin, MqttPublishOutgoing, - MqttSetting, SubscribeTopic, TopicMessage, + MqttClient, MqttClientError, MqttClientState, MqttConnectError, MqttEvent, MqttPlugin, + MqttPublishOutgoing, MqttSetting, SubscribeTopic, TopicMessage, }; use bevy_state::prelude::OnEnter; use bincode::ErrorKind; @@ -90,9 +90,16 @@ fn handle_message(mut mqtt_event: EventReader) { } } -fn handle_error(mut error_events: EventReader) { - for error in error_events.read() { - println!("Error: {:?}", error); +fn handle_error( + mut connect_errors: EventReader, + mut client_errors: EventReader, +) { + for error in connect_errors.read() { + println!("connect Error: {:?}", error); + } + + for error in client_errors.read() { + println!("client Error: {:?}", error); } } @@ -110,7 +117,7 @@ fn sub_topic_by_component(mut commands: Commands) { .spawn(SubscribeTopic::new("+/mqtt", QoS::AtMostOnce)) .observe(|topic_message: Trigger| { println!( - "topic: {} Received : {:?}", + "topic: {} received : {:?}", topic_message.event().topic, topic_message.event().payload ); @@ -146,8 +153,6 @@ fn publish_message(mut pub_events: EventWriter) { pub_events.send_batch(list); } - - ``` ## Supported Versions diff --git a/examples/pub_and_sub.rs b/examples/pub_and_sub.rs index 8ded076..5237894 100644 --- a/examples/pub_and_sub.rs +++ b/examples/pub_and_sub.rs @@ -4,8 +4,8 @@ use bevy::{prelude::*, time::common_conditions::on_timer}; use bevy_log::LogPlugin; use bevy_mqtt::{ rumqttc::{MqttOptions, QoS}, - MqttClient, MqttClientState, MqttError, MqttEvent, MqttPlugin, MqttPublishOutgoing, - MqttSetting, SubscribeTopic, TopicMessage, + MqttClient, MqttClientError, MqttClientState, MqttConnectError, MqttEvent, MqttPlugin, + MqttPublishOutgoing, MqttSetting, SubscribeTopic, TopicMessage, }; use bevy_state::prelude::OnEnter; use bincode::ErrorKind; @@ -73,9 +73,16 @@ fn handle_message(mut mqtt_event: EventReader) { } } -fn handle_error(mut error_events: EventReader) { - for error in error_events.read() { - println!("Error: {:?}", error); +fn handle_error( + mut connect_errors: EventReader, + mut client_errors: EventReader, +) { + for error in connect_errors.read() { + println!("connect Error: {:?}", error); + } + + for error in client_errors.read() { + println!("client Error: {:?}", error); } } diff --git a/examples/websocket.rs b/examples/websocket.rs index 4d374f8..0a3e1a3 100644 --- a/examples/websocket.rs +++ b/examples/websocket.rs @@ -4,8 +4,8 @@ use bevy::{prelude::*, time::common_conditions::on_timer}; use bevy_log::LogPlugin; use bevy_mqtt::{ rumqttc::{MqttOptions, QoS, Transport}, - MqttClientState, MqttError, MqttPlugin, MqttPublishOutgoing, MqttPublishPacket, MqttSetting, - MqttSubTopicOutgoing, + MqttClient, MqttClientState, MqttConnectError, MqttPlugin, MqttPublishOutgoing, + MqttPublishPacket, MqttSetting, }; use bevy_state::state::OnEnter; @@ -24,7 +24,7 @@ fn main() { .add_systems(Update, (handle_mqtt_publish, handle_error)) .add_systems( Update, - publish_message.run_if(on_timer(std::time::Duration::from_secs(1))), + publish_message.run_if(on_timer(Duration::from_secs(1))), ) .run(); } @@ -41,17 +41,16 @@ fn handle_mqtt_publish(mut mqtt_event: EventReader) { } } -fn handle_error(mut error_events: EventReader) { +fn handle_error(mut error_events: EventReader) { for error in error_events.read() { println!("Error: {:?}", error); } } -fn sub_topic(mut sub_events: EventWriter) { - sub_events.send(MqttSubTopicOutgoing { - topic: "hello/mqtt".to_string(), - qos: QoS::AtMostOnce, - }); +fn sub_topic(client: Res) { + client + .try_subscribe("hello/mqtt", QoS::AtMostOnce) + .expect("subscribe failed"); } fn publish_message(mut pub_events: EventWriter) { diff --git a/src/lib.rs b/src/lib.rs index ec92680..35bdf28 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,7 +14,7 @@ use bytes::Bytes; use kanal::{bounded_async, AsyncReceiver}; use regex::Regex; pub use rumqttc; -use rumqttc::{ConnectionError, QoS}; +use rumqttc::{ClientError, ConnectionError, QoS}; use std::ops::{Deref, DerefMut}; #[derive(Default)] @@ -31,7 +31,8 @@ impl Plugin for MqttPlugin { app.init_state::() .add_event::() - .add_event::() + .add_event::() + .add_event::() .add_event::() .add_event::() .add_event::() @@ -95,7 +96,11 @@ pub struct MqttEvent(pub rumqttc::Event); /// A wrapper around rumqttc::ConnectionError #[derive(Debug, Deref, DerefMut, Event)] -pub struct MqttError(pub ConnectionError); +pub struct MqttConnectError(pub ConnectionError); + +/// A wrapper around rumqttc::ClientError +#[derive(Debug, Deref, DerefMut, Event)] +pub struct MqttClientError(pub ClientError); #[derive(Debug, Event)] pub struct MqttPublishOutgoing { @@ -115,14 +120,14 @@ pub struct MqttPublishPacket { pub payload: Bytes, } -/// A event to disconnect an MQTT client +/// AN event to disconnect an MQTT client #[derive(Event)] pub struct DisconnectMqttClient; fn handle_mqtt_events( client: Res, mut mqtt_events: EventWriter, - mut error_events: EventWriter, + mut error_events: EventWriter, mut next_state: ResMut>, mut publish_incoming: EventWriter, ) { @@ -151,7 +156,7 @@ fn handle_mqtt_events( while let Ok(Some(err)) = client.from_async_error.try_recv() { next_state.set(MqttClientState::Disconnected); - error_events.send(MqttError(err)); + error_events.send(MqttConnectError(err)); } } @@ -260,39 +265,42 @@ fn on_add_subscribe( trigger: Trigger, query: Query<&SubscribeTopic>, client: Res, + mut client_error: EventWriter, ) { let subscribe = query.get(trigger.entity()).unwrap(); debug!("subscribe to {:?}", subscribe.topic); - client + let _ = client .try_subscribe(subscribe.topic.clone(), subscribe.qos) - .expect("subscribe failed"); + .map_err(|e| client_error.send(MqttClientError(e))); } fn on_remove_subscribe( trigger: Trigger, query: Query<&SubscribeTopic>, client: Res, + mut client_error: EventWriter, ) { let subscribe = query.get(trigger.entity()).unwrap(); debug!("unsubscribe to {:?}", subscribe.topic); - client + let _ = client .try_unsubscribe(subscribe.topic.clone()) - .expect("unsubscribe failed"); + .map_err(|e| client_error.send(MqttClientError(e))); } fn handle_outgoing_publish( mut pub_events: EventReader, client: Res, + mut client_error: EventWriter, ) { for event in pub_events.read() { - client + let _ = client .try_publish( event.topic.clone(), event.qos, event.retain, event.payload.clone(), ) - .expect("publish failed"); + .map_err(|e| client_error.send(MqttClientError(e))); } }