diff --git a/Source/HiveMQtt/Client/HiveMQClient.cs b/Source/HiveMQtt/Client/HiveMQClient.cs index 8b845d2c..c2ec4fd2 100644 --- a/Source/HiveMQtt/Client/HiveMQClient.cs +++ b/Source/HiveMQtt/Client/HiveMQClient.cs @@ -197,13 +197,8 @@ public async Task DisconnectAsync(DisconnectOptions? options = null) } /// - public async Task PublishAsync(MQTT5PublishMessage message) + public async Task PublishAsync(MQTT5PublishMessage message, CancellationToken cancellationToken = default) { - if (!this.IsConnected()) - { - throw new HiveMQttClientException("PublishAsync: Client is not connected. Check client.IsConnected() before calling PublishAsync."); - } - message.Validate(); var packetIdentifier = this.GeneratePacketIdentifier(); @@ -227,17 +222,12 @@ public async Task PublishAsync(MQTT5PublishMessage message) { // Wait on the QoS 1 handshake pubAckPacket = await publishPacket.OnPublishQoS1CompleteTCS.Task - .WaitAsync(TimeSpan.FromMilliseconds(this.Options.ResponseTimeoutInMs)) + .WaitAsync(cancellationToken) .ConfigureAwait(false); } - catch (TimeoutException) + catch (OperationCanceledException) { - Logger.Error("PublishAsync: QoS 1 timeout. No PUBACK response received in time."); - var disconnectOptions = new DisconnectOptions - { - ReasonCode = DisconnectReasonCode.UnspecifiedError, - }; - await this.DisconnectAsync(disconnectOptions).ConfigureAwait(false); + Logger.Debug("PublishAsync: Operation cancelled by user."); throw; } @@ -255,18 +245,12 @@ public async Task PublishAsync(MQTT5PublishMessage message) { // Wait on the QoS 2 handshake packetList = await publishPacket.OnPublishQoS2CompleteTCS.Task - .WaitAsync(TimeSpan.FromMilliseconds(this.Options.ResponseTimeoutInMs)) + .WaitAsync(cancellationToken) .ConfigureAwait(false); } - catch (TimeoutException) + catch (OperationCanceledException) { - Logger.Error("PublishAsync: QoS 2 timeout. No response received in time."); - - var disconnectOptions = new DisconnectOptions - { - ReasonCode = DisconnectReasonCode.UnspecifiedError, - }; - await this.DisconnectAsync(disconnectOptions).ConfigureAwait(false); + Logger.Debug("PublishAsync: Operation cancelled by user."); throw; } @@ -278,11 +262,6 @@ public async Task PublishAsync(MQTT5PublishMessage message) } } - if (publishResult is null) - { - throw new HiveMQttClientException("PublishAsync: QoS 2 complete but no PubRec packet received."); - } - return publishResult; } @@ -330,11 +309,6 @@ public async Task SubscribeAsync(string topic, QualityOfService /// public async Task SubscribeAsync(SubscribeOptions options) { - if (!this.IsConnected()) - { - throw new HiveMQttClientException("SubscribeAsync: Client is not connected. Check client.IsConnected() before calling SubscribeAsync."); - } - // Fire the corresponding event this.BeforeSubscribeEventLauncher(options); @@ -445,11 +419,6 @@ public async Task UnsubscribeAsync(List subscri public async Task UnsubscribeAsync(UnsubscribeOptions unsubOptions) { - if (!this.IsConnected()) - { - throw new HiveMQttClientException("UnsubscribeAsync: Client is not connected. Check client.IsConnected() before calling UnsubscribeAsync."); - } - // Fire the corresponding event this.BeforeUnsubscribeEventLauncher(unsubOptions.Subscriptions); @@ -513,6 +482,12 @@ public async Task UnsubscribeAsync(UnsubscribeOptions unsubOp /// Indicates whether the disconnect was intended or not. private async Task HandleDisconnectionAsync(bool clean = true) { + if (this.ConnectState == ConnectState.Disconnected) + { + Logger.Trace("HandleDisconnection: Already disconnected."); + return false; + } + Logger.Debug($"HandleDisconnection: Handling disconnection. clean={clean}."); // Cancel all background tasks and close the socket @@ -537,6 +512,9 @@ private async Task HandleDisconnectionAsync(bool clean = true) this.OutgoingPublishQueue.Clear(); } + // Delay for 1 seconds before launching the AfterDisconnect event + await Task.Delay(1000).ConfigureAwait(false); + // Fire the corresponding after event this.AfterDisconnectEventLauncher(clean); return true; diff --git a/Source/HiveMQtt/Client/HiveMQClientSocket.cs b/Source/HiveMQtt/Client/HiveMQClientSocket.cs index 8c3ddd66..e5344c7e 100644 --- a/Source/HiveMQtt/Client/HiveMQClientSocket.cs +++ b/Source/HiveMQtt/Client/HiveMQClientSocket.cs @@ -213,7 +213,7 @@ private async Task CreateTLSConnectionAsync(Stream stream) } else { - tlsOptions.RemoteCertificateValidationCallback = HiveMQClient.ValidateServerCertificate; + tlsOptions.RemoteCertificateValidationCallback = ValidateServerCertificate; } try diff --git a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs index d38c1a16..1f7b5440 100644 --- a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs +++ b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs @@ -84,8 +84,8 @@ private Task ConnectionMonitorAsync(CancellationToken cancellationToken) => Task { if (cancellationToken.IsCancellationRequested) { - Logger.Trace($"{this.Options.ClientId}-(CM)- Cancelled"); - break; + Logger.Trace($"{this.Options.ClientId}-(CM)- Canceled & exiting..."); + return; } // If connected and no recent traffic, send a ping @@ -100,7 +100,7 @@ private Task ConnectionMonitorAsync(CancellationToken cancellationToken) => Task } // Dumping Client State - Logger.Debug($"{this.Options.ClientId}-(CM)- {this.ConnectState} lastCommunicationTimer:{this.lastCommunicationTimer.Elapsed}"); + Logger.Debug($"{this.Options.ClientId}-(CM)- {this.ConnectState}: last communications {this.lastCommunicationTimer.Elapsed} ago"); Logger.Debug($"{this.Options.ClientId}-(CM)- SendQueue:...............{this.SendQueue.Count}"); Logger.Debug($"{this.Options.ClientId}-(CM)- ReceivedQueue:...........{this.ReceivedQueue.Count}"); Logger.Debug($"{this.Options.ClientId}-(CM)- OutgoingPublishQueue:....{this.OutgoingPublishQueue.Count}"); @@ -117,8 +117,8 @@ private Task ConnectionMonitorAsync(CancellationToken cancellationToken) => Task } catch (TaskCanceledException) { - Logger.Debug($"{this.Options.ClientId}-(CM)- Cancelled"); - break; + Logger.Debug($"{this.Options.ClientId}-(CM)- Canceled & exiting...."); + return; } catch (Exception ex) { @@ -149,8 +149,8 @@ private Task ConnectionPublishWriterAsync(CancellationToken cancellationToken) = { if (cancellationToken.IsCancellationRequested) { - Logger.Trace($"{this.Options.ClientId}-(PW)- Cancelled with {this.OutgoingPublishQueue.Count} publish packets remaining."); - break; + Logger.Trace($"{this.Options.ClientId}-(PW)- Cancelled & existing with {this.OutgoingPublishQueue.Count} publish packets remaining."); + return; } while (this.ConnectState != ConnectState.Connected) @@ -186,20 +186,29 @@ private Task ConnectionPublishWriterAsync(CancellationToken cancellationToken) = if (writeResult.IsCanceled) { - Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter Write Cancelled"); - break; + Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter Write Cancelled (exiting)..."); + return; } if (writeResult.IsCompleted) { Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter IsCompleted: end of the stream"); + + if (this.ConnectState == ConnectState.Connected) + { + // This is an unexpected exit and may be due to a network failure. + Logger.Debug($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter IsCompleted: this was unexpected"); + await this.HandleDisconnectionAsync(false).ConfigureAwait(false); + } + + Logger.Info($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter Exiting...{this.ConnectState}"); break; } } catch (TaskCanceledException) { - Logger.Debug($"{this.Options.ClientId}-(PW)- Cancelled"); - break; + Logger.Debug($"{this.Options.ClientId}-(PW)- Cancelled & exiting..."); + return; } catch (Exception ex) { @@ -230,8 +239,8 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => Task. { if (cancellationToken.IsCancellationRequested) { - Logger.Trace($"{this.Options.ClientId}-(W)- Cancelled with {this.SendQueue.Count} packets remaining."); - break; + Logger.Trace($"{this.Options.ClientId}-(W)- Cancelled & exiting with {this.SendQueue.Count} packets remaining."); + return; } // We allow this task to run in Connecting, Connected, and Disconnecting states @@ -305,20 +314,28 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => Task. if (writeResult.IsCanceled) { - Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter Write Cancelled"); - break; + Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter Write Cancelled. Exiting..."); + return; } if (writeResult.IsCompleted) { - Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter IsCompleted: end of the stream"); - break; + Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter IsCompleted: end of the stream. Exiting..."); + if (this.ConnectState == ConnectState.Connected) + { + // This is an unexpected exit and may be due to a network failure. + Logger.Debug($"{this.Options.ClientId}-(W)- ConnectionWriter IsCompleted: this was unexpected"); + await this.HandleDisconnectionAsync(false).ConfigureAwait(false); + } + + Logger.Info($"{this.Options.ClientId}-(W)- ConnectionWriter Exiting...{this.ConnectState}"); + return; } } catch (TaskCanceledException) { - Logger.Debug($"{this.Options.ClientId}-(W)- Cancelled"); - break; + Logger.Debug($"{this.Options.ClientId}-(W)- Cancelled & exiting..."); + return; } catch (Exception ex) { @@ -352,16 +369,16 @@ private Task ConnectionReaderAsync(CancellationToken cancellationToken) => { if (cancellationToken.IsCancellationRequested) { - Logger.Trace($"{this.Options.ClientId}-(R)- Cancelled"); - break; + Logger.Trace($"{this.Options.ClientId}-(R)- Cancelled & exiting..."); + return true; } readResult = await this.ReadAsync().ConfigureAwait(false); if (readResult.IsCanceled) { - Logger.Trace($"{this.Options.ClientId}-(R)- Cancelled read result."); - break; + Logger.Trace($"{this.Options.ClientId}-(R)- Cancelled read result. Exiting..."); + return true; } if (readResult.IsCompleted) @@ -370,11 +387,11 @@ private Task ConnectionReaderAsync(CancellationToken cancellationToken) => if (this.ConnectState == ConnectState.Connected) { // This is an unexpected exit and may be due to a network failure. - Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader IsCompleted: this was unexpected"); + Logger.Debug($"{this.Options.ClientId}-(R)- ConnectionReader IsCompleted: this was unexpected"); await this.HandleDisconnectionAsync(false).ConfigureAwait(false); } - Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Exiting...{this.ConnectState}"); + Logger.Info($"{this.Options.ClientId}-(R)- ConnectionReader Exiting...{this.ConnectState}"); return true; } @@ -412,8 +429,6 @@ private Task ConnectionReaderAsync(CancellationToken cancellationToken) => // We handle disconnects immediately if (decodedPacket is DisconnectPacket disconnectPacket) { - // FIXME: If we received disconnect another client with the same id connected else where, should we - // Call the OnDisconnectReceivedEventLauncher? Logger.Error($"--> Disconnect received <--: {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}"); await this.HandleDisconnectionAsync(false).ConfigureAwait(false); this.OnDisconnectReceivedEventLauncher(disconnectPacket); @@ -445,14 +460,11 @@ private Task ConnectionReaderAsync(CancellationToken cancellationToken) => // Add the packet to the received queue for processing later by ReceivedPacketsHandlerAsync this.ReceivedQueue.Enqueue(decodedPacket); } // while (buffer.Length > 0 - - // FIXME - await Task.Yield(); } catch (TaskCanceledException) { - Logger.Debug($"{this.Options.ClientId}-(R)- Cancelled"); - break; + Logger.Debug($"{this.Options.ClientId}-(R)- Cancelled & exiting..."); + return true; } catch (Exception ex) { @@ -485,8 +497,8 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationToken) => { if (cancellationToken.IsCancellationRequested) { - Logger.Trace($"{this.Options.ClientId}-(RPH)- Cancelled with {this.ReceivedQueue.Count} received packets remaining."); - break; + Logger.Trace($"{this.Options.ClientId}-(RPH)- Cancelled with {this.ReceivedQueue.Count} received packets remaining. Exiting..."); + return; } var packet = await this.ReceivedQueue.DequeueAsync(cancellationToken).ConfigureAwait(false); @@ -563,8 +575,8 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationToken) => } catch (TaskCanceledException) { - Logger.Debug($"{this.Options.ClientId}-(RPH)- Cancelled"); - break; + Logger.Debug($"{this.Options.ClientId}-(RPH)- Cancelled & exiting..."); + return; } catch (Exception ex) { @@ -590,6 +602,7 @@ internal async void HandleIncomingPublishPacket(PublishPacket publishPacket) { Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Publish id={publishPacket.PacketIdentifier}"); this.OnPublishReceivedEventLauncher(publishPacket); + bool success; if (publishPacket.Message.QoS is QualityOfService.AtMostOnceDelivery) { @@ -597,18 +610,40 @@ internal async void HandleIncomingPublishPacket(PublishPacket publishPacket) } else if (publishPacket.Message.QoS is QualityOfService.AtLeastOnceDelivery) { - // We've received a QoS 1 publish. Send a PubAck and notify subscribers. + // We've received a QoS 1 publish. The transaction chain was created & added + // by ConnectionReaderAsync to enforce the client's ReceiveMaximum + // Send a PubAck and update the chain. Once the PubAckPacket is sent, + // the transaction chain will be deleted and the appropriate events will be + // launched in HandleSentPubAckPacket. var pubAckResponse = new PubAckPacket(publishPacket.PacketIdentifier, PubAckReasonCode.Success); + success = this.IPubTransactionQueue.TryGetValue(publishPacket.PacketIdentifier, out var publishQoS1Chain); + publishQoS1Chain.Add(pubAckResponse); + + if (success) + { + // Update the chain in the queue + if (!this.IPubTransactionQueue.TryUpdate(publishPacket.PacketIdentifier, publishQoS1Chain, publishQoS1Chain)) + { + Logger.Error($"QoS1: Couldn't update Publish --> PubAck QoS1 Chain for packet identifier {publishPacket.PacketIdentifier}. Discarded."); + this.IPubTransactionQueue.Remove(publishPacket.PacketIdentifier, out _); + } + } + else + { + // FIXME: This should never happen if ConnectionReaderAsync is working correctly + Logger.Error($"QoS1: Received Publish with an unknown packet identifier {publishPacket.PacketIdentifier}. Discarded."); + return; + } + this.SendQueue.Enqueue(pubAckResponse); - this.OnMessageReceivedEventLauncher(publishPacket); } else if (publishPacket.Message.QoS is QualityOfService.ExactlyOnceDelivery) { - - // We've received a QoS 2 publish. Send a PubRec and add to QoS2 transaction register. - // When we get the PubRel, we'll notify subscribers and send the PubComp in HandleIncomingPubRelPacket - bool success; + // We've received a QoS 2 publish. The transaction chain was created & added + // by ConnectionReaderAsync to enforce the client's ReceiveMaximum. + // Send a PubRec and add to QoS2 transaction register. Once PubComp is sent, + // Subscribers will be notified and the transaction chain will be deleted. var pubRecResponse = new PubRecPacket(publishPacket.PacketIdentifier, PubRecReasonCode.Success); // Get the QoS2 transaction chain for this packet identifier and add the PubRec to it @@ -626,6 +661,7 @@ internal async void HandleIncomingPublishPacket(PublishPacket publishPacket) } else { + // FIXME: This should never happen if ConnectionReaderAsync is working correctly Logger.Error($"QoS2: Received Publish with an unknown packet identifier {publishPacket.PacketIdentifier}. Discarded."); return; } @@ -660,30 +696,6 @@ internal void HandleIncomingPubAckPacket(PubAckPacket pubAckPacket) } } - /// - /// Handle an incoming PubComp packet. - /// - /// The received PubComp packet. - internal void HandleSentPubAckPacket(PubAckPacket pubAckPacket) - { - // Remove the transaction chain from the transaction queue - var success = this.IPubTransactionQueue.Remove(pubAckPacket.PacketIdentifier, out var publishQoS1Chain); - - if (success) - { - var publishPacket = (PublishPacket)publishQoS1Chain.First(); - - // Trigger the packet specific event - publishPacket.OnPublishQoS1CompleteEventLauncher(pubAckPacket); - } - else - { - Logger.Warn($"QoS1: Couldn't remove PubAck --> Publish QoS1 Chain for packet identifier {pubAckPacket.PacketIdentifier}."); - } - - this.OnPubAckSentEventLauncher(pubAckPacket); - } - /// /// Handle an incoming PubRec packet. /// @@ -767,7 +779,36 @@ internal void HandleIncomingPubRelPacket(PubRelPacket pubRelPacket) } this.SendQueue.Enqueue(pubCompResponsePacket); + } + /// + /// Handle an incoming PubComp packet. + /// + /// The received PubComp packet. + internal void HandleSentPubAckPacket(PubAckPacket pubAckPacket) + { + // Remove the transaction chain from the transaction queue + var success = this.IPubTransactionQueue.Remove(pubAckPacket.PacketIdentifier, out var publishQoS1Chain); + PublishPacket publishPacket; + + if (success) + { + publishPacket = (PublishPacket)publishQoS1Chain.First(); + + // Trigger the packet specific event + publishPacket.OnPublishQoS1CompleteEventLauncher(pubAckPacket); + + // The Application Message Event + this.OnMessageReceivedEventLauncher(publishPacket); + } + else + { + // FIXME: Send an appropriate disconnect packet? + Logger.Warn($"QoS1: Couldn't remove PubAck --> Publish QoS1 Chain for packet identifier {pubAckPacket.PacketIdentifier}."); + } + + // The Packet Event + this.OnPubAckSentEventLauncher(pubAckPacket); } /// diff --git a/Source/HiveMQtt/Client/HiveMQClientUtil.cs b/Source/HiveMQtt/Client/HiveMQClientUtil.cs index 149ca7e2..6adbd2f3 100644 --- a/Source/HiveMQtt/Client/HiveMQClientUtil.cs +++ b/Source/HiveMQtt/Client/HiveMQClientUtil.cs @@ -87,7 +87,7 @@ public static bool MatchTopic(string pattern, string candidate) if (pattern == "#") { // A subscription to “#” will not receive any messages published to a topic beginning with a $ - if (candidate.StartsWith("$", System.StringComparison.CurrentCulture)) + if (candidate.StartsWith("$", StringComparison.CurrentCulture)) { return false; } @@ -100,8 +100,8 @@ public static bool MatchTopic(string pattern, string candidate) if (pattern == "+") { // A subscription to “+” will not receive any messages published to a topic beginning with a $ or / - if (candidate.StartsWith("$", System.StringComparison.CurrentCulture) || - candidate.StartsWith("/", System.StringComparison.CurrentCulture)) + if (candidate.StartsWith("$", StringComparison.CurrentCulture) || + candidate.StartsWith("/", StringComparison.CurrentCulture)) { return false; } @@ -126,7 +126,7 @@ public static bool MatchTopic(string pattern, string candidate) var regexp = "\\A" + Regex.Escape(pattern).Replace(@"\+", @"?[/][^/]*") + "\\z"; regexp = regexp.Replace(@"/\#", @"(/?|.+)"); - regexp = regexp.EndsWith("\\z", System.StringComparison.CurrentCulture) ? regexp : regexp + "\\z"; + regexp = regexp.EndsWith("\\z", StringComparison.CurrentCulture) ? regexp : regexp + "\\z"; return Regex.IsMatch(candidate, regexp); } @@ -168,7 +168,7 @@ from executing a second time. /// or indirectly by a user's code. Managed and unmanaged resources /// can be disposed. /// If disposing equals false, the method has been called by the - /// runtime from inside the finalizer and you should not reference + /// runtime from inside finalize and you should not reference /// other objects. Only unmanaged resources can be disposed. /// /// True if called from user code. diff --git a/Source/HiveMQtt/Client/IHiveMQClient.cs b/Source/HiveMQtt/Client/IHiveMQClient.cs index bb574697..274e49d6 100644 --- a/Source/HiveMQtt/Client/IHiveMQClient.cs +++ b/Source/HiveMQtt/Client/IHiveMQClient.cs @@ -71,13 +71,14 @@ public interface IHiveMQClient : IDisposable /// Publish a message to an MQTT topic. /// /// The for the Publish. + /// A to cancel the operation. /// A representing the result of the publish operation. - Task PublishAsync(MQTT5PublishMessage message); + Task PublishAsync(MQTT5PublishMessage message, CancellationToken cancellationToken = default); /// /// Publish a message to an MQTT topic. /// - /// This is a convenience method that routes to . + /// This is a convenience method that routes to . /// /// /// The string topic to publish to. @@ -89,7 +90,7 @@ public interface IHiveMQClient : IDisposable /// /// Publish a message to an MQTT topic. /// - /// This is a convenience method that routes to . + /// This is a convenience method that routes to . /// /// /// The string topic to publish to. diff --git a/Source/HiveMQtt/Client/internal/BoundedDictionaryX.cs b/Source/HiveMQtt/Client/internal/BoundedDictionaryX.cs index 9598f585..2b285002 100644 --- a/Source/HiveMQtt/Client/internal/BoundedDictionaryX.cs +++ b/Source/HiveMQtt/Client/internal/BoundedDictionaryX.cs @@ -42,7 +42,7 @@ public BoundedDictionaryX(int capacity) } /// - /// Attempts to add an item to the dictionary. + /// Attempts to add an item to the dictionary. If there is not slot available, this method will asynchronously wait for an available slot. /// /// The key to add. /// The value to add.