Skip to content

Commit

Permalink
Events: Efficiency & Performance Improvements (#161)
Browse files Browse the repository at this point in the history
  • Loading branch information
pglombardo authored May 21, 2024
1 parent 1a7906e commit 684385d
Show file tree
Hide file tree
Showing 6 changed files with 465 additions and 315 deletions.
20 changes: 10 additions & 10 deletions Source/HiveMQtt/Client/HiveMQClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public async Task<ConnectResult> ConnectAsync()

// Construct the MQTT Connect packet and queue to send
var connPacket = new ConnectPacket(this.Options);
Logger.Trace($"Queuing packet for send: {connPacket}");
Logger.Trace($"Queuing packet for send: {connPacket.GetType().Name} id={connPacket.PacketIdentifier}");
this.SendQueue.Enqueue(connPacket);

ConnAckPacket connAck;
Expand Down Expand Up @@ -162,7 +162,7 @@ public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)
EventHandler<OnDisconnectSentEventArgs> eventHandler = TaskHandler;
this.OnDisconnectSent += eventHandler;

Logger.Trace($"Queuing packet for send: {disconnectPacket}");
Logger.Trace($"Queuing packet for send: {disconnectPacket.GetType().Name} id={disconnectPacket.PacketIdentifier}");
this.SendQueue.Enqueue(disconnectPacket);

try
Expand All @@ -187,7 +187,7 @@ public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)
/// <inheritdoc />
public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
{
if (this.IsConnected() == false)
if (!this.IsConnected())
{
throw new HiveMQttClientException("PublishAsync: Client is not connected. Check client.IsConnected() before calling PublishAsync.");
}
Expand All @@ -200,7 +200,7 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
// QoS 0: Fast Service
if (message.QoS == QualityOfService.AtMostOnceDelivery)
{
Logger.Trace($"Queuing packet for send: {publishPacket}");
Logger.Trace($"Queuing packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}");
this.OutgoingPublishQueue.Enqueue(publishPacket);
return new PublishResult(publishPacket.Message);
}
Expand All @@ -212,7 +212,7 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
EventHandler<OnPublishQoS1CompleteEventArgs> eventHandler = TaskHandler;
publishPacket.OnPublishQoS1Complete += eventHandler;

Logger.Trace($"Queuing packet for send: {publishPacket}");
Logger.Trace($"Queuing packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}");
this.OutgoingPublishQueue.Enqueue(publishPacket);

var pubAckPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
Expand All @@ -229,7 +229,7 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
EventHandler<OnPublishQoS2CompleteEventArgs> eventHandler = TaskHandler;
publishPacket.OnPublishQoS2Complete += eventHandler;

Logger.Trace($"Queuing packet for send: {publishPacket}");
Logger.Trace($"Queuing packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}");
this.OutgoingPublishQueue.Enqueue(publishPacket);

List<ControlPacket> packetList;
Expand All @@ -244,7 +244,7 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
Logger.Error("PublishAsync: QoS 2 timeout. No response received in time.");

// Remove the transaction chain
if (this.transactionQueue.Remove(publishPacket.PacketIdentifier, out var publishQoS2Chain))
if (this.TransactionQueue.Remove(publishPacket.PacketIdentifier, out var publishQoS2Chain))
{
Logger.Debug($"PublishAsync: QoS 2 timeout. Removing transaction chain for packet identifier {publishPacket.PacketIdentifier}.");
}
Expand Down Expand Up @@ -320,7 +320,7 @@ public async Task<SubscribeResult> SubscribeAsync(string topic, QualityOfService
/// <inheritdoc />
public async Task<SubscribeResult> SubscribeAsync(SubscribeOptions options)
{
if (this.IsConnected() == false)
if (!this.IsConnected())
{
throw new HiveMQttClientException("SubscribeAsync: Client is not connected. Check client.IsConnected() before calling SubscribeAsync.");
}
Expand All @@ -329,7 +329,7 @@ public async Task<SubscribeResult> SubscribeAsync(SubscribeOptions options)
this.BeforeSubscribeEventLauncher(options);

// FIXME: We should only ever have one subscribe in flight at any time (for now)
// Construct the MQTT Connect packet
// Construct the MQTT Subscribe packet
var packetIdentifier = this.GeneratePacketIdentifier();
var subscribePacket = new SubscribePacket(options, (ushort)packetIdentifier);

Expand Down Expand Up @@ -433,7 +433,7 @@ public async Task<UnsubscribeResult> UnsubscribeAsync(List<Subscription> subscri

public async Task<UnsubscribeResult> UnsubscribeAsync(UnsubscribeOptions unsubOptions)
{
if (this.IsConnected() == false)
if (!this.IsConnected())
{
throw new HiveMQttClientException("UnsubscribeAsync: Client is not connected. Check client.IsConnected() before calling UnsubscribeAsync.");
}
Expand Down
Loading

0 comments on commit 684385d

Please sign in to comment.