Skip to content

Commit

Permalink
add mqtt event
Browse files Browse the repository at this point in the history
  • Loading branch information
foxzool committed Mar 27, 2024
1 parent d9c3279 commit ebb9d96
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 15 deletions.
34 changes: 30 additions & 4 deletions examples/pub_and_sub.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use bevy::prelude::*;
use rumqttc::MqttOptions;
use bevy::time::common_conditions::on_timer;
use bevy_mqtt::rumqttc::{MqttOptions, QoS};

use bevy_mqtt::prelude::*;

Expand All @@ -9,9 +10,34 @@ fn main() {
mqtt_options: MqttOptions::new("rumqtt-sync", "127.0.0.1", 1883),
cap: 10,
})
.add_plugins((MinimalPlugins, MqttPlugin)).add_systems(Update, publish).run();
.add_plugins((MinimalPlugins, MqttPlugin)).add_systems(Update, (handle_message, handle_error))
.add_systems(Startup, sub_topic)
.add_systems(Update, publish_message.run_if(on_timer(std::time::Duration::from_secs(1))))
.run();
}

fn publish() {
// println!("Publishing message");
fn handle_message(mut mqtt_event: EventReader<MqttEvent>) {
for event in mqtt_event.read() {
println!("Received: {:?}", event.0);
}
}

fn handle_error(mut error_events: EventReader<MqttError>) {
for error in error_events.read() {
println!("Error: {:?}", error);
}
}

fn sub_topic(mut mqtt_client: Res<MqttClient>) {
mqtt_client.subscribe("hello/+/world", QoS::AtMostOnce).unwrap();
}

fn publish_message(mut mqtt_client: Res<MqttClient>) {
for i in 0..3 {
let payload = vec![1; i];
let topic = format!("hello/{i}/world");
let qos = QoS::AtLeastOnce;

mqtt_client.publish(topic, qos, false, payload).unwrap();
}
}
10 changes: 10 additions & 0 deletions src/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use bevy::prelude::{Deref, DerefMut, Event};

/// A wrapper around rumqttc::Event
#[derive(Debug, Clone, PartialEq, Eq, Deref, DerefMut, Event)]
pub struct MqttEvent(pub rumqttc::Event);


/// A wrapper around rumqttc::ConnectionError
#[derive(Debug, Deref, DerefMut, Event)]
pub struct MqttError(pub rumqttc::ConnectionError);
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

pub use rumqttc;


pub mod events;
pub mod client;
pub mod plugin;
pub mod prelude;
Expand Down
15 changes: 8 additions & 7 deletions src/plugin.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use bevy::prelude::*;
use rumqttc::{Client, QoS};
use rumqttc::Client;

use crate::client::{MqttClient, MqttConnection, MqttSetting};
use crate::systems::*;
use crate::events::{MqttError, MqttEvent};
use crate::systems::recv_connection;

#[derive(Default)]
pub struct MqttPlugin;
Expand All @@ -14,13 +15,13 @@ impl Plugin for MqttPlugin {
}
let setting = app.world.get_resource::<MqttSetting>().unwrap();

let (client, connection) = Client::new(setting.mqtt_options.clone(), setting.cap);

client.subscribe("hello/+/world", QoS::AtMostOnce).unwrap();
let (client, connection) = Client::new(setting.mqtt_options.clone(), setting.cap);

app.insert_resource(MqttClient(client));

app.insert_non_send_resource(MqttConnection(connection));
app.add_systems(Update, recv_connection);
app
.add_event::<MqttEvent>()
.add_event::<MqttError>()
.add_systems(Update, recv_connection);
}
}
2 changes: 1 addition & 1 deletion src/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#[doc(hidden)]
pub use crate::{client::*, plugin::*};
pub use crate::{client::*, events::*, plugin::*};

17 changes: 14 additions & 3 deletions src/systems.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
use std::time::Duration;

use bevy::prelude::NonSendMut;
use bevy::log::trace;
use bevy::prelude::{ EventWriter, NonSendMut};
use rumqttc::{ConnectionError, Event};

use crate::client::MqttConnection;
use crate::events::{MqttError, MqttEvent};

pub(crate) fn recv_connection(mut connection: NonSendMut<MqttConnection>) {
pub(crate) fn recv_connection(mut connection: NonSendMut<MqttConnection>, mut mqtt_events: EventWriter<MqttEvent>, mut error_events: EventWriter<MqttError>) {
while let Ok(notification) = connection.recv_timeout(Duration::from_millis(1)) {
println!("Received: {:?}", notification);
trace!("Received: {:?}", notification);
match notification {
Ok(event) => {
mqtt_events.send(MqttEvent(event));
}
Err(err) => {
error_events.send(MqttError(err));
}
}
}
}

0 comments on commit ebb9d96

Please sign in to comment.