Skip to content

Commit

Permalink
Handle Retransmitted Publishes (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
pglombardo authored Jul 1, 2024
1 parent 73cda0f commit 5d66deb
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 6 deletions.
28 changes: 24 additions & 4 deletions Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,14 @@ private Task<bool> ConnectionReaderAsync(CancellationToken cancellationToken) =>
if (publishPacket.Message.QoS is QualityOfService.ExactlyOnceDelivery ||
publishPacket.Message.QoS is QualityOfService.AtLeastOnceDelivery)
{
if (publishPacket.Message.Duplicate)
{
// We've received a retransmitted publish packet.
// Remove any prior transaction chain and reprocess the packet.
Logger.Debug($"{this.Options.ClientId}-(R)- Received a retransmitted publish packet with id={publishPacket.PacketIdentifier}. Removing any prior transaction chain.");
_ = this.IPubTransactionQueue.Remove(publishPacket.PacketIdentifier, out _);
}

var success = await this.IPubTransactionQueue.AddAsync(
publishPacket.PacketIdentifier,
new List<ControlPacket> { publishPacket }).ConfigureAwait(false);
Expand Down Expand Up @@ -643,7 +651,14 @@ internal async Task HandleIncomingPublishPacketAsync(PublishPacket publishPacket
}
else
{
throw new HiveMQttClientException($"QoS1: Received Publish with an unknown packet identifier {publishPacket.PacketIdentifier}. Discarded.");
var opts = new DisconnectOptions
{
ReasonCode = DisconnectReasonCode.UnspecifiedError,
ReasonString = "Client internal error managing publish transaction chain.",
};
await this.DisconnectAsync(opts).ConfigureAwait(false);
await this.PacketIDManager.MarkPacketIDAsAvailableAsync(publishPacket.PacketIdentifier).ConfigureAwait(false);
Logger.Error($"QoS1: Received Publish with an unknown packet identifier {publishPacket.PacketIdentifier}.");
}
}
else if (publishPacket.Message.QoS is QualityOfService.ExactlyOnceDelivery)
Expand Down Expand Up @@ -671,9 +686,14 @@ internal async Task HandleIncomingPublishPacketAsync(PublishPacket publishPacket
}
else
{
// FIXME: This should never happen if ConnectionReaderAsync is working correctly
Logger.Error($"QoS2: Received Publish with an unknown packet identifier {publishPacket.PacketIdentifier}. Discarded.");
return;
var opts = new DisconnectOptions
{
ReasonCode = DisconnectReasonCode.UnspecifiedError,
ReasonString = "Client internal error managing publish transaction chain.",
};
await this.DisconnectAsync(opts).ConfigureAwait(false);
await this.PacketIDManager.MarkPacketIDAsAvailableAsync(publishPacket.PacketIdentifier).ConfigureAwait(false);
Logger.Error($"QoS2: Received Publish with an unknown packet identifier {publishPacket.PacketIdentifier}.");
}

this.SendQueue.Enqueue(pubRecResponse);
Expand Down
10 changes: 8 additions & 2 deletions Source/HiveMQtt/Client/internal/BoundedDictionaryX.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,11 @@ public bool Remove(TKey key, out TVal value)

try
{
if (this.dictionary.TryRemove(key, out value))
if (this.dictionary.TryRemove(key, out var removedValue))
{
// Item successfully removed, release the slot
this.semaphore.Release();
value = removedValue;
return true;
}
else
Expand Down Expand Up @@ -143,7 +144,12 @@ public bool Remove(TKey key, out TVal value)
/// <param name="key">The key to get.</param>
/// <param name="value">The value retrieved.</param>
/// <returns><c>true</c> if the item was retrieved; otherwise, <c>false</c>.</returns>
public bool TryGetValue(TKey key, out TVal value) => this.dictionary.TryGetValue(key, out value);
public bool TryGetValue(TKey key, out TVal value)
{
var result = this.dictionary.TryGetValue(key, out var retrievedValue);
value = retrievedValue is null ? default! : retrievedValue;
return result;
}

/// <summary>
/// Determines whether the dictionary contains the specified key.
Expand Down

0 comments on commit 5d66deb

Please sign in to comment.