diff --git a/README.md b/README.md index a602f3bc..b23b4229 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ A number fo the documents below are still a work in progress and would be added #### How to ... -* [Work with IoTHub](docs/Work-with-IoTHub.md) +* [Work with Azure IoT Hub](docs/Work-with-Azure-IoT-Hub.md) * [Advanced Service Bus options](docs/Advanced-Service-Bus-options.md) * [Work with Feature Management](docs/Work-with-Feature-Management.md) * [Extend event configuration](docs/Extend-Event-Configuration.md) diff --git a/Tingle.EventBus.sln b/Tingle.EventBus.sln index 6cef390e..35af4aa2 100644 --- a/Tingle.EventBus.sln +++ b/Tingle.EventBus.sln @@ -50,6 +50,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{62F6 samples\Directory.Build.props = samples\Directory.Build.props EndProjectSection EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AzureIotHub", "samples\AzureIotHub\AzureIotHub.csproj", "{3759B206-BF8D-4E46-9B04-1C19F156D295}" +EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CustomEventConfigurator", "samples\CustomEventConfigurator\CustomEventConfigurator.csproj", "{8C0EE13F-701F-45EF-BADF-6B7A22AA6785}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CustomEventSerializer", "samples\CustomEventSerializer\CustomEventSerializer.csproj", "{2C55FABC-8C94-4104-BE05-42A477D2AD9E}" @@ -134,6 +136,10 @@ Global {E4D62A60-39E4-401E-B146-0EA8DA272664}.Debug|Any CPU.Build.0 = Debug|Any CPU {E4D62A60-39E4-401E-B146-0EA8DA272664}.Release|Any CPU.ActiveCfg = Release|Any CPU {E4D62A60-39E4-401E-B146-0EA8DA272664}.Release|Any CPU.Build.0 = Release|Any CPU + {3759B206-BF8D-4E46-9B04-1C19F156D295}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {3759B206-BF8D-4E46-9B04-1C19F156D295}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3759B206-BF8D-4E46-9B04-1C19F156D295}.Release|Any CPU.ActiveCfg = Release|Any CPU + {3759B206-BF8D-4E46-9B04-1C19F156D295}.Release|Any CPU.Build.0 = Release|Any CPU {8C0EE13F-701F-45EF-BADF-6B7A22AA6785}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {8C0EE13F-701F-45EF-BADF-6B7A22AA6785}.Debug|Any CPU.Build.0 = Debug|Any CPU {8C0EE13F-701F-45EF-BADF-6B7A22AA6785}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -183,6 +189,7 @@ Global {B52EE5B6-D86E-4415-8688-E835220AD219} = {BDD324B6-9EFC-49A3-9CF6-6CE494446C4B} {C2E8CDC9-E607-4DDF-9796-98EF9CAFC65D} = {BDD324B6-9EFC-49A3-9CF6-6CE494446C4B} {E4D62A60-39E4-401E-B146-0EA8DA272664} = {BDD324B6-9EFC-49A3-9CF6-6CE494446C4B} + {3759B206-BF8D-4E46-9B04-1C19F156D295} = {62F603F3-FF36-4E36-AC0C-08D1883525BE} {8C0EE13F-701F-45EF-BADF-6B7A22AA6785} = {62F603F3-FF36-4E36-AC0C-08D1883525BE} {2C55FABC-8C94-4104-BE05-42A477D2AD9E} = {62F603F3-FF36-4E36-AC0C-08D1883525BE} {C9293277-90BA-4F1A-BEA7-85CE41103B8D} = {62F603F3-FF36-4E36-AC0C-08D1883525BE} diff --git a/samples/AzureIotHub/AzureIotEventsConsumer.cs b/samples/AzureIotHub/AzureIotEventsConsumer.cs new file mode 100644 index 00000000..035210cb --- /dev/null +++ b/samples/AzureIotHub/AzureIotEventsConsumer.cs @@ -0,0 +1,26 @@ +namespace AzureIotHub; + +internal class AzureIotEventsConsumer : IEventConsumer +{ + private readonly ILogger logger; + + public AzureIotEventsConsumer(ILogger logger) + { + this.logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public Task ConsumeAsync(EventContext context, CancellationToken cancellationToken) + { + var deviceId = context.GetIotHubDeviceId(); + var source = context.GetIotHubMessageSource(); + var enqueued = context.GetIotHubEnqueuedTime(); + + logger.LogInformation("Received {Source} from {DeviceId}\r\nEnqueued: {EnqueuedTime}\r\nTimestamped: {Timestamp}.", + source, + deviceId, + enqueued, + context.Event.Timestamp); + + return Task.CompletedTask; + } +} diff --git a/samples/AzureIotHub/AzureIotHub.csproj b/samples/AzureIotHub/AzureIotHub.csproj new file mode 100644 index 00000000..1902def4 --- /dev/null +++ b/samples/AzureIotHub/AzureIotHub.csproj @@ -0,0 +1,15 @@ + + + + 710bcdca-052b-456d-be46-4efede662bb2 + + + + + + + + + + + diff --git a/samples/AzureIotHub/MyIotHubEvent.cs b/samples/AzureIotHub/MyIotHubEvent.cs new file mode 100644 index 00000000..28ca9557 --- /dev/null +++ b/samples/AzureIotHub/MyIotHubEvent.cs @@ -0,0 +1,14 @@ +using System.Text.Json.Nodes; +using System.Text.Json.Serialization; +using Tingle.EventBus.Configuration; + +namespace AzureIotHub; + +[EventSerializer(typeof(MyIotHubEventSerializer))] +internal class MyIotHubEvent +{ + public DateTimeOffset Timestamp { get; set; } + + [JsonExtensionData] + public JsonObject? Extras { get; set; } +} diff --git a/samples/AzureIotHub/MyIotHubEventSerializer.cs b/samples/AzureIotHub/MyIotHubEventSerializer.cs new file mode 100644 index 00000000..fd700dcd --- /dev/null +++ b/samples/AzureIotHub/MyIotHubEventSerializer.cs @@ -0,0 +1,37 @@ +using Microsoft.Extensions.Options; +using System.Net.Mime; +using System.Text.Json; +using Tingle.EventBus.Serialization; + +namespace AzureIotHub; + +internal class MyIotHubEventSerializer : AbstractEventSerializer +{ + public MyIotHubEventSerializer(IOptionsMonitor optionsAccessor, + ILoggerFactory loggerFactory) + : base(optionsAccessor, loggerFactory) { } + + /// + protected override IList SupportedMediaTypes => JsonContentTypes; + + /// + protected override async Task?> DeserializeToEnvelopeAsync(Stream stream, + ContentType? contentType, + CancellationToken cancellationToken = default) + { + var serializerOptions = OptionsAccessor.CurrentValue.SerializerOptions; + var @event = await JsonSerializer.DeserializeAsync(utf8Json: stream, + options: serializerOptions, + cancellationToken: cancellationToken); + + return new EventEnvelope { Event = @event, }; + } + + /// + protected override Task SerializeEnvelopeAsync(Stream stream, + EventEnvelope envelope, + CancellationToken cancellationToken = default) + { + throw new NotSupportedException("Serialization of IotHubEvent events should never happen."); + } +} diff --git a/samples/AzureIotHub/Program.cs b/samples/AzureIotHub/Program.cs new file mode 100644 index 00000000..bc678cee --- /dev/null +++ b/samples/AzureIotHub/Program.cs @@ -0,0 +1,28 @@ +using AzureIotHub; +using Tingle.EventBus.Configuration; + +var host = Host.CreateDefaultBuilder(args) + .ConfigureServices((hostContext, services) => + { + var configuration = hostContext.Configuration; + + services.AddEventBus(builder => + { + builder.Configure(o => o.ConfigureEvent(reg => reg.ConfigureAsIotHubEvent(configuration["IotHubEventHubName"]))); + + builder.AddConsumer(); + + // setup extra serializers + builder.Services.AddSingleton(); + + // Transport specific configuration + builder.AddAzureEventHubsTransport(options => + { + options.Credentials = configuration.GetConnectionString("EventHub"); + options.BlobStorageCredentials = configuration.GetConnectionString("AzureStorage"); + }); + }); + }) + .Build(); + +await host.RunAsync(); diff --git a/samples/AzureIotHub/Properties/launchSettings.json b/samples/AzureIotHub/Properties/launchSettings.json new file mode 100644 index 00000000..05ffa069 --- /dev/null +++ b/samples/AzureIotHub/Properties/launchSettings.json @@ -0,0 +1,10 @@ +{ + "profiles": { + "AzureIotHub": { + "commandName": "Project", + "environmentVariables": { + "DOTNET_ENVIRONMENT": "Development" + } + } + } +} \ No newline at end of file diff --git a/samples/AzureIotHub/appsettings.Development.json b/samples/AzureIotHub/appsettings.Development.json new file mode 100644 index 00000000..14b27546 --- /dev/null +++ b/samples/AzureIotHub/appsettings.Development.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Debug", + "Microsoft": "Information", + "System": "Information" + } + } +} diff --git a/samples/AzureIotHub/appsettings.json b/samples/AzureIotHub/appsettings.json new file mode 100644 index 00000000..03fe9621 --- /dev/null +++ b/samples/AzureIotHub/appsettings.json @@ -0,0 +1,14 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft": "Warning", + "Microsoft.Hosting.Lifetime": "Information" + } + }, + "AllowedHosts": "*", + + "ConnectionStrings:AzureStorage": "UseDevelopmentStorage=true;", + "ConnectionStrings:EventHub": "Endpoint=sb://abcd.servicebus.windows.net/;SharedAccessKeyName=xyz;SharedAccessKey=AAAAAAAAAAAAAAAAAAAAAA==", + "IotHubEventHubName": "iothub-ehub-test-dev-0000000-0aaaa000aa" +} diff --git a/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs b/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs index 82201425..61ee122c 100644 --- a/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs +++ b/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs @@ -126,11 +126,19 @@ public override async Task StopAsync(CancellationToken cancellationToken) registration: registration, cancellationToken: cancellationToken); - var data = new EventData(body); - data.Properties.AddIfNotDefault(MetadataNames.Id, @event.Id) - .AddIfNotDefault(MetadataNames.CorrelationId, @event.CorrelationId) - .AddIfNotDefault(MetadataNames.ContentType, @event.ContentType?.ToString()) - .AddIfNotDefault(MetadataNames.RequestId, @event.RequestId) + var data = new EventData(body) + { + MessageId = @event.Id, + ContentType = @event.ContentType?.ToString(), + }; + + // If CorrelationId is present, set it + if (@event.CorrelationId != null) + { + data.CorrelationId = @event.CorrelationId; + } + + data.Properties.AddIfNotDefault(MetadataNames.RequestId, @event.RequestId) .AddIfNotDefault(MetadataNames.InitiatorId, @event.InitiatorId) .AddIfNotDefault(MetadataNames.EventName, registration.EventName) .AddIfNotDefault(MetadataNames.EventType, registration.EventType.FullName) @@ -172,11 +180,19 @@ public override async Task StopAsync(CancellationToken cancellationToken) registration: registration, cancellationToken: cancellationToken); - var data = new EventData(body); - data.Properties.AddIfNotDefault(MetadataNames.Id, @event.Id) - .AddIfNotDefault(MetadataNames.CorrelationId, @event.CorrelationId) - .AddIfNotDefault(MetadataNames.ContentType, @event.ContentType?.ToString()) - .AddIfNotDefault(MetadataNames.RequestId, @event.RequestId) + var data = new EventData(body) + { + MessageId = @event.Id, + ContentType = @event.ContentType?.ToString(), + }; + + // If CorrelationId is present, set it + if (@event.CorrelationId != null) + { + data.CorrelationId = @event.CorrelationId; + } + + data.Properties.AddIfNotDefault(MetadataNames.RequestId, @event.RequestId) .AddIfNotDefault(MetadataNames.InitiatorId, @event.InitiatorId) .AddIfNotDefault(MetadataNames.EventName, registration.EventName) .AddIfNotDefault(MetadataNames.EventType, registration.EventType.FullName) @@ -265,8 +281,12 @@ private async Task GetProcessorAsync(EventRegistration reg try { - var eventHubName = reg.EventName; - var consumerGroup = TransportOptions.UseBasicTier ? EventHubConsumerClient.DefaultConsumerGroupName : ecr.ConsumerName; + // For events configured as sourced from IoT Hub, + // 1. The event hub name is in the metadata + // 2. The ConsumerGroup is set to $Default (this may be changed to support more) + var isIotHub = reg.IsConfiguredAsIotHub(); + var eventHubName = isIotHub ? reg.GetIotHubEventHubName() : reg.EventName; + var consumerGroup = isIotHub || TransportOptions.UseBasicTier ? EventHubConsumerClient.DefaultConsumerGroupName : ecr.ConsumerName; var key = $"{eventHubName}/{consumerGroup}"; if (!processorsCache.TryGetValue(key, out var processor)) @@ -290,7 +310,7 @@ private async Task GetProcessorAsync(EventRegistration reg ? new BlobContainerClient(blobContainerUri: new Uri($"{abstc.BlobServiceUrl}/{TransportOptions.BlobContainerName}"), credential: abstc.TokenCredential) : new BlobContainerClient(connectionString: (string)cred_bs, - blobContainerName: TransportOptions.BlobContainerName); + blobContainerName: TransportOptions.BlobContainerName); // Create the processor client options var epco = new EventProcessorClientOptions @@ -312,16 +332,16 @@ private async Task GetProcessorAsync(EventRegistration reg var cred = TransportOptions.Credentials!.Value!; processor = cred is AzureEventHubsTransportCredentials aehtc ? new EventProcessorClient(checkpointStore: blobContainerClient, - consumerGroup: consumerGroup, - fullyQualifiedNamespace: aehtc.FullyQualifiedNamespace, - eventHubName: eventHubName, - credential: aehtc.TokenCredential, - clientOptions: epco) + consumerGroup: consumerGroup, + fullyQualifiedNamespace: aehtc.FullyQualifiedNamespace, + eventHubName: eventHubName, + credential: aehtc.TokenCredential, + clientOptions: epco) : new EventProcessorClient(checkpointStore: blobContainerClient, - consumerGroup: consumerGroup, - connectionString: (string)cred, - eventHubName: eventHubName, - clientOptions: epco); + consumerGroup: consumerGroup, + connectionString: (string)cred, + eventHubName: eventHubName, + clientOptions: epco); processorsCache[key] = processor; } @@ -354,16 +374,14 @@ private async Task OnEventReceivedAsync(EventRegistration reg var data = args.Data; var cancellationToken = args.CancellationToken; + var messageId = data.MessageId; - data.Properties.TryGetValue(MetadataNames.Id, out var eventId); - data.Properties.TryGetValue(MetadataNames.CorrelationId, out var correlationId); - data.Properties.TryGetValue(MetadataNames.ContentType, out var contentType_str); data.Properties.TryGetValue(MetadataNames.EventName, out var eventName); data.Properties.TryGetValue(MetadataNames.EventType, out var eventType); data.Properties.TryGetValue(MetadataNames.ActivityId, out var parentActivityId); - using var log_scope = BeginLoggingScopeForConsume(id: eventId?.ToString(), - correlationId: correlationId?.ToString(), + using var log_scope = BeginLoggingScopeForConsume(id: messageId, + correlationId: data.CorrelationId, sequenceNumber: data.SequenceNumber.ToString(), extras: new Dictionary { @@ -381,13 +399,13 @@ private async Task OnEventReceivedAsync(EventRegistration reg activity?.AddTag(ActivityTagNames.MessagingSystem, Name); activity?.AddTag(ActivityTagNames.MessagingDestination, processor.EventHubName); - Logger.ProcessingEvent(eventId: eventId, - partitionKey: data.PartitionKey, - sequenceNumber: data.SequenceNumber, + Logger.ProcessingEvent(messageId: messageId, eventHubName: processor.EventHubName, - consumerGroup: processor.ConsumerGroup); + consumerGroup: processor.ConsumerGroup, + partitionKey: data.PartitionKey, + sequenceNumber: data.SequenceNumber); using var scope = CreateScope(); - var contentType = contentType_str is string ctts ? new ContentType(ctts) : null; + var contentType = new ContentType(data.ContentType); var context = await DeserializeAsync(scope: scope, body: data.EventBody, contentType: contentType, @@ -395,10 +413,10 @@ private async Task OnEventReceivedAsync(EventRegistration reg identifier: data.SequenceNumber.ToString(), cancellationToken: cancellationToken); Logger.ReceivedEvent(eventId: context.Id, - partitionKey: data.PartitionKey, - sequenceNumber: data.SequenceNumber, eventHubName: processor.EventHubName, - consumerGroup: processor.ConsumerGroup); + consumerGroup: processor.ConsumerGroup, + partitionKey: data.PartitionKey, + sequenceNumber: data.SequenceNumber); // set the extras context.SetConsumerGroup(processor.ConsumerGroup) @@ -426,8 +444,7 @@ private async Task OnEventReceivedAsync(EventRegistration reg Logger.Checkpointing(partition: args.Partition, eventHubName: processor.EventHubName, consumerGroup: processor.ConsumerGroup, - sequenceNumber: data.SequenceNumber, - eventId: eventId); + sequenceNumber: data.SequenceNumber); await args.UpdateCheckpointAsync(args.CancellationToken); } } @@ -443,15 +460,16 @@ private Task OnPartitionClosingAsync(EventProcessorClient processor, PartitionCl private Task OnPartitionInitializingAsync(EventProcessorClient processor, PartitionInitializingEventArgs args) { - Logger.OpeningProcessor(eventHubName: args.PartitionId, - consumerGroup: processor.EventHubName, - partitionId: processor.ConsumerGroup, + Logger.OpeningProcessor(eventHubName: processor.EventHubName, + consumerGroup: processor.ConsumerGroup, + partitionId: args.PartitionId, position: args.DefaultStartingPosition); return Task.CompletedTask; } private Task OnProcessErrorAsync(EventProcessorClient processor, ProcessErrorEventArgs args) { + // TODO: decide on whether to restart (Stop() then Start()) or terminate (recreate processor) processing Logger.ProcessingError(operation: args.Operation, eventHubName: processor.EventHubName, consumerGroup: processor.ConsumerGroup, diff --git a/src/Tingle.EventBus.Transports.Azure.EventHubs/EventContextExtensions.cs b/src/Tingle.EventBus.Transports.Azure.EventHubs/EventContextExtensions.cs index 043c1a8e..47f8617f 100644 --- a/src/Tingle.EventBus.Transports.Azure.EventHubs/EventContextExtensions.cs +++ b/src/Tingle.EventBus.Transports.Azure.EventHubs/EventContextExtensions.cs @@ -7,7 +7,7 @@ namespace Tingle.EventBus; /// /// Extension methods on and . /// -public static class EventContextExtensions +public static partial class EventContextExtensions { internal const string ItemsKeyConsumerGroup = "azure.eventhubs.consumer-group"; internal const string ItemsKeyPartitionContext = "azure.eventhubs.partition-context"; @@ -97,6 +97,22 @@ public static bool TryGetEventData(this EventContext context, [NotNullWhen(true) return false; } + /// + /// Gets the associated with the specified + /// if the event uses Azure Event Hubs transport. + /// + /// The to use. + /// + /// true if the data is found; otherwise, false. + /// + /// The context is null + public static EventData GetEventData(this EventContext context) + { + if (context.TryGetEventData(out var data)) return data; + + throw new InvalidOperationException("The EventData has not been set in this context."); + } + /// /// Set the ConsumerGroup for an event. /// diff --git a/src/Tingle.EventBus.Transports.Azure.EventHubs/EventRegistrationExtensions.cs b/src/Tingle.EventBus.Transports.Azure.EventHubs/EventRegistrationExtensions.cs new file mode 100644 index 00000000..2c223118 --- /dev/null +++ b/src/Tingle.EventBus.Transports.Azure.EventHubs/EventRegistrationExtensions.cs @@ -0,0 +1,45 @@ +using System.Diagnostics.CodeAnalysis; + +namespace Tingle.EventBus.Configuration; + +/// +/// Extension methods on . +/// +public static class EventRegistrationExtensions +{ + internal const string MetadataKeyIotHubEventHubName = "azure.iothub.eventhub-name"; + + /// + /// Configure the event hub name to be used when connected to the inbuilt Event Hubs endpoint for an Azure IoT Hub instance. + /// + /// The to confugure. + /// The Event Hub-compatible name e.g. iothub-ehub-test-dev-1973449-3febe524cb + /// + /// + public static EventRegistration ConfigureAsIotHubEvent(this EventRegistration registration, [NotNull] string name) + { + if (registration is null) throw new ArgumentNullException(nameof(registration)); + if (string.IsNullOrWhiteSpace(name)) + { + throw new ArgumentException($"'{nameof(name)}' cannot be null or whitespace.", nameof(name)); + } + + registration.Metadata[MetadataKeyIotHubEventHubName] = name; + + return registration; + } + + internal static string GetIotHubEventHubName(this EventRegistration registration) + { + if (registration is null) throw new ArgumentNullException(nameof(registration)); + + return (string)registration.Metadata[MetadataKeyIotHubEventHubName]; + } + + internal static bool IsConfiguredAsIotHub(this EventRegistration registration) + { + if (registration is null) throw new ArgumentNullException(nameof(registration)); + + return registration.Metadata.ContainsKey(MetadataKeyIotHubEventHubName); + } +} diff --git a/src/Tingle.EventBus.Transports.Azure.EventHubs/ILoggerExtensions.cs b/src/Tingle.EventBus.Transports.Azure.EventHubs/ILoggerExtensions.cs index 2baf5196..51229df0 100644 --- a/src/Tingle.EventBus.Transports.Azure.EventHubs/ILoggerExtensions.cs +++ b/src/Tingle.EventBus.Transports.Azure.EventHubs/ILoggerExtensions.cs @@ -9,13 +9,13 @@ namespace Microsoft.Extensions.Logging; /// internal static partial class ILoggerExtensions { - [LoggerMessage(100, LogLevel.Information, "Opening processor for EventHub:{EventHubName}, ConsumerGroup:{ConsumerGroup}, PartitionId:{PartitionId}, DefaultStartingPosition:{Position}")] + [LoggerMessage(100, LogLevel.Information, "Opening processor for EventHub: {EventHubName}\r\nConsumerGroup: {ConsumerGroup}\r\nPartitionId: {PartitionId}\r\nDefaultStartingPosition: {Position}")] public static partial void OpeningProcessor(this ILogger logger, string eventHubName, string consumerGroup, string partitionId, EventPosition position); - [LoggerMessage(101, LogLevel.Information, "Closing processor for EventHub:{EventHubName}, ConsumerGroup:{ConsumerGroup}, PartitionId:{PartitionId} (Reason:{Reason})")] + [LoggerMessage(101, LogLevel.Information, "Closing processor for EventHub: {EventHubName}\r\nConsumerGroup: {ConsumerGroup}\r\nPartitionId: {PartitionId} (Reason: {Reason})")] public static partial void ClosingProcessor(this ILogger logger, string eventHubName, string consumerGroup, string partitionId, ProcessingStoppedReason reason); - [LoggerMessage(102, LogLevel.Error, "Event processing faulted. Operation:{Operation}, EventHub:{EventHubName}, ConsumerGroup:{ConsumerGroup}, PartitionId: {PartitionId}")] + [LoggerMessage(102, LogLevel.Error, "Event processing faulted. Operation: {Operation}\r\nEventHub: {EventHubName}\r\nConsumerGroup: {ConsumerGroup}\r\nPartitionId: {PartitionId}")] public static partial void ProcessingError(this ILogger logger, string operation, string eventHubName, string consumerGroup, string partitionId, Exception ex); [LoggerMessage(103, LogLevel.Debug, "Stopping processor: {Processor}.")] @@ -56,16 +56,16 @@ public static void SendingEvents(this ILogger logger, IList> logger.SendingEvents(events.Select(e => e.Id).ToList(), eventHubName, scheduled); } - [LoggerMessage(204, LogLevel.Debug, "Checkpointing {Partition} of '{EventHubName}/{ConsumerGroup}', at {SequenceNumber}. Event: '{EventId}'.")] - public static partial void Checkpointing(this ILogger logger, PartitionContext partition, string eventHubName, string consumerGroup, long sequenceNumber, object? eventId); + [LoggerMessage(204, LogLevel.Debug, "Checkpointing {Partition} of '{EventHubName}/{ConsumerGroup}' at {SequenceNumber}.")] + public static partial void Checkpointing(this ILogger logger, PartitionContext partition, string eventHubName, string consumerGroup, long sequenceNumber); - [LoggerMessage(300, LogLevel.Debug, "Processor received event on EventHub:{EventHubName}, ConsumerGroup:{ConsumerGroup}, PartitionId:{PartitionId}")] + [LoggerMessage(300, LogLevel.Debug, "Processor received event on EventHub: {EventHubName}\r\nConsumerGroup: {ConsumerGroup}\r\nPartitionId: {PartitionId}")] public static partial void ProcessorReceivedEvent(this ILogger logger, string eventHubName, string consumerGroup, string partitionId); - [LoggerMessage(301, LogLevel.Debug, "Processing '{EventId} in {PartitionKey}|{SequenceNumber}' from '{EventHubName}/{ConsumerGroup}'")] - public static partial void ProcessingEvent(this ILogger logger, object? eventId, string partitionKey, long sequenceNumber, string eventHubName, string consumerGroup); + [LoggerMessage(301, LogLevel.Debug, "Processing '{MessageId}' from '{EventHubName}/{ConsumerGroup}'.\r\nPartitionKey: {PartitionKey}\r\nSequenceNumber: {SequenceNumber}'")] + public static partial void ProcessingEvent(this ILogger logger, string messageId, string eventHubName, string consumerGroup, string partitionKey, long sequenceNumber); - [LoggerMessage(302, LogLevel.Information, "Received event: '{EventId} in {PartitionKey}|{SequenceNumber}' from '{EventHubName}/{ConsumerGroup}'")] - public static partial void ReceivedEvent(this ILogger logger, object? eventId, string partitionKey, long sequenceNumber, string eventHubName, string consumerGroup); + [LoggerMessage(302, LogLevel.Information, "Received event: '{EventId}' from '{EventHubName}/{ConsumerGroup}'.\r\nPartitionKey: {PartitionKey}\r\nSequenceNumber: {SequenceNumber}'")] + public static partial void ReceivedEvent(this ILogger logger, string? eventId, string eventHubName, string consumerGroup, string partitionKey, long sequenceNumber); } diff --git a/src/Tingle.EventBus.Transports.Azure.EventHubs/IotHub/EventContextExtensions.IotHub.cs b/src/Tingle.EventBus.Transports.Azure.EventHubs/IotHub/EventContextExtensions.IotHub.cs new file mode 100644 index 00000000..46dd317c --- /dev/null +++ b/src/Tingle.EventBus.Transports.Azure.EventHubs/IotHub/EventContextExtensions.IotHub.cs @@ -0,0 +1,66 @@ +using Azure.Messaging.EventHubs; +using Tingle.EventBus.Transports.Azure.EventHubs.IotHub; + +namespace Tingle.EventBus; + +/// +/// Extension methods on and . +/// +public static partial class EventContextExtensions +{ + /// Gets the message identifier for the IoT Hub message. + /// The to use. + public static string? GetIotHubMessageId(this EventContext context) => context.GetEventData().GetIotHubMessageId(); + + /// Gets the enqueued time for the IoT Hub message. + /// The to use. + public static DateTime? GetIotHubEnqueuedTime(this EventContext context) => context.GetEventData().GetIotHubEnqueuedTime(); + + /// Gets the device identifier for the IoT Hub message. + /// The to use. + public static string? GetIotHubDeviceId(this EventContext context) => context.GetEventData().GetIotHubDeviceId(); + + /// Gets whether the message is from an IoT Hub. + /// The to use. + public static bool IsIotHubMessage(this EventContext context) => !string.IsNullOrEmpty(GetIotHubDeviceId(context)); + + /// Gets the module identifier for the IoT Hub message. + /// The to use. + public static string? GetIotHubModuleId(this EventContext context) => context.GetEventData().GetIotHubModuleId(); + + /// Gets the connection authentication generation identifier for the IoT Hub message. + /// The to use. + public static string? GetIotHubConnectionAuthGenerationId(this EventContext context) => context.GetEventData().GetIotHubConnectionAuthGenerationId(); + + /// Gets the raw connection authentication method for the IoT Hub message. + /// The to use. + public static string? GetIotHubConnectionAuthMethodRaw(this EventContext context) => context.GetEventData().GetIotHubConnectionAuthMethodRaw(); + + /// Gets the connection authentication method for the IoT Hub message. + /// The to use. + public static IotHubConnectionAuthMethod? GetIotHubConnectionAuthMethod(this EventContext context) => context.GetEventData().GetIotHubConnectionAuthMethod(); + + /// Gets the source for the IoT Hub message. + /// The to use. + public static string? GetIotHubMessageSource(this EventContext context) => context.GetEventData().GetIotHubMessageSource(); + + /// Gets whether the message is sourced from telemetry. + /// The to use. + public static bool IsIotHubTelemetry(this EventContext context) => context.GetEventData().IsIotHubTelemetry(); + + /// Gets whether the message is sourced from device/module twin changes. + /// The to use. + public static bool IsIotHubTwinChangeEvent(this EventContext context) => context.GetEventData().IsIotHubTwinChangeEvent(); + + /// Gets whether the message is sourced from lifecycle events. + /// The to use. + public static bool IsIotHubDeviceLifeCycleEvent(this EventContext context) => context.GetEventData().IsIotHubDeviceLifeCycleEvent(); + + /// Gets the data schema for the IoT Hub message. + /// The to use. + public static string? GetIotHubDataSchema(this EventContext context) => context.GetEventData().GetIotHubDataSchema(); + + /// Gets the subject for the IoT Hub message. + /// The to use. + public static string? GetIotHubSubject(this EventContext context) => context.GetEventData().GetIotHubSubject(); +} diff --git a/src/Tingle.EventBus.Transports.Azure.EventHubs/IotHub/EventDataExtensions.cs b/src/Tingle.EventBus.Transports.Azure.EventHubs/IotHub/EventDataExtensions.cs new file mode 100644 index 00000000..b95c158d --- /dev/null +++ b/src/Tingle.EventBus.Transports.Azure.EventHubs/IotHub/EventDataExtensions.cs @@ -0,0 +1,105 @@ +using System.Diagnostics.CodeAnalysis; +using Tingle.EventBus.Transports.Azure.EventHubs.IotHub; + +namespace Azure.Messaging.EventHubs; + +/// +/// Extension methods on . +/// +public static partial class EventDataExtensions +{ + private const string IotHubPropertyNameMessageId = "message-id"; + private const string IotHubPropertyNameEnqueuedTime = "iothub-enqueuedtime"; + private const string IotHubPropertyNameDeviceId = "iothub-connection-device-id"; + private const string IotHubPropertyNameModuleId = "iothub-connection-module-id"; + private const string IotHubPropertyNameConnectionAuthGenerationId = "iothub-connection-auth-generation-id"; + private const string IotHubPropertyNameConnectionAuthMethod = "iothub-connection-auth-method"; + private const string IotHubPropertyNameMessageSource = "iothub-message-source"; + private const string IotHubPropertyNameDataSchema = "dt-dataschema"; + private const string IotHubPropertyNameSubject = "dt-subject"; + + private const string IotHubMessageSourceTelemetry = "Telemetry"; + private const string IotHubMessageSourceTwinChangeEvents = "twinChangeEvents"; + private const string IotHubMessageSourceDeviceLifeCycleEvents = "deviceLifecycleEvents"; + + private static bool TryGetPropertyValue(this EventData data, string key, [NotNullWhen(true)] out object? value) + { + if (data is null) throw new ArgumentNullException(nameof(data)); + if (string.IsNullOrWhiteSpace(key)) + { + throw new ArgumentException($"'{nameof(key)}' cannot be null or whitespace.", nameof(key)); + } + + return data.SystemProperties.TryGetValue(key, out value) + || data.Properties.TryGetValue(key, out value); + } + + private static T? GetPropertyValue(this EventData data, string key) + { + return data.TryGetPropertyValue(key, out var value) && value is not null ? (T?)value : default; + } + + /// Gets the message identifier for the IoT Hub message. + /// The to use. + public static string? GetIotHubMessageId(this EventData data) => data.GetPropertyValue(IotHubPropertyNameMessageId); + + /// Gets the enqueued time for the IoT Hub message. + /// The to use. + public static DateTime? GetIotHubEnqueuedTime(this EventData data) => data.GetPropertyValue(IotHubPropertyNameEnqueuedTime); + + /// Gets the device identifier for the IoT Hub message. + /// The to use. + public static string? GetIotHubDeviceId(this EventData data) => data.GetPropertyValue(IotHubPropertyNameDeviceId); + + /// Gets whether the message is from an IoT Hub. + /// The to use. + public static bool IsIotHubMessage(this EventData data) => !string.IsNullOrEmpty(GetIotHubDeviceId(data)); + + /// Gets the module identifier for the IoT Hub message. + /// The to use. + public static string? GetIotHubModuleId(this EventData data) => data.GetPropertyValue(IotHubPropertyNameModuleId); + + /// Gets the connection authentication generation identifier for the IoT Hub message. + /// The to use. + public static string? GetIotHubConnectionAuthGenerationId(this EventData data) => data.GetPropertyValue(IotHubPropertyNameConnectionAuthGenerationId); + + /// Gets the raw connection authentication method for the IoT Hub message. + /// The to use. + public static string? GetIotHubConnectionAuthMethodRaw(this EventData data) => data.GetPropertyValue(IotHubPropertyNameConnectionAuthMethod); + + /// Gets the connection authentication method for the IoT Hub message. + /// The to use. + public static IotHubConnectionAuthMethod? GetIotHubConnectionAuthMethod(this EventData data) + { + var json = data.GetIotHubConnectionAuthMethodRaw(); + if (string.IsNullOrEmpty(json)) return null; + return System.Text.Json.JsonSerializer.Deserialize(json); + } + + /// Gets the source for the IoT Hub message. + /// The to use. + public static string? GetIotHubMessageSource(this EventData data) => data.GetPropertyValue(IotHubPropertyNameMessageSource); + + private static bool IsIotHubMessageIsFromSource(this EventData data, string exceptedSource) => string.Equals(exceptedSource, data.GetIotHubMessageSource()); + + /// Gets whether the message is sourced from telemetry. + /// The to use. + public static bool IsIotHubTelemetry(this EventData data) => data.IsIotHubMessageIsFromSource(IotHubMessageSourceTelemetry); + + /// Gets whether the message is sourced from device/module twin changes. + /// The to use. + public static bool IsIotHubTwinChangeEvent(this EventData data) => data.IsIotHubMessageIsFromSource(IotHubMessageSourceTwinChangeEvents); + + /// Gets whether the message is sourced from lifecycle events. + /// The to use. + public static bool IsIotHubDeviceLifeCycleEvent(this EventData data) => data.IsIotHubMessageIsFromSource(IotHubMessageSourceDeviceLifeCycleEvents); + + /// Gets the data schema for the IoT Hub message. + /// The to use. + public static string? GetIotHubDataSchema(this EventData data) => data.GetPropertyValue(IotHubPropertyNameDataSchema); + + /// Gets the subject for the IoT Hub message. + /// The to use. + public static string? GetIotHubSubject(this EventData data) => data.GetPropertyValue(IotHubPropertyNameSubject); + +} diff --git a/src/Tingle.EventBus.Transports.Azure.EventHubs/IotHub/IotHubConnectionAuthMethod.cs b/src/Tingle.EventBus.Transports.Azure.EventHubs/IotHub/IotHubConnectionAuthMethod.cs new file mode 100644 index 00000000..2e60c785 --- /dev/null +++ b/src/Tingle.EventBus.Transports.Azure.EventHubs/IotHub/IotHubConnectionAuthMethod.cs @@ -0,0 +1,31 @@ +using System.Text.Json.Nodes; +using System.Text.Json.Serialization; + +namespace Tingle.EventBus.Transports.Azure.EventHubs.IotHub; + +/// +/// Model representing the data found in iothub-connection-auth-method property. +/// +/// +/// { +/// "scope": "hub", +/// "type": "sas", +/// "issuer": "iothub", +/// "acceptingIpFilterRule": null +/// } +/// +public record IotHubConnectionAuthMethod +{ + /// hub + public virtual string? Scope { get; set; } + + /// sas + public virtual string? Type { get; set; } + + /// iothub + public virtual string? Issuer { get; set; } + + /// + [JsonExtensionData] + public JsonObject? Extras { get; set; } +}