Skip to content

Commit

Permalink
Refactor Packet ID Generation (#179)
Browse files Browse the repository at this point in the history
  • Loading branch information
pglombardo authored Jun 27, 2024
1 parent 5894e4f commit 17f8cbf
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 49 deletions.
27 changes: 16 additions & 11 deletions Source/HiveMQtt/Client/HiveMQClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public async Task<ConnectResult> ConnectAsync()

// Construct the MQTT Connect packet and queue to send
var connPacket = new ConnectPacket(this.Options);
Logger.Trace($"Queuing packet for send: {connPacket.GetType().Name} id={connPacket.PacketIdentifier}");
Logger.Trace($"Queuing CONNECT packet for send.");
this.SendQueue.Enqueue(connPacket);

ConnAckPacket connAck;
Expand Down Expand Up @@ -174,7 +174,7 @@ public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)
EventHandler<OnDisconnectSentEventArgs> eventHandler = TaskHandler;
this.OnDisconnectSent += eventHandler;

Logger.Trace($"Queuing packet for send: {disconnectPacket.GetType().Name} id={disconnectPacket.PacketIdentifier}");
Logger.Trace($"Queuing DISCONNECT packet for send.");
this.SendQueue.Enqueue(disconnectPacket);

try
Expand All @@ -201,23 +201,25 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message, Cance
{
message.Validate();

var packetIdentifier = this.GeneratePacketIdentifier();
var publishPacket = new PublishPacket(message, (ushort)packetIdentifier);

// QoS 0: Fast Service
if (message.QoS == QualityOfService.AtMostOnceDelivery)
{
Logger.Trace($"Queuing packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}");
var publishPacket = new PublishPacket(message, 0);
Logger.Trace($"Queuing QoS 0 publish packet for send: {publishPacket.GetType().Name}");

this.OutgoingPublishQueue.Enqueue(publishPacket);
return new PublishResult(publishPacket.Message);
}
else if (message.QoS == QualityOfService.AtLeastOnceDelivery)
{
// QoS 1: Acknowledged Delivery
Logger.Trace($"Queuing packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}");
var packetIdentifier = await this.PacketIDManager.GetAvailablePacketIDAsync().ConfigureAwait(false);
var publishPacket = new PublishPacket(message, (ushort)packetIdentifier);
PubAckPacket pubAckPacket;

Logger.Trace($"Queuing QoS 1 publish packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}");
this.OutgoingPublishQueue.Enqueue(publishPacket);

PubAckPacket pubAckPacket;
try
{
// Wait on the QoS 1 handshake
Expand All @@ -236,8 +238,11 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message, Cance
else if (message.QoS == QualityOfService.ExactlyOnceDelivery)
{
// QoS 2: Assured Delivery
var packetIdentifier = await this.PacketIDManager.GetAvailablePacketIDAsync().ConfigureAwait(false);
var publishPacket = new PublishPacket(message, (ushort)packetIdentifier);
PublishResult? publishResult = null;
Logger.Trace($"Queuing packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}");

Logger.Trace($"Queuing QoS 2 publish packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}");
this.OutgoingPublishQueue.Enqueue(publishPacket);

List<ControlPacket> packetList;
Expand Down Expand Up @@ -314,7 +319,7 @@ public async Task<SubscribeResult> SubscribeAsync(SubscribeOptions options)

// FIXME: We should only ever have one subscribe in flight at any time (for now)
// Construct the MQTT Subscribe packet
var packetIdentifier = this.GeneratePacketIdentifier();
var packetIdentifier = await this.PacketIDManager.GetAvailablePacketIDAsync().ConfigureAwait(false);
var subscribePacket = new SubscribePacket(options, (ushort)packetIdentifier);

// Setup the task completion source to wait for the SUBACK
Expand Down Expand Up @@ -422,7 +427,7 @@ public async Task<UnsubscribeResult> UnsubscribeAsync(UnsubscribeOptions unsubOp
// Fire the corresponding event
this.BeforeUnsubscribeEventLauncher(unsubOptions.Subscriptions);

var packetIdentifier = this.GeneratePacketIdentifier();
var packetIdentifier = await this.PacketIDManager.GetAvailablePacketIDAsync().ConfigureAwait(false);
var unsubscribePacket = new UnsubscribePacket(unsubOptions, (ushort)packetIdentifier);

var taskCompletionSource = new TaskCompletionSource<UnsubAckPacket>();
Expand Down
Loading

0 comments on commit 17f8cbf

Please sign in to comment.