diff --git a/src/Directory.Build.props b/src/Directory.Build.props index 20ea4356..c543d010 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -31,6 +31,7 @@ + diff --git a/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisTransport.cs b/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisTransport.cs index c87709c0..0755c216 100644 --- a/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisTransport.cs +++ b/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisTransport.cs @@ -37,7 +37,7 @@ public AmazonKinesisTransport(IServiceScopeFactory serviceScopeFactory, /// public override async Task StartAsync(CancellationToken cancellationToken) { - await base.StartAsync(cancellationToken); + await base.StartAsync(cancellationToken).ConfigureAwait(false); // if there are consumers for this transport, throw exception var registrations = GetRegistrations(); @@ -51,7 +51,7 @@ public override async Task StartAsync(CancellationToken cancellationToken) /// public override async Task StopAsync(CancellationToken cancellationToken) { - await base.StopAsync(cancellationToken); + await base.StopAsync(cancellationToken).ConfigureAwait(false); // if there are consumers for this transport, throw exception var registrations = GetRegistrations(); @@ -78,7 +78,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) var body = await SerializeAsync(scope: scope, @event: @event, registration: registration, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); // prepare the record var streamName = registration.EventName!; @@ -91,7 +91,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) // send the event Logger.SendingToStream(eventBusId: @event.Id, streamName: streamName, scheduled: scheduled); - var response = await kinesisClient.PutRecordAsync(request, cancellationToken); + var response = await kinesisClient.PutRecordAsync(request, cancellationToken).ConfigureAwait(false); response.EnsureSuccess(); // return the sequence number @@ -119,7 +119,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) var body = await SerializeAsync(scope: scope, @event: @event, registration: registration, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); var record = new PutRecordsRequestEntry { @@ -139,7 +139,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) // send the events Logger.SendingEventsToStream(events, streamName, scheduled); - var response = await kinesisClient.PutRecordsAsync(request, cancellationToken); + var response = await kinesisClient.PutRecordsAsync(request, cancellationToken).ConfigureAwait(false); response.EnsureSuccess(); // Should we check for failed records and throw exception? diff --git a/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs b/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs index 45d3634b..d3370083 100644 --- a/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs +++ b/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs @@ -53,7 +53,7 @@ public AmazonSqsTransport(IServiceScopeFactory serviceScopeFactory, /// public override async Task StartAsync(CancellationToken cancellationToken) { - await base.StartAsync(cancellationToken); + await base.StartAsync(cancellationToken).ConfigureAwait(false); var registrations = GetRegistrations(); foreach (var reg in registrations) @@ -63,7 +63,7 @@ public override async Task StartAsync(CancellationToken cancellationToken) var queueUrl = await GetQueueUrlAsync(reg: reg, ecr: ecr, deadletter: false, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); var t = ReceiveAsync(reg: reg, ecr: ecr, queueUrl: queueUrl, @@ -76,7 +76,7 @@ public override async Task StartAsync(CancellationToken cancellationToken) /// public override async Task StopAsync(CancellationToken cancellationToken) { - await base.StopAsync(cancellationToken); + await base.StopAsync(cancellationToken).ConfigureAwait(false); // Stop called without start or there was no consumers registered if (receiverTasks.Count == 0) return; @@ -90,7 +90,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) { // Wait until the tasks complete or the stop token triggers var tasks = receiverTasks.Concat(new[] { Task.Delay(Timeout.Infinite, cancellationToken), }); - await Task.WhenAny(tasks); + await Task.WhenAny(tasks).ConfigureAwait(false); } } @@ -104,7 +104,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) var body = await SerializeAsync(scope: scope, @event: @event, registration: registration, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); string sequenceNumber; if (registration.EntityKind != EntityKind.Broadcast) @@ -116,17 +116,17 @@ public override async Task StopAsync(CancellationToken cancellationToken) } // get the topic arn and send the message - var topicArn = await GetTopicArnAsync(registration, cancellationToken); + var topicArn = await GetTopicArnAsync(registration, cancellationToken).ConfigureAwait(false); var request = new PublishRequest(topicArn: topicArn, message: body.ToString()).SetAttributes(@event); Logger.SendingToTopic(eventBusId: @event.Id, topicArn: topicArn, scheduled: scheduled); - var response = await snsClient.PublishAsync(request: request, cancellationToken: cancellationToken); + var response = await snsClient.PublishAsync(request: request, cancellationToken: cancellationToken).ConfigureAwait(false); response.EnsureSuccess(); sequenceNumber = response.SequenceNumber; } else { // get the queueUrl and prepare the message - var queueUrl = await GetQueueUrlAsync(registration, cancellationToken: cancellationToken); + var queueUrl = await GetQueueUrlAsync(registration, cancellationToken: cancellationToken).ConfigureAwait(false); var request = new SendMessageRequest(queueUrl: queueUrl, body.ToString()).SetAttributes(@event); // if scheduled for later, set the delay in the message @@ -149,7 +149,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) // send the message Logger.SendingToQueue(eventBusId: @event.Id, queueUrl: queueUrl, scheduled: scheduled); - var response = await sqsClient.SendMessageAsync(request: request, cancellationToken: cancellationToken); + var response = await sqsClient.SendMessageAsync(request: request, cancellationToken: cancellationToken).ConfigureAwait(false); response.EnsureSuccess(); sequenceNumber = response.SequenceNumber; } @@ -185,13 +185,13 @@ public override async Task StopAsync(CancellationToken cancellationToken) var body = await SerializeAsync(scope: scope, @event: @event, registration: registration, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); // get the topic arn and send the message - var topicArn = await GetTopicArnAsync(registration, cancellationToken); + var topicArn = await GetTopicArnAsync(registration, cancellationToken).ConfigureAwait(false); var request = new PublishRequest(topicArn: topicArn, message: body.ToString()).SetAttributes(@event); Logger.SendingToTopic(eventBusId: @event.Id, topicArn: topicArn, scheduled: scheduled); - var response = await snsClient.PublishAsync(request: request, cancellationToken: cancellationToken); + var response = await snsClient.PublishAsync(request: request, cancellationToken: cancellationToken).ConfigureAwait(false); response.EnsureSuccess(); // collect the sequence number @@ -207,7 +207,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) var body = await SerializeAsync(scope: scope, @event: @event, registration: registration, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); var entry = new SendMessageBatchRequestEntry(id: @event.Id, messageBody: body.ToString()).SetAttributes(@event); @@ -233,9 +233,9 @@ public override async Task StopAsync(CancellationToken cancellationToken) } // get the queueUrl and send the messages - var queueUrl = await GetQueueUrlAsync(registration, cancellationToken: cancellationToken); + var queueUrl = await GetQueueUrlAsync(registration, cancellationToken: cancellationToken).ConfigureAwait(false); var request = new SendMessageBatchRequest(queueUrl: queueUrl, entries: entries); - var response = await sqsClient.SendMessageBatchAsync(request: request, cancellationToken: cancellationToken); + var response = await sqsClient.SendMessageBatchAsync(request: request, cancellationToken: cancellationToken).ConfigureAwait(false); response.EnsureSuccess(); sequenceNumbers = response.Successful.Select(smbre => smbre.SequenceNumber).ToList(); } @@ -263,7 +263,7 @@ protected override Task CancelCoreAsync(IList ids, private async Task GetTopicArnAsync(EventRegistration reg, CancellationToken cancellationToken) { - await topicArnsCacheLock.WaitAsync(cancellationToken); + await topicArnsCacheLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { @@ -271,7 +271,7 @@ private async Task GetTopicArnAsync(EventRegistration reg, CancellationT { // ensure topic is created, then add it's arn to the cache var name = reg.EventName!; - topicArn = await CreateTopicIfNotExistsAsync(topicName: name, reg: reg, cancellationToken: cancellationToken); + topicArn = await CreateTopicIfNotExistsAsync(topicName: name, reg: reg, cancellationToken: cancellationToken).ConfigureAwait(false); topicArnsCache[reg.EventType] = topicArn; } @@ -285,7 +285,7 @@ private async Task GetTopicArnAsync(EventRegistration reg, CancellationT private async Task GetQueueUrlAsync(EventRegistration reg, EventConsumerRegistration? ecr = null, bool deadletter = false, CancellationToken cancellationToken = default) { - await queueUrlsCacheLock.WaitAsync(cancellationToken); + await queueUrlsCacheLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { @@ -293,17 +293,17 @@ private async Task GetQueueUrlAsync(EventRegistration reg, EventConsumer if (!queueUrlsCache.TryGetValue(key, out var queueUrl)) { // ensure queue is created before creating subscription - queueUrl = await CreateQueueIfNotExistsAsync(queueName: key.Name, reg: reg, ecr: ecr, cancellationToken: cancellationToken); + queueUrl = await CreateQueueIfNotExistsAsync(queueName: key.Name, reg: reg, ecr: ecr, cancellationToken: cancellationToken).ConfigureAwait(false); // for non dead-letter broadcast types, we need to ensure the topic exists and the queue is subscribed to it if (!deadletter && reg.EntityKind == EntityKind.Broadcast) { // ensure topic is created before creating the subscription var topicName = reg.EventName!; - var topicArn = await CreateTopicIfNotExistsAsync(topicName: topicName, reg: reg, cancellationToken: cancellationToken); + var topicArn = await CreateTopicIfNotExistsAsync(topicName: topicName, reg: reg, cancellationToken: cancellationToken).ConfigureAwait(false); // create subscription from the topic to the queue - await snsClient.SubscribeQueueAsync(topicArn: topicArn, sqsClient, queueUrl); + await snsClient.SubscribeQueueAsync(topicArn: topicArn, sqsClient, queueUrl).ConfigureAwait(false); } queueUrlsCache[key] = queueUrl; @@ -344,7 +344,7 @@ private QueueCacheKey CreateQueueCacheKey(EventRegistration reg, EventConsumerRe private async Task CreateTopicIfNotExistsAsync(string topicName, EventRegistration reg, CancellationToken cancellationToken) { // check if the topic exists - var topic = await snsClient.FindTopicAsync(topicName: topicName); + var topic = await snsClient.FindTopicAsync(topicName: topicName).ConfigureAwait(false); if (topic != null) return topic.TopicArn; // if entity creation is not enabled, throw exception @@ -356,7 +356,7 @@ private async Task CreateTopicIfNotExistsAsync(string topicName, EventRe // create the topic var request = new CreateTopicRequest(name: topicName); TransportOptions.SetupCreateTopicRequest?.Invoke(reg, request); - var response = await snsClient.CreateTopicAsync(request: request, cancellationToken: cancellationToken); + var response = await snsClient.CreateTopicAsync(request: request, cancellationToken: cancellationToken).ConfigureAwait(false); response.EnsureSuccess(); return response.TopicArn; @@ -365,7 +365,7 @@ private async Task CreateTopicIfNotExistsAsync(string topicName, EventRe private async Task CreateQueueIfNotExistsAsync(string queueName, EventRegistration reg, EventConsumerRegistration? ecr, CancellationToken cancellationToken) { // check if the queue exists - var urlResponse = await sqsClient.GetQueueUrlAsync(queueName: queueName, cancellationToken); + var urlResponse = await sqsClient.GetQueueUrlAsync(queueName: queueName, cancellationToken).ConfigureAwait(false); if (urlResponse != null && urlResponse.Successful()) return urlResponse.QueueUrl; // if entity creation is not enabled, throw exception @@ -377,7 +377,7 @@ private async Task CreateQueueIfNotExistsAsync(string queueName, EventRe // create the queue var request = new CreateQueueRequest(queueName: queueName); TransportOptions.SetupCreateQueueRequest?.Invoke(reg, ecr, request); - var response = await sqsClient.CreateQueueAsync(request: request, cancellationToken: cancellationToken); + var response = await sqsClient.CreateQueueAsync(request: request, cancellationToken: cancellationToken).ConfigureAwait(false); response.EnsureSuccess(); return response.QueueUrl; @@ -393,7 +393,7 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration { try { - var response = await sqsClient.ReceiveMessageAsync(queueUrl, cancellationToken); + var response = await sqsClient.ReceiveMessageAsync(queueUrl, cancellationToken).ConfigureAwait(false); response.EnsureSuccess(); var messages = response.Messages; @@ -402,7 +402,7 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration { var delay = TransportOptions.EmptyResultsDelay; Logger.NoMessages(queueUrl: queueUrl, delay: delay); - await Task.Delay(delay, cancellationToken); + await Task.Delay(delay, cancellationToken).ConfigureAwait(false); } else { @@ -410,7 +410,7 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration using var scope = CreateScope(); // shared foreach (var message in messages) { - await (Task)method.Invoke(this, new object[] { reg, ecr, queueUrl, message, cancellationToken, })!; + await ((Task)method.Invoke(this, new object[] { reg, ecr, queueUrl, message, cancellationToken, })!).ConfigureAwait(false); } } } @@ -471,7 +471,7 @@ private async Task OnMessageReceivedAsync(EventRegistration r registration: reg, identifier: messageId, raw: message, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); Logger.ReceivedMessage(messageId: messageId, eventBusId: context.Id, queueUrl: queueUrl); @@ -479,26 +479,26 @@ private async Task OnMessageReceivedAsync(EventRegistration r ecr: ecr, @event: context, scope: scope, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); if (!successful && ecr.UnhandledErrorBehaviour == UnhandledConsumerErrorBehaviour.Deadletter) { // get the queueUrl for the dead letter queue and send the message there - var dlqQueueUrl = await GetQueueUrlAsync(reg: reg, ecr: ecr, deadletter: true, cancellationToken: cancellationToken); + var dlqQueueUrl = await GetQueueUrlAsync(reg: reg, ecr: ecr, deadletter: true, cancellationToken: cancellationToken).ConfigureAwait(false); var dlqRequest = new SendMessageRequest { MessageAttributes = message.MessageAttributes, MessageBody = message.Body, QueueUrl = dlqQueueUrl, }; - await sqsClient.SendMessageAsync(request: dlqRequest, cancellationToken: cancellationToken); + await sqsClient.SendMessageAsync(request: dlqRequest, cancellationToken: cancellationToken).ConfigureAwait(false); } // whether or not successful, always delete the message from the current queue Logger.DeletingMessage(messageId: messageId, queueUrl: queueUrl); await sqsClient.DeleteMessageAsync(queueUrl: queueUrl, receiptHandle: message.ReceiptHandle, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); } /// diff --git a/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs b/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs index 942056db..dc8f5382 100644 --- a/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs +++ b/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs @@ -42,14 +42,14 @@ public AzureEventHubsTransport(IServiceScopeFactory serviceScopeFactory, /// public override async Task StartAsync(CancellationToken cancellationToken) { - await base.StartAsync(cancellationToken); + await base.StartAsync(cancellationToken).ConfigureAwait(false); var registrations = GetRegistrations(); foreach (var reg in registrations) { foreach (var ecr in reg.Consumers) { - var processor = await GetProcessorAsync(reg: reg, ecr: ecr, cancellationToken: cancellationToken); + var processor = await GetProcessorAsync(reg: reg, ecr: ecr, cancellationToken: cancellationToken).ConfigureAwait(false); // register handlers for error and processing processor.PartitionClosingAsync += delegate (PartitionClosingEventArgs args) @@ -73,7 +73,7 @@ public override async Task StartAsync(CancellationToken cancellationToken) }; // start processing - await processor.StartProcessingAsync(cancellationToken: cancellationToken); + await processor.StartProcessingAsync(cancellationToken: cancellationToken).ConfigureAwait(false); } } } @@ -81,7 +81,7 @@ public override async Task StartAsync(CancellationToken cancellationToken) /// public override async Task StopAsync(CancellationToken cancellationToken) { - await base.StopAsync(cancellationToken); + await base.StopAsync(cancellationToken).ConfigureAwait(false); var clients = processorsCache.Select(kvp => (key: kvp.Key, proc: kvp.Value)).ToList(); foreach (var (key, proc) in clients) @@ -90,7 +90,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) try { - await proc.StopProcessingAsync(cancellationToken); + await proc.StopProcessingAsync(cancellationToken).ConfigureAwait(false); processorsCache.Remove(key); Logger.StoppedProcessor(processor: key); @@ -124,7 +124,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) var body = await SerializeAsync(scope: scope, @event: @event, registration: registration, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); var data = new EventData(body) { @@ -145,9 +145,9 @@ public override async Task StopAsync(CancellationToken cancellationToken) .AddIfNotDefault(MetadataNames.ActivityId, Activity.Current?.Id); // get the producer and send the event accordingly - var producer = await GetProducerAsync(reg: registration, deadletter: false, cancellationToken: cancellationToken); + var producer = await GetProducerAsync(reg: registration, deadletter: false, cancellationToken: cancellationToken).ConfigureAwait(false); Logger.SendingEvent(eventBusId: @event.Id, eventHubName: producer.EventHubName, scheduled: scheduled); - await producer.SendAsync(new[] { data }, cancellationToken); + await producer.SendAsync(new[] { data }, cancellationToken).ConfigureAwait(false); // return the sequence number return scheduled != null ? new ScheduledResult(id: data.SequenceNumber, scheduled: scheduled.Value) : null; @@ -178,7 +178,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) var body = await SerializeAsync(scope: scope, @event: @event, registration: registration, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); var data = new EventData(body) { @@ -201,9 +201,9 @@ public override async Task StopAsync(CancellationToken cancellationToken) } // get the producer and send the events accordingly - var producer = await GetProducerAsync(reg: registration, deadletter: false, cancellationToken: cancellationToken); + var producer = await GetProducerAsync(reg: registration, deadletter: false, cancellationToken: cancellationToken).ConfigureAwait(false); Logger.SendingEvents(events: events, eventHubName: producer.EventHubName, scheduled: scheduled); - await producer.SendAsync(datas, cancellationToken); + await producer.SendAsync(datas, cancellationToken).ConfigureAwait(false); // return the sequence numbers return scheduled != null ? datas.Select(m => new ScheduledResult(id: m.SequenceNumber, scheduled: scheduled.Value)).ToList() : null; @@ -227,7 +227,7 @@ protected override Task CancelCoreAsync(IList ids, private async Task GetProducerAsync(EventRegistration reg, bool deadletter, CancellationToken cancellationToken) { - await producersCacheLock.WaitAsync(cancellationToken); + await producersCacheLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { @@ -277,7 +277,7 @@ private async Task GetProducerAsync(EventRegistration re private async Task GetProcessorAsync(EventRegistration reg, EventConsumerRegistration ecr, CancellationToken cancellationToken) { - await processorsCacheLock.WaitAsync(cancellationToken); + await processorsCacheLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { @@ -405,7 +405,7 @@ private async Task OnEventReceivedAsync(EventRegistration reg registration: reg, identifier: data.SequenceNumber.ToString(), raw: data, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); Logger.ReceivedEvent(eventBusId: context.Id, eventHubName: processor.EventHubName, consumerGroup: processor.ConsumerGroup, @@ -421,13 +421,13 @@ private async Task OnEventReceivedAsync(EventRegistration reg ecr: ecr, @event: context, scope: scope, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); if (!successful && ecr.UnhandledErrorBehaviour == UnhandledConsumerErrorBehaviour.Deadletter) { // get the producer for the dead letter event hub and send the event there - var dlqProcessor = await GetProducerAsync(reg: reg, deadletter: true, cancellationToken: cancellationToken); - await dlqProcessor.SendAsync(new[] { data }, cancellationToken); + var dlqProcessor = await GetProducerAsync(reg: reg, deadletter: true, cancellationToken: cancellationToken).ConfigureAwait(false); + await dlqProcessor.SendAsync(new[] { data }, cancellationToken).ConfigureAwait(false); } /* @@ -441,7 +441,7 @@ private async Task OnEventReceivedAsync(EventRegistration reg eventHubName: processor.EventHubName, consumerGroup: processor.ConsumerGroup, sequenceNumber: data.SequenceNumber); - await args.UpdateCheckpointAsync(args.CancellationToken); + await args.UpdateCheckpointAsync(args.CancellationToken).ConfigureAwait(false); } } diff --git a/src/Tingle.EventBus.Transports.Azure.EventHubs/IotHub/IotHubEventSerializer.cs b/src/Tingle.EventBus.Transports.Azure.EventHubs/IotHub/IotHubEventSerializer.cs index 73534d05..afe72fb5 100644 --- a/src/Tingle.EventBus.Transports.Azure.EventHubs/IotHub/IotHubEventSerializer.cs +++ b/src/Tingle.EventBus.Transports.Azure.EventHubs/IotHub/IotHubEventSerializer.cs @@ -44,7 +44,7 @@ public IotHubEventSerializer(IOptionsMonitor optio telemetry = await JsonSerializer.DeserializeAsync(utf8Json: stream, returnType: mapped.TelemetryType, options: serializerOptions, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); } else if (source is IotHubEventMessageSource.TwinChangeEvents or IotHubEventMessageSource.DeviceLifecycleEvents @@ -62,7 +62,7 @@ or IotHubEventMessageSource.DeviceLifecycleEvents var twinChangeEvent = await JsonSerializer.DeserializeAsync(utf8Json: stream, returnType: mapped.TwinChangeEventType, options: serializerOptions, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); var twinChangeOpEventType = typeof(IotHubOperationalEvent<>).MakeGenericType(mapped.TwinChangeEventType); twinChange = Activator.CreateInstance(twinChangeOpEventType, new[] { hubName, deviceId, moduleId, type, operationTimestamp, twinChangeEvent, }); @@ -72,7 +72,7 @@ or IotHubEventMessageSource.DeviceLifecycleEvents var lifecycleEvent = await JsonSerializer.DeserializeAsync(utf8Json: stream, returnType: mapped.LifecycleEventType, options: serializerOptions, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); var lifecycleOpEventType = typeof(IotHubOperationalEvent<>).MakeGenericType(mapped.LifecycleEventType); lifecycle = Activator.CreateInstance(lifecycleOpEventType, new[] { hubName, deviceId, moduleId, type, operationTimestamp, lifecycleEvent, }); @@ -82,7 +82,7 @@ or IotHubEventMessageSource.DeviceLifecycleEvents var connectionStateEvent = await JsonSerializer.DeserializeAsync( utf8Json: stream, options: serializerOptions, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); connectionState = new IotHubOperationalEvent( hubName: hubName, diff --git a/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs b/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs index f9e01499..674b64a0 100644 --- a/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs +++ b/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs @@ -44,7 +44,7 @@ public AzureQueueStorageTransport(IServiceScopeFactory serviceScopeFactory, /// public override async Task StartAsync(CancellationToken cancellationToken) { - await base.StartAsync(cancellationToken); + await base.StartAsync(cancellationToken).ConfigureAwait(false); var registrations = GetRegistrations(); foreach (var reg in registrations) @@ -60,7 +60,7 @@ public override async Task StartAsync(CancellationToken cancellationToken) /// public override async Task StopAsync(CancellationToken cancellationToken) { - await base.StopAsync(cancellationToken); + await base.StopAsync(cancellationToken).ConfigureAwait(false); // Stop called without start or there was no consumers registered if (receiverTasks.Count == 0) return; @@ -74,7 +74,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) { // Wait until the tasks complete or the stop token triggers var tasks = receiverTasks.Concat(new[] { Task.Delay(Timeout.Infinite, cancellationToken), }); - await Task.WhenAny(tasks); + await Task.WhenAny(tasks).ConfigureAwait(false); } } @@ -88,7 +88,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) var body = await SerializeAsync(scope: scope, @event: @event, registration: registration, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); // if scheduled for later, calculate the visibility timeout @@ -98,12 +98,12 @@ public override async Task StopAsync(CancellationToken cancellationToken) var ttl = @event.Expires - DateTimeOffset.UtcNow; // get the queue client and send the message - var queueClient = await GetQueueClientAsync(reg: registration, deadletter: false, cancellationToken: cancellationToken); + var queueClient = await GetQueueClientAsync(reg: registration, deadletter: false, cancellationToken: cancellationToken).ConfigureAwait(false); Logger.SendingMessage(eventBusId: @event.Id, queueName: queueClient.Name, scheduled: scheduled); var response = await queueClient.SendMessageAsync(messageText: body.ToString(), visibilityTimeout: visibilityTimeout, timeToLive: ttl, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); // return the sequence number; both MessageId and PopReceipt are needed to update or delete return scheduled != null @@ -124,13 +124,13 @@ public override async Task StopAsync(CancellationToken cancellationToken) // work on each event var sequenceNumbers = new List(); - var queueClient = await GetQueueClientAsync(reg: registration, deadletter: false, cancellationToken: cancellationToken); + var queueClient = await GetQueueClientAsync(reg: registration, deadletter: false, cancellationToken: cancellationToken).ConfigureAwait(false); foreach (var @event in events) { var body = await SerializeAsync(scope: scope, @event: @event, registration: registration, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); // if scheduled for later, calculate the visibility timeout var visibilityTimeout = scheduled - DateTimeOffset.UtcNow; @@ -142,7 +142,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) var response = await queueClient.SendMessageAsync(messageText: body.ToString(), visibilityTimeout: visibilityTimeout, timeToLive: ttl, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); // collect the sequence number sequenceNumbers.Add((AzureQueueStorageSchedulingId)response.Value); } @@ -167,11 +167,11 @@ protected override async Task CancelCoreAsync(string id, } // get the queue client and cancel the message accordingly - var queueClient = await GetQueueClientAsync(reg: registration, deadletter: false, cancellationToken: cancellationToken); + var queueClient = await GetQueueClientAsync(reg: registration, deadletter: false, cancellationToken: cancellationToken).ConfigureAwait(false); Logger.CancelingMessage(messageId: sid.MessageId, popReceipt: sid.PopReceipt, queueName: queueClient.Name); await queueClient.DeleteMessageAsync(messageId: sid.MessageId, popReceipt: sid.PopReceipt, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); } /// @@ -197,19 +197,19 @@ protected override async Task CancelCoreAsync(IList ids, }).ToList(); // get the queue client and cancel the messages accordingly - var queueClient = await GetQueueClientAsync(reg: registration, deadletter: false, cancellationToken: cancellationToken); + var queueClient = await GetQueueClientAsync(reg: registration, deadletter: false, cancellationToken: cancellationToken).ConfigureAwait(false); foreach (var id in sids) { Logger.CancelingMessage(messageId: id.MessageId, popReceipt: id.PopReceipt, queueName: queueClient.Name); await queueClient.DeleteMessageAsync(messageId: id.MessageId, popReceipt: id.PopReceipt, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); } } private async Task GetQueueClientAsync(EventRegistration reg, bool deadletter, CancellationToken cancellationToken) { - await queueClientsCacheLock.WaitAsync(cancellationToken); + await queueClientsCacheLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { @@ -239,7 +239,7 @@ private async Task GetQueueClientAsync(EventRegistration reg, bool { // ensure queue is created if it does not exist Logger.EnsuringQueue(queueName: name); - await queueClient.CreateIfNotExistsAsync(cancellationToken: cancellationToken); + await queueClient.CreateIfNotExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false); } else { @@ -263,13 +263,13 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration var mt = GetType().GetMethod(nameof(OnMessageReceivedAsync), flags) ?? throw new InvalidOperationException("Methods should be null"); var method = mt.MakeGenericMethod(reg.EventType, ecr.ConsumerType); - var queueClient = await GetQueueClientAsync(reg: reg, deadletter: false, cancellationToken: cancellationToken); + var queueClient = await GetQueueClientAsync(reg: reg, deadletter: false, cancellationToken: cancellationToken).ConfigureAwait(false); while (!cancellationToken.IsCancellationRequested) { try { - var response = await queueClient.ReceiveMessagesAsync(cancellationToken); + var response = await queueClient.ReceiveMessagesAsync(cancellationToken).ConfigureAwait(false); var messages = response.Value; // if the response is empty, introduce a delay @@ -277,7 +277,7 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration { var delay = TransportOptions.EmptyResultsDelay; Logger.NoMessages(queueName: queueClient.Name, delay: delay); - await Task.Delay(delay, cancellationToken); + await Task.Delay(delay, cancellationToken).ConfigureAwait(false); } else { @@ -285,7 +285,7 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration using var scope = CreateScope(); // shared foreach (var message in messages) { - await (Task)method.Invoke(this, new object[] { reg, ecr, queueClient, message, scope, cancellationToken, })!; + await ((Task)method.Invoke(this, new object[] { reg, ecr, queueClient, message, scope, cancellationToken, })!).ConfigureAwait(false); } } } @@ -334,7 +334,7 @@ private async Task OnMessageReceivedAsync(EventRegistration r registration: reg, identifier: (AzureQueueStorageSchedulingId)message, raw: message, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); Logger.ReceivedMessage(messageId: messageId, eventBusId: context.Id, queueName: queueClient.Name); @@ -348,20 +348,20 @@ private async Task OnMessageReceivedAsync(EventRegistration r ecr: ecr, @event: context, scope: scope, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); if (!successful && ecr.UnhandledErrorBehaviour == UnhandledConsumerErrorBehaviour.Deadletter) { // get the client for the dead letter queue and send the message there - var dlqClient = await GetQueueClientAsync(reg: reg, deadletter: true, cancellationToken: cancellationToken); - await dlqClient.SendMessageAsync(message.MessageText, cancellationToken); + var dlqClient = await GetQueueClientAsync(reg: reg, deadletter: true, cancellationToken: cancellationToken).ConfigureAwait(false); + await dlqClient.SendMessageAsync(message.MessageText, cancellationToken).ConfigureAwait(false); } // whether or not successful, always delete the message from the current queue Logger.DeletingMessage(messageId: messageId, queueName: queueClient.Name); await queueClient.DeleteMessageAsync(messageId: messageId, popReceipt: message.PopReceipt, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); } /// diff --git a/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs b/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs index 712fdf0f..422fbaba 100644 --- a/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs +++ b/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs @@ -58,17 +58,17 @@ public AzureServiceBusTransport(IServiceScopeFactory serviceScopeFactory, /// public override async Task StartAsync(CancellationToken cancellationToken) { - await base.StartAsync(cancellationToken); + await base.StartAsync(cancellationToken).ConfigureAwait(false); // Get the namespace properties once at the start - _ = await GetNamespacePropertiesAsync(cancellationToken); + _ = await GetNamespacePropertiesAsync(cancellationToken).ConfigureAwait(false); var registrations = GetRegistrations(); foreach (var reg in registrations) { foreach (var ecr in reg.Consumers) { - var processor = await GetProcessorAsync(reg: reg, ecr: ecr, cancellationToken: cancellationToken); + var processor = await GetProcessorAsync(reg: reg, ecr: ecr, cancellationToken: cancellationToken).ConfigureAwait(false); // register handlers for error and processing processor.ProcessErrorAsync += OnMessageFaultedAsync; @@ -82,7 +82,7 @@ public override async Task StartAsync(CancellationToken cancellationToken) // start processing Logger.StartingProcessing(entityPath: processor.EntityPath); - await processor.StartProcessingAsync(cancellationToken: cancellationToken); + await processor.StartProcessingAsync(cancellationToken: cancellationToken).ConfigureAwait(false); } } } @@ -90,7 +90,7 @@ public override async Task StartAsync(CancellationToken cancellationToken) /// public override async Task StopAsync(CancellationToken cancellationToken) { - await base.StopAsync(cancellationToken); + await base.StopAsync(cancellationToken).ConfigureAwait(false); var clients = processorsCache.Select(kvp => (key: kvp.Key, proc: kvp.Value)).ToList(); foreach (var (key, proc) in clients) @@ -99,7 +99,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) try { - await proc.StopProcessingAsync(cancellationToken); + await proc.StopProcessingAsync(cancellationToken).ConfigureAwait(false); processorsCache.Remove(key); Logger.StoppedProcessor(processor: key); @@ -121,7 +121,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) var body = await SerializeAsync(scope: scope, @event: @event, registration: registration, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); var message = new ServiceBusMessage(body) { @@ -154,18 +154,18 @@ public override async Task StopAsync(CancellationToken cancellationToken) .AddIfNotDefault(MetadataNames.ActivityId, Activity.Current?.Id); // Get the sender and send the message accordingly - var sender = await GetSenderAsync(registration, cancellationToken); + var sender = await GetSenderAsync(registration, cancellationToken).ConfigureAwait(false); Logger.SendingMessage(eventBusId: @event.Id, entityPath: sender.EntityPath, scheduled: scheduled); if (scheduled != null) { var seqNum = await sender.ScheduleMessageAsync(message: message, scheduledEnqueueTime: message.ScheduledEnqueueTime, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); return new ScheduledResult(id: seqNum, scheduled: scheduled.Value); // return the sequence number } else { - await sender.SendMessageAsync(message, cancellationToken); + await sender.SendMessageAsync(message, cancellationToken).ConfigureAwait(false); return null; // no sequence number available } } @@ -183,7 +183,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) var body = await SerializeAsync(scope: scope, @event: @event, registration: registration, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); var message = new ServiceBusMessage(body) { @@ -219,18 +219,18 @@ public override async Task StopAsync(CancellationToken cancellationToken) } // Get the sender and send the messages accordingly - var sender = await GetSenderAsync(registration, cancellationToken); + var sender = await GetSenderAsync(registration, cancellationToken).ConfigureAwait(false); Logger.SendingMessages(events: events, entityPath: sender.EntityPath, scheduled: scheduled); if (scheduled != null) { var seqNums = await sender.ScheduleMessagesAsync(messages: messages, scheduledEnqueueTime: messages.First().ScheduledEnqueueTime, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); return seqNums.Select(n => new ScheduledResult(id: n, scheduled: scheduled.Value)).ToList(); // return the sequence numbers } else { - await sender.SendMessagesAsync(messages: messages, cancellationToken: cancellationToken); + await sender.SendMessagesAsync(messages: messages, cancellationToken: cancellationToken).ConfigureAwait(false); return Array.Empty(); // no sequence numbers available } } @@ -251,9 +251,9 @@ protected override async Task CancelCoreAsync(string id, } // get the sender and cancel the message accordingly - var sender = await GetSenderAsync(registration, cancellationToken); + var sender = await GetSenderAsync(registration, cancellationToken).ConfigureAwait(false); Logger.CancelingMessage(sequenceNumber: seqNum, entityPath: sender.EntityPath); - await sender.CancelScheduledMessageAsync(sequenceNumber: seqNum, cancellationToken: cancellationToken); + await sender.CancelScheduledMessageAsync(sequenceNumber: seqNum, cancellationToken: cancellationToken).ConfigureAwait(false); } /// @@ -276,14 +276,14 @@ protected override async Task CancelCoreAsync(IList ids, }).ToList(); // get the sender and cancel the messages accordingly - var sender = await GetSenderAsync(registration, cancellationToken); + var sender = await GetSenderAsync(registration, cancellationToken).ConfigureAwait(false); Logger.CancelingMessages(sequenceNumbers: seqNums, entityPath: sender.EntityPath); - await sender.CancelScheduledMessagesAsync(sequenceNumbers: seqNums, cancellationToken: cancellationToken); + await sender.CancelScheduledMessagesAsync(sequenceNumbers: seqNums, cancellationToken: cancellationToken).ConfigureAwait(false); } private async Task GetSenderAsync(EventRegistration reg, CancellationToken cancellationToken) { - await sendersCacheLock.WaitAsync(cancellationToken); + await sendersCacheLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { @@ -292,17 +292,17 @@ private async Task GetSenderAsync(EventRegistration reg, Cance var name = reg.EventName!; // Create the entity. - if (await ShouldUseQueueAsync(reg, cancellationToken)) + if (await ShouldUseQueueAsync(reg, cancellationToken).ConfigureAwait(false)) { // Ensure Queue is created Logger.CreatingQueueSender(queueName: name); - await CreateQueueIfNotExistsAsync(reg: reg, name: name, cancellationToken: cancellationToken); + await CreateQueueIfNotExistsAsync(reg: reg, name: name, cancellationToken: cancellationToken).ConfigureAwait(false); } else { // Ensure topic is created Logger.CreatingTopicSender(topicName: name); - await CreateTopicIfNotExistsAsync(reg: reg, name: name, cancellationToken: cancellationToken); + await CreateTopicIfNotExistsAsync(reg: reg, name: name, cancellationToken: cancellationToken).ConfigureAwait(false); } // Create the sender @@ -320,7 +320,7 @@ private async Task GetSenderAsync(EventRegistration reg, Cance private async Task GetProcessorAsync(EventRegistration reg, EventConsumerRegistration ecr, CancellationToken cancellationToken) { - await processorsCacheLock.WaitAsync(cancellationToken); + await processorsCacheLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { @@ -352,10 +352,10 @@ private async Task GetProcessorAsync(EventRegistration reg, TransportOptions.SetupProcessorOptions?.Invoke(reg, ecr, sbpo); // Create the processor. - if (await ShouldUseQueueAsync(reg, cancellationToken)) + if (await ShouldUseQueueAsync(reg, cancellationToken).ConfigureAwait(false)) { // Ensure Queue is created - await CreateQueueIfNotExistsAsync(reg: reg, name: topicName, cancellationToken: cancellationToken); + await CreateQueueIfNotExistsAsync(reg: reg, name: topicName, cancellationToken: cancellationToken).ConfigureAwait(false); // Create the processor for the Queue Logger.CreatingQueueProcessor(queueName: topicName); @@ -364,13 +364,13 @@ private async Task GetProcessorAsync(EventRegistration reg, else { // Ensure Topic is created before creating the Subscription - await CreateTopicIfNotExistsAsync(reg: reg, name: topicName, cancellationToken: cancellationToken); + await CreateTopicIfNotExistsAsync(reg: reg, name: topicName, cancellationToken: cancellationToken).ConfigureAwait(false); // Ensure Subscription is created await CreateSubscriptionIfNotExistsAsync(ecr: ecr, topicName: topicName, subscriptionName: subscriptionName, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); // Create the processor for the Subscription Logger.CreatingSubscriptionProcessor(topicName: topicName, subscriptionName: subscriptionName); @@ -401,7 +401,7 @@ private async Task CreateQueueIfNotExistsAsync(EventRegistration reg, string nam // If the queue does not exist, create it Logger.CheckingQueueExistence(queueName: name); - if (!await managementClient.QueueExistsAsync(name: name, cancellationToken: cancellationToken)) + if (!await managementClient.QueueExistsAsync(name: name, cancellationToken: cancellationToken).ConfigureAwait(false)) { Logger.CreatingQueuePreparation(queueName: name); var options = new CreateQueueOptions(name: name) @@ -416,7 +416,7 @@ private async Task CreateQueueIfNotExistsAsync(EventRegistration reg, string nam }; // Certain properties are not allowed in Basic Tier or have lower limits - if (!await IsBasicTierAsync(cancellationToken)) + if (!await IsBasicTierAsync(cancellationToken).ConfigureAwait(false)) { options.DefaultMessageTimeToLive = TransportOptions.DefaultMessageTimeToLive; // defaults to 14days in basic tier options.RequiresDuplicateDetection = BusOptions.EnableDeduplication; @@ -427,7 +427,7 @@ private async Task CreateQueueIfNotExistsAsync(EventRegistration reg, string nam // Allow for the defaults to be overridden TransportOptions.SetupQueueOptions?.Invoke(reg, options); Logger.CreatingQueue(queueName: name); - _ = await managementClient.CreateQueueAsync(options: options, cancellationToken: cancellationToken); + _ = await managementClient.CreateQueueAsync(options: options, cancellationToken: cancellationToken).ConfigureAwait(false); } } @@ -442,7 +442,7 @@ private async Task CreateTopicIfNotExistsAsync(EventRegistration reg, string nam // If the topic does not exist, create it Logger.CheckingTopicExistence(topicName: name); - if (!await managementClient.TopicExistsAsync(name: name, cancellationToken: cancellationToken)) + if (!await managementClient.TopicExistsAsync(name: name, cancellationToken: cancellationToken).ConfigureAwait(false)) { Logger.CreatingTopicPreparation(topicName: name); var options = new CreateTopicOptions(name: name) @@ -459,7 +459,7 @@ private async Task CreateTopicIfNotExistsAsync(EventRegistration reg, string nam // Allow for the defaults to be overridden TransportOptions.SetupTopicOptions?.Invoke(reg, options); Logger.CreatingTopic(topicName: name); - _ = await managementClient.CreateTopicAsync(options: options, cancellationToken: cancellationToken); + _ = await managementClient.CreateTopicAsync(options: options, cancellationToken: cancellationToken).ConfigureAwait(false); } } @@ -474,7 +474,7 @@ private async Task CreateSubscriptionIfNotExistsAsync(EventConsumerRegistration // If the subscription does not exist, create it Logger.CheckingSubscriptionExistence(subscriptionName: subscriptionName, topicName: topicName); - if (!await managementClient.SubscriptionExistsAsync(topicName, subscriptionName, cancellationToken)) + if (!await managementClient.SubscriptionExistsAsync(topicName, subscriptionName, cancellationToken).ConfigureAwait(false)) { Logger.CreatingSubscriptionPreparation(subscriptionName: subscriptionName, topicName: topicName); var options = new CreateSubscriptionOptions(topicName: topicName, subscriptionName: subscriptionName) @@ -491,7 +491,7 @@ private async Task CreateSubscriptionIfNotExistsAsync(EventConsumerRegistration // Allow for the defaults to be overridden TransportOptions.SetupSubscriptionOptions?.Invoke(ecr, options); Logger.CreatingSubscription(subscriptionName: subscriptionName, topicName: topicName); - await managementClient.CreateSubscriptionAsync(options: options, cancellationToken: cancellationToken); + await managementClient.CreateSubscriptionAsync(options: options, cancellationToken: cancellationToken).ConfigureAwait(false); } } @@ -523,7 +523,7 @@ private async Task OnMessageReceivedAsync(EventRegistration r activity?.AddTag(ActivityTagNames.EventBusEventType, typeof(TEvent).FullName); activity?.AddTag(ActivityTagNames.EventBusConsumerType, typeof(TConsumer).FullName); activity?.AddTag(ActivityTagNames.MessagingSystem, Name); - var destination = await ShouldUseQueueAsync(reg, cancellationToken) ? reg.EventName : ecr.ConsumerName; + var destination = await ShouldUseQueueAsync(reg, cancellationToken).ConfigureAwait(false) ? reg.EventName : ecr.ConsumerName; activity?.AddTag(ActivityTagNames.MessagingDestination, destination); // name of the queue/subscription activity?.AddTag(ActivityTagNames.MessagingDestinationKind, "queue"); // the spec does not know subscription so we can only use queue for both @@ -536,7 +536,7 @@ private async Task OnMessageReceivedAsync(EventRegistration r registration: reg, identifier: message.SequenceNumber.ToString(), raw: message, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); Logger.ReceivedMessage(sequenceNumber: message.SequenceNumber, eventBusId: context.Id, entityPath: entityPath); @@ -547,7 +547,7 @@ private async Task OnMessageReceivedAsync(EventRegistration r ecr: ecr, @event: context, scope: scope, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); // Decide the action to execute then execute var action = DecideAction(successful, ecr.UnhandledErrorBehaviour, processor.AutoCompleteMessages); @@ -555,7 +555,7 @@ private async Task OnMessageReceivedAsync(EventRegistration r if (action == PostConsumeAction.Complete) { - await args.CompleteMessageAsync(message: message, cancellationToken: cancellationToken); + await args.CompleteMessageAsync(message: message, cancellationToken: cancellationToken).ConfigureAwait(false); } else if (action == PostConsumeAction.Throw) { @@ -566,11 +566,11 @@ private async Task OnMessageReceivedAsync(EventRegistration r } else if (action == PostConsumeAction.Deadletter) { - await args.DeadLetterMessageAsync(message, cancellationToken: cancellationToken); + await args.DeadLetterMessageAsync(message, cancellationToken: cancellationToken).ConfigureAwait(false); } else if (action == PostConsumeAction.Abandon) { - await args.AbandonMessageAsync(message, cancellationToken: cancellationToken); + await args.AbandonMessageAsync(message, cancellationToken: cancellationToken).ConfigureAwait(false); } } @@ -589,22 +589,22 @@ private async Task ShouldUseQueueAsync(EventRegistration reg, Cancellation * Queues are used in the basic tier or when explicitly mapped to Queue. * Otherwise, Topics and Subscriptions are used. */ - return reg.EntityKind == EntityKind.Queue || await IsBasicTierAsync(cancellationToken); + return reg.EntityKind == EntityKind.Queue || await IsBasicTierAsync(cancellationToken).ConfigureAwait(false); } private async Task IsBasicTierAsync(CancellationToken cancellationToken) { - var np = await GetNamespacePropertiesAsync(cancellationToken); + var np = await GetNamespacePropertiesAsync(cancellationToken).ConfigureAwait(false); return np.MessagingSku == MessagingSku.Basic; } private async Task GetNamespacePropertiesAsync(CancellationToken cancellationToken) { - await propertiesCacheLock.WaitAsync(cancellationToken); + await propertiesCacheLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { - properties ??= await managementClient.GetNamespacePropertiesAsync(cancellationToken); + properties ??= await managementClient.GetNamespacePropertiesAsync(cancellationToken).ConfigureAwait(false); } finally { diff --git a/src/Tingle.EventBus.Transports.InMemory/Client/InMemoryProcessor.cs b/src/Tingle.EventBus.Transports.InMemory/Client/InMemoryProcessor.cs index 6f322050..66d7f569 100644 --- a/src/Tingle.EventBus.Transports.InMemory/Client/InMemoryProcessor.cs +++ b/src/Tingle.EventBus.Transports.InMemory/Client/InMemoryProcessor.cs @@ -20,7 +20,7 @@ public InMemoryProcessor(string entityPath, ChannelReader reade private async Task RunAsync(CancellationToken cancellationToken) { var messages = reader.ReadAllAsync(cancellationToken); - await foreach (var message in messages) + await foreach (var message in messages.ConfigureAwait(false)) { try { diff --git a/src/Tingle.EventBus.Transports.InMemory/Client/InMemorySender.cs b/src/Tingle.EventBus.Transports.InMemory/Client/InMemorySender.cs index 7506b8fc..ef2f740e 100644 --- a/src/Tingle.EventBus.Transports.InMemory/Client/InMemorySender.cs +++ b/src/Tingle.EventBus.Transports.InMemory/Client/InMemorySender.cs @@ -31,7 +31,7 @@ private async Task RunAsync(CancellationToken cancellationToken) while (!cancellationToken.IsCancellationRequested) { // wait to be notified of an item added - await available.WaitAsync(waitTimeout, cancellationToken); + await available.WaitAsync(waitTimeout, cancellationToken).ConfigureAwait(false); var cached = items.ToList(); // just in case it is changed foreach (var msg in cached) @@ -39,7 +39,7 @@ private async Task RunAsync(CancellationToken cancellationToken) if (msg.Scheduled <= DateTimeOffset.UtcNow) { // write the message and remove it - await writer.WriteAsync(msg, cancellationToken); + await writer.WriteAsync(msg, cancellationToken).ConfigureAwait(false); items.Remove(msg); } } @@ -69,7 +69,7 @@ public async Task SendMessageAsync(InMemoryMessage message, CancellationToken ca { if (message is null) throw new ArgumentNullException(nameof(message)); - await updateLock.WaitAsync(cancellationToken); + await updateLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { message.SequenceNumber = sng.Generate(); @@ -92,7 +92,7 @@ public async Task SendMessagesAsync(IEnumerable messages, Cance { if (messages is null) throw new ArgumentNullException(nameof(messages)); - await updateLock.WaitAsync(cancellationToken); + await updateLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { // set sequence numbers @@ -118,7 +118,7 @@ public async Task SendMessagesAsync(IEnumerable messages, Cance /// public async Task ScheduleMessageAsync(InMemoryMessage message, CancellationToken cancellationToken = default) { - await SendMessageAsync(message, cancellationToken); + await SendMessageAsync(message, cancellationToken).ConfigureAwait(false); return message.SequenceNumber; } @@ -130,7 +130,7 @@ public async Task ScheduleMessageAsync(InMemoryMessage message, Cancellati /// public async Task> ScheduleMessagesAsync(IEnumerable messages, CancellationToken cancellationToken = default) { - await SendMessagesAsync(messages, cancellationToken); + await SendMessagesAsync(messages, cancellationToken).ConfigureAwait(false); return messages.Select(m => m.SequenceNumber).ToArray(); } @@ -144,7 +144,7 @@ public async Task> ScheduleMessagesAsync(IEnumerable public async Task CancelScheduledMessageAsync(long sequenceNumber, CancellationToken cancellationToken) { - await updateLock.WaitAsync(cancellationToken); + await updateLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { // get matching @@ -173,7 +173,7 @@ public async Task CancelScheduledMessageAsync(long sequenceNumber, CancellationT /// public async Task CancelScheduledMessagesAsync(IEnumerable sequenceNumbers, CancellationToken cancellationToken) { - await updateLock.WaitAsync(cancellationToken); + await updateLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { // get matching diff --git a/src/Tingle.EventBus.Transports.InMemory/InMemoryTestHarness.cs b/src/Tingle.EventBus.Transports.InMemory/InMemoryTestHarness.cs index 744e9faf..a03748c6 100644 --- a/src/Tingle.EventBus.Transports.InMemory/InMemoryTestHarness.cs +++ b/src/Tingle.EventBus.Transports.InMemory/InMemoryTestHarness.cs @@ -23,13 +23,13 @@ public InMemoryTestHarness(IEnumerable transports, IOptions< /// public async Task StartAsync(CancellationToken cancellationToken = default) { - await transport.StartAsync(cancellationToken); + await transport.StartAsync(cancellationToken).ConfigureAwait(false); } /// public async Task StopAsync(CancellationToken cancellationToken = default) { - await transport.StopAsync(cancellationToken); + await transport.StopAsync(cancellationToken).ConfigureAwait(false); } /// @@ -48,7 +48,7 @@ public async Task StopAsync(CancellationToken cancellationToken = default) /// public async Task> PublishedAsync(TimeSpan? delay = null, CancellationToken cancellationToken = default) { - await Task.Delay(delay ?? options.DefaultDelay, cancellationToken); + await Task.Delay(delay ?? options.DefaultDelay, cancellationToken).ConfigureAwait(false); return Published(); } @@ -71,7 +71,7 @@ public async Task> PublishedAsync(TimeSpan? delay = nu public async Task>> PublishedAsync(TimeSpan? delay = null, CancellationToken cancellationToken = default) where T : class { - await Task.Delay(delay ?? options.DefaultDelay, cancellationToken); + await Task.Delay(delay ?? options.DefaultDelay, cancellationToken).ConfigureAwait(false); return Published(); } @@ -91,7 +91,7 @@ public async Task>> PublishedAsync(TimeSpan? dela /// public async Task> CancelledAsync(TimeSpan? delay = null, CancellationToken cancellationToken = default) { - await Task.Delay(delay ?? options.DefaultDelay, cancellationToken); + await Task.Delay(delay ?? options.DefaultDelay, cancellationToken).ConfigureAwait(false); return Cancelled(); } @@ -111,7 +111,7 @@ public async Task> CancelledAsync(TimeSpan? delay = null, Canc /// public async Task> ConsumedAsync(TimeSpan? delay = null, CancellationToken cancellationToken = default) { - await Task.Delay(delay ?? options.DefaultDelay, cancellationToken); + await Task.Delay(delay ?? options.DefaultDelay, cancellationToken).ConfigureAwait(false); return Consumed(); } @@ -134,7 +134,7 @@ public async Task> ConsumedAsync(TimeSpan? delay = nul public async Task>> ConsumedAsync(TimeSpan? delay = null, CancellationToken cancellationToken = default) where T : class { - await Task.Delay(delay ?? options.DefaultDelay, cancellationToken); + await Task.Delay(delay ?? options.DefaultDelay, cancellationToken).ConfigureAwait(false); return Consumed(); } @@ -154,7 +154,7 @@ public async Task>> ConsumedAsync(TimeSpan? delay /// public async Task> FailedAsync(TimeSpan? delay = null, CancellationToken cancellationToken = default) { - await Task.Delay(delay ?? options.DefaultDelay, cancellationToken); + await Task.Delay(delay ?? options.DefaultDelay, cancellationToken).ConfigureAwait(false); return Failed(); } @@ -176,7 +176,7 @@ public async Task> FailedAsync(TimeSpan? delay = null, public async Task>> FailedAsync(TimeSpan? delay = null, CancellationToken cancellationToken = default) where T : class { - await Task.Delay(delay ?? options.DefaultDelay, cancellationToken); + await Task.Delay(delay ?? options.DefaultDelay, cancellationToken).ConfigureAwait(false); return Failed(); } } diff --git a/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs b/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs index ad33728a..3c37b4e8 100644 --- a/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs +++ b/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs @@ -69,14 +69,14 @@ public InMemoryTransport(IServiceScopeFactory serviceScopeFactory, /// public override async Task StartAsync(CancellationToken cancellationToken) { - await base.StartAsync(cancellationToken); + await base.StartAsync(cancellationToken).ConfigureAwait(false); var registrations = GetRegistrations(); foreach (var reg in registrations) { foreach (var ecr in reg.Consumers) { - var processor = await GetProcessorAsync(reg: reg, ecr: ecr, cancellationToken: cancellationToken); + var processor = await GetProcessorAsync(reg: reg, ecr: ecr, cancellationToken: cancellationToken).ConfigureAwait(false); // register handlers for error and processing processor.ProcessErrorAsync += OnMessageFaultedAsync; @@ -90,7 +90,7 @@ public override async Task StartAsync(CancellationToken cancellationToken) // start processing Logger.StartingProcessing(entityPath: processor.EntityPath); - await processor.StartProcessingAsync(cancellationToken: cancellationToken); + await processor.StartProcessingAsync(cancellationToken: cancellationToken).ConfigureAwait(false); } } } @@ -98,7 +98,7 @@ public override async Task StartAsync(CancellationToken cancellationToken) /// public override async Task StopAsync(CancellationToken cancellationToken) { - await base.StopAsync(cancellationToken); + await base.StopAsync(cancellationToken).ConfigureAwait(false); var clients = processorsCache.Select(kvp => (key: kvp.Key, proc: kvp.Value)).ToList(); foreach (var (key, proc) in clients) @@ -107,7 +107,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) try { - await proc.StopProcessingAsync(cancellationToken); + await proc.StopProcessingAsync(cancellationToken).ConfigureAwait(false); processorsCache.Remove(key); Logger.StoppedProcessor(processor: key); @@ -135,7 +135,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) var body = await SerializeAsync(scope: scope, @event: @event, registration: registration, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); var message = new InMemoryMessage(body) { @@ -159,16 +159,16 @@ public override async Task StopAsync(CancellationToken cancellationToken) published.Add(@event); // Get the queue and send the message accordingly - var sender = await GetSenderAsync(registration, cancellationToken); + var sender = await GetSenderAsync(registration, cancellationToken).ConfigureAwait(false); Logger.SendingMessage(eventBusId: @event.Id, entityPath: sender.EntityPath, scheduled: scheduled); if (scheduled != null) { - var seqNum = await sender.ScheduleMessageAsync(message: message, cancellationToken: cancellationToken); + var seqNum = await sender.ScheduleMessageAsync(message: message, cancellationToken: cancellationToken).ConfigureAwait(false); return new ScheduledResult(id: seqNum, scheduled: scheduled.Value); // return the sequence number } else { - await sender.SendMessageAsync(message, cancellationToken); + await sender.SendMessageAsync(message, cancellationToken).ConfigureAwait(false); return null; // no sequence number available } } @@ -193,7 +193,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) var body = await SerializeAsync(scope: scope, @event: @event, registration: registration, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); var message = new InMemoryMessage(body) { @@ -220,16 +220,16 @@ public override async Task StopAsync(CancellationToken cancellationToken) AddBatch(published, events); // Get the queue and send the message accordingly - var sender = await GetSenderAsync(registration, cancellationToken); + var sender = await GetSenderAsync(registration, cancellationToken).ConfigureAwait(false); Logger.SendingMessages(events: events, entityPath: sender.EntityPath, scheduled: scheduled); if (scheduled != null) { - var seqNums = await sender.ScheduleMessagesAsync(messages: messages, cancellationToken: cancellationToken); + var seqNums = await sender.ScheduleMessagesAsync(messages: messages, cancellationToken: cancellationToken).ConfigureAwait(false); return seqNums.Select(n => new ScheduledResult(id: n, scheduled: scheduled.Value)).ToList(); // return the sequence numbers } else { - await sender.SendMessagesAsync(messages, cancellationToken); + await sender.SendMessagesAsync(messages, cancellationToken).ConfigureAwait(false); return Array.Empty(); // no sequence numbers available } } @@ -250,9 +250,9 @@ protected override async Task CancelCoreAsync(string id, } // get the entity and cancel the message accordingly - var sender = await GetSenderAsync(registration, cancellationToken); + var sender = await GetSenderAsync(registration, cancellationToken).ConfigureAwait(false); Logger.CancelingMessage(sequenceNumber: seqNum, entityPath: sender.EntityPath); - await sender.CancelScheduledMessageAsync(sequenceNumber: seqNum, cancellationToken: cancellationToken); + await sender.CancelScheduledMessageAsync(sequenceNumber: seqNum, cancellationToken: cancellationToken).ConfigureAwait(false); // Add to cancelled list cancelled.Add(seqNum); @@ -275,9 +275,9 @@ protected override async Task CancelCoreAsync(IList ids, }).ToList(); // get the entity and cancel the message accordingly - var sender = await GetSenderAsync(registration, cancellationToken); + var sender = await GetSenderAsync(registration, cancellationToken).ConfigureAwait(false); Logger.CancelingMessages(sequenceNumbers: seqNums, entityPath: sender.EntityPath); - await sender.CancelScheduledMessagesAsync(sequenceNumbers: seqNums, cancellationToken: cancellationToken); + await sender.CancelScheduledMessagesAsync(sequenceNumbers: seqNums, cancellationToken: cancellationToken).ConfigureAwait(false); // Add to cancelled list AddBatch(cancelled, seqNums); @@ -285,7 +285,7 @@ protected override async Task CancelCoreAsync(IList ids, private async Task GetSenderAsync(EventRegistration reg, CancellationToken cancellationToken) { - await sendersCacheLock.WaitAsync(cancellationToken); + await sendersCacheLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { @@ -307,7 +307,7 @@ private async Task GetSenderAsync(EventRegistration reg, Cancell private async Task GetProcessorAsync(EventRegistration reg, EventConsumerRegistration ecr, CancellationToken cancellationToken) { - await processorsCacheLock.WaitAsync(cancellationToken); + await processorsCacheLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { @@ -387,7 +387,7 @@ private async Task OnMessageReceivedAsync(EventRegistration r registration: reg, identifier: message.SequenceNumber.ToString(), raw: message, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); Logger.ReceivedMessage(sequenceNumber: message.SequenceNumber, eventBusId: context.Id, entityPath: entityPath); @@ -398,7 +398,7 @@ private async Task OnMessageReceivedAsync(EventRegistration r ecr: ecr, @event: context, scope: scope, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); if (successful) { diff --git a/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs b/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs index ba49f4f4..c3af3440 100644 --- a/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs +++ b/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs @@ -68,7 +68,7 @@ public KafkaTransport(IHostEnvironment environment, /// public override async Task StartAsync(CancellationToken cancellationToken) { - await base.StartAsync(cancellationToken); + await base.StartAsync(cancellationToken).ConfigureAwait(false); var registrations = GetRegistrations(); var topics = registrations.Where(r => r.Consumers.Count > 0) // filter out those with consumers @@ -85,7 +85,7 @@ public override async Task StartAsync(CancellationToken cancellationToken) /// public override async Task StopAsync(CancellationToken cancellationToken) { - await base.StopAsync(cancellationToken); + await base.StopAsync(cancellationToken).ConfigureAwait(false); // Stop called without start or there was no consumers registered if (receiverTasks.Count == 0) return; @@ -102,7 +102,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) { // Wait until the tasks complete or the stop token triggers var tasks = receiverTasks.Concat(new[] { Task.Delay(Timeout.Infinite, cancellationToken), }); - await Task.WhenAny(tasks); + await Task.WhenAny(tasks).ConfigureAwait(false); } } @@ -122,7 +122,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) var body = await SerializeAsync(scope: scope, @event: @event, registration: registration, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); // prepare the message var message = new Message(); @@ -136,7 +136,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) // send the event var topic = registration.EventName; - var result = await producer.ProduceAsync(topic: topic, message: message, cancellationToken: cancellationToken); + var result = await producer.ProduceAsync(topic: topic, message: message, cancellationToken: cancellationToken).ConfigureAwait(false); // Should we check persistence status? // return the sequence number @@ -167,7 +167,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) var body = await SerializeAsync(scope: scope, @event: @event, registration: registration, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); // prepare the message var message = new Message(); @@ -181,7 +181,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) // send the event var topic = registration.EventName; - var result = await producer.ProduceAsync(topic: topic, message: message, cancellationToken: cancellationToken); + var result = await producer.ProduceAsync(topic: topic, message: message, cancellationToken: cancellationToken).ConfigureAwait(false); // Should we check persistence status? // collect the sequence number @@ -233,7 +233,7 @@ private async Task ProcessAsync(CancellationToken cancellationToken) // form the generic method var ecr = reg.Consumers.Single(); // only one consumer per event var method = mt.MakeGenericMethod(reg.EventType, ecr.ConsumerType); - await (Task)method.Invoke(this, new object[] { reg, ecr, result, cancellationToken, })!; + await ((Task)method.Invoke(this, new object[] { reg, ecr, result, cancellationToken, })!).ConfigureAwait(false); // if configured to checkpoint at intervals, respect it @@ -291,19 +291,19 @@ private async Task OnEventReceivedAsync(EventRegistration reg registration: reg, identifier: result.Offset.ToString(), raw: message, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); Logger.ReceivedEvent(messageKey, context.Id); var (successful, _) = await ConsumeAsync(registration: reg, ecr: ecr, @event: context, scope: scope, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); if (!successful && ecr.UnhandledErrorBehaviour == UnhandledConsumerErrorBehaviour.Deadletter) { // produce message on dead-letter topic var dlt = reg.EventName += TransportOptions.DeadLetterSuffix; - await producer.ProduceAsync(topic: dlt, message: message, cancellationToken: cancellationToken); + await producer.ProduceAsync(topic: dlt, message: message, cancellationToken: cancellationToken).ConfigureAwait(false); } // TODO: find a better way to handle the checkpointing when there is an error diff --git a/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs b/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs index e6269231..264d2473 100644 --- a/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs +++ b/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs @@ -54,15 +54,15 @@ public RabbitMqTransport(IServiceScopeFactory serviceScopeFactory, /// public override async Task StartAsync(CancellationToken cancellationToken) { - await base.StartAsync(cancellationToken); + await base.StartAsync(cancellationToken).ConfigureAwait(false); - await ConnectConsumersAsync(cancellationToken); + await ConnectConsumersAsync(cancellationToken).ConfigureAwait(false); } /// public override async Task StopAsync(CancellationToken cancellationToken) { - await base.StopAsync(cancellationToken); + await base.StopAsync(cancellationToken).ConfigureAwait(false); var channels = subscriptionChannelsCache.Select(kvp => (key: kvp.Key, sc: kvp.Value)).ToList(); foreach (var (key, channel) in channels) @@ -94,7 +94,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) { if (!IsConnected) { - await TryConnectAsync(cancellationToken); + await TryConnectAsync(cancellationToken).ConfigureAwait(false); } // create channel, declare a fanout exchange @@ -107,7 +107,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) var body = await SerializeAsync(scope: scope, @event: @event, registration: registration, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); // publish message string? scheduledId = null; @@ -165,7 +165,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) { if (!IsConnected) { - await TryConnectAsync(cancellationToken); + await TryConnectAsync(cancellationToken).ConfigureAwait(false); } // create channel, declare a fanout exchange @@ -181,7 +181,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) var body = await SerializeAsync(scope: scope, @event: @event, registration: registration, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); serializedEvents.Add((@event, @event.ContentType, body)); } @@ -256,7 +256,7 @@ private async Task ConnectConsumersAsync(CancellationToken cancellationToken) { if (!IsConnected) { - await TryConnectAsync(cancellationToken); + await TryConnectAsync(cancellationToken).ConfigureAwait(false); } var registrations = GetRegistrations(); @@ -267,7 +267,7 @@ private async Task ConnectConsumersAsync(CancellationToken cancellationToken) { var queueName = ecr.ConsumerName!; - var channel = await GetSubscriptionChannelAsync(exchangeName: exchangeName, queueName: queueName, cancellationToken); + var channel = await GetSubscriptionChannelAsync(exchangeName: exchangeName, queueName: queueName, cancellationToken).ConfigureAwait(false); var consumer = new AsyncEventingBasicConsumer(channel); consumer.Received += delegate (object sender, BasicDeliverEventArgs @event) { @@ -318,7 +318,7 @@ private async Task OnMessageReceivedAsync(EventRegistration r registration: reg, identifier: messageId, raw: args, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); Logger.LogInformation("Received message: '{MessageId}' containing Event '{Id}'", messageId, context.Id); @@ -326,7 +326,7 @@ private async Task OnMessageReceivedAsync(EventRegistration r ecr: ecr, @event: context, scope: scope, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); // Decide the action to execute then execute var action = DecideAction(successful, ecr.UnhandledErrorBehaviour); @@ -354,7 +354,7 @@ private async Task OnMessageReceivedAsync(EventRegistration r private async Task GetSubscriptionChannelAsync(string exchangeName, string queueName, CancellationToken cancellationToken) { - await subscriptionChannelsCacheLock.WaitAsync(cancellationToken); + await subscriptionChannelsCacheLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { @@ -389,7 +389,7 @@ private async Task GetSubscriptionChannelAsync(string exchangeName, stri private async Task TryConnectAsync(CancellationToken cancellationToken) { Logger.LogDebug("RabbitMQ Client is trying to connect."); - await connectionLock.WaitAsync(cancellationToken); + await connectionLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { diff --git a/src/Tingle.EventBus/EventBus.cs b/src/Tingle.EventBus/EventBus.cs index 1505b7c6..81a2a1ed 100644 --- a/src/Tingle.EventBus/EventBus.cs +++ b/src/Tingle.EventBus/EventBus.cs @@ -88,7 +88,7 @@ public EventBus(IEventIdGenerator idGenerator, return await transport.PublishAsync(@event: @event, registration: reg, scheduled: scheduled, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); } /// @@ -141,7 +141,7 @@ public EventBus(IEventIdGenerator idGenerator, return await transport.PublishAsync(events: events, registration: reg, scheduled: scheduled, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); } @@ -167,7 +167,7 @@ public async Task CancelAsync(string id, CancellationToken cancellationT // Cancel on the transport logger.CancelingEvent(id, transport.Name); - await transport.CancelAsync(id: id, registration: reg, cancellationToken: cancellationToken); + await transport.CancelAsync(id: id, registration: reg, cancellationToken: cancellationToken).ConfigureAwait(false); } /// @@ -192,7 +192,7 @@ public async Task CancelAsync(IList ids, CancellationToken cance // Cancel on the transport logger.CancelingEvents(ids, transport.Name); - await transport.CancelAsync(ids: ids, registration: reg, cancellationToken: cancellationToken); + await transport.CancelAsync(ids: ids, registration: reg, cancellationToken: cancellationToken).ConfigureAwait(false); } /// @@ -207,8 +207,8 @@ public async Task StartAsync(CancellationToken cancellationToken) try { logger.DelayedBusStartup(delay.Value); - await Task.Delay(delay.Value, cancellationToken); - await StartTransportsAsync(cancellationToken); + await Task.Delay(delay.Value, cancellationToken).ConfigureAwait(false); + await StartTransportsAsync(cancellationToken).ConfigureAwait(false); } catch (Exception ex) when (!(ex is OperationCanceledException || ex is TaskCanceledException)) // skip operation cancel @@ -219,7 +219,7 @@ public async Task StartAsync(CancellationToken cancellationToken) else { // Without a delay, just start the transports directly - await StartTransportsAsync(cancellationToken); + await StartTransportsAsync(cancellationToken).ConfigureAwait(false); } } @@ -229,7 +229,7 @@ private async Task StartTransportsAsync(CancellationToken cancellationToken) logger.StartingBus(transports.Count); foreach (var t in transports) { - await t.StartAsync(cancellationToken); + await t.StartAsync(cancellationToken).ConfigureAwait(false); } } @@ -239,7 +239,7 @@ public async Task StopAsync(CancellationToken cancellationToken) // Stop the transports in parallel logger.StoppingTransports(); var tasks = transports.Select(t => t.StopAsync(cancellationToken)); - await Task.WhenAll(tasks); + await Task.WhenAll(tasks).ConfigureAwait(false); } internal (EventRegistration registration, IEventBusTransport transport) GetTransportForEvent() diff --git a/src/Tingle.EventBus/EventBusHost.cs b/src/Tingle.EventBus/EventBusHost.cs index e6d741ff..03f0bcbb 100644 --- a/src/Tingle.EventBus/EventBusHost.cs +++ b/src/Tingle.EventBus/EventBusHost.cs @@ -34,15 +34,15 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) return; } - await bus.StartAsync(stoppingToken); + await bus.StartAsync(stoppingToken).ConfigureAwait(false); } /// public override async Task StopAsync(CancellationToken cancellationToken) { logger.StoppingBus(); - await base.StopAsync(cancellationToken); - await bus.StopAsync(cancellationToken); + await base.StopAsync(cancellationToken).ConfigureAwait(false); + await bus.StopAsync(cancellationToken).ConfigureAwait(false); } private static async Task WaitForAppStartupAsync(IHostApplicationLifetime lifetime, CancellationToken stoppingToken) diff --git a/src/Tingle.EventBus/Publisher/EventPublisher.cs b/src/Tingle.EventBus/Publisher/EventPublisher.cs index 63b0be26..7421ded5 100644 --- a/src/Tingle.EventBus/Publisher/EventPublisher.cs +++ b/src/Tingle.EventBus/Publisher/EventPublisher.cs @@ -22,7 +22,7 @@ public EventContext CreateEventContext(TEvent @event, string? co CancellationToken cancellationToken = default) where TEvent : class { - return await bus.PublishAsync(@event, scheduled, cancellationToken); + return await bus.PublishAsync(@event, scheduled, cancellationToken).ConfigureAwait(false); } /// @@ -31,18 +31,18 @@ public EventContext CreateEventContext(TEvent @event, string? co CancellationToken cancellationToken = default) where TEvent : class { - return await bus.PublishAsync(events: events, scheduled: scheduled, cancellationToken: cancellationToken); + return await bus.PublishAsync(events: events, scheduled: scheduled, cancellationToken: cancellationToken).ConfigureAwait(false); } /// public async Task CancelAsync(string id, CancellationToken cancellationToken = default) where TEvent : class { - await bus.CancelAsync(id: id, cancellationToken: cancellationToken); + await bus.CancelAsync(id: id, cancellationToken: cancellationToken).ConfigureAwait(false); } /// public async Task CancelAsync(IList ids, CancellationToken cancellationToken = default) where TEvent : class { - await bus.CancelAsync(ids: ids, cancellationToken: cancellationToken); + await bus.CancelAsync(ids: ids, cancellationToken: cancellationToken).ConfigureAwait(false); } } diff --git a/src/Tingle.EventBus/Serialization/AbstractEventSerializer.cs b/src/Tingle.EventBus/Serialization/AbstractEventSerializer.cs index 38deaec7..10860475 100644 --- a/src/Tingle.EventBus/Serialization/AbstractEventSerializer.cs +++ b/src/Tingle.EventBus/Serialization/AbstractEventSerializer.cs @@ -59,7 +59,7 @@ protected AbstractEventSerializer(IOptionsMonitor // Deserialize using var stream = context.Body.ToStream(); - var envelope = await DeserializeToEnvelopeAsync(stream: stream, context: context, cancellationToken: cancellationToken); + var envelope = await DeserializeToEnvelopeAsync(stream: stream, context: context, cancellationToken: cancellationToken).ConfigureAwait(false); if (envelope is null) { Logger.DeserializationResultedInNull(identifier: context.Identifier, eventType: context.Registration.EventType.FullName); @@ -111,12 +111,12 @@ public async Task SerializeAsync(SerializationContext context, // Serialize using var stream = new MemoryStream(); - await SerializeEnvelopeAsync(stream: stream, envelope: envelope, cancellationToken: cancellationToken); + await SerializeEnvelopeAsync(stream: stream, envelope: envelope, cancellationToken: cancellationToken).ConfigureAwait(false); // Return to the beginning of the stream stream.Seek(0, SeekOrigin.Begin); - context.Body = await BinaryData.FromStreamAsync(stream, cancellationToken); + context.Body = await BinaryData.FromStreamAsync(stream, cancellationToken).ConfigureAwait(false); } /// diff --git a/src/Tingle.EventBus/Serialization/DefaultJsonEventSerializer.cs b/src/Tingle.EventBus/Serialization/DefaultJsonEventSerializer.cs index 4044876a..aff6724f 100644 --- a/src/Tingle.EventBus/Serialization/DefaultJsonEventSerializer.cs +++ b/src/Tingle.EventBus/Serialization/DefaultJsonEventSerializer.cs @@ -30,7 +30,7 @@ public DefaultJsonEventSerializer(IOptionsMonitor var serializerOptions = OptionsAccessor.CurrentValue.SerializerOptions; return await JsonSerializer.DeserializeAsync>(utf8Json: stream, options: serializerOptions, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); } /// @@ -42,6 +42,6 @@ protected override async Task SerializeEnvelopeAsync(Stream stream, await JsonSerializer.SerializeAsync(utf8Json: stream, value: envelope, options: serializerOptions, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); } } diff --git a/src/Tingle.EventBus/Transports/EventBusTransportBase.cs b/src/Tingle.EventBus/Transports/EventBusTransportBase.cs index c84b250e..18b2fbd5 100644 --- a/src/Tingle.EventBus/Transports/EventBusTransportBase.cs +++ b/src/Tingle.EventBus/Transports/EventBusTransportBase.cs @@ -75,7 +75,7 @@ public EventBusTransportBase(IServiceScopeFactory scopeFactory, var retryPolicy = registration.RetryPolicy; if (retryPolicy != null) { - return await retryPolicy.ExecuteAsync(ct => PublishCoreAsync(@event, registration, scheduled, ct), cancellationToken); + return await retryPolicy.ExecuteAsync(ct => PublishCoreAsync(@event, registration, scheduled, ct), cancellationToken).ConfigureAwait(false); } else { @@ -94,7 +94,7 @@ public EventBusTransportBase(IServiceScopeFactory scopeFactory, var retryPolicy = registration.RetryPolicy; if (retryPolicy != null) { - return await retryPolicy.ExecuteAsync(ct => PublishCoreAsync(events, registration, scheduled, ct), cancellationToken); + return await retryPolicy.ExecuteAsync(ct => PublishCoreAsync(events, registration, scheduled, ct), cancellationToken).ConfigureAwait(false); } else { @@ -128,7 +128,7 @@ public virtual async Task CancelAsync(string id, EventRegistration regis var retryPolicy = registration.RetryPolicy; if (retryPolicy != null) { - await retryPolicy.ExecuteAsync(ct => CancelCoreAsync(id, registration, ct), cancellationToken); + await retryPolicy.ExecuteAsync(ct => CancelCoreAsync(id, registration, ct), cancellationToken).ConfigureAwait(false); } else { @@ -144,7 +144,7 @@ public virtual async Task CancelAsync(IList ids, EventRegistrati var retryPolicy = registration.RetryPolicy; if (retryPolicy != null) { - await retryPolicy.ExecuteAsync(ct => CancelCoreAsync(ids, registration, ct), cancellationToken); + await retryPolicy.ExecuteAsync(ct => CancelCoreAsync(ids, registration, ct), cancellationToken).ConfigureAwait(false); } else { @@ -216,7 +216,7 @@ protected async Task> DeserializeAsync(Deserializat var serializer = (IEventSerializer)ActivatorUtilities.GetServiceOrCreateInstance(ctx.ServiceProvider, registration.EventSerializerType!); // Deserialize the content into a context - var context = await serializer.DeserializeAsync(ctx, cancellationToken); + var context = await serializer.DeserializeAsync(ctx, cancellationToken).ConfigureAwait(false); // Ensure we are not null (throwing helps track the error) if (context is null) @@ -253,7 +253,7 @@ protected async Task> DeserializeAsync(IServiceScop ContentType = contentType, RawTransportData = raw, }; - return await DeserializeAsync(ctx, cancellationToken); + return await DeserializeAsync(ctx, cancellationToken).ConfigureAwait(false); } /// @@ -272,7 +272,7 @@ protected async Task SerializeAsync(SerializationContext ctx, var serializer = (IEventSerializer)ActivatorUtilities.GetServiceOrCreateInstance(ctx.ServiceProvider, registration.EventSerializerType!); // Serialize - await serializer.SerializeAsync(ctx, cancellationToken); + await serializer.SerializeAsync(ctx, cancellationToken).ConfigureAwait(false); } /// @@ -291,7 +291,7 @@ protected async Task SerializeAsync(IServiceScope scope, where TEvent : class { var ctx = new SerializationContext(scope.ServiceProvider, @event, registration); - await SerializeAsync(ctx, cancellationToken); + await SerializeAsync(ctx, cancellationToken).ConfigureAwait(false); return ctx.Body!; } @@ -328,7 +328,7 @@ protected async Task ConsumeAsync(EventRe var retryPolicy = registration.RetryPolicy; if (retryPolicy != null) { - await retryPolicy.ExecuteAsync(ct => consumer.ConsumeAsync(@event, ct), cancellationToken); + await retryPolicy.ExecuteAsync(ct => consumer.ConsumeAsync(@event, ct), cancellationToken).ConfigureAwait(false); } else {