Skip to content

Commit 57633a7

Browse files
committed
Added better handling of ignored events
1 parent 8ac26d2 commit 57633a7

File tree

2 files changed

+18
-8
lines changed

2 files changed

+18
-8
lines changed

src/Eventuous.Shovel/ExtendedShovelService.cs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ public class ShovelService<TSubscription, TProducer, TProduceOptions> : IHostedS
1616
readonly TSubscription _subscription;
1717
readonly TProducer _producer;
1818

19-
public record ShovelMessage(string TargetStream, object Message, TProduceOptions ProduceOptions);
19+
public record ShovelMessage(string TargetStream, object? Message, TProduceOptions ProduceOptions);
2020

21-
public delegate ValueTask<ShovelMessage> RouteAndTransform(object message);
21+
public delegate ValueTask<ShovelMessage?> RouteAndTransform(object message);
2222

2323
public delegate TSubscription CreateSubscription(
2424
string subscriptionId,
@@ -96,8 +96,15 @@ ShovelService<TSubscription, TProducer, TProduceOptions>.RouteAndTransform trans
9696
}
9797

9898
public async Task HandleEvent(object evt, long? position, CancellationToken cancellationToken) {
99-
var (targetStream, message, options) = await _transform(evt).NoContext();
100-
await _eventProducer.Produce(targetStream, new[] { message }, options, cancellationToken).NoContext();
99+
var shovelMessage = await _transform(evt).NoContext();
100+
if (shovelMessage?.Message == null) return;
101+
102+
await _eventProducer.Produce(
103+
shovelMessage.TargetStream,
104+
new[] { shovelMessage.Message },
105+
shovelMessage.ProduceOptions,
106+
cancellationToken
107+
).NoContext();
101108
}
102109
}
103110
}

src/Eventuous.Shovel/ShovelService.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public class ShovelService<TSubscription, TProducer> : IHostedService
2525

2626
public record ShovelMessage(string TargetStream, object? Message);
2727

28-
public delegate ValueTask<ShovelMessage> RouteAndTransform(object message);
28+
public delegate ValueTask<ShovelMessage?> RouteAndTransform(object message);
2929

3030
public delegate TSubscription CreateSubscription(
3131
string subscriptionId,
@@ -145,9 +145,12 @@ ShovelService<TSubscription, TProducer>.RouteAndTransform transform
145145
}
146146

147147
public async Task HandleEvent(object evt, long? position, CancellationToken cancellationToken) {
148-
var (targetStream, message) = await _transform(evt).NoContext();
149-
if (message == null) return;
150-
await _eventProducer.Produce(targetStream, new[] { message }, cancellationToken).NoContext();
148+
var shovelMessage = await _transform(evt).NoContext();
149+
if (shovelMessage?.Message == null) return;
150+
151+
await _eventProducer
152+
.Produce(shovelMessage.TargetStream, new[] { shovelMessage.Message }, cancellationToken)
153+
.NoContext();
151154
}
152155
}
153156
}

0 commit comments

Comments
 (0)