diff --git a/src/Tingle.EventBus.Transports.InMemory/InMemoryEventBus.cs b/src/Tingle.EventBus.Transports.InMemory/InMemoryEventBus.cs index be08a182..41224c5d 100644 --- a/src/Tingle.EventBus.Transports.InMemory/InMemoryEventBus.cs +++ b/src/Tingle.EventBus.Transports.InMemory/InMemoryEventBus.cs @@ -5,7 +5,9 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.IO; using System.Linq; +using System.Net.Mime; using System.Threading; using System.Threading.Tasks; @@ -69,9 +71,9 @@ public override Task CheckHealthAsync(EventBusHealthCheckExtras extras, protected override Task StopBusAsync(CancellationToken cancellationToken) => Task.CompletedTask; /// - protected override Task PublishOnBusAsync(EventContext @event, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + protected override async Task PublishOnBusAsync(EventContext @event, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) { // log warning when trying to publish scheduled message if (scheduled != null) @@ -81,12 +83,22 @@ protected override Task PublishOnBusAsync(EventContext @ var scheduledId = scheduled?.ToUnixTimeMilliseconds().ToString(); published.Add(@event); - var _ = SendToConsumersAsync(@event, scheduled); - return Task.FromResult(scheduledId); + + using var scope = CreateScope(); + var reg = BusOptions.GetOrCreateEventRegistration(); + using var ms = new MemoryStream(); + var contentType = await SerializeAsync(body: ms, + @event: @event, + registration: reg, + scope: scope, + cancellationToken: cancellationToken); + + var _ = SendToConsumersAsync(ms, contentType, scheduled); + return scheduled != null ? scheduledId : null; } /// - protected override Task> PublishOnBusAsync(IList> events, + protected async override Task> PublishOnBusAsync(IList> events, DateTimeOffset? scheduled = null, CancellationToken cancellationToken = default) { @@ -96,9 +108,17 @@ protected override Task> PublishOnBusAsync(IList(); foreach (var @event in events) { - var _ = SendToConsumersAsync(@event, scheduled); + using var ms = new MemoryStream(); + var contentType = await SerializeAsync(body: ms, + @event: @event, + registration: reg, + scope: scope, + cancellationToken: cancellationToken); + var _ = SendToConsumersAsync(ms, contentType, scheduled); } var random = new Random(); @@ -108,7 +128,7 @@ protected override Task> PublishOnBusAsync(IList)Array.Empty()); + return scheduled != null ? scheduledIds.ToList() : null; } /// @@ -123,7 +143,8 @@ public override Task CancelAsync(IList ids, CancellationToken ca throw new NotSupportedException("InMemory EventBus does not support canceling published messages."); } - private async Task SendToConsumersAsync(EventContext @event, DateTimeOffset? scheduled) + private async Task SendToConsumersAsync(MemoryStream ms, ContentType contentType, DateTimeOffset? scheduled) + where TEvent : class { var cts = new CancellationTokenSource(); var cancellationToken = cts.Token; @@ -138,7 +159,13 @@ private async Task SendToConsumersAsync(EventContext @event, Dat } } + logger.LogDebug("Processing sent/incoming event"); using var scope = CreateScope(); // shared + var context = await DeserializeAsync(body: ms, + contentType: contentType, + registration: BusOptions.GetOrCreateEventRegistration(), + scope: scope, + cancellationToken: cancellationToken); // find consumers registered for the event var eventType = typeof(TEvent); @@ -150,7 +177,7 @@ private async Task SendToConsumersAsync(EventContext @event, Dat var flags = System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic; var mt = GetType().GetMethod(nameof(DispatchToConsumerAsync), flags); var method = mt.MakeGenericMethod(typeof(TEvent), reg.ConsumerType); - return (Task)method.Invoke(this, new object[] { @event, scope, cancellationToken, }); + return (Task)method.Invoke(this, new object[] { context, scope, cancellationToken, }); }).ToList(); await Task.WhenAll(tasks); } @@ -170,6 +197,7 @@ private async Task DispatchToConsumerAsync(EventContext(@event: context, scope: scope, cancellationToken: cancellationToken); @@ -177,7 +205,7 @@ await ConsumeAsync(@event: context, } catch (Exception ex) { - logger.LogError(ex, "Event processing failed. Moving to deadletter."); + logger.LogError(ex, "Event processing failed. Deadletter is not supported in memory."); failed.Add(context); } }