Skip to content

Commit

Permalink
Added ConfigureAwait(false) with an analyzer
Browse files Browse the repository at this point in the history
  • Loading branch information
mburumaxwell committed Oct 14, 2022
1 parent 905141a commit 8363786
Show file tree
Hide file tree
Showing 19 changed files with 223 additions and 222 deletions.
1 change: 1 addition & 0 deletions src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ConfigureAwaitChecker.Analyzer" Version="5.0.0.1" PrivateAssets="All"/>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All"/>
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public AmazonKinesisTransport(IServiceScopeFactory serviceScopeFactory,
/// <inheritdoc/>
public override async Task StartAsync(CancellationToken cancellationToken)
{
await base.StartAsync(cancellationToken);
await base.StartAsync(cancellationToken).ConfigureAwait(false);

// if there are consumers for this transport, throw exception
var registrations = GetRegistrations();
Expand All @@ -51,7 +51,7 @@ public override async Task StartAsync(CancellationToken cancellationToken)
/// <inheritdoc/>
public override async Task StopAsync(CancellationToken cancellationToken)
{
await base.StopAsync(cancellationToken);
await base.StopAsync(cancellationToken).ConfigureAwait(false);

// if there are consumers for this transport, throw exception
var registrations = GetRegistrations();
Expand All @@ -78,7 +78,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
var body = await SerializeAsync(scope: scope,
@event: @event,
registration: registration,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken).ConfigureAwait(false);

// prepare the record
var streamName = registration.EventName!;
Expand All @@ -91,7 +91,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)

// send the event
Logger.SendingToStream(eventBusId: @event.Id, streamName: streamName, scheduled: scheduled);
var response = await kinesisClient.PutRecordAsync(request, cancellationToken);
var response = await kinesisClient.PutRecordAsync(request, cancellationToken).ConfigureAwait(false);
response.EnsureSuccess();

// return the sequence number
Expand Down Expand Up @@ -119,7 +119,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
var body = await SerializeAsync(scope: scope,
@event: @event,
registration: registration,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken).ConfigureAwait(false);

var record = new PutRecordsRequestEntry
{
Expand All @@ -139,7 +139,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)

// send the events
Logger.SendingEventsToStream(events, streamName, scheduled);
var response = await kinesisClient.PutRecordsAsync(request, cancellationToken);
var response = await kinesisClient.PutRecordsAsync(request, cancellationToken).ConfigureAwait(false);
response.EnsureSuccess();

// Should we check for failed records and throw exception?
Expand Down
66 changes: 33 additions & 33 deletions src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ public AzureEventHubsTransport(IServiceScopeFactory serviceScopeFactory,
/// <inheritdoc/>
public override async Task StartAsync(CancellationToken cancellationToken)
{
await base.StartAsync(cancellationToken);
await base.StartAsync(cancellationToken).ConfigureAwait(false);

var registrations = GetRegistrations();
foreach (var reg in registrations)
{
foreach (var ecr in reg.Consumers)
{
var processor = await GetProcessorAsync(reg: reg, ecr: ecr, cancellationToken: cancellationToken);
var processor = await GetProcessorAsync(reg: reg, ecr: ecr, cancellationToken: cancellationToken).ConfigureAwait(false);

// register handlers for error and processing
processor.PartitionClosingAsync += delegate (PartitionClosingEventArgs args)
Expand All @@ -73,15 +73,15 @@ public override async Task StartAsync(CancellationToken cancellationToken)
};

// start processing
await processor.StartProcessingAsync(cancellationToken: cancellationToken);
await processor.StartProcessingAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
}
}
}

/// <inheritdoc/>
public override async Task StopAsync(CancellationToken cancellationToken)
{
await base.StopAsync(cancellationToken);
await base.StopAsync(cancellationToken).ConfigureAwait(false);

var clients = processorsCache.Select(kvp => (key: kvp.Key, proc: kvp.Value)).ToList();
foreach (var (key, proc) in clients)
Expand All @@ -90,7 +90,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)

try
{
await proc.StopProcessingAsync(cancellationToken);
await proc.StopProcessingAsync(cancellationToken).ConfigureAwait(false);
processorsCache.Remove(key);

Logger.StoppedProcessor(processor: key);
Expand Down Expand Up @@ -124,7 +124,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
var body = await SerializeAsync(scope: scope,
@event: @event,
registration: registration,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken).ConfigureAwait(false);

var data = new EventData(body)
{
Expand All @@ -145,9 +145,9 @@ public override async Task StopAsync(CancellationToken cancellationToken)
.AddIfNotDefault(MetadataNames.ActivityId, Activity.Current?.Id);

// get the producer and send the event accordingly
var producer = await GetProducerAsync(reg: registration, deadletter: false, cancellationToken: cancellationToken);
var producer = await GetProducerAsync(reg: registration, deadletter: false, cancellationToken: cancellationToken).ConfigureAwait(false);
Logger.SendingEvent(eventBusId: @event.Id, eventHubName: producer.EventHubName, scheduled: scheduled);
await producer.SendAsync(new[] { data }, cancellationToken);
await producer.SendAsync(new[] { data }, cancellationToken).ConfigureAwait(false);

// return the sequence number
return scheduled != null ? new ScheduledResult(id: data.SequenceNumber, scheduled: scheduled.Value) : null;
Expand Down Expand Up @@ -178,7 +178,7 @@ public override async Task StopAsync(CancellationToken cancellationToken)
var body = await SerializeAsync(scope: scope,
@event: @event,
registration: registration,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken).ConfigureAwait(false);

var data = new EventData(body)
{
Expand All @@ -201,9 +201,9 @@ public override async Task StopAsync(CancellationToken cancellationToken)
}

// get the producer and send the events accordingly
var producer = await GetProducerAsync(reg: registration, deadletter: false, cancellationToken: cancellationToken);
var producer = await GetProducerAsync(reg: registration, deadletter: false, cancellationToken: cancellationToken).ConfigureAwait(false);
Logger.SendingEvents(events: events, eventHubName: producer.EventHubName, scheduled: scheduled);
await producer.SendAsync(datas, cancellationToken);
await producer.SendAsync(datas, cancellationToken).ConfigureAwait(false);

// return the sequence numbers
return scheduled != null ? datas.Select(m => new ScheduledResult(id: m.SequenceNumber, scheduled: scheduled.Value)).ToList() : null;
Expand All @@ -227,7 +227,7 @@ protected override Task CancelCoreAsync<TEvent>(IList<string> ids,

private async Task<EventHubProducerClient> GetProducerAsync(EventRegistration reg, bool deadletter, CancellationToken cancellationToken)
{
await producersCacheLock.WaitAsync(cancellationToken);
await producersCacheLock.WaitAsync(cancellationToken).ConfigureAwait(false);

try
{
Expand Down Expand Up @@ -277,7 +277,7 @@ private async Task<EventHubProducerClient> GetProducerAsync(EventRegistration re

private async Task<EventProcessorClient> GetProcessorAsync(EventRegistration reg, EventConsumerRegistration ecr, CancellationToken cancellationToken)
{
await processorsCacheLock.WaitAsync(cancellationToken);
await processorsCacheLock.WaitAsync(cancellationToken).ConfigureAwait(false);

try
{
Expand Down Expand Up @@ -405,7 +405,7 @@ private async Task OnEventReceivedAsync<TEvent, TConsumer>(EventRegistration reg
registration: reg,
identifier: data.SequenceNumber.ToString(),
raw: data,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken).ConfigureAwait(false);
Logger.ReceivedEvent(eventBusId: context.Id,
eventHubName: processor.EventHubName,
consumerGroup: processor.ConsumerGroup,
Expand All @@ -421,13 +421,13 @@ private async Task OnEventReceivedAsync<TEvent, TConsumer>(EventRegistration reg
ecr: ecr,
@event: context,
scope: scope,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken).ConfigureAwait(false);

if (!successful && ecr.UnhandledErrorBehaviour == UnhandledConsumerErrorBehaviour.Deadletter)
{
// get the producer for the dead letter event hub and send the event there
var dlqProcessor = await GetProducerAsync(reg: reg, deadletter: true, cancellationToken: cancellationToken);
await dlqProcessor.SendAsync(new[] { data }, cancellationToken);
var dlqProcessor = await GetProducerAsync(reg: reg, deadletter: true, cancellationToken: cancellationToken).ConfigureAwait(false);
await dlqProcessor.SendAsync(new[] { data }, cancellationToken).ConfigureAwait(false);
}

/*
Expand All @@ -441,7 +441,7 @@ private async Task OnEventReceivedAsync<TEvent, TConsumer>(EventRegistration reg
eventHubName: processor.EventHubName,
consumerGroup: processor.ConsumerGroup,
sequenceNumber: data.SequenceNumber);
await args.UpdateCheckpointAsync(args.CancellationToken);
await args.UpdateCheckpointAsync(args.CancellationToken).ConfigureAwait(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public IotHubEventSerializer(IOptionsMonitor<EventBusSerializationOptions> optio
telemetry = await JsonSerializer.DeserializeAsync(utf8Json: stream,
returnType: mapped.TelemetryType,
options: serializerOptions,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken).ConfigureAwait(false);
}
else if (source is IotHubEventMessageSource.TwinChangeEvents
or IotHubEventMessageSource.DeviceLifecycleEvents
Expand All @@ -62,7 +62,7 @@ or IotHubEventMessageSource.DeviceLifecycleEvents
var twinChangeEvent = await JsonSerializer.DeserializeAsync(utf8Json: stream,
returnType: mapped.TwinChangeEventType,
options: serializerOptions,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken).ConfigureAwait(false);

var twinChangeOpEventType = typeof(IotHubOperationalEvent<>).MakeGenericType(mapped.TwinChangeEventType);
twinChange = Activator.CreateInstance(twinChangeOpEventType, new[] { hubName, deviceId, moduleId, type, operationTimestamp, twinChangeEvent, });
Expand All @@ -72,7 +72,7 @@ or IotHubEventMessageSource.DeviceLifecycleEvents
var lifecycleEvent = await JsonSerializer.DeserializeAsync(utf8Json: stream,
returnType: mapped.LifecycleEventType,
options: serializerOptions,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken).ConfigureAwait(false);

var lifecycleOpEventType = typeof(IotHubOperationalEvent<>).MakeGenericType(mapped.LifecycleEventType);
lifecycle = Activator.CreateInstance(lifecycleOpEventType, new[] { hubName, deviceId, moduleId, type, operationTimestamp, lifecycleEvent, });
Expand All @@ -82,7 +82,7 @@ or IotHubEventMessageSource.DeviceLifecycleEvents
var connectionStateEvent = await JsonSerializer.DeserializeAsync<IotHubDeviceConnectionStateEvent>(
utf8Json: stream,
options: serializerOptions,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken).ConfigureAwait(false);

connectionState = new IotHubOperationalEvent<IotHubDeviceConnectionStateEvent>(
hubName: hubName,
Expand Down
Loading

0 comments on commit 8363786

Please sign in to comment.