Skip to content

Commit 011bb31

Browse files
ESDB client update (#70)
* Moved to the new subscription start * Added persistent subscription to all * Fixed the failing stream subscription when it hits a deleted event
1 parent bb8bf85 commit 011bb31

30 files changed

+424
-342
lines changed

src/Core/src/Eventuous.Subscriptions/Context/DelayedAckConsumeContext.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,9 @@ public class DelayedAckConsumeContext : WrappedConsumeContext {
2323
/// <param name="inner">The original message context</param>
2424
/// <param name="acknowledge">Function to ACK the message</param>
2525
/// <param name="fail">Function to NACK the message in case of failure</param>
26-
public DelayedAckConsumeContext(IMessageConsumeContext inner, Acknowledge acknowledge, Fail fail)
27-
: base(inner) {
26+
public DelayedAckConsumeContext(IMessageConsumeContext inner, Acknowledge acknowledge, Fail fail) : base(inner) {
2827
_acknowledge = acknowledge;
29-
_fail = fail;
28+
_fail = fail;
3029
}
3130

3231
/// <summary>

src/Core/src/Eventuous/AppService/ApplicationService.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,9 @@ public async Task<Result<TState, TId>> Handle(object command, CancellationToken
241241
var storeResult = await Store.Store(result, cancellationToken).NoContext();
242242

243243
var changes = result.Changes.Select(x => new Change(x, TypeMap.GetTypeName(x)));
244+
245+
Log.CommandHandled(commandType);
246+
244247
return new OkResult<TState, TId>(result.State, changes, storeResult.GlobalPosition);
245248
}
246249
catch (Exception e) {

src/Core/src/Eventuous/Diagnostics/EventuousEventSource.cs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,26 +15,27 @@ public class EventuousEventSource : EventSource {
1515

1616
const int CommandHandlerNotFoundId = 1;
1717
const int ErrorHandlingCommandId = 2;
18-
const int CommandHandlerAlreadyRegisteredId = 3;
19-
const int UnableToStoreAggregateId = 4;
20-
const int UnableToReadAggregateId = 5;
21-
const int UnableToAppendEventsId = 6;
22-
const int TypeNotMappedToNameId = 7;
23-
const int TypeNameNotMappedToTypeId = 8;
24-
const int TypeMapRegisteredId = 9;
18+
const int CommandHandledId = 3;
19+
const int CommandHandlerAlreadyRegisteredId = 4;
20+
const int UnableToStoreAggregateId = 5;
21+
const int UnableToReadAggregateId = 6;
22+
const int UnableToAppendEventsId = 7;
23+
const int TypeNotMappedToNameId = 8;
24+
const int TypeNameNotMappedToTypeId = 9;
25+
const int TypeMapRegisteredId = 10;
2526

2627
[NonEvent]
2728
public void CommandHandlerNotFound(Type type) => CommandHandlerNotFound(type.Name);
2829

29-
[NonEvent]
30-
public void CommandHandlerNotFound<T>() => CommandHandlerNotFound(typeof(T).Name);
31-
3230
[NonEvent]
3331
public void ErrorHandlingCommand(Type type, Exception e) => ErrorHandlingCommand(type.Name, e.ToString());
3432

3533
[NonEvent]
36-
public void ErrorHandlingCommand<T>(Exception e) => ErrorHandlingCommand(typeof(T).Name, e.ToString());
37-
34+
public void CommandHandled(Type commandType) {
35+
if (IsEnabled(EventLevel.Verbose, EventKeywords.All))
36+
CommandHandled(commandType.Name);
37+
}
38+
3839
[NonEvent]
3940
public void CommandHandlerAlreadyRegistered<T>() => CommandHandlerAlreadyRegistered(typeof(T).Name);
4041

@@ -64,6 +65,9 @@ public void UnableToLoadAggregate<T>(string id, Exception exception) where T : A
6465
public void ErrorHandlingCommand(string commandType, string exception)
6566
=> WriteEvent(ErrorHandlingCommandId, commandType, exception);
6667

68+
[Event(CommandHandledId, Message = "Command handled: '{0}'", Level = EventLevel.Verbose)]
69+
public void CommandHandled(string commandType) => WriteEvent(CommandHandledId, commandType);
70+
6771
[Event(
6872
CommandHandlerAlreadyRegisteredId,
6973
Message = "Command handler already registered for {0}",

src/Diagnostics/test/Eventuous.Tests.OpenTelemetry/Fixtures/IntegrationFixture.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace Eventuous.Tests.OpenTelemetry.Fixtures;
88

9-
public class IntegrationFixture : IDisposable {
9+
public class IntegrationFixture : IAsyncDisposable {
1010
public IEventStore EventStore { get; }
1111
public IAggregateStore AggregateStore { get; }
1212
public EventStoreClient Client { get; }
@@ -30,8 +30,5 @@ public class IntegrationFixture : IDisposable {
3030
// ActivitySource.AddActivityListener(_listener);
3131
}
3232

33-
public void Dispose() {
34-
// _listener.Dispose();
35-
Client.Dispose();
36-
}
33+
public ValueTask DisposeAsync() => Client.DisposeAsync();
3734
}

src/Directory.Build.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
</PropertyGroup>
4646

4747
<ItemGroup Condition="'$(TargetFramework)' == 'netcoreapp3.1'">
48-
<PackageReference Include="IsExternalInit" Version="1.0.1" PrivateAssets="All"/>
48+
<PackageReference Include="IsExternalInit" Version="1.0.2" PrivateAssets="All"/>
4949
</ItemGroup>
5050

5151
<PropertyGroup Condition="'$(IsTestProject)' == 'true'">

src/EventStore/src/Eventuous.EventStore/EsdbEventStore.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,12 +271,12 @@ CancellationToken cancellationToken
271271
) => TryExecute(
272272
() => AnyOrNot(
273273
expectedVersion,
274-
() => _client.SoftDeleteAsync(
274+
() => _client.DeleteAsync(
275275
stream,
276276
StreamState.Any,
277277
cancellationToken: cancellationToken
278278
),
279-
() => _client.SoftDeleteAsync(
279+
() => _client.DeleteAsync(
280280
stream,
281281
expectedVersion.AsStreamRevision(),
282282
cancellationToken: cancellationToken

src/EventStore/src/Eventuous.EventStore/Eventuous.EventStore.csproj

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<Project Sdk="Microsoft.NET.Sdk">
22
<ItemGroup>
3-
<PackageReference Include="EventStore.Client.Grpc.PersistentSubscriptions" Version="21.2.0" />
4-
<PackageReference Include="EventStore.Client.Grpc.Streams" Version="21.2.0" />
3+
<PackageReference Include="EventStore.Client.Grpc.PersistentSubscriptions" Version="22.0.0" />
4+
<PackageReference Include="EventStore.Client.Grpc.Streams" Version="22.0.0" />
55
</ItemGroup>
66
<ItemGroup>
77
<ProjectReference Include="$(CoreRoot)\Eventuous.Producers\Eventuous.Producers.csproj" />
@@ -13,4 +13,7 @@
1313
<Using Include="Eventuous.Subscriptions" />
1414
<Using Include="Microsoft.Extensions.Logging" />
1515
</ItemGroup>
16+
<ItemGroup>
17+
<None Remove="Eventuous.EventStore.csproj.DotSettings" />
18+
</ItemGroup>
1619
</Project>
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
2+
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=subscriptions_005Coptions/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>

src/EventStore/src/Eventuous.EventStore/Producers/EventStoreProduceOptions.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,8 @@ public record EventStoreProduceOptions {
1616
/// Maximum number of events appended to a single stream in one batch
1717
/// </summary>
1818
public int MaxAppendEventsCount { get; init; } = 500;
19-
20-
/// <summary>
21-
/// Optional function to configure client operation options
22-
/// </summary>
23-
public Action<EventStoreClientOperationOptions>? ConfigureOperation { get; init; }
2419

20+
public TimeSpan? Deadline { get; init; }
21+
2522
public static EventStoreProduceOptions Default { get; } = new();
2623
}

src/EventStore/src/Eventuous.EventStore/Producers/EventStoreProducer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ await _client.AppendToStreamAsync(
6262
stream,
6363
options.ExpectedState,
6464
chunk.Select(CreateMessage),
65-
options.ConfigureOperation,
65+
null,
66+
options.Deadline,
6667
options.Credentials,
6768
cancellationToken
6869
).NoContext();

0 commit comments

Comments
 (0)