From 5d66deb5e74b7a0d1b7315246c32f66401babbb0 Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Mon, 1 Jul 2024 15:50:33 +0200 Subject: [PATCH] Handle Retransmitted Publishes (#181) --- .../Client/HiveMQClientTrafficProcessor.cs | 28 ++++++++++++++++--- .../Client/internal/BoundedDictionaryX.cs | 10 +++++-- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs index f7120ced..ed76aa66 100644 --- a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs +++ b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs @@ -427,6 +427,14 @@ private Task ConnectionReaderAsync(CancellationToken cancellationToken) => if (publishPacket.Message.QoS is QualityOfService.ExactlyOnceDelivery || publishPacket.Message.QoS is QualityOfService.AtLeastOnceDelivery) { + if (publishPacket.Message.Duplicate) + { + // We've received a retransmitted publish packet. + // Remove any prior transaction chain and reprocess the packet. + Logger.Debug($"{this.Options.ClientId}-(R)- Received a retransmitted publish packet with id={publishPacket.PacketIdentifier}. Removing any prior transaction chain."); + _ = this.IPubTransactionQueue.Remove(publishPacket.PacketIdentifier, out _); + } + var success = await this.IPubTransactionQueue.AddAsync( publishPacket.PacketIdentifier, new List { publishPacket }).ConfigureAwait(false); @@ -643,7 +651,14 @@ internal async Task HandleIncomingPublishPacketAsync(PublishPacket publishPacket } else { - throw new HiveMQttClientException($"QoS1: Received Publish with an unknown packet identifier {publishPacket.PacketIdentifier}. Discarded."); + var opts = new DisconnectOptions + { + ReasonCode = DisconnectReasonCode.UnspecifiedError, + ReasonString = "Client internal error managing publish transaction chain.", + }; + await this.DisconnectAsync(opts).ConfigureAwait(false); + await this.PacketIDManager.MarkPacketIDAsAvailableAsync(publishPacket.PacketIdentifier).ConfigureAwait(false); + Logger.Error($"QoS1: Received Publish with an unknown packet identifier {publishPacket.PacketIdentifier}."); } } else if (publishPacket.Message.QoS is QualityOfService.ExactlyOnceDelivery) @@ -671,9 +686,14 @@ internal async Task HandleIncomingPublishPacketAsync(PublishPacket publishPacket } else { - // FIXME: This should never happen if ConnectionReaderAsync is working correctly - Logger.Error($"QoS2: Received Publish with an unknown packet identifier {publishPacket.PacketIdentifier}. Discarded."); - return; + var opts = new DisconnectOptions + { + ReasonCode = DisconnectReasonCode.UnspecifiedError, + ReasonString = "Client internal error managing publish transaction chain.", + }; + await this.DisconnectAsync(opts).ConfigureAwait(false); + await this.PacketIDManager.MarkPacketIDAsAvailableAsync(publishPacket.PacketIdentifier).ConfigureAwait(false); + Logger.Error($"QoS2: Received Publish with an unknown packet identifier {publishPacket.PacketIdentifier}."); } this.SendQueue.Enqueue(pubRecResponse); diff --git a/Source/HiveMQtt/Client/internal/BoundedDictionaryX.cs b/Source/HiveMQtt/Client/internal/BoundedDictionaryX.cs index 9fd4886d..42919a22 100644 --- a/Source/HiveMQtt/Client/internal/BoundedDictionaryX.cs +++ b/Source/HiveMQtt/Client/internal/BoundedDictionaryX.cs @@ -104,10 +104,11 @@ public bool Remove(TKey key, out TVal value) try { - if (this.dictionary.TryRemove(key, out value)) + if (this.dictionary.TryRemove(key, out var removedValue)) { // Item successfully removed, release the slot this.semaphore.Release(); + value = removedValue; return true; } else @@ -143,7 +144,12 @@ public bool Remove(TKey key, out TVal value) /// The key to get. /// The value retrieved. /// true if the item was retrieved; otherwise, false. - public bool TryGetValue(TKey key, out TVal value) => this.dictionary.TryGetValue(key, out value); + public bool TryGetValue(TKey key, out TVal value) + { + var result = this.dictionary.TryGetValue(key, out var retrievedValue); + value = retrievedValue is null ? default! : retrievedValue; + return result; + } /// /// Determines whether the dictionary contains the specified key.