Skip to content

Commit 0b9e335

Browse files
Command service improvements (#187)
* Rename to command * Use fast type map * Pass over a pre-made generic context instead of converting it in each handler * Added command mapping to the controller base * First version of the service without aggregates * Make the test more like a proper sample * Improved Postgres Projector * Made schema create-drop per test. Slower but can run in parallel. * Added Postgres projector tests * Add sync version to get the command * Registrations and HTTP for functional services
1 parent a29bf4a commit 0b9e335

File tree

102 files changed

+2276
-896
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

102 files changed

+2276
-896
lines changed

.editorconfig

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ resharper_csharp_blank_lines_around_single_line_field = 0
2222
resharper_csharp_blank_lines_around_single_line_invocable = 1
2323
resharper_csharp_empty_block_style = together
2424
resharper_csharp_int_align_comments = true
25-
resharper_csharp_max_line_length = 120
25+
resharper_csharp_max_line_length = 140
2626
resharper_csharp_other_braces = end_of_line
2727
resharper_csharp_wrap_after_declaration_lpar = true
2828
resharper_csharp_wrap_arguments_style = chop_if_long
2929
resharper_csharp_wrap_before_binary_opsign = true
3030
resharper_csharp_wrap_before_declaration_rpar = true
3131
resharper_csharp_wrap_parameters_style = chop_if_long
32-
resharper_csharp_wrap_ternary_expr_style = wrap_if_long
32+
resharper_csharp_wrap_ternary_expr_style = chop_always
3333
resharper_instance_members_qualify_declared_in = base_class
3434
resharper_int_align_assignments = true
3535
resharper_int_align_fields = true
@@ -38,11 +38,14 @@ resharper_int_align_properties = true
3838
resharper_max_attribute_length_for_same_line = 80
3939
resharper_nested_ternary_style = expanded
4040
resharper_outdent_binary_ops = true
41-
resharper_place_expr_method_on_single_line = true
41+
resharper_place_expr_method_on_single_line = false
4242
resharper_place_expr_property_on_single_line = true
43+
resharper_place_field_attribute_on_same_line = false
4344
resharper_place_simple_embedded_statement_on_same_line = true
4445
resharper_space_within_single_line_array_initializer_braces = true
46+
resharper_use_heuristics_for_body_style = true
4547
resharper_wrap_object_and_collection_initializer_style = chop_if_long
4648
resharper_wrap_parameters_style = wrap_if_long
49+
resharper_wrap_ternary_expr_style = chop_if_long
4750

4851
file_header_template = Copyright (C) Ubiquitous AS. All rights reserved\nLicensed under the Apache License, Version 2.0.

Directory.Packages.props

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@
88
</PropertyGroup>
99
<PropertyGroup Label="Package versions for .NET 6" Condition="$(TargetFramework) == 'net7.0'">
1010
<MicrosoftExtensionsVer>7.0</MicrosoftExtensionsVer>
11-
<MicrosoftTestHostVer>7.0</MicrosoftTestHostVer>
11+
<MicrosoftTestHostVer>7.0.2</MicrosoftTestHostVer>
1212
</PropertyGroup>
1313
<ItemGroup>
14-
<PackageVersion Include="BenchmarkDotNet" Version="0.13.2" />
14+
<PackageVersion Include="BenchmarkDotNet" Version="0.13.4" />
1515
<PackageVersion Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="$(MicrosoftExtensionsVer)" />
1616
<PackageVersion Include="Microsoft.Extensions.Hosting.Abstractions" Version="$(MicrosoftExtensionsVer)" />
1717
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="$(MicrosoftExtensionsVer)" />
@@ -23,26 +23,25 @@
2323
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.0.0-rc9.4" />
2424
<PackageVersion Include="EventStore.Client.Grpc.PersistentSubscriptions" Version="22.0.0" />
2525
<PackageVersion Include="EventStore.Client.Grpc.Streams" Version="22.0.0" />
26-
<PackageVersion Include="MongoDB.Driver" Version="2.16.0" />
27-
<PackageVersion Include="Google.Cloud.Monitoring.V3" Version="3.0.0" />
28-
<PackageVersion Include="Google.Cloud.PubSub.V1" Version="3.2.0" />
29-
<PackageVersion Include="Confluent.Kafka" Version="1.9.3" />
26+
<PackageVersion Include="MongoDB.Driver" Version="2.19.0" />
27+
<PackageVersion Include="Google.Cloud.PubSub.V1" Version="3.3.0" />
28+
<PackageVersion Include="Confluent.Kafka" Version="2.0.2" />
3029
<PackageVersion Include="Npgsql" Version="6.0.4" />
31-
<PackageVersion Include="RabbitMQ.Client" Version="6.3.0" />
32-
<PackageVersion Include="Microsoft.Data.SqlClient" Version="5.0.0" />
30+
<PackageVersion Include="RabbitMQ.Client" Version="6.4.0" />
31+
<PackageVersion Include="Microsoft.Data.SqlClient" Version="5.1.0" />
3332
<PackageVersion Include="NEST" Version="7.17.2" />
3433
<PackageVersion Include="Polly" Version="7.2.3" />
3534
<PackageVersion Include="Newtonsoft.Json" Version="13.0.1" />
3635
</ItemGroup>
3736
<ItemGroup Label="References for packable projects">
38-
<PackageVersion Include="MinVer" Version="4.2.0" PrivateAssets="All" />
37+
<PackageVersion Include="MinVer" Version="4.3.0" PrivateAssets="All" />
3938
<PackageVersion Include="JetBrains.Annotations" Version="2022.3.1" PrivateAssets="All" />
4039
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
4140
</ItemGroup>
4241
<ItemGroup Label="References for test projects">
4342
<PackageVersion Include="AutoFixture" Version="4.17.0" />
4443
<PackageVersion Include="Bogus" Version="34.0.2" />
45-
<PackageVersion Include="FluentAssertions" Version="6.8.0" />
44+
<PackageVersion Include="FluentAssertions" Version="6.9.0" />
4645
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.4.0" />
4746
<PackageVersion Include="xunit" Version="2.4.2" />
4847
<PackageVersion Include="xunit.abstractions" Version="2.0.3" />
@@ -52,8 +51,8 @@
5251
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="$(MicrosoftTestHostVer)" />
5352
<PackageVersion Include="Microsoft.AspNetCore.TestHost" Version="$(MicrosoftTestHostVer)" />
5453
<PackageVersion Include="RestSharp" Version="107.3.0" />
55-
<PackageVersion Include="Hypothesist" Version="2.0.30" />
56-
<PackageVersion Include="NodaTime" Version="3.1.5" />
54+
<PackageVersion Include="Hypothesist" Version="2.1.50" />
55+
<PackageVersion Include="NodaTime" Version="3.1.6" />
5756
<PackageVersion Include="NodaTime.Serialization.SystemTextJson" Version="1.0.0" />
5857
<PackageVersion Include="MicroElements.AutoFixture.NodaTime" Version="1.0.0" />
5958
<PackageVersion Include="MongoDb.Bson.NodaTime" Version="3.0.0" />

Eventuous.sln.DotSettings

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
2+
<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_FIELD_ATTRIBUTE_ON_SAME_LINE_EX/@EntryValue">NEVER</s:String>
23
<s:String x:Key="/Default/CodeStyle/FileHeader/FileHeaderRegionName/@EntryValue"></s:String>
34
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/PredefinedNamingRules/=PrivateStaticFields/@EntryIndexedValue">&lt;Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" /&gt;</s:String>
5+
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpKeepExistingMigration/@EntryIndexedValue">True</s:Boolean>
6+
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpPlaceEmbeddedOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
7+
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpUseContinuousIndentInsideBracesMigration/@EntryIndexedValue">True</s:Boolean>
8+
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
49
<s:Boolean x:Key="/Default/UserDictionary/Words/=Esdb/@EntryIndexedValue">True</s:Boolean>
510
<s:Boolean x:Key="/Default/UserDictionary/Words/=Eventuous/@EntryIndexedValue">True</s:Boolean>
611
<s:Boolean x:Key="/Default/UserDictionary/Words/=pubsub/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright (C) Ubiquitous AS. All rights reserved
2+
// Licensed under the Apache License, Version 2.0.
3+
4+
namespace Eventuous;
5+
6+
public class MessageMap {
7+
readonly TypeMap<Func<object, object>> _typeMap = new();
8+
9+
public MessageMap Add<TIn, TOut>(Func<TIn, TOut> map) where TIn : class where TOut : class {
10+
_typeMap.Add<TIn>(Map);
11+
return this;
12+
13+
object Map(object inCmd)
14+
=> map((TIn)inCmd);
15+
}
16+
17+
public TOut Convert<TIn, TOut>(TIn command) where TIn : class {
18+
if (!_typeMap.TryGetValue<TIn>(out var mapper)) {
19+
throw new Exceptions.CommandMappingException<TIn, TOut>();
20+
}
21+
22+
return (TOut)mapper(command);
23+
}
24+
}

src/Core/src/Eventuous.Application/ApplicationService.cs renamed to src/Core/src/Eventuous.Application/CommandService.cs

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,24 @@
11
// Copyright (C) Ubiquitous AS. All rights reserved
22
// Licensed under the Apache License, Version 2.0.
33

4-
using Eventuous.Tools;
5-
using static Eventuous.Diagnostics.ApplicationEventSource;
6-
7-
// ReSharper disable MemberCanBePrivate.Global
8-
94
namespace Eventuous;
105

6+
using Tools;
7+
using static Diagnostics.ApplicationEventSource;
8+
119
/// <summary>
12-
/// Application service base class. A derived class should be scoped to handle commands for one aggregate type only.
10+
/// Command service base class. A derived class should be scoped to handle commands for one aggregate type only.
1311
/// </summary>
1412
/// <typeparam name="TAggregate">The aggregate type</typeparam>
1513
/// <typeparam name="TState">The aggregate state type</typeparam>
1614
/// <typeparam name="TId">The aggregate identity type</typeparam>
1715
// [PublicAPI]
18-
public abstract class ApplicationService<TAggregate, TState, TId>
19-
: IApplicationService<TAggregate, TState, TId>, IApplicationService<TAggregate>
16+
public abstract class CommandService<TAggregate, TState, TId>
17+
: ICommandService<TAggregate, TState, TId>, ICommandService<TAggregate>
2018
where TAggregate : Aggregate<TState>, new()
2119
where TState : State<TState>, new()
2220
where TId : AggregateId {
21+
[PublicAPI]
2322
protected IAggregateStore Store { get; }
2423

2524
readonly HandlersMap<TAggregate> _handlers = new();
@@ -28,7 +27,7 @@ public abstract class ApplicationService<TAggregate, TState, TId>
2827
readonly StreamNameMap _streamNameMap;
2928
readonly TypeMapper _typeMap;
3029

31-
protected ApplicationService(
30+
protected CommandService(
3231
IAggregateStore store,
3332
AggregateFactoryRegistry? factoryRegistry = null,
3433
StreamNameMap? streamNameMap = null,
@@ -196,19 +195,17 @@ protected void OnAsync<TCommand>(ArbitraryActAsync<TCommand> action)
196195
/// <exception cref="Exceptions.CommandHandlerNotFound{TCommand}"></exception>
197196
public async Task<Result<TState>> Handle<TCommand>(TCommand command, CancellationToken cancellationToken)
198197
where TCommand : class {
199-
var commandType = Ensure.NotNull(command).GetType();
200-
201-
if (!_handlers.TryGetValue(commandType, out var registeredHandler)) {
202-
Log.CommandHandlerNotFound(commandType);
203-
var exception = new Exceptions.CommandHandlerNotFound(commandType);
198+
if (!_handlers.TryGet<TCommand>(out var registeredHandler)) {
199+
Log.CommandHandlerNotFound<TCommand>();
200+
var exception = new Exceptions.CommandHandlerNotFound<TCommand>();
204201
return new ErrorResult<TState>(exception);
205202
}
206203

207-
var hasGetIdFunction = _idMap.TryGetValue(commandType, out var getId);
204+
var hasGetIdFunction = _idMap.TryGet<TCommand>(out var getId);
208205

209206
if (!hasGetIdFunction || getId == null) {
210-
Log.CannotCalculateAggregateId(commandType);
211-
var exception = new Exceptions.CommandHandlerNotFound(commandType);
207+
Log.CannotCalculateAggregateId<TCommand>();
208+
var exception = new Exceptions.CommandHandlerNotFound<TCommand>();
212209
return new ErrorResult<TState>(exception);
213210
}
214211

@@ -238,37 +235,41 @@ public async Task<Result<TState>> Handle<TCommand>(TCommand command, Cancellatio
238235
if (result.Changes.Count == 0) return new OkResult<TState>(result.State, Array.Empty<Change>(), 0);
239236

240237
var storeResult = await Store.Store(
241-
streamName != default ? streamName : GetAggregateStreamName(),
238+
streamName != default
239+
? streamName
240+
: GetAggregateStreamName(),
242241
result,
243242
cancellationToken
244243
)
245244
.NoContext();
246245

247246
var changes = result.Changes.Select(x => new Change(x, _typeMap.GetTypeName(x)));
248247

249-
Log.CommandHandled(commandType);
248+
Log.CommandHandled<TCommand>();
250249

251250
return new OkResult<TState>(result.State, changes, storeResult.GlobalPosition);
252251
}
253252
catch (Exception e) {
254-
Log.ErrorHandlingCommand(commandType, e);
253+
Log.ErrorHandlingCommand<TCommand>(e);
255254

256-
return new ErrorResult<TState>($"Error handling command {commandType.Name}", e);
255+
return new ErrorResult<TState>($"Error handling command {typeof(TCommand).Name}", e);
257256
}
258257

259-
TAggregate Create() => _factoryRegistry.CreateInstance<TAggregate, TState>();
258+
TAggregate Create()
259+
=> _factoryRegistry.CreateInstance<TAggregate, TState>();
260260

261-
StreamName GetAggregateStreamName() => _streamNameMap.GetStreamName<TAggregate, TId>(aggregateId);
261+
StreamName GetAggregateStreamName()
262+
=> _streamNameMap.GetStreamName<TAggregate, TId>(aggregateId);
262263
}
263264

264-
async Task<Result> IApplicationService.Handle<TCommand>(TCommand command, CancellationToken cancellationToken)
265+
async Task<Result> ICommandService.Handle<TCommand>(TCommand command, CancellationToken cancellationToken)
265266
where TCommand : class {
266267
var result = await Handle(command, cancellationToken).NoContext();
267268

268269
return result switch {
269-
OkResult<TState>(var aggregateState, var enumerable, _) => new OkResult(aggregateState, enumerable),
270-
ErrorResult<TState> error => new ErrorResult(error.Message, error.Exception),
271-
_ => throw new ApplicationException("Unknown result type")
270+
OkResult<TState>(var state, var enumerable, _) => new OkResult(state, enumerable),
271+
ErrorResult<TState> error => new ErrorResult(error.Message, error.Exception),
272+
_ => throw new ApplicationException("Unknown result type")
272273
};
273274
}
274275

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Copyright (C) Ubiquitous AS. All rights reserved
2+
// Licensed under the Apache License, Version 2.0.
3+
4+
using System.Diagnostics.CodeAnalysis;
5+
6+
namespace Eventuous;
7+
8+
public delegate Task<TId> GetIdFromCommandAsync<TId, in TCommand>(TCommand command, CancellationToken cancellationToken)
9+
where TId : AggregateId where TCommand : class;
10+
11+
public delegate TId GetIdFromCommand<out TId, in TCommand>(TCommand command) where TId : AggregateId where TCommand : class;
12+
13+
delegate ValueTask<TId> GetIdFromUntypedCommand<TId>(object command, CancellationToken cancellationToken)
14+
where TId : AggregateId;
15+
16+
class IdMap<TId> where TId : AggregateId {
17+
readonly TypeMap<GetIdFromUntypedCommand<TId>> _typeMap = new();
18+
19+
public void AddCommand<TCommand>(GetIdFromCommand<TId, TCommand> getId) where TCommand : class
20+
=> _typeMap.Add<TCommand>((cmd, _) => new ValueTask<TId>(getId((TCommand)cmd)));
21+
22+
public void AddCommand<TCommand>(GetIdFromCommandAsync<TId, TCommand> getId) where TCommand : class
23+
=> _typeMap.Add<TCommand>(async (cmd, ct) => await getId((TCommand)cmd, ct));
24+
25+
internal bool TryGet<TCommand>([NotNullWhen(true)] out GetIdFromUntypedCommand<TId>? getId) where TCommand : class
26+
=> _typeMap.TryGetValue<TCommand>(out getId);
27+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Copyright (C) Ubiquitous AS. All rights reserved
2+
// Licensed under the Apache License, Version 2.0.
3+
4+
using System.Diagnostics.CodeAnalysis;
5+
6+
namespace Eventuous;
7+
8+
public delegate StreamName GetStreamNameFromCommand<in TCommand>(TCommand command) where TCommand : class;
9+
10+
delegate ValueTask<StreamName> GetStreamNameFromUntypedCommand(object command, CancellationToken cancellationToken);
11+
12+
public class CommandToStreamMap {
13+
readonly TypeMap<GetStreamNameFromUntypedCommand> _typeMap = new();
14+
15+
public void AddCommand<TCommand>(GetStreamNameFromCommand<TCommand> getId) where TCommand : class
16+
=> _typeMap.Add<TCommand>((cmd, _) => new ValueTask<StreamName>(getId((TCommand)cmd)));
17+
18+
internal bool TryGet<TCommand>([NotNullWhen(true)] out GetStreamNameFromUntypedCommand? getId) where TCommand : class
19+
=> _typeMap.TryGetValue<TCommand>(out getId);
20+
}

src/Core/src/Eventuous.Application/Diagnostics/ApplicationEventSource.cs

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,37 +2,44 @@
22
// Licensed under the Apache License, Version 2.0.
33

44
using System.Diagnostics.Tracing;
5+
56
// ReSharper disable MemberCanBePrivate.Global
67

7-
namespace Eventuous.Diagnostics;
8+
namespace Eventuous.Diagnostics;
89

910
[EventSource(Name = $"{DiagnosticName.BaseName}.application")]
1011
class ApplicationEventSource : EventSource {
1112
public static ApplicationEventSource Log { get; } = new();
12-
13+
1314
const int CommandHandlerNotFoundId = 1;
1415
const int ErrorHandlingCommandId = 2;
1516
const int CommandHandledId = 3;
1617
const int CommandHandlerAlreadyRegisteredId = 4;
18+
const int CommandHandlerRegisteredId = 5;
1719
const int CannotGetAggregateIdFromCommandId = 11;
18-
20+
1921
[NonEvent]
20-
public void CommandHandlerNotFound(Type type) => CommandHandlerNotFound(type.Name);
22+
public void CommandHandlerNotFound<TCommand>() => CommandHandlerNotFound(typeof(TCommand).Name);
2123

2224
[NonEvent]
23-
public void CannotCalculateAggregateId(Type type) => CannotCalculateAggregateId(type.Name);
25+
public void CannotCalculateAggregateId<TCommand>() => CannotCalculateAggregateId(typeof(TCommand).Name);
2426

2527
[NonEvent]
26-
public void ErrorHandlingCommand(Type type, Exception e) => ErrorHandlingCommand(type.Name, e.ToString());
28+
public void ErrorHandlingCommand<TCommand>(Exception e) => ErrorHandlingCommand(typeof(TCommand).Name, e.ToString());
2729

2830
[NonEvent]
29-
public void CommandHandled(Type commandType) {
30-
if (IsEnabled(EventLevel.Verbose, EventKeywords.All)) CommandHandled(commandType.Name);
31+
public void CommandHandled<TCommand>() {
32+
if (IsEnabled(EventLevel.Verbose, EventKeywords.All)) CommandHandled(typeof(TCommand).Name);
3133
}
3234

3335
[NonEvent]
3436
public void CommandHandlerAlreadyRegistered<T>() => CommandHandlerAlreadyRegistered(typeof(T).Name);
3537

38+
[NonEvent]
39+
public void CommandHandlerRegistered<T>() {
40+
if (IsEnabled(EventLevel.Verbose, EventKeywords.All)) CommandHandlerRegistered(typeof(T).Name);
41+
}
42+
3643
[Event(CommandHandlerNotFoundId, Message = "Handler not found for command: '{0}'", Level = EventLevel.Error)]
3744
void CommandHandlerNotFound(string commandType) => WriteEvent(CommandHandlerNotFoundId, commandType);
3845

@@ -41,8 +48,7 @@ public void CommandHandled(Type commandType) {
4148
Message = "Cannot get aggregate id from command: '{0}'",
4249
Level = EventLevel.Error
4350
)]
44-
void CannotCalculateAggregateId(string commandType)
45-
=> WriteEvent(CannotGetAggregateIdFromCommandId, commandType);
51+
void CannotCalculateAggregateId(string commandType) => WriteEvent(CannotGetAggregateIdFromCommandId, commandType);
4652

4753
[Event(ErrorHandlingCommandId, Message = "Error handling command: '{0}' {1}", Level = EventLevel.Error)]
4854
void ErrorHandlingCommand(string commandType, string exception)
@@ -57,4 +63,7 @@ void ErrorHandlingCommand(string commandType, string exception)
5763
Level = EventLevel.Critical
5864
)]
5965
void CommandHandlerAlreadyRegistered(string type) => WriteEvent(CommandHandlerAlreadyRegisteredId, type);
60-
}
66+
67+
[Event(CommandHandlerRegisteredId, Message = "Command handler registered for {0}", Level = EventLevel.Verbose)]
68+
void CommandHandlerRegistered(string type) => WriteEvent(CommandHandlerRegisteredId, type);
69+
}

0 commit comments

Comments
 (0)