Skip to content

Commit 8ac26d2

Browse files
committed
Fixing the null
1 parent 016ead6 commit 8ac26d2

File tree

6 files changed

+178
-33
lines changed

6 files changed

+178
-33
lines changed

src/Eventuous.Producers.EventStoreDB/EventStoreProducer.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,17 @@ public class EventStoreProducer : BaseProducer<EventStoreProduceOptions> {
2222
/// </summary>
2323
/// <param name="eventStoreClient">EventStoreDB gRPC client</param>
2424
/// <param name="serializer">Event serializer instance</param>
25-
public EventStoreProducer(EventStoreClient eventStoreClient, IEventSerializer serializer) {
25+
public EventStoreProducer(EventStoreClient eventStoreClient, IEventSerializer? serializer = null) {
2626
_client = Ensure.NotNull(eventStoreClient, nameof(eventStoreClient));
27-
_serializer = Ensure.NotNull(serializer, nameof(serializer));
27+
_serializer = serializer ?? DefaultEventSerializer.Instance;
2828
}
2929

3030
/// <summary>
3131
/// Create a new EventStoreDB producer instance
3232
/// </summary>
3333
/// <param name="clientSettings">EventStoreDB gRPC client settings</param>
3434
/// <param name="serializer">Event serializer instance</param>
35-
public EventStoreProducer(EventStoreClientSettings clientSettings, IEventSerializer serializer)
35+
public EventStoreProducer(EventStoreClientSettings clientSettings, IEventSerializer? serializer = null)
3636
: this(new EventStoreClient(Ensure.NotNull(clientSettings, nameof(clientSettings))), serializer) { }
3737

3838
public override Task Initialize(CancellationToken cancellationToken = default) => Task.CompletedTask;
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
using System.Collections.Generic;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using Eventuous.Producers;
5+
using Eventuous.Subscriptions;
6+
using JetBrains.Annotations;
7+
using Microsoft.Extensions.Hosting;
8+
using Microsoft.Extensions.Logging;
9+
10+
namespace Eventuous.Shovel {
11+
[PublicAPI]
12+
public class ShovelService<TSubscription, TProducer, TProduceOptions> : IHostedService
13+
where TSubscription : SubscriptionService
14+
where TProducer : class, IEventProducer<TProduceOptions>
15+
where TProduceOptions : class {
16+
readonly TSubscription _subscription;
17+
readonly TProducer _producer;
18+
19+
public record ShovelMessage(string TargetStream, object Message, TProduceOptions ProduceOptions);
20+
21+
public delegate ValueTask<ShovelMessage> RouteAndTransform(object message);
22+
23+
public delegate TSubscription CreateSubscription(
24+
string subscriptionId,
25+
IEnumerable<IEventHandler> eventHandlers,
26+
IEventSerializer? serializer,
27+
ILoggerFactory? loggerFactory,
28+
SubscriptionGapMeasure? measure
29+
);
30+
31+
/// <summary>
32+
/// Creates a shovel service instance, which must be registered as a hosted service
33+
/// </summary>
34+
/// <param name="subscriptionId">Shovel subscription id</param>
35+
/// <param name="eventSerializer">Event serializer</param>
36+
/// <param name="createSubscription">Function to create a subscription</param>
37+
/// <param name="producer">Producer instance</param>
38+
/// <param name="routeAndTransform">Routing and transformation function</param>
39+
/// <param name="loggerFactory">Logger factory</param>
40+
/// <param name="measure">Subscription gap measurement</param>
41+
public ShovelService(
42+
string subscriptionId,
43+
CreateSubscription createSubscription,
44+
TProducer producer,
45+
RouteAndTransform routeAndTransform,
46+
IEventSerializer? eventSerializer = null,
47+
ILoggerFactory? loggerFactory = null,
48+
SubscriptionGapMeasure? measure = null
49+
) {
50+
_producer = Ensure.NotNull(producer, nameof(producer));
51+
52+
_subscription = createSubscription(
53+
Ensure.NotEmptyString(subscriptionId, nameof(subscriptionId)),
54+
new[] {
55+
new ShovelHandler<TSubscription, TProducer, TProduceOptions>(
56+
subscriptionId,
57+
producer,
58+
Ensure.NotNull(routeAndTransform, nameof(routeAndTransform))
59+
)
60+
},
61+
eventSerializer,
62+
loggerFactory,
63+
measure
64+
);
65+
}
66+
67+
public async Task StartAsync(CancellationToken cancellationToken) {
68+
await _producer.Initialize(cancellationToken).NoContext();
69+
await _subscription.StartAsync(cancellationToken).NoContext();
70+
}
71+
72+
public async Task StopAsync(CancellationToken cancellationToken) {
73+
await _subscription.StopAsync(cancellationToken).NoContext();
74+
await _producer.Shutdown(cancellationToken).NoContext();
75+
}
76+
}
77+
78+
class ShovelHandler<TSubscription, TProducer, TProduceOptions> : IEventHandler
79+
where TProducer : class, IEventProducer<TProduceOptions>
80+
where TSubscription : SubscriptionService
81+
where TProduceOptions : class {
82+
readonly TProducer _eventProducer;
83+
84+
readonly ShovelService<TSubscription, TProducer, TProduceOptions>.RouteAndTransform _transform;
85+
86+
public string SubscriptionId { get; }
87+
88+
public ShovelHandler(
89+
string subscriptionId,
90+
TProducer eventProducer,
91+
ShovelService<TSubscription, TProducer, TProduceOptions>.RouteAndTransform transform
92+
) {
93+
_eventProducer = eventProducer;
94+
_transform = transform;
95+
SubscriptionId = subscriptionId;
96+
}
97+
98+
public async Task HandleEvent(object evt, long? position, CancellationToken cancellationToken) {
99+
var (targetStream, message, options) = await _transform(evt).NoContext();
100+
await _eventProducer.Produce(targetStream, new[] { message }, options, cancellationToken).NoContext();
101+
}
102+
}
103+
}

src/Eventuous.Shovel/ShovelService.cs

Lines changed: 67 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,6 @@
88
using Microsoft.Extensions.Logging;
99

1010
namespace Eventuous.Shovel {
11-
public delegate ValueTask<ShovelMessage> RouteAndTransform(object message);
12-
13-
public record ShovelMessage(string TargetStream, object Message);
14-
1511
/// <summary>
1612
/// Super-simple shovel, which allows to use a subscription to receive events, and then
1713
/// shovel them as-is, or after a transformation, to a producer. For example, you can
@@ -27,10 +23,14 @@ public class ShovelService<TSubscription, TProducer> : IHostedService
2723
readonly TSubscription _subscription;
2824
readonly TProducer _producer;
2925

26+
public record ShovelMessage(string TargetStream, object? Message);
27+
28+
public delegate ValueTask<ShovelMessage> RouteAndTransform(object message);
29+
3030
public delegate TSubscription CreateSubscription(
3131
string subscriptionId,
32-
IEventSerializer serializer,
3332
IEnumerable<IEventHandler> eventHandlers,
33+
IEventSerializer? serializer,
3434
ILoggerFactory? loggerFactory,
3535
SubscriptionGapMeasure? measure
3636
);
@@ -47,21 +47,61 @@ public delegate TSubscription CreateSubscription(
4747
/// <param name="measure">Subscription gap measurement</param>
4848
public ShovelService(
4949
string subscriptionId,
50-
IEventSerializer eventSerializer,
5150
CreateSubscription createSubscription,
5251
TProducer producer,
5352
RouteAndTransform routeAndTransform,
54-
ILoggerFactory? loggerFactory = null,
55-
SubscriptionGapMeasure? measure = null
53+
IEventSerializer? eventSerializer = null,
54+
ILoggerFactory? loggerFactory = null,
55+
SubscriptionGapMeasure? measure = null
56+
) {
57+
_producer = Ensure.NotNull(producer, nameof(producer));
58+
59+
_subscription = createSubscription(
60+
Ensure.NotEmptyString(subscriptionId, nameof(subscriptionId)),
61+
new[] {
62+
new ShovelHandler<TSubscription, TProducer>(
63+
subscriptionId,
64+
producer,
65+
Ensure.NotNull(routeAndTransform, nameof(routeAndTransform))
66+
)
67+
},
68+
eventSerializer,
69+
loggerFactory,
70+
measure
71+
);
72+
}
73+
74+
/// <summary>
75+
/// Creates a shovel service instance, which must be registered as a hosted service
76+
/// </summary>
77+
/// <param name="subscriptionId">Shovel subscription id</param>
78+
/// <param name="createSubscription">Function to create a subscription</param>
79+
/// <param name="targetStream">The stream where events will be produced</param>
80+
/// <param name="producer">Producer instance</param>
81+
/// <param name="eventSerializer">Event serializer</param>
82+
/// <param name="loggerFactory">Logger factory</param>
83+
/// <param name="measure">Subscription gap measurement</param>
84+
public ShovelService(
85+
string subscriptionId,
86+
CreateSubscription createSubscription,
87+
TProducer producer,
88+
string targetStream,
89+
IEventSerializer? eventSerializer = null,
90+
ILoggerFactory? loggerFactory = null,
91+
SubscriptionGapMeasure? measure = null
5692
) {
5793
_producer = Ensure.NotNull(producer, nameof(producer));
5894

5995
_subscription = createSubscription(
6096
Ensure.NotEmptyString(subscriptionId, nameof(subscriptionId)),
61-
Ensure.NotNull(eventSerializer, nameof(eventSerializer)),
6297
new[] {
63-
new ShovelHandler<TProducer>(subscriptionId, producer, Ensure.NotNull(routeAndTransform, nameof(routeAndTransform)))
98+
new ShovelHandler<TSubscription, TProducer>(
99+
subscriptionId,
100+
producer,
101+
new DefaultRoute(Ensure.NotNull(targetStream, nameof(targetStream))).Route
102+
)
64103
},
104+
eventSerializer ?? DefaultEventSerializer.Instance,
65105
loggerFactory,
66106
measure
67107
);
@@ -76,18 +116,28 @@ public async Task StopAsync(CancellationToken cancellationToken) {
76116
await _subscription.StopAsync(cancellationToken).NoContext();
77117
await _producer.Shutdown(cancellationToken).NoContext();
78118
}
119+
120+
public class DefaultRoute {
121+
readonly string _targetStream;
122+
123+
public DefaultRoute(string targetStream) => _targetStream = targetStream;
124+
125+
public ValueTask<ShovelMessage> Route(object message) => new(new ShovelMessage(_targetStream, message));
126+
}
79127
}
80128

81-
class ShovelHandler<TProducer> : IEventHandler where TProducer : IEventProducer {
82-
readonly TProducer _eventProducer;
83-
readonly RouteAndTransform _transform;
129+
class ShovelHandler<TSubscription, TProducer> : IEventHandler
130+
where TProducer : class, IEventProducer
131+
where TSubscription : SubscriptionService {
132+
readonly TProducer _eventProducer;
133+
readonly ShovelService<TSubscription, TProducer>.RouteAndTransform _transform;
84134

85135
public string SubscriptionId { get; }
86136

87137
public ShovelHandler(
88-
string subscriptionId,
89-
TProducer eventProducer,
90-
RouteAndTransform transform
138+
string subscriptionId,
139+
TProducer eventProducer,
140+
ShovelService<TSubscription, TProducer>.RouteAndTransform transform
91141
) {
92142
_eventProducer = eventProducer;
93143
_transform = transform;
@@ -96,16 +146,8 @@ RouteAndTransform transform
96146

97147
public async Task HandleEvent(object evt, long? position, CancellationToken cancellationToken) {
98148
var (targetStream, message) = await _transform(evt).NoContext();
149+
if (message == null) return;
99150
await _eventProducer.Produce(targetStream, new[] { message }, cancellationToken).NoContext();
100151
}
101152
}
102-
103-
[PublicAPI]
104-
public class DefaultRoute {
105-
readonly string _targetStream;
106-
107-
public DefaultRoute(string targetStream) => _targetStream = targetStream;
108-
109-
public ValueTask<ShovelMessage> Route(object message) => new(new ShovelMessage(_targetStream, message));
110-
}
111153
}

src/Eventuous/AggregateStore.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ public class AggregateStore : IAggregateStore {
1010
readonly IEventStore _eventStore;
1111
readonly IEventSerializer _serializer;
1212

13-
public AggregateStore(IEventStore eventStore, IEventSerializer serializer) {
13+
public AggregateStore(IEventStore eventStore, IEventSerializer? serializer = null) {
1414
_eventStore = Ensure.NotNull(eventStore, nameof(eventStore));
15-
_serializer = Ensure.NotNull(serializer, nameof(serializer));
15+
_serializer = serializer ?? DefaultEventSerializer.Instance;
1616
}
1717

1818
public async Task<AppendEventsResult> Store<T>(T aggregate, CancellationToken cancellationToken)

src/Eventuous/StateStore.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ public class StateStore : IStateStore {
99
readonly IEventStore _eventStore;
1010
readonly IEventSerializer _serializer;
1111

12-
public StateStore(IEventStore eventStore, IEventSerializer serializer) {
12+
public StateStore(IEventStore eventStore, IEventSerializer? serializer = null) {
1313
_eventStore = Ensure.NotNull(eventStore, nameof(eventStore));
14-
_serializer = Ensure.NotNull(serializer, nameof(serializer));
14+
_serializer = serializer ?? DefaultEventSerializer.Instance;
1515
}
1616

1717
public async Task<T> LoadState<T, TId>(StreamName stream, CancellationToken cancellationToken)

test/Eventuous.Tests.GooglePubSub/PubSubTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ namespace Eventuous.Tests.GooglePubSub {
1616
public class PubSubTests : IAsyncLifetime {
1717
static PubSubTests() => TypeMap.AddType<TestEvent>("test-event");
1818

19-
static readonly Fixture Auto = new();
19+
static readonly Fixture Auto = new();
2020

2121
readonly GooglePubSubSubscription _subscription;
2222
readonly GooglePubSubProducer _producer;

0 commit comments

Comments
 (0)