Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.unwrap()
.qos(mqtt::packet::Qos::AtLeastOnce)
.payload(b"Hello, MQTT!")
.packet_id(connection.acquire_packet_id()?)
.packet_id(Some(connection.acquire_packet_id()?))
.build()?;

let events = connection.checked_send(publish_packet);
Expand Down
23 changes: 12 additions & 11 deletions examples/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,21 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
handle_events(&mut stream, &mut connection, events)?;
}

let mut publish_builder = mqtt::packet::v5_0::Publish::builder()
let packet_id = match qos_level {
mqtt::packet::Qos::AtMostOnce => None,
_ => Some(
connection
.acquire_packet_id()
.map_err(|e| format!("Failed to acquire packet ID: {e:?}"))?,
),
};

let publish_packet = mqtt::packet::v5_0::Publish::builder()
.topic_name(topic)
.unwrap()
.qos(qos_level)
.payload(payload.as_bytes());

if qos_level != mqtt::packet::Qos::AtMostOnce {
let packet_id = connection
.acquire_packet_id()
.map_err(|e| format!("Failed to acquire packet ID: {e:?}"))?;
publish_builder = publish_builder.packet_id(packet_id);
}

let publish_packet = publish_builder
.packet_id(packet_id)
.payload(payload.as_bytes())
.build()
.map_err(|e| format!("Failed to build PUBLISH packet: {e:?}"))?;

Expand Down
6 changes: 3 additions & 3 deletions src/mqtt/packet/v3_1_1/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -861,10 +861,10 @@ where
/// .topic_name("test/topic")
/// .unwrap()
/// .qos(Qos::AtLeastOnce)
/// .packet_id(123);
/// .packet_id(Some(123));
/// ```
pub fn packet_id(mut self, id: PacketIdType) -> Self {
self.packet_id_buf = Some(Some(id.to_buffer()));
pub fn packet_id(mut self, id: Option<PacketIdType>) -> Self {
self.packet_id_buf = id.map(|id| Some(id.to_buffer()));
self
}

Expand Down
6 changes: 3 additions & 3 deletions src/mqtt/packet/v5_0/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1155,10 +1155,10 @@ where
/// use mqtt_protocol_core::mqtt;
///
/// let builder = mqtt::packet::v5_0::Publish::builder()
/// .packet_id(42);
/// .packet_id(Some(42));
/// ```
pub fn packet_id(mut self, id: PacketIdType) -> Self {
self.packet_id_buf = Some(Some(id.to_buffer()));
pub fn packet_id(mut self, id: Option<PacketIdType>) -> Self {
self.packet_id_buf = id.map(|id| Some(id.to_buffer()));
self
}

Expand Down
12 changes: 6 additions & 6 deletions tests/connection-core-auto-res.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn auto_pub_response_v3_1_1() {
.topic_name("topic/a")
.unwrap()
.qos(mqtt::packet::Qos::AtLeastOnce)
.packet_id(packet_id_a)
.packet_id(Some(packet_id_a))
.payload(b"payload A".to_vec())
.build()
.unwrap();
Expand Down Expand Up @@ -89,7 +89,7 @@ fn auto_pub_response_v3_1_1() {
.topic_name("topic/b")
.unwrap()
.qos(mqtt::packet::Qos::ExactlyOnce)
.packet_id(packet_id_b)
.packet_id(Some(packet_id_b))
.payload(b"payload B".to_vec())
.build()
.unwrap();
Expand Down Expand Up @@ -188,7 +188,7 @@ fn auto_pub_response_v5_0() {
.topic_name("topic/a")
.unwrap()
.qos(mqtt::packet::Qos::AtLeastOnce)
.packet_id(packet_id_a)
.packet_id(Some(packet_id_a))
.payload(b"payload A".to_vec())
.build()
.unwrap();
Expand Down Expand Up @@ -222,7 +222,7 @@ fn auto_pub_response_v5_0() {
.topic_name("topic/b")
.unwrap()
.qos(mqtt::packet::Qos::ExactlyOnce)
.packet_id(packet_id_b)
.packet_id(Some(packet_id_b))
.payload(b"payload B".to_vec())
.build()
.unwrap();
Expand Down Expand Up @@ -323,7 +323,7 @@ fn qos2_pubrel_send_request_v3_1_1() {
.topic_name("test/topic")
.unwrap()
.qos(mqtt::packet::Qos::ExactlyOnce)
.packet_id(packet_id)
.packet_id(Some(packet_id))
.payload(b"test payload".to_vec())
.build()
.unwrap();
Expand Down Expand Up @@ -393,7 +393,7 @@ fn qos2_pubrel_send_request_v5_0() {
.topic_name("test/topic")
.unwrap()
.qos(mqtt::packet::Qos::ExactlyOnce)
.packet_id(packet_id)
.packet_id(Some(packet_id))
.payload(b"test payload".to_vec())
.build()
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions tests/connection-core-maximum-packet-size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ fn client_over_maximum_packet_size_recv() {
.topic_name("topic/c")
.unwrap()
.qos(mqtt::packet::Qos::ExactlyOnce)
.packet_id(1u16)
.packet_id(Some(1u16))
.payload(b"012345678901234567890123456789".to_vec())
.build()
.unwrap();
Expand Down Expand Up @@ -408,7 +408,7 @@ fn server_over_maximum_packet_size_recv() {
.topic_name("topic/c")
.unwrap()
.qos(mqtt::packet::Qos::ExactlyOnce)
.packet_id(1u16)
.packet_id(Some(1u16))
.payload(b"012345678901234567890123456789".to_vec())
.build()
.unwrap();
Expand Down
12 changes: 6 additions & 6 deletions tests/connection-core-misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ fn offline_publish_v3_1_1() {
.topic_name("topic/a")
.unwrap()
.qos(mqtt::packet::Qos::AtLeastOnce)
.packet_id(packet_id_a)
.packet_id(Some(packet_id_a))
.payload(b"payload A".to_vec())
.build()
.unwrap();
Expand All @@ -98,7 +98,7 @@ fn offline_publish_v3_1_1() {
.topic_name("topic/b")
.unwrap()
.qos(mqtt::packet::Qos::ExactlyOnce)
.packet_id(packet_id_b)
.packet_id(Some(packet_id_b))
.payload(b"payload B".to_vec())
.build()
.unwrap();
Expand Down Expand Up @@ -170,7 +170,7 @@ fn offline_publish_v5_0() {
.topic_name("topic/a")
.unwrap()
.qos(mqtt::packet::Qos::AtLeastOnce)
.packet_id(packet_id_a)
.packet_id(Some(packet_id_a))
.payload(b"payload A".to_vec())
.build()
.unwrap();
Expand All @@ -183,7 +183,7 @@ fn offline_publish_v5_0() {
.topic_name("topic/b")
.unwrap()
.qos(mqtt::packet::Qos::ExactlyOnce)
.packet_id(packet_id_b)
.packet_id(Some(packet_id_b))
.payload(b"payload B".to_vec())
.build()
.unwrap();
Expand Down Expand Up @@ -253,7 +253,7 @@ fn puback_match_v3_1_1() {
.topic_name("topic/a")
.unwrap()
.qos(mqtt::packet::Qos::AtLeastOnce)
.packet_id(packet_id_a)
.packet_id(Some(packet_id_a))
.payload(b"payload A".to_vec())
.build()
.unwrap();
Expand Down Expand Up @@ -930,7 +930,7 @@ fn qos2_publish_handled_v3_1_1() {
.topic_name("topic/a")
.unwrap()
.qos(mqtt::packet::Qos::ExactlyOnce)
.packet_id(packet_id_a)
.packet_id(Some(packet_id_a))
.payload(b"payload A".to_vec())
.build()
.unwrap();
Expand Down
12 changes: 6 additions & 6 deletions tests/connection-core-notify-closed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ fn notify_closed_with_acquired_packet_ids() {
.topic_name("test/qos1")
.unwrap()
.qos(mqtt::packet::Qos::AtLeastOnce)
.packet_id(publish_qos1_pid)
.packet_id(Some(publish_qos1_pid))
.payload(b"qos1 payload")
.build()
.expect("Failed to build Publish QoS1 packet")
Expand All @@ -287,7 +287,7 @@ fn notify_closed_with_acquired_packet_ids() {
.topic_name("test/qos2")
.unwrap()
.qos(mqtt::packet::Qos::ExactlyOnce)
.packet_id(publish_qos2_pid)
.packet_id(Some(publish_qos2_pid))
.payload(b"qos2 payload")
.build()
.expect("Failed to build Publish QoS2 packet")
Expand Down Expand Up @@ -391,7 +391,7 @@ fn notify_closed_with_acquired_packet_ids_session_storage() {
.topic_name("test/qos1")
.unwrap()
.qos(mqtt::packet::Qos::AtLeastOnce)
.packet_id(publish_qos1_pid)
.packet_id(Some(publish_qos1_pid))
.payload(b"qos1 payload")
.build()
.expect("Failed to build Publish QoS1 packet")
Expand All @@ -406,7 +406,7 @@ fn notify_closed_with_acquired_packet_ids_session_storage() {
.topic_name("test/qos2")
.unwrap()
.qos(mqtt::packet::Qos::ExactlyOnce)
.packet_id(publish_qos2_pid)
.packet_id(Some(publish_qos2_pid))
.payload(b"qos2 payload")
.build()
.expect("Failed to build Publish QoS2 packet")
Expand Down Expand Up @@ -506,7 +506,7 @@ fn notify_closed_v5_0_with_acquired_packet_ids() {
.topic_name("test/qos1")
.unwrap()
.qos(mqtt::packet::Qos::AtLeastOnce)
.packet_id(publish_qos1_pid)
.packet_id(Some(publish_qos1_pid))
.payload(b"qos1 payload")
.build()
.expect("Failed to build Publish QoS1 packet")
Expand All @@ -521,7 +521,7 @@ fn notify_closed_v5_0_with_acquired_packet_ids() {
.topic_name("test/qos2")
.unwrap()
.qos(mqtt::packet::Qos::ExactlyOnce)
.packet_id(publish_qos2_pid)
.packet_id(Some(publish_qos2_pid))
.payload(b"qos2 payload")
.build()
.expect("Failed to build Publish QoS2 packet")
Expand Down
24 changes: 12 additions & 12 deletions tests/connection-core-receive-maximum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ fn get_receive_maximum_vacancy_for_send_client() {
.topic_name("topic/a")
.unwrap()
.qos(mqtt::packet::Qos::AtLeastOnce)
.packet_id(packet_id_a)
.packet_id(Some(packet_id_a))
.payload(b"payload A".to_vec())
.build()
.unwrap();
Expand All @@ -64,7 +64,7 @@ fn get_receive_maximum_vacancy_for_send_client() {
.topic_name("topic/b")
.unwrap()
.qos(mqtt::packet::Qos::ExactlyOnce)
.packet_id(packet_id_b)
.packet_id(Some(packet_id_b))
.payload(b"payload B".to_vec())
.build()
.unwrap();
Expand All @@ -78,7 +78,7 @@ fn get_receive_maximum_vacancy_for_send_client() {
.topic_name("topic/c")
.unwrap()
.qos(mqtt::packet::Qos::ExactlyOnce)
.packet_id(packet_id_c)
.packet_id(Some(packet_id_c))
.payload(b"payload C".to_vec())
.build()
.unwrap();
Expand Down Expand Up @@ -173,7 +173,7 @@ fn get_receive_maximum_vacancy_for_send_server() {
.topic_name("topic/a")
.unwrap()
.qos(mqtt::packet::Qos::AtLeastOnce)
.packet_id(packet_id_a)
.packet_id(Some(packet_id_a))
.payload(b"payload A".to_vec())
.build()
.unwrap();
Expand All @@ -187,7 +187,7 @@ fn get_receive_maximum_vacancy_for_send_server() {
.topic_name("topic/b")
.unwrap()
.qos(mqtt::packet::Qos::ExactlyOnce)
.packet_id(packet_id_b)
.packet_id(Some(packet_id_b))
.payload(b"payload B".to_vec())
.build()
.unwrap();
Expand All @@ -201,7 +201,7 @@ fn get_receive_maximum_vacancy_for_send_server() {
.topic_name("topic/c")
.unwrap()
.qos(mqtt::packet::Qos::ExactlyOnce)
.packet_id(packet_id_c)
.packet_id(Some(packet_id_c))
.payload(b"payload C".to_vec())
.build()
.unwrap();
Expand Down Expand Up @@ -286,7 +286,7 @@ fn receive_maximum_exceeded_send() {
.topic_name("topic/a")
.unwrap()
.qos(mqtt::packet::Qos::AtLeastOnce)
.packet_id(packet_id_a)
.packet_id(Some(packet_id_a))
.payload(b"payload A".to_vec())
.build()
.unwrap();
Expand All @@ -298,7 +298,7 @@ fn receive_maximum_exceeded_send() {
.topic_name("topic/b")
.unwrap()
.qos(mqtt::packet::Qos::AtLeastOnce)
.packet_id(packet_id_b)
.packet_id(Some(packet_id_b))
.payload(b"payload B".to_vec())
.build()
.unwrap();
Expand Down Expand Up @@ -361,7 +361,7 @@ fn receive_maximum_exceeded_recv() {
.topic_name("topic/a")
.unwrap()
.qos(mqtt::packet::Qos::AtLeastOnce)
.packet_id(packet_id_a)
.packet_id(Some(packet_id_a))
.payload(b"payload A".to_vec())
.build()
.unwrap();
Expand All @@ -374,7 +374,7 @@ fn receive_maximum_exceeded_recv() {
.topic_name("topic/b")
.unwrap()
.qos(mqtt::packet::Qos::AtLeastOnce)
.packet_id(packet_id_b)
.packet_id(Some(packet_id_b))
.payload(b"payload B".to_vec())
.build()
.unwrap();
Expand Down Expand Up @@ -451,7 +451,7 @@ fn receive_maximum_exceeded_recv_server() {
.topic_name("topic/a")
.unwrap()
.qos(mqtt::packet::Qos::AtLeastOnce)
.packet_id(packet_id_a)
.packet_id(Some(packet_id_a))
.payload(b"payload A".to_vec())
.build()
.unwrap();
Expand All @@ -464,7 +464,7 @@ fn receive_maximum_exceeded_recv_server() {
.topic_name("topic/b")
.unwrap()
.qos(mqtt::packet::Qos::AtLeastOnce)
.packet_id(packet_id_b)
.packet_id(Some(packet_id_b))
.payload(b"payload B".to_vec())
.build()
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion tests/connection-core-recv-success.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ fn client_recv_pubrel_success_v5_0() {
v5_0_client_establish_connection(&mut connection);

let packet = mqtt::packet::v5_0::Publish::builder()
.packet_id(1)
.packet_id(Some(1))
.topic_name("topic/a")
.unwrap()
.qos(mqtt::packet::Qos::ExactlyOnce)
Expand Down
Loading