Skip to content

Commit

Permalink
Support for Azure IoT Hub (#379)
Browse files Browse the repository at this point in the history
Reading messages from Azure IoT Hub is done using the Event Hubs compatible endpoint by default. This Pr adds convenience extensions for working with Azure IoT Hub in the `Tingle.EventBus.Transports.EventHubs` project/package.

In addition, a functional sample is added.

Other changes:
1.  Logging in EventHubs is streamlined further.
2. `MessageId, `CorrelationId`, and `ContentType` headers in the `EventData` are now used instead of own properties.
  • Loading branch information
mburumaxwell authored Dec 27, 2021
1 parent 6214698 commit 67d05d0
Show file tree
Hide file tree
Showing 17 changed files with 493 additions and 52 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ A number fo the documents below are still a work in progress and would be added

#### How to ...

* [Work with IoTHub](docs/Work-with-IoTHub.md)
* [Work with Azure IoT Hub](docs/Work-with-Azure-IoT-Hub.md)
* [Advanced Service Bus options](docs/Advanced-Service-Bus-options.md)
* [Work with Feature Management](docs/Work-with-Feature-Management.md)
* [Extend event configuration](docs/Extend-Event-Configuration.md)
Expand Down
7 changes: 7 additions & 0 deletions Tingle.EventBus.sln
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{62F6
samples\Directory.Build.props = samples\Directory.Build.props
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AzureIotHub", "samples\AzureIotHub\AzureIotHub.csproj", "{3759B206-BF8D-4E46-9B04-1C19F156D295}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CustomEventConfigurator", "samples\CustomEventConfigurator\CustomEventConfigurator.csproj", "{8C0EE13F-701F-45EF-BADF-6B7A22AA6785}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CustomEventSerializer", "samples\CustomEventSerializer\CustomEventSerializer.csproj", "{2C55FABC-8C94-4104-BE05-42A477D2AD9E}"
Expand Down Expand Up @@ -134,6 +136,10 @@ Global
{E4D62A60-39E4-401E-B146-0EA8DA272664}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E4D62A60-39E4-401E-B146-0EA8DA272664}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E4D62A60-39E4-401E-B146-0EA8DA272664}.Release|Any CPU.Build.0 = Release|Any CPU
{3759B206-BF8D-4E46-9B04-1C19F156D295}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3759B206-BF8D-4E46-9B04-1C19F156D295}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3759B206-BF8D-4E46-9B04-1C19F156D295}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3759B206-BF8D-4E46-9B04-1C19F156D295}.Release|Any CPU.Build.0 = Release|Any CPU
{8C0EE13F-701F-45EF-BADF-6B7A22AA6785}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{8C0EE13F-701F-45EF-BADF-6B7A22AA6785}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8C0EE13F-701F-45EF-BADF-6B7A22AA6785}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -183,6 +189,7 @@ Global
{B52EE5B6-D86E-4415-8688-E835220AD219} = {BDD324B6-9EFC-49A3-9CF6-6CE494446C4B}
{C2E8CDC9-E607-4DDF-9796-98EF9CAFC65D} = {BDD324B6-9EFC-49A3-9CF6-6CE494446C4B}
{E4D62A60-39E4-401E-B146-0EA8DA272664} = {BDD324B6-9EFC-49A3-9CF6-6CE494446C4B}
{3759B206-BF8D-4E46-9B04-1C19F156D295} = {62F603F3-FF36-4E36-AC0C-08D1883525BE}
{8C0EE13F-701F-45EF-BADF-6B7A22AA6785} = {62F603F3-FF36-4E36-AC0C-08D1883525BE}
{2C55FABC-8C94-4104-BE05-42A477D2AD9E} = {62F603F3-FF36-4E36-AC0C-08D1883525BE}
{C9293277-90BA-4F1A-BEA7-85CE41103B8D} = {62F603F3-FF36-4E36-AC0C-08D1883525BE}
Expand Down
26 changes: 26 additions & 0 deletions samples/AzureIotHub/AzureIotEventsConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace AzureIotHub;

internal class AzureIotEventsConsumer : IEventConsumer<MyIotHubEvent>
{
private readonly ILogger logger;

public AzureIotEventsConsumer(ILogger<AzureIotEventsConsumer> logger)
{
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public Task ConsumeAsync(EventContext<MyIotHubEvent> context, CancellationToken cancellationToken)
{
var deviceId = context.GetIotHubDeviceId();
var source = context.GetIotHubMessageSource();
var enqueued = context.GetIotHubEnqueuedTime();

logger.LogInformation("Received {Source} from {DeviceId}\r\nEnqueued: {EnqueuedTime}\r\nTimestamped: {Timestamp}.",
source,
deviceId,
enqueued,
context.Event.Timestamp);

return Task.CompletedTask;
}
}
15 changes: 15 additions & 0 deletions samples/AzureIotHub/AzureIotHub.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk.Worker">

<PropertyGroup>
<UserSecretsId>710bcdca-052b-456d-be46-4efede662bb2</UserSecretsId>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.UserSecrets" Version="6.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Tingle.EventBus.Transports.Azure.EventHubs\Tingle.EventBus.Transports.Azure.EventHubs.csproj" />
</ItemGroup>

</Project>
14 changes: 14 additions & 0 deletions samples/AzureIotHub/MyIotHubEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System.Text.Json.Nodes;
using System.Text.Json.Serialization;
using Tingle.EventBus.Configuration;

namespace AzureIotHub;

[EventSerializer(typeof(MyIotHubEventSerializer))]
internal class MyIotHubEvent
{
public DateTimeOffset Timestamp { get; set; }

[JsonExtensionData]
public JsonObject? Extras { get; set; }
}
37 changes: 37 additions & 0 deletions samples/AzureIotHub/MyIotHubEventSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using Microsoft.Extensions.Options;
using System.Net.Mime;
using System.Text.Json;
using Tingle.EventBus.Serialization;

namespace AzureIotHub;

internal class MyIotHubEventSerializer : AbstractEventSerializer
{
public MyIotHubEventSerializer(IOptionsMonitor<EventBusOptions> optionsAccessor,
ILoggerFactory loggerFactory)
: base(optionsAccessor, loggerFactory) { }

/// <inheritdoc/>
protected override IList<string> SupportedMediaTypes => JsonContentTypes;

/// <inheritdoc/>
protected override async Task<IEventEnvelope<T>?> DeserializeToEnvelopeAsync<T>(Stream stream,
ContentType? contentType,
CancellationToken cancellationToken = default)
{
var serializerOptions = OptionsAccessor.CurrentValue.SerializerOptions;
var @event = await JsonSerializer.DeserializeAsync<T>(utf8Json: stream,
options: serializerOptions,
cancellationToken: cancellationToken);

return new EventEnvelope<T> { Event = @event, };
}

/// <inheritdoc/>
protected override Task SerializeEnvelopeAsync<T>(Stream stream,
EventEnvelope<T> envelope,
CancellationToken cancellationToken = default)
{
throw new NotSupportedException("Serialization of IotHubEvent events should never happen.");
}
}
28 changes: 28 additions & 0 deletions samples/AzureIotHub/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using AzureIotHub;
using Tingle.EventBus.Configuration;

var host = Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
var configuration = hostContext.Configuration;

services.AddEventBus(builder =>
{
builder.Configure(o => o.ConfigureEvent<MyIotHubEvent>(reg => reg.ConfigureAsIotHubEvent(configuration["IotHubEventHubName"])));

builder.AddConsumer<AzureIotEventsConsumer>();

// setup extra serializers
builder.Services.AddSingleton<MyIotHubEventSerializer>();

// Transport specific configuration
builder.AddAzureEventHubsTransport(options =>
{
options.Credentials = configuration.GetConnectionString("EventHub");
options.BlobStorageCredentials = configuration.GetConnectionString("AzureStorage");
});
});
})
.Build();

await host.RunAsync();
10 changes: 10 additions & 0 deletions samples/AzureIotHub/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"profiles": {
"AzureIotHub": {
"commandName": "Project",
"environmentVariables": {
"DOTNET_ENVIRONMENT": "Development"
}
}
}
}
9 changes: 9 additions & 0 deletions samples/AzureIotHub/appsettings.Development.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"Logging": {
"LogLevel": {
"Default": "Debug",
"Microsoft": "Information",
"System": "Information"
}
}
}
14 changes: 14 additions & 0 deletions samples/AzureIotHub/appsettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"AllowedHosts": "*",

"ConnectionStrings:AzureStorage": "UseDevelopmentStorage=true;",
"ConnectionStrings:EventHub": "Endpoint=sb://abcd.servicebus.windows.net/;SharedAccessKeyName=xyz;SharedAccessKey=AAAAAAAAAAAAAAAAAAAAAA==",
"IotHubEventHubName": "iothub-ehub-test-dev-0000000-0aaaa000aa"
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,19 @@ public override async Task StopAsync(CancellationToken cancellationToken)
registration: registration,
cancellationToken: cancellationToken);

var data = new EventData(body);
data.Properties.AddIfNotDefault(MetadataNames.Id, @event.Id)
.AddIfNotDefault(MetadataNames.CorrelationId, @event.CorrelationId)
.AddIfNotDefault(MetadataNames.ContentType, @event.ContentType?.ToString())
.AddIfNotDefault(MetadataNames.RequestId, @event.RequestId)
var data = new EventData(body)
{
MessageId = @event.Id,
ContentType = @event.ContentType?.ToString(),
};

// If CorrelationId is present, set it
if (@event.CorrelationId != null)
{
data.CorrelationId = @event.CorrelationId;
}

data.Properties.AddIfNotDefault(MetadataNames.RequestId, @event.RequestId)
.AddIfNotDefault(MetadataNames.InitiatorId, @event.InitiatorId)
.AddIfNotDefault(MetadataNames.EventName, registration.EventName)
.AddIfNotDefault(MetadataNames.EventType, registration.EventType.FullName)
Expand Down Expand Up @@ -172,11 +180,19 @@ public override async Task StopAsync(CancellationToken cancellationToken)
registration: registration,
cancellationToken: cancellationToken);

var data = new EventData(body);
data.Properties.AddIfNotDefault(MetadataNames.Id, @event.Id)
.AddIfNotDefault(MetadataNames.CorrelationId, @event.CorrelationId)
.AddIfNotDefault(MetadataNames.ContentType, @event.ContentType?.ToString())
.AddIfNotDefault(MetadataNames.RequestId, @event.RequestId)
var data = new EventData(body)
{
MessageId = @event.Id,
ContentType = @event.ContentType?.ToString(),
};

// If CorrelationId is present, set it
if (@event.CorrelationId != null)
{
data.CorrelationId = @event.CorrelationId;
}

data.Properties.AddIfNotDefault(MetadataNames.RequestId, @event.RequestId)
.AddIfNotDefault(MetadataNames.InitiatorId, @event.InitiatorId)
.AddIfNotDefault(MetadataNames.EventName, registration.EventName)
.AddIfNotDefault(MetadataNames.EventType, registration.EventType.FullName)
Expand Down Expand Up @@ -265,8 +281,12 @@ private async Task<EventProcessorClient> GetProcessorAsync(EventRegistration reg

try
{
var eventHubName = reg.EventName;
var consumerGroup = TransportOptions.UseBasicTier ? EventHubConsumerClient.DefaultConsumerGroupName : ecr.ConsumerName;
// For events configured as sourced from IoT Hub,
// 1. The event hub name is in the metadata
// 2. The ConsumerGroup is set to $Default (this may be changed to support more)
var isIotHub = reg.IsConfiguredAsIotHub();
var eventHubName = isIotHub ? reg.GetIotHubEventHubName() : reg.EventName;
var consumerGroup = isIotHub || TransportOptions.UseBasicTier ? EventHubConsumerClient.DefaultConsumerGroupName : ecr.ConsumerName;

var key = $"{eventHubName}/{consumerGroup}";
if (!processorsCache.TryGetValue(key, out var processor))
Expand All @@ -290,7 +310,7 @@ private async Task<EventProcessorClient> GetProcessorAsync(EventRegistration reg
? new BlobContainerClient(blobContainerUri: new Uri($"{abstc.BlobServiceUrl}/{TransportOptions.BlobContainerName}"),
credential: abstc.TokenCredential)
: new BlobContainerClient(connectionString: (string)cred_bs,
blobContainerName: TransportOptions.BlobContainerName);
blobContainerName: TransportOptions.BlobContainerName);

// Create the processor client options
var epco = new EventProcessorClientOptions
Expand All @@ -312,16 +332,16 @@ private async Task<EventProcessorClient> GetProcessorAsync(EventRegistration reg
var cred = TransportOptions.Credentials!.Value!;
processor = cred is AzureEventHubsTransportCredentials aehtc
? new EventProcessorClient(checkpointStore: blobContainerClient,
consumerGroup: consumerGroup,
fullyQualifiedNamespace: aehtc.FullyQualifiedNamespace,
eventHubName: eventHubName,
credential: aehtc.TokenCredential,
clientOptions: epco)
consumerGroup: consumerGroup,
fullyQualifiedNamespace: aehtc.FullyQualifiedNamespace,
eventHubName: eventHubName,
credential: aehtc.TokenCredential,
clientOptions: epco)
: new EventProcessorClient(checkpointStore: blobContainerClient,
consumerGroup: consumerGroup,
connectionString: (string)cred,
eventHubName: eventHubName,
clientOptions: epco);
consumerGroup: consumerGroup,
connectionString: (string)cred,
eventHubName: eventHubName,
clientOptions: epco);

processorsCache[key] = processor;
}
Expand Down Expand Up @@ -354,16 +374,14 @@ private async Task OnEventReceivedAsync<TEvent, TConsumer>(EventRegistration reg

var data = args.Data;
var cancellationToken = args.CancellationToken;
var messageId = data.MessageId;

data.Properties.TryGetValue(MetadataNames.Id, out var eventId);
data.Properties.TryGetValue(MetadataNames.CorrelationId, out var correlationId);
data.Properties.TryGetValue(MetadataNames.ContentType, out var contentType_str);
data.Properties.TryGetValue(MetadataNames.EventName, out var eventName);
data.Properties.TryGetValue(MetadataNames.EventType, out var eventType);
data.Properties.TryGetValue(MetadataNames.ActivityId, out var parentActivityId);

using var log_scope = BeginLoggingScopeForConsume(id: eventId?.ToString(),
correlationId: correlationId?.ToString(),
using var log_scope = BeginLoggingScopeForConsume(id: messageId,
correlationId: data.CorrelationId,
sequenceNumber: data.SequenceNumber.ToString(),
extras: new Dictionary<string, string?>
{
Expand All @@ -381,24 +399,24 @@ private async Task OnEventReceivedAsync<TEvent, TConsumer>(EventRegistration reg
activity?.AddTag(ActivityTagNames.MessagingSystem, Name);
activity?.AddTag(ActivityTagNames.MessagingDestination, processor.EventHubName);

Logger.ProcessingEvent(eventId: eventId,
partitionKey: data.PartitionKey,
sequenceNumber: data.SequenceNumber,
Logger.ProcessingEvent(messageId: messageId,
eventHubName: processor.EventHubName,
consumerGroup: processor.ConsumerGroup);
consumerGroup: processor.ConsumerGroup,
partitionKey: data.PartitionKey,
sequenceNumber: data.SequenceNumber);
using var scope = CreateScope();
var contentType = contentType_str is string ctts ? new ContentType(ctts) : null;
var contentType = new ContentType(data.ContentType);
var context = await DeserializeAsync<TEvent>(scope: scope,
body: data.EventBody,
contentType: contentType,
registration: reg,
identifier: data.SequenceNumber.ToString(),
cancellationToken: cancellationToken);
Logger.ReceivedEvent(eventId: context.Id,
partitionKey: data.PartitionKey,
sequenceNumber: data.SequenceNumber,
eventHubName: processor.EventHubName,
consumerGroup: processor.ConsumerGroup);
consumerGroup: processor.ConsumerGroup,
partitionKey: data.PartitionKey,
sequenceNumber: data.SequenceNumber);

// set the extras
context.SetConsumerGroup(processor.ConsumerGroup)
Expand Down Expand Up @@ -426,8 +444,7 @@ private async Task OnEventReceivedAsync<TEvent, TConsumer>(EventRegistration reg
Logger.Checkpointing(partition: args.Partition,
eventHubName: processor.EventHubName,
consumerGroup: processor.ConsumerGroup,
sequenceNumber: data.SequenceNumber,
eventId: eventId);
sequenceNumber: data.SequenceNumber);
await args.UpdateCheckpointAsync(args.CancellationToken);
}
}
Expand All @@ -443,15 +460,16 @@ private Task OnPartitionClosingAsync(EventProcessorClient processor, PartitionCl

private Task OnPartitionInitializingAsync(EventProcessorClient processor, PartitionInitializingEventArgs args)
{
Logger.OpeningProcessor(eventHubName: args.PartitionId,
consumerGroup: processor.EventHubName,
partitionId: processor.ConsumerGroup,
Logger.OpeningProcessor(eventHubName: processor.EventHubName,
consumerGroup: processor.ConsumerGroup,
partitionId: args.PartitionId,
position: args.DefaultStartingPosition);
return Task.CompletedTask;
}

private Task OnProcessErrorAsync(EventProcessorClient processor, ProcessErrorEventArgs args)
{
// TODO: decide on whether to restart (Stop() then Start()) or terminate (recreate processor) processing
Logger.ProcessingError(operation: args.Operation,
eventHubName: processor.EventHubName,
consumerGroup: processor.ConsumerGroup,
Expand Down
Loading

0 comments on commit 67d05d0

Please sign in to comment.