Skip to content

Commit 45ef2cb

Browse files
Allow using persistent subscription client (#354)
1 parent c475ab2 commit 45ef2cb

File tree

30 files changed

+270
-166
lines changed

30 files changed

+270
-166
lines changed

.editorconfig

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,8 @@ dotnet_style_parentheses_in_relational_binary_operators = never_if_unnecessary:n
122122
dotnet_style_predefined_type_for_locals_parameters_members = true:suggestion
123123
dotnet_style_predefined_type_for_member_access = true:suggestion
124124
dotnet_style_require_accessibility_modifiers = never:suggestion
125+
object_creation_when_type_evident = target_typed
126+
object_creation_when_type_not_evident = target_typed
125127

126128
resharper_max_attribute_length_for_same_line = 80
127129
resharper_nested_ternary_style = expanded

src/Core/src/Eventuous.Subscriptions/EventSubscription.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public abstract class EventSubscription<T> : IMessageSubscription, IAsyncDisposa
2727
internal ConsumePipe Pipe { get; }
2828
protected ILoggerFactory? LoggerFactory { get; }
2929
protected LogContext Log { get; }
30-
protected CancellationTokenSource Stopping { get; } = new();
30+
protected CancellationTokenSource Stopping { get; set; } = new();
3131

3232
protected ulong Sequence;
3333

@@ -47,11 +47,13 @@ protected EventSubscription(T options, ConsumePipe consumePipe, ILoggerFactory?
4747
public string SubscriptionId => Options.SubscriptionId;
4848

4949
public async ValueTask Subscribe(OnSubscribed onSubscribed, OnDropped onDropped, CancellationToken cancellationToken) {
50-
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, Stopping.Token);
50+
if (IsRunning) return;
51+
52+
Stopping = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
5153

5254
_onSubscribed = onSubscribed;
5355
_onDropped = onDropped;
54-
await Subscribe(cts.Token).NoContext();
56+
await Subscribe(Stopping.Token).NoContext();
5557
IsRunning = true;
5658
Log.SubscriptionStarted();
5759
onSubscribed(Options.SubscriptionId);
@@ -64,6 +66,7 @@ public async ValueTask Unsubscribe(OnUnsubscribed onUnsubscribed, CancellationTo
6466
onUnsubscribed(Options.SubscriptionId);
6567
await Finalize(cancellationToken);
6668
Sequence = 0;
69+
Stopping.Dispose();
6770
}
6871

6972
protected virtual ValueTask Finalize(CancellationToken cancellationToken) => default;

src/Core/src/Eventuous.Subscriptions/Registrations/SubscriptionRegistrationExtensions.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,7 @@ Action<SubscriptionBuilder<T, TOptions>> configureSubscription
3333
.AddSubscriptionBuilder(builder)
3434
.AddSingleton(sp => GetBuilder(sp).ResolveSubscription(sp))
3535
.AddSingleton<IHostedService, SubscriptionHostedService>(
36-
sp =>
37-
new SubscriptionHostedService(
38-
GetBuilder(sp).ResolveSubscription(sp),
39-
sp.GetService<ISubscriptionHealth>(),
40-
sp.GetService<ILoggerFactory>()
41-
)
36+
sp => new(GetBuilder(sp).ResolveSubscription(sp), sp.GetService<ISubscriptionHealth>(), sp.GetService<ILoggerFactory>())
4237
);
4338

4439
SubscriptionBuilder<T, TOptions> GetBuilder(IServiceProvider sp) => sp.GetSubscriptionBuilder<T, TOptions>(subscriptionId);

src/Core/test/Eventuous.Tests.Application/Eventuous.Tests.Application.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@
44
<IncludeSutApp>true</IncludeSutApp>
55
</PropertyGroup>
66
<ItemGroup>
7-
<ProjectReference Include="..\..\..\Testing\src\Eventuous.Testing\Eventuous.Testing.csproj" />
7+
<ProjectReference Include="$(SrcRoot)\Testing\src\Eventuous.Testing\Eventuous.Testing.csproj" />
88
</ItemGroup>
99
</Project>

src/Core/test/Eventuous.Tests.Persistence.Base/Eventuous.Tests.Persistence.Base.csproj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,7 @@
1010
<PackageReference Include="NodaTime.Serialization.SystemTextJson" />
1111
<PackageReference Include="Testcontainers" />
1212
</ItemGroup>
13+
<ItemGroup>
14+
<ProjectReference Include="$(RepoRoot)\test\Eventuous.TestHelpers\Eventuous.TestHelpers.csproj" />
15+
</ItemGroup>
1316
</Project>

src/Core/test/Eventuous.Tests.Persistence.Base/Fixtures/StoreFixtureBase.cs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
// Copyright (C) Ubiquitous AS.All rights reserved
22
// Licensed under the Apache License, Version 2.0.
33

4-
using System.Text.Json;
54
using System.Text.RegularExpressions;
65
using Bogus;
76
using DotNet.Testcontainers.Containers;
7+
using Eventuous.TestHelpers;
88
using MicroElements.AutoFixture.NodaTime;
99
using Microsoft.Extensions.DependencyInjection;
1010
using Microsoft.Extensions.Hosting;
11-
using NodaTime;
12-
using NodaTime.Serialization.SystemTextJson;
1311

1412
namespace Eventuous.Tests.Persistence.Base.Fixtures;
1513

@@ -24,8 +22,7 @@ public abstract class StoreFixtureBase {
2422
}
2523

2624
public abstract partial class StoreFixtureBase<TContainer> : StoreFixtureBase, IAsyncLifetime where TContainer : DockerContainer {
27-
IEventSerializer Serializer { get; } =
28-
new DefaultEventSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web).ConfigureForNodaTime(DateTimeZoneProviders.Tzdb));
25+
IEventSerializer Serializer { get; } = new DefaultEventSerializer(TestPrimitives.DefaultOptions);
2926

3027
public virtual async Task InitializeAsync() {
3128
Container = CreateContainer();
@@ -80,7 +77,7 @@ public virtual async Task DisposeAsync() {
8077

8178
protected virtual void GetDependencies(IServiceProvider provider) { }
8279

83-
protected TContainer Container { get; private set; } = null!;
80+
public TContainer Container { get; private set; } = null!;
8481

8582
bool _disposed;
8683

src/Core/test/Eventuous.Tests.Subscriptions.Base/SubscribeToStream.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public async Task ShouldUseExistingCheckpoint() {
6868

6969
await fixture.CheckpointStore.GetLastCheckpoint(fixture.SubscriptionId, default);
7070
Logger.ConfigureIfNull(fixture.SubscriptionId, fixture.LoggerFactory);
71-
await fixture.CheckpointStore.StoreCheckpoint(new Checkpoint(fixture.SubscriptionId, 9), true, default);
71+
await fixture.CheckpointStore.StoreCheckpoint(new(fixture.SubscriptionId, 9), true, default);
7272

7373
await fixture.StartSubscription();
7474
await Task.Delay(TimeSpan.FromSeconds(1));
@@ -88,7 +88,7 @@ async Task<List<BookingImported>> GenerateAndProduceEvents(int count) {
8888
.ToList();
8989

9090
var events = commands.Select(ToEvent).ToList();
91-
var streamEvents = events.Select(x => new StreamEvent(Guid.NewGuid(), x, new Metadata(), "", 0));
91+
var streamEvents = events.Select(x => new StreamEvent(Guid.NewGuid(), x, new(), "", 0));
9292
await fixture.EventStore.AppendEvents(streamName, ExpectedStreamVersion.Any, streamEvents.ToArray(), default);
9393

9494
return events;

src/EventStore/src/Eventuous.EventStore/Subscriptions/AllPersistentSubscription.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,21 @@ public AllPersistentSubscription(
2626
)
2727
: base(eventStoreClient, options, consumePipe, loggerFactory) { }
2828

29+
/// <summary>
30+
/// Persistent subscription for EventStoreDB, for $all stream
31+
/// </summary>
32+
/// <param name="eventStoreClient">EventStoreDB persistent subscription client instance</param>
33+
/// <param name="options">Persistent subscription options</param>
34+
/// <param name="consumePipe">Consume pipe, usually provided by the builder</param>
35+
/// <param name="loggerFactory">Optional logger factory</param>
36+
public AllPersistentSubscription(
37+
EventStorePersistentSubscriptionsClient eventStoreClient,
38+
AllPersistentSubscriptionOptions options,
39+
ConsumePipe consumePipe,
40+
ILoggerFactory? loggerFactory
41+
)
42+
: base(eventStoreClient, options, consumePipe, loggerFactory) { }
43+
2944
/// <summary>
3045
/// Creates EventStoreDB persistent subscription service for a given stream
3146
/// </summary>

src/EventStore/src/Eventuous.EventStore/Subscriptions/EventStoreCatchUpSubscriptionBase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ protected EventStoreCatchUpSubscriptionBase(
4343
/// <param name="cancellationToken"></param>
4444
protected override async ValueTask Unsubscribe(CancellationToken cancellationToken) {
4545
try {
46+
Subscription?.Dispose();
4647
Stopping.Cancel(false);
4748
await Task.Delay(100, cancellationToken);
48-
Subscription?.Dispose();
4949
} catch (Exception) {
5050
// Nothing to see here
5151
}

src/EventStore/src/Eventuous.EventStore/Subscriptions/EventStoreExtensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
namespace Eventuous.EventStore.Subscriptions;
77

88
static class EventStoreExtensions {
9-
public static EventStoreClientSettings GetSettings(this EventStoreClient client) {
10-
var prop = typeof(EventStoreClient).GetProperty("Settings", BindingFlags.NonPublic | BindingFlags.Instance);
9+
public static EventStoreClientSettings GetSettings(this EventStoreClientBase client) {
10+
var prop = typeof(EventStoreClientBase).GetProperty("Settings", BindingFlags.NonPublic | BindingFlags.Instance);
1111

1212
var getter = prop!.GetGetMethod(true);
1313
return (EventStoreClientSettings) getter!.Invoke(client, null)!;

0 commit comments

Comments
 (0)