Skip to content

Commit

Permalink
add serde example
Browse files Browse the repository at this point in the history
  • Loading branch information
foxzool committed Mar 27, 2024
1 parent 8ed795d commit 91301ab
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 13 deletions.
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@ websocket = ["rumqttc/websocket"]

[dependencies]
bevy = { version = "0.13.1", default-features = false }
rumqttc = "0.24.0"
rumqttc = "0.24.0"

[dev-dependencies]
bincode = "1.3.3"
serde = { version = "1", features = ["derive"] }
62 changes: 52 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,45 @@ A Bevy Plugin for MQTT client.
## Example

```rust
use bevy::prelude::*;
use bevy::time::common_conditions::on_timer;
use std::time::SystemTime;

use bevy::{prelude::*, time::common_conditions::on_timer};
use bincode::ErrorKind;
use serde::{Deserialize, Serialize};

use bevy_mqtt::prelude::*;
use bevy_mqtt::rumqttc::{MqttOptions, QoS};

#[derive(Serialize, Deserialize, Debug)]
struct Message {
i: usize,
time: SystemTime,
}

impl From<&Message> for Vec<u8> {
fn from(value: &Message) -> Self {
bincode::serialize(value).unwrap()
}
}

impl From<Message> for Vec<u8> {
fn from(value: Message) -> Self {
bincode::serialize(&value).unwrap()
}
}

impl TryFrom<&[u8]> for Message {
type Error = Box<ErrorKind>;

fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
bincode::deserialize(value)
}
}

fn main() {
App::new()
.insert_resource(MqttSetting {
mqtt_options: MqttOptions::new("rumqtt-sync", "localhost", 1883),
mqtt_options: MqttOptions::new("rumqtt-serde", "127.0.0.1", 1883),
cap: 10,
})
.add_plugins((MinimalPlugins, MqttPlugin))
Expand All @@ -34,7 +63,16 @@ fn main() {

fn handle_message(mut mqtt_event: EventReader<MqttEvent>) {
for event in mqtt_event.read() {
println!("Received: {:?}", event.0);
match &event.0 {
rumqttc::Event::Incoming(income) => match income {
rumqttc::Incoming::Publish(publish) => {
let message: Message = bincode::deserialize(&publish.payload).unwrap();
println!("Received: {:?}", message);
}
_ => {}
},
rumqttc::Event::Outgoing(_) => {}
}
}
}

Expand All @@ -46,20 +84,24 @@ fn handle_error(mut error_events: EventReader<MqttError>) {

fn sub_topic(mut mqtt_client: Res<MqttClient>) {
mqtt_client
.subscribe("hello/+/world", QoS::AtMostOnce)
.subscribe("hello/mqtt", QoS::AtMostOnce)
.unwrap();
}

fn publish_message(mut mqtt_client: Res<MqttClient>) {
for i in 0..3 {
let payload = vec![1; i];
let topic = format!("hello/{i}/world");
let qos = QoS::AtLeastOnce;

mqtt_client.publish(topic, qos, false, payload).unwrap();
let message = Message {
i,
time: SystemTime::now(),
};

mqtt_client
.publish("hello/mqtt", QoS::AtLeastOnce, false, message)
.unwrap();
}
}


```

## Supported Versions
Expand Down
4 changes: 2 additions & 2 deletions examples/pub_and_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use bevy_mqtt::rumqttc::{MqttOptions, QoS};
fn main() {
App::new()
.insert_resource(MqttSetting {
mqtt_options: MqttOptions::new("rumqtt-sync", "localhost", 1883),
mqtt_options: MqttOptions::new("rumqtt-sync", "127.0.0.1", 1883),
cap: 10,
})
.add_plugins((MinimalPlugins, MqttPlugin))
Expand All @@ -22,7 +22,7 @@ fn main() {

fn handle_message(mut mqtt_event: EventReader<MqttEvent>) {
for event in mqtt_event.read() {
println!("Received: {:?}", event.0);
println!("Received: {:?}", event);
}
}

Expand Down
90 changes: 90 additions & 0 deletions examples/serde.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use std::time::SystemTime;

use bevy::{prelude::*, time::common_conditions::on_timer};
use bincode::ErrorKind;
use serde::{Deserialize, Serialize};

use bevy_mqtt::prelude::*;
use bevy_mqtt::rumqttc::{MqttOptions, QoS};

#[derive(Serialize, Deserialize, Debug)]
struct Message {
i: usize,
time: SystemTime,
}

impl From<&Message> for Vec<u8> {
fn from(value: &Message) -> Self {
bincode::serialize(value).unwrap()
}
}

impl From<Message> for Vec<u8> {
fn from(value: Message) -> Self {
bincode::serialize(&value).unwrap()
}
}

impl TryFrom<&[u8]> for Message {
type Error = Box<ErrorKind>;

fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
bincode::deserialize(value)
}
}

fn main() {
App::new()
.insert_resource(MqttSetting {
mqtt_options: MqttOptions::new("rumqtt-serde", "127.0.0.1", 1883),
cap: 10,
})
.add_plugins((MinimalPlugins, MqttPlugin))
.add_systems(Update, (handle_message, handle_error))
.add_systems(Startup, sub_topic)
.add_systems(
Update,
publish_message.run_if(on_timer(std::time::Duration::from_secs(1))),
)
.run();
}

fn handle_message(mut mqtt_event: EventReader<MqttEvent>) {
for event in mqtt_event.read() {
match &event.0 {
rumqttc::Event::Incoming(income) => match income {
rumqttc::Incoming::Publish(publish) => {
let message: Message = bincode::deserialize(&publish.payload).unwrap();
println!("Received: {:?}", message);
}
_ => {}
},
rumqttc::Event::Outgoing(_) => {}
}
}
}

fn handle_error(mut error_events: EventReader<MqttError>) {
for error in error_events.read() {
println!("Error: {:?}", error);
}
}

fn sub_topic(mut mqtt_client: Res<MqttClient>) {
mqtt_client
.subscribe("hello/mqtt", QoS::AtMostOnce)
.unwrap();
}

fn publish_message(mut mqtt_client: Res<MqttClient>) {
for i in 0..3 {
let message = Message {
i,
time: SystemTime::now(),
};

mqtt_client
.publish("hello/mqtt", QoS::AtLeastOnce, false, message)
.unwrap();
}
}

0 comments on commit 91301ab

Please sign in to comment.