Skip to content

Commit 5aabc40

Browse files
committed
Fixed the resubscribe bug
1 parent a8b6fd3 commit 5aabc40

File tree

25 files changed

+367
-139
lines changed

25 files changed

+367
-139
lines changed

Eventuous.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.ElasticSearch", "
112112
EndProject
113113
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Connectors.EsdbElastic", "src\Experimental\src\Eventuous.Connectors.EsdbElastic\Eventuous.Connectors.EsdbElastic.csproj", "{17A2AEBE-F96F-4F14-A075-C6984303B1B1}"
114114
EndProject
115+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Connectors.Base", "src\Experimental\src\Eventuous.Connectors.Base\Eventuous.Connectors.Base.csproj", "{859AF2D3-3370-412A-A163-425C42C8A04C}"
116+
EndProject
115117
Global
116118
GlobalSection(SolutionConfigurationPlatforms) = preSolution
117119
Debug|Any CPU = Debug|Any CPU
@@ -250,6 +252,10 @@ Global
250252
{17A2AEBE-F96F-4F14-A075-C6984303B1B1}.Debug|Any CPU.Build.0 = Debug|Any CPU
251253
{17A2AEBE-F96F-4F14-A075-C6984303B1B1}.Release|Any CPU.ActiveCfg = Release|Any CPU
252254
{17A2AEBE-F96F-4F14-A075-C6984303B1B1}.Release|Any CPU.Build.0 = Release|Any CPU
255+
{859AF2D3-3370-412A-A163-425C42C8A04C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
256+
{859AF2D3-3370-412A-A163-425C42C8A04C}.Debug|Any CPU.Build.0 = Debug|Any CPU
257+
{859AF2D3-3370-412A-A163-425C42C8A04C}.Release|Any CPU.ActiveCfg = Release|Any CPU
258+
{859AF2D3-3370-412A-A163-425C42C8A04C}.Release|Any CPU.Build.0 = Release|Any CPU
253259
EndGlobalSection
254260
GlobalSection(NestedProjects) = preSolution
255261
{151A0839-2B1F-49D6-B5DD-199A5FAAB610} = {C60C6094-2A03-45B6-AB33-C514C35DF823}
@@ -298,5 +304,6 @@ Global
298304
{F63DCF76-908F-4F4C-B8A7-4CA4EC34F6C9} = {0E2520E7-B4A6-47E7-AED8-662C88441A84}
299305
{8B9741E1-EB6A-40C9-B30D-0549A1849B9D} = {0E2520E7-B4A6-47E7-AED8-662C88441A84}
300306
{17A2AEBE-F96F-4F14-A075-C6984303B1B1} = {0E2520E7-B4A6-47E7-AED8-662C88441A84}
307+
{859AF2D3-3370-412A-A163-425C42C8A04C} = {0E2520E7-B4A6-47E7-AED8-662C88441A84}
301308
EndGlobalSection
302309
EndGlobal

src/Core/src/Eventuous.Subscriptions/Checkpoints/CheckpointCommitHandler.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ internal record CommitEvent(string Id, CommitPosition CommitPosition, CommitPosi
2424
public CheckpointCommitHandler(string subscriptionId, CommitCheckpoint commitCheckpoint, int batchSize = 1) {
2525
_subscriptionId = subscriptionId;
2626
_commitCheckpoint = commitCheckpoint;
27-
var channel = Channel.CreateBounded<CommitPosition>(batchSize * 10);
27+
var channel = Channel.CreateBounded<CommitPosition>(batchSize * 1000);
2828
_worker = new ChannelWorker<CommitPosition>(channel, Process, true);
2929

3030
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -58,7 +58,7 @@ async ValueTask CommitInternal(bool force, CancellationToken cancellationToken)
5858
try {
5959
switch (_lastCommit.Valid) {
6060
// There's a gap between the last committed position and the list head
61-
case true when _lastCommit.Sequence + 1 != _positions.Min.Sequence:
61+
case true when _lastCommit.Sequence + 1 != _positions.Min.Sequence && !force:
6262
// The list head is not at the very beginning
6363
case false when _positions.Min.Sequence != 0:
6464
return;

src/Core/src/Eventuous.Subscriptions/EventSubscription.cs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,13 @@ CancellationToken cancellationToken
4646
_onSubscribed = onSubscribed;
4747
_onDropped = onDropped;
4848
await Subscribe(cts.Token).NoContext();
49+
IsRunning = true;
4950
Log.SubscriptionStarted(Options.SubscriptionId);
5051
onSubscribed(Options.SubscriptionId);
5152
}
5253

5354
public async ValueTask Unsubscribe(OnUnsubscribed onUnsubscribed, CancellationToken cancellationToken) {
55+
IsRunning = false;
5456
await Unsubscribe(cancellationToken).NoContext();
5557
Log.SubscriptionStopped(Options.SubscriptionId);
5658
onUnsubscribed(Options.SubscriptionId);
@@ -89,16 +91,14 @@ protected async ValueTask Handler(IMessageConsumeContext context) {
8991
context.Ignore(SubscriptionId);
9092
}
9193

92-
if (context.WasIgnored() && activity != null)
93-
activity.ActivityTraceFlags = ActivityTraceFlags.None;
94+
if (context.WasIgnored() && activity != null) activity.ActivityTraceFlags = ActivityTraceFlags.None;
9495
}
9596
catch (Exception e) {
9697
context.Nack(SubscriptionId, e);
9798
}
9899

99100
if (context.HasFailed()) {
100-
if (activity != null)
101-
activity.ActivityTraceFlags = ActivityTraceFlags.Recorded;
101+
if (activity != null) activity.ActivityTraceFlags = ActivityTraceFlags.Recorded;
102102

103103
var exception = context.HandlingResults.GetException();
104104

@@ -160,18 +160,12 @@ protected async ValueTask Handler(IMessageConsumeContext context) {
160160

161161
protected abstract ValueTask Unsubscribe(CancellationToken cancellationToken);
162162

163-
readonly InterlockedSemaphore _resubscribing = new();
164-
165163
protected virtual async Task Resubscribe(TimeSpan delay, CancellationToken cancellationToken) {
166-
if (_resubscribing.IsClosed()) return;
167-
168-
Log.SubscriptionResubscribing(Options.SubscriptionId);
169-
170164
await Task.Delay(delay, cancellationToken).NoContext();
171165

172-
while (IsRunning && IsDropped) {
166+
while (IsRunning && IsDropped && !cancellationToken.IsCancellationRequested) {
173167
try {
174-
_resubscribing.Close();
168+
Log.SubscriptionResubscribing(Options.SubscriptionId);
175169

176170
await Subscribe(cancellationToken).NoContext();
177171

@@ -180,31 +174,37 @@ protected virtual async Task Resubscribe(TimeSpan delay, CancellationToken cance
180174

181175
Log.SubscriptionRestored(Options.SubscriptionId);
182176
}
177+
catch (OperationCanceledException) { }
183178
catch (Exception e) {
184-
Log.ResubscribeFailed(Options.SubscriptionId, e.ToString());
179+
Log.ResubscribeFailed(Options.SubscriptionId, e.Message);
185180
await Task.Delay(1000, cancellationToken).NoContext();
186181
}
187-
finally {
188-
_resubscribing.Open();
189-
}
190182
}
191183
}
192184

193185
protected void Dropped(DropReason reason, Exception? exception) {
194-
if (!IsRunning || _resubscribing.IsClosed()) return;
186+
if (!IsRunning) return;
195187

196188
Log.SubscriptionDropped(Options.SubscriptionId, reason, exception);
197189

198190
IsDropped = true;
199191
_onDropped?.Invoke(Options.SubscriptionId, reason, exception);
200192

201193
Task.Run(
202-
() => {
194+
async () => {
203195
var delay = reason == DropReason.Stopped
204196
? TimeSpan.FromSeconds(10)
205197
: TimeSpan.FromSeconds(2);
206198

207-
return Resubscribe(delay, Stopping.Token);
199+
Log.Warn($"Will resubscribe after {delay}");
200+
201+
try {
202+
await Resubscribe(delay, Stopping.Token);
203+
}
204+
catch (Exception e) {
205+
Log.Warn(e.Message);
206+
throw;
207+
}
208208
}
209209
);
210210
}
@@ -213,4 +213,4 @@ protected void Dropped(DropReason reason, Exception? exception) {
213213
public record EventPosition(ulong? Position, DateTime Created) {
214214
public static EventPosition FromContext(IMessageConsumeContext context)
215215
=> new(context.Items.TryGetItem<ulong>(ContextKeys.StreamPosition), context.Created);
216-
}
216+
}

src/Core/src/Eventuous.Subscriptions/Filters/ConcurrentFilter.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ static async ValueTask DelayedConsume(WorkerTask workerTask, CancellationToken c
3131

3232
await ctx.Acknowledge().NoContext();
3333
}
34+
catch (OperationCanceledException) {
35+
ctx.Ignore<ConcurrentFilter>();
36+
}
3437
catch (Exception e) {
3538
Log.MessageHandlingFailed(nameof(ConcurrentFilter), workerTask.Context, e);
3639
activity?.SetActivityStatus(ActivityStatus.Error(e));

src/Core/src/Eventuous/Tools/Interlocked.cs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,17 @@ public class InterlockedSemaphore {
77

88
public bool CanMove() => CompareExchange(true, false);
99

10-
public bool Close() => Set(true);
11-
12-
public bool Open() => Set(false);
10+
public bool Close() {
11+
return Set(true);
12+
}
1313

14-
public bool IsClosed() => _value == 1;
14+
public bool Open() {
15+
return Set(false);
16+
}
17+
18+
public bool IsClosed() {
19+
return _value == 1;
20+
}
1521

1622
bool Set(bool newValue) {
1723
var oldValue = Interlocked.Exchange(ref _value, newValue ? 1 : 0);

src/Directory.Build.props

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
<DiagRoot>$(RepoRoot)\src\Diagnostics\src</DiagRoot>
1111
<EsdbRoot>$(RepoRoot)\src\EventStore\src</EsdbRoot>
1212
<ExtRoot>$(RepoRoot)\src\Extensions\src</ExtRoot>
13+
<GatewayRoot>$(RepoRoot)\src\Shovel\src</GatewayRoot>
1314
<LocalRoot>..\..\src</LocalRoot>
1415
</PropertyGroup>
1516

src/EventStore/src/Eventuous.EventStore/Subscriptions/AllStreamSubscription.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,9 @@ void HandleDrop(
9292
global::EventStore.Client.StreamSubscription _,
9393
SubscriptionDroppedReason reason,
9494
Exception? ex
95-
)
96-
=> Dropped(EsdbMappings.AsDropReason(reason), ex);
95+
) {
96+
Dropped(EsdbMappings.AsDropReason(reason), ex);
97+
}
9798
}
9899

99100
IMessageConsumeContext CreateContext(ResolvedEvent re, CancellationToken cancellationToken) {
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
using Microsoft.AspNetCore.Builder;
2+
using Microsoft.Extensions.Configuration;
3+
4+
namespace Eventuous.Connectors.Base;
5+
6+
public static class Configuration {
7+
public static IConfigurationBuilder AddConfiguration(this WebApplicationBuilder builder)
8+
=> builder.Configuration.AddYamlFile("config.yaml", false, true).AddEnvironmentVariables();
9+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
using Eventuous.Gateway;
2+
using Eventuous.Producers;
3+
using Eventuous.Subscriptions;
4+
using Eventuous.Subscriptions.Registrations;
5+
using Microsoft.Extensions.DependencyInjection.Extensions;
6+
7+
// ReSharper disable CheckNamespace
8+
9+
namespace Microsoft.Extensions.DependencyInjection;
10+
11+
public class ConnectorBuilder {
12+
public ConnectorBuilder<TSubscription, TSubscriptionOptions> SubscribeWith<TSubscription, TSubscriptionOptions>(
13+
string subscriptionId
14+
) where TSubscription : EventSubscription<TSubscriptionOptions> where TSubscriptionOptions : SubscriptionOptions
15+
=> new(subscriptionId);
16+
}
17+
18+
public class ConnectorBuilder<TSubscription, TSubscriptionOptions> : ConnectorBuilder
19+
where TSubscription : EventSubscription<TSubscriptionOptions> where TSubscriptionOptions : SubscriptionOptions {
20+
internal string SubscriptionId { get; }
21+
22+
internal ConnectorBuilder(string subscriptionId) => SubscriptionId = subscriptionId;
23+
24+
public ConnectorBuilder<TSubscription, TSubscriptionOptions> ConfigureSubscriptionOptions(
25+
Action<TSubscriptionOptions> configureOptions
26+
) {
27+
_configureOptions = configureOptions;
28+
return this;
29+
}
30+
31+
public ConnectorBuilder<TSubscription, TSubscriptionOptions> ConfigureSubscription(
32+
Action<SubscriptionBuilder<TSubscription, TSubscriptionOptions>> configure
33+
) {
34+
_configure = configure;
35+
return this;
36+
}
37+
38+
public ConnectorBuilder<TSubscription, TSubscriptionOptions, TProducer, TProduceOptions>
39+
ProduceWith<TProducer, TProduceOptions>()
40+
where TProducer : class, IEventProducer<TProduceOptions> where TProduceOptions : class
41+
=> new(this);
42+
43+
internal void ConfigureOptions(TSubscriptionOptions options) => _configureOptions?.Invoke(options);
44+
45+
internal void Configure(SubscriptionBuilder<TSubscription, TSubscriptionOptions> builder)
46+
=> _configure?.Invoke(builder);
47+
48+
Action<TSubscriptionOptions>? _configureOptions;
49+
Action<SubscriptionBuilder<TSubscription, TSubscriptionOptions>>? _configure;
50+
}
51+
52+
public class ConnectorBuilder<TSubscription, TSubscriptionOptions, TProducer, TProduceOptions>
53+
where TSubscription : EventSubscription<TSubscriptionOptions>
54+
where TSubscriptionOptions : SubscriptionOptions
55+
where TProducer : class, IEventProducer<TProduceOptions>
56+
where TProduceOptions : class {
57+
readonly ConnectorBuilder<TSubscription, TSubscriptionOptions> _inner;
58+
Func<IServiceProvider, IGatewayTransform<TProduceOptions>>? _getTransformer;
59+
Type? _transformerType;
60+
61+
public ConnectorBuilder(ConnectorBuilder<TSubscription, TSubscriptionOptions> inner) => _inner = inner;
62+
63+
public ConnectorBuilder<TSubscription, TSubscriptionOptions, TProducer, TProduceOptions> TransformWith<T>(
64+
Func<IServiceProvider, T> getTransformer
65+
) where T : class, IGatewayTransform<TProduceOptions> {
66+
_getTransformer = getTransformer;
67+
_transformerType = typeof(T);
68+
return this;
69+
}
70+
71+
public void Register(IServiceCollection services) {
72+
services.AddSingleton(
73+
Ensure.NotNull(_transformerType, "Transformer"),
74+
Ensure.NotNull(_getTransformer, "GetTransformer")
75+
);
76+
77+
services.TryAddSingleton<TProducer>();
78+
79+
services.AddSubscription<TSubscription, TSubscriptionOptions>(
80+
_inner.SubscriptionId,
81+
builder => {
82+
builder.Configure(_inner.ConfigureOptions);
83+
_inner.Configure(builder);
84+
builder.AddEventHandler(GetHandler);
85+
}
86+
);
87+
88+
IEventHandler GetHandler(IServiceProvider sp) {
89+
var transform = sp.GetRequiredService(_transformerType!) as IGatewayTransform<TProduceOptions>;
90+
var producer = sp.GetRequiredService<TProducer>();
91+
92+
return new GatewayHandler<TProduceOptions>(
93+
new GatewayProducer<TProduceOptions>(producer),
94+
transform!.RouteAndTransform
95+
);
96+
}
97+
}
98+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<TargetFramework>net6.0</TargetFramework>
4+
</PropertyGroup>
5+
6+
<ItemGroup>
7+
<PackageReference Include="Serilog" Version="2.10.0" />
8+
<PackageReference Include="Serilog.AspNetCore" Version="5.0.0" />
9+
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.1" />
10+
<PackageReference Include="NetEscapades.Configuration.Yaml" Version="2.2.0" />
11+
</ItemGroup>
12+
13+
<ItemGroup>
14+
<ProjectReference Include="$(CoreRoot)\Eventuous.Producers\Eventuous.Producers.csproj" />
15+
<ProjectReference Include="$(CoreRoot)\Eventuous.Subscriptions\Eventuous.Subscriptions.csproj" />
16+
<ProjectReference Include="$(ExtRoot)\Eventuous.AspNetCore\Eventuous.AspNetCore.csproj" />
17+
<ProjectReference Include="$(GatewayRoot)\Eventuous.Gateway\Eventuous.Gateway.csproj" />
18+
</ItemGroup>
19+
</Project>

0 commit comments

Comments
 (0)