Skip to content

Commit

Permalink
Add Backpressure Support (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
pglombardo authored Jun 10, 2024
1 parent 580b99a commit af2333b
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 121 deletions.
62 changes: 43 additions & 19 deletions Source/HiveMQtt/Client/HiveMQClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public HiveMQClient(HiveMQClientOptions? options = null)

this.Options = options;
this.cancellationTokenSource = new CancellationTokenSource();
this.ClientReceiveSemaphore = new SemaphoreSlim(this.Options.ClientReceiveMaximum);

// Set protocol default until ConnAck is received
this.BrokerReceiveSemaphore = new SemaphoreSlim(65535);
}

/// <inheritdoc />
Expand Down Expand Up @@ -167,7 +171,9 @@ public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)

try
{
disconnectPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
disconnectPacket = await taskCompletionSource.Task
.WaitAsync(TimeSpan.FromMilliseconds(this.Options.ResponseTimeoutInMs))
.ConfigureAwait(false);
}
catch (TimeoutException)
{
Expand All @@ -179,9 +185,7 @@ public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)
this.OnDisconnectSent -= eventHandler;
}

await this.HandleDisconnectionAsync().ConfigureAwait(false);

return true;
return await this.HandleDisconnectionAsync().ConfigureAwait(false);
}

/// <inheritdoc />
Expand Down Expand Up @@ -210,8 +214,25 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
Logger.Trace($"Queuing packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}");
this.OutgoingPublishQueue.Enqueue(publishPacket);

// Wait on the QoS 1 handshake
var pubAckPacket = await publishPacket.OnPublishQoS1CompleteTCS.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
PubAckPacket pubAckPacket;
try
{
// Wait on the QoS 1 handshake
pubAckPacket = await publishPacket.OnPublishQoS1CompleteTCS.Task
.WaitAsync(TimeSpan.FromMilliseconds(this.Options.ResponseTimeoutInMs))
.ConfigureAwait(false);
}
catch (TimeoutException)
{
Logger.Error("PublishAsync: QoS 1 timeout. No PUBACK response received in time.");
var disconnectOptions = new DisconnectOptions
{
ReasonCode = DisconnectReasonCode.UnspecifiedError,
};
await this.DisconnectAsync(disconnectOptions).ConfigureAwait(false);
throw;
}

return new PublishResult(publishPacket.Message, pubAckPacket);
}
else if (message.QoS == QualityOfService.ExactlyOnceDelivery)
Expand All @@ -225,24 +246,20 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
try
{
// Wait on the QoS 2 handshake
packetList = await publishPacket.OnPublishQoS2CompleteTCS.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
packetList = await publishPacket.OnPublishQoS2CompleteTCS.Task
.WaitAsync(TimeSpan.FromMilliseconds(this.Options.ResponseTimeoutInMs))
.ConfigureAwait(false);
}
catch (TimeoutException)
{
Logger.Error("PublishAsync: QoS 2 timeout. No response received in time.");

// Remove the transaction chain
if (this.TransactionQueue.Remove(publishPacket.PacketIdentifier, out var publishQoS2Chain))
var disconnectOptions = new DisconnectOptions
{
Logger.Debug($"PublishAsync: QoS 2 timeout. Removing transaction chain for packet identifier {publishPacket.PacketIdentifier}.");
}

// Prepare PublishResult
publishResult = new PublishResult(publishPacket.Message)
{
QoS2ReasonCode = null,
ReasonCode = DisconnectReasonCode.UnspecifiedError,
};
return publishResult;
await this.DisconnectAsync(disconnectOptions).ConfigureAwait(false);
throw;
}

foreach (var packet in packetList)
Expand Down Expand Up @@ -331,7 +348,9 @@ public async Task<SubscribeResult> SubscribeAsync(SubscribeOptions options)
SubscribeResult subscribeResult;
try
{
subAck = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
subAck = await taskCompletionSource.Task
.WaitAsync(TimeSpan.FromMilliseconds(this.Options.ResponseTimeoutInMs))
.ConfigureAwait(false);
}
catch (TimeoutException)
{
Expand Down Expand Up @@ -441,7 +460,9 @@ public async Task<UnsubscribeResult> UnsubscribeAsync(UnsubscribeOptions unsubOp
UnsubscribeResult unsubscribeResult;
try
{
unsubAck = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
unsubAck = await taskCompletionSource.Task
.WaitAsync(TimeSpan.FromMilliseconds(this.Options.ResponseTimeoutInMs))
.ConfigureAwait(false);

// FIXME: Validate that the packet identifier matches
}
Expand Down Expand Up @@ -488,6 +509,9 @@ private async Task<bool> HandleDisconnectionAsync(bool clean = true)

// Cancel all background tasks and close the socket
this.ConnectState = ConnectState.Disconnected;

// Don't use CancelAsync here to maintain backwards compatibility
// with >=.net6.0. CancelAsync was introduced in .net8.0
this.cancellationTokenSource.Cancel();

Check warning on line 515 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

Cancel synchronously blocks. Await CancelAsync instead. (https://github.com/Microsoft/vs-threading/blob/main/doc/analyzers/VSTHRD103.md)

Check warning on line 515 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-6.0.x

Cancel synchronously blocks. Await CancelAsync instead. (https://github.com/Microsoft/vs-threading/blob/main/doc/analyzers/VSTHRD103.md)

Check warning on line 515 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x

Cancel synchronously blocks. Await CancelAsync instead. (https://github.com/Microsoft/vs-threading/blob/main/doc/analyzers/VSTHRD103.md)

Check warning on line 515 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

Cancel synchronously blocks. Await CancelAsync instead. (https://github.com/Microsoft/vs-threading/blob/main/doc/analyzers/VSTHRD103.md)

Check warning on line 515 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-6.0.x

Cancel synchronously blocks. Await CancelAsync instead. (https://github.com/Microsoft/vs-threading/blob/main/doc/analyzers/VSTHRD103.md)

Check warning on line 515 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x

Cancel synchronously blocks. Await CancelAsync instead. (https://github.com/Microsoft/vs-threading/blob/main/doc/analyzers/VSTHRD103.md)
this.CloseSocket();

Expand Down
Loading

0 comments on commit af2333b

Please sign in to comment.