diff --git a/rumqttc/examples/async_manual_acks_v5.rs b/rumqttc/examples/async_manual_acks_v5.rs index bcf1bf356..e76cc214c 100644 --- a/rumqttc/examples/async_manual_acks_v5.rs +++ b/rumqttc/examples/async_manual_acks_v5.rs @@ -2,7 +2,7 @@ use rumqttc::v5::mqttbytes::v5::Packet; use rumqttc::v5::mqttbytes::QoS; use tokio::{task, time}; -use rumqttc::v5::{AsyncClient, Event, EventLoop, MqttOptions}; +use rumqttc::v5::{AsyncClient, Event, EventLoop, ManualAckReason, MqttOptions}; use std::error::Error; use std::time::Duration; @@ -61,12 +61,27 @@ async fn main() -> Result<(), Box> { Packet::Publish(publish) => publish, _ => continue, }; - // this time we will ack incoming publishes. + // this time we will ack incoming publishes in two different ways. // Its important not to block notifier as this can cause deadlock. - let c = client.clone(); - tokio::spawn(async move { - c.ack(&publish).await.unwrap(); - }); + if (publish.pkid & 1) == 0 { + // Ack with all default: reason code success, no reason string, properties none + let c = client.clone(); + tokio::spawn(async move { + c.ack(&publish).await.unwrap(); + }); + } else { + // Ack with custom reason code and reason string + let c = client.clone(); + // Get manual ack packet for later acking + let mut ack = c.get_manual_ack(&publish); + tokio::spawn(async move { + // Customize reason code (if not set, default is Success) + ack.set_reason(ManualAckReason::Success); + // Customize reason string (if not set, default is empty) + ack.set_reason_string("There is no error".to_string().into()); + c.manual_ack(ack).await.unwrap(); + }); + } } } diff --git a/rumqttc/src/client.rs b/rumqttc/src/client.rs index 15cd5f5ad..0b18f0dc8 100644 --- a/rumqttc/src/client.rs +++ b/rumqttc/src/client.rs @@ -111,6 +111,33 @@ impl AsyncClient { Ok(()) } + /// Get a MQTT ManualAck (PubAck/PubRec) for manual_ack/try_manual_ack to the `EventLoop`. Only needed in if `manual_acks` flag is set. + pub fn get_manual_ack(&self, publish: &Publish) -> ManualAck { + match publish.qos { + QoS::AtMostOnce => ManualAck::None, + QoS::AtLeastOnce => ManualAck::PubAck(PubAck::new(publish.pkid)), + QoS::ExactlyOnce => ManualAck::PubRec(PubRec::new(publish.pkid)), + } + } + + /// Sends a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set. + pub async fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> { + let ack = get_manual_ack_req(ack); + if let Some(ack) = ack { + self.request_tx.send_async(ack).await?; + } + Ok(()) + } + + /// Attempts to send a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set. + pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> { + let ack = get_manual_ack_req(ack); + if let Some(ack) = ack { + self.request_tx.try_send(ack)?; + } + Ok(()) + } + /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set. pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> { let ack = get_ack_req(publish); @@ -235,6 +262,22 @@ impl AsyncClient { } } + +/// ManualAck packet for manual_ack +pub enum ManualAck { + None, + PubAck(PubAck), + PubRec(PubRec), +} + +fn get_manual_ack_req(ack: ManualAck) -> Option { + match ack { + ManualAck::None => None, + ManualAck::PubAck(ack) => Some(Request::PubAck(ack)), + ManualAck::PubRec(ack) => Some(Request::PubRec(ack)), + } +} + fn get_ack_req(publish: &Publish) -> Option { let ack = match publish.qos { QoS::AtMostOnce => return None, @@ -323,6 +366,30 @@ impl Client { Ok(()) } + /// Get a MQTT ManualAck (PubAck/PubRec) for manual_ack/try_manual_ack to the `EventLoop`. Only needed in if `manual_acks` flag is set. + pub fn get_manual_ack(&self, publish: &Publish) -> ManualAck { + match publish.qos { + QoS::AtMostOnce => ManualAck::None, + QoS::AtLeastOnce => ManualAck::PubAck(PubAck::new(publish.pkid)), + QoS::ExactlyOnce => ManualAck::PubRec(PubRec::new(publish.pkid)), + } + } + + /// Sends a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set. + pub async fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> { + let ack = get_manual_ack_req(ack); + if let Some(ack) = ack { + self.client.request_tx.send(ack)?; + } + Ok(()) + } + + /// Attempts to send a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set. + pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> { + self.client.try_manual_ack(ack)?; + Ok(()) + } + /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set. pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> { let ack = get_ack_req(publish); diff --git a/rumqttc/src/v5/client.rs b/rumqttc/src/v5/client.rs index f8629b8c5..591bb551a 100644 --- a/rumqttc/src/v5/client.rs +++ b/rumqttc/src/v5/client.rs @@ -177,6 +177,37 @@ impl AsyncClient { self.handle_try_publish(topic, qos, retain, payload, None) } + /// Get a MQTT ManualAck (PubAck/PubRec) for manual_ack/try_manual_ack to send to the `EventLoop`. Only needed in if `manual_acks` flag is set. + /// This is useful when you want to ack a publish later. + /// By default the ack reason code is success, you can change it using `ack.set_reason`. + /// By default the ack reason string is empty, you can change it using `ack.set_reason_string`. + /// By default the ack user properties is empty, you can change it using `ack.set_user_properties`. + pub fn get_manual_ack(&self, publish: &Publish) -> ManualAck { + match publish.qos { + QoS::AtMostOnce => ManualAck::None, + QoS::AtLeastOnce => ManualAck::PubAck(PubAck::new(publish.pkid, None)), + QoS::ExactlyOnce => ManualAck::PubRec(PubRec::new(publish.pkid, None)), + } + } + + /// Sends a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set. + pub async fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> { + let ack = get_manual_ack_req(ack); + if let Some(ack) = ack { + self.request_tx.send_async(ack).await?; + } + Ok(()) + } + + /// Attempts to send a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set. + pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> { + let ack = get_manual_ack_req(ack); + if let Some(ack) = ack { + self.request_tx.try_send(ack)?; + } + Ok(()) + } + /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set. pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> { let ack = get_ack_req(publish); @@ -450,6 +481,91 @@ impl AsyncClient { } } +/// Reasons for ManualAck Preparation +pub enum ManualAckReason { + Success, + NoMatchingSubscribers, + UnspecifiedError, + ImplementationSpecificError, + NotAuthorized, + TopicNameInvalid, + PacketIdentifierInUse, + QuotaExceeded, + PayloadFormatInvalid, +} +impl ManualAckReason { + pub fn code(&self) -> u8 { + match self { + ManualAckReason::Success => 0, + ManualAckReason::NoMatchingSubscribers => 16, + ManualAckReason::UnspecifiedError => 128, + ManualAckReason::ImplementationSpecificError => 131, + ManualAckReason::NotAuthorized => 135, + ManualAckReason::TopicNameInvalid => 144, + ManualAckReason::PacketIdentifierInUse => 145, + ManualAckReason::QuotaExceeded => 151, + ManualAckReason::PayloadFormatInvalid => 153, + } + } +} + +/// ManualAck packet for manual_ack +pub enum ManualAck { + None, + PubAck(PubAck), + PubRec(PubRec), +} + +impl ManualAck { + /// Set reason code for manual_ack sending + pub fn set_reason(&mut self, reason: ManualAckReason) -> &mut Self { + match self { + ManualAck::None => (), + ManualAck::PubAck(ack) => ack.set_code(reason.code()), + ManualAck::PubRec(ack) => ack.set_code(reason.code()), + } + self + } + + /// Set reason code number for manual_ack sending + pub fn set_code(&mut self, code: u8) -> &mut Self { + match self { + ManualAck::None => (), + ManualAck::PubAck(ack) => ack.set_code(code), + ManualAck::PubRec(ack) => ack.set_code(code), + } + self + } + + /// Set reason string on manual_ack properties + pub fn set_reason_string(&mut self, reason_string: Option) -> &mut Self { + match self { + ManualAck::None => (), + ManualAck::PubAck(ack) => ack.set_reason_string(reason_string), + ManualAck::PubRec(ack) => ack.set_reason_string(reason_string), + } + self + } + + /// Set user properties on manual_ack properties + pub fn set_user_properties(&mut self, user_properties: Vec<(String, String)>) -> &mut Self { + match self { + ManualAck::None => (), + ManualAck::PubAck(ack) => ack.set_user_properties(user_properties), + ManualAck::PubRec(ack) => ack.set_user_properties(user_properties), + } + self + } +} + +fn get_manual_ack_req(ack: ManualAck) -> Option { + match ack { + ManualAck::None => None, + ManualAck::PubAck(ack) => Some(Request::PubAck(ack)), + ManualAck::PubRec(ack) => Some(Request::PubRec(ack)), + } +} + fn get_ack_req(publish: &Publish) -> Option { let ack = match publish.qos { QoS::AtMostOnce => return None, @@ -584,6 +700,30 @@ impl Client { self.client.try_publish(topic, qos, retain, payload) } + /// Get a MQTT ManualAck (PubAck/PubRec) for manual_ack/try_manual_ack to send to the `EventLoop`. Only needed in if `manual_acks` flag is set. + /// This is useful when you want to ack a publish later. + /// By default the ack reason code is success, you can change it using `ack.set_reason`. + /// By default the ack reason string is empty, you can change it using `ack.set_reason_string`. + /// By default the ack user properties is empty, you can change it using `ack.set_user_properties`. + pub fn get_manual_ack(&self, publish: &Publish) -> ManualAck { + self.client.get_manual_ack(publish) + } + + /// Sends a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set. + pub fn manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> { + let ack = get_manual_ack_req(ack); + if let Some(ack) = ack { + self.client.request_tx.send(ack)?; + } + Ok(()) + } + + /// Attempts to send a prepared MQTT ManualAck (PubAck/PubRec) to the `EventLoop`. Only needed in if `manual_acks` flag is set. + pub fn try_manual_ack(&self, ack: ManualAck) -> Result<(), ClientError> { + self.client.try_manual_ack(ack)?; + Ok(()) + } + /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set. pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> { let ack = get_ack_req(publish); diff --git a/rumqttc/src/v5/mod.rs b/rumqttc/src/v5/mod.rs index 44499cde2..2e2ece399 100644 --- a/rumqttc/src/v5/mod.rs +++ b/rumqttc/src/v5/mod.rs @@ -19,7 +19,7 @@ use crate::{NetworkOptions, Transport}; use mqttbytes::v5::*; -pub use client::{AsyncClient, Client, ClientError, Connection, Iter}; +pub use client::{AsyncClient, Client, ClientError, Connection, Iter, ManualAck, ManualAckReason}; pub use eventloop::{ConnectionError, Event, EventLoop}; pub use state::{MqttState, StateError}; diff --git a/rumqttc/src/v5/mqttbytes/v5/puback.rs b/rumqttc/src/v5/mqttbytes/v5/puback.rs index 9905a4500..3a0af3a6e 100644 --- a/rumqttc/src/v5/mqttbytes/v5/puback.rs +++ b/rumqttc/src/v5/mqttbytes/v5/puback.rs @@ -32,6 +32,32 @@ impl PubAck { } } + pub fn set_code(&mut self, code: u8) { + self.reason = reason(code).unwrap(); + } + + pub fn set_reason_string(&mut self, reason_string: Option) { + if let Some(props) = &mut self.properties { + props.reason_string = reason_string; + } else { + self.properties = Some(PubAckProperties { + reason_string, + user_properties: Vec::<(String, String)>::new(), + }); + } + } + + pub fn set_user_properties(&mut self, user_properties: Vec<(String, String)>) { + if let Some(props) = &mut self.properties { + props.user_properties = user_properties; + } else { + self.properties = Some(PubAckProperties { + reason_string: None, + user_properties, + }); + } + } + pub fn size(&self) -> usize { if self.reason == PubAckReason::Success && self.properties.is_none() { return 4; diff --git a/rumqttc/src/v5/mqttbytes/v5/pubrec.rs b/rumqttc/src/v5/mqttbytes/v5/pubrec.rs index 6b3aad36d..ec98aadb3 100644 --- a/rumqttc/src/v5/mqttbytes/v5/pubrec.rs +++ b/rumqttc/src/v5/mqttbytes/v5/pubrec.rs @@ -33,6 +33,32 @@ impl PubRec { } } + pub fn set_code(&mut self, code: u8) { + self.reason = reason(code).unwrap(); + } + + pub fn set_reason_string(&mut self, reason_string: Option) { + if let Some(props) = &mut self.properties { + props.reason_string = reason_string; + } else { + self.properties = Some(PubRecProperties { + reason_string, + user_properties: Vec::<(String, String)>::new(), + }); + } + } + + pub fn set_user_properties(&mut self, user_properties: Vec<(String, String)>) { + if let Some(props) = &mut self.properties { + props.user_properties = user_properties; + } else { + self.properties = Some(PubRecProperties { + reason_string: None, + user_properties, + }); + } + } + pub fn size(&self) -> usize { let len = self.len(); let remaining_len_size = len_len(len); @@ -83,12 +109,12 @@ impl PubRec { } let properties = PubRecProperties::read(&mut bytes)?; - let puback = PubRec { + let pubrec = PubRec { pkid, reason: reason(ack_reason)?, properties, }; - Ok(puback) + Ok(pubrec) } pub fn write(&self, buffer: &mut BytesMut) -> Result {