Skip to content

Commit

Permalink
Support serialization and deserialization for InMemory transport so t…
Browse files Browse the repository at this point in the history
…hat bugs can be caught during tests
  • Loading branch information
mburumaxwell committed Dec 29, 2020
1 parent e1e5d07 commit 3f5e5f5
Showing 1 changed file with 39 additions and 11 deletions.
50 changes: 39 additions & 11 deletions src/Tingle.EventBus.Transports.InMemory/InMemoryEventBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Mime;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -69,9 +71,9 @@ public override Task<bool> CheckHealthAsync(EventBusHealthCheckExtras extras,
protected override Task StopBusAsync(CancellationToken cancellationToken) => Task.CompletedTask;

/// <inheritdoc/>
protected override Task<string> PublishOnBusAsync<TEvent>(EventContext<TEvent> @event,
DateTimeOffset? scheduled = null,
CancellationToken cancellationToken = default)
protected override async Task<string> PublishOnBusAsync<TEvent>(EventContext<TEvent> @event,
DateTimeOffset? scheduled = null,
CancellationToken cancellationToken = default)
{
// log warning when trying to publish scheduled message
if (scheduled != null)
Expand All @@ -81,12 +83,22 @@ protected override Task<string> PublishOnBusAsync<TEvent>(EventContext<TEvent> @

var scheduledId = scheduled?.ToUnixTimeMilliseconds().ToString();
published.Add(@event);
var _ = SendToConsumersAsync(@event, scheduled);
return Task.FromResult(scheduledId);

using var scope = CreateScope();
var reg = BusOptions.GetOrCreateEventRegistration<TEvent>();
using var ms = new MemoryStream();
var contentType = await SerializeAsync(body: ms,
@event: @event,
registration: reg,
scope: scope,
cancellationToken: cancellationToken);

var _ = SendToConsumersAsync<TEvent>(ms, contentType, scheduled);
return scheduled != null ? scheduledId : null;
}

/// <inheritdoc/>
protected override Task<IList<string>> PublishOnBusAsync<TEvent>(IList<EventContext<TEvent>> events,
protected async override Task<IList<string>> PublishOnBusAsync<TEvent>(IList<EventContext<TEvent>> events,
DateTimeOffset? scheduled = null,
CancellationToken cancellationToken = default)
{
Expand All @@ -96,9 +108,17 @@ protected override Task<IList<string>> PublishOnBusAsync<TEvent>(IList<EventCont
logger.LogWarning("InMemory EventBus uses a short-lived timer that is not persisted for scheduled publish");
}

using var scope = CreateScope();
var reg = BusOptions.GetOrCreateEventRegistration<TEvent>();
foreach (var @event in events)
{
var _ = SendToConsumersAsync(@event, scheduled);
using var ms = new MemoryStream();
var contentType = await SerializeAsync(body: ms,
@event: @event,
registration: reg,
scope: scope,
cancellationToken: cancellationToken);
var _ = SendToConsumersAsync<TEvent>(ms, contentType, scheduled);
}

var random = new Random();
Expand All @@ -108,7 +128,7 @@ protected override Task<IList<string>> PublishOnBusAsync<TEvent>(IList<EventCont
random.NextBytes(bys);
return Convert.ToString(BitConverter.ToInt64(bys));
});
return Task.FromResult(scheduled != null ? scheduledIds.ToList() : (IList<string>)Array.Empty<string>());
return scheduled != null ? scheduledIds.ToList() : null;
}

/// <inheritdoc/>
Expand All @@ -123,7 +143,8 @@ public override Task CancelAsync<TEvent>(IList<string> ids, CancellationToken ca
throw new NotSupportedException("InMemory EventBus does not support canceling published messages.");
}

private async Task SendToConsumersAsync<TEvent>(EventContext<TEvent> @event, DateTimeOffset? scheduled)
private async Task SendToConsumersAsync<TEvent>(MemoryStream ms, ContentType contentType, DateTimeOffset? scheduled)
where TEvent : class
{
var cts = new CancellationTokenSource();
var cancellationToken = cts.Token;
Expand All @@ -138,7 +159,13 @@ private async Task SendToConsumersAsync<TEvent>(EventContext<TEvent> @event, Dat
}
}

logger.LogDebug("Processing sent/incoming event");
using var scope = CreateScope(); // shared
var context = await DeserializeAsync<TEvent>(body: ms,
contentType: contentType,
registration: BusOptions.GetOrCreateEventRegistration<TEvent>(),
scope: scope,
cancellationToken: cancellationToken);

// find consumers registered for the event
var eventType = typeof(TEvent);
Expand All @@ -150,7 +177,7 @@ private async Task SendToConsumersAsync<TEvent>(EventContext<TEvent> @event, Dat
var flags = System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic;
var mt = GetType().GetMethod(nameof(DispatchToConsumerAsync), flags);
var method = mt.MakeGenericMethod(typeof(TEvent), reg.ConsumerType);
return (Task)method.Invoke(this, new object[] { @event, scope, cancellationToken, });
return (Task)method.Invoke(this, new object[] { context, scope, cancellationToken, });
}).ToList();
await Task.WhenAll(tasks);
}
Expand All @@ -170,14 +197,15 @@ private async Task DispatchToConsumerAsync<TEvent, TConsumer>(EventContext<TEven

try
{
logger.LogInformation("Received event '{EventId}'", context.EventId);
await ConsumeAsync<TEvent, TConsumer>(@event: context,
scope: scope,
cancellationToken: cancellationToken);
consumed.Add(context);
}
catch (Exception ex)
{
logger.LogError(ex, "Event processing failed. Moving to deadletter.");
logger.LogError(ex, "Event processing failed. Deadletter is not supported in memory.");
failed.Add(context);
}
}
Expand Down

0 comments on commit 3f5e5f5

Please sign in to comment.