Skip to content

Commit 2f4b44b

Browse files
committed
Add positions to the result
1 parent aac753c commit 2f4b44b

File tree

8 files changed

+77
-31
lines changed

8 files changed

+77
-31
lines changed

src/Eventuous.EventStoreDB/EsdbEventStore.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,15 @@ public class EsdbEventStore : IEventStore {
1616
public EsdbEventStore(EventStoreClientSettings clientSettings)
1717
: this(new EventStoreClient(Ensure.NotNull(clientSettings, nameof(clientSettings)))) { }
1818

19-
public async Task AppendEvents(
19+
public async Task<AppendEventsResult> AppendEvents(
2020
string stream,
2121
ExpectedStreamVersion expectedVersion,
2222
IReadOnlyCollection<StreamEvent> events,
2323
CancellationToken cancellationToken
2424
) {
2525
var proposedEvents = events.Select(ToEventData);
2626

27-
Task resultTask;
27+
Task<IWriteResult> resultTask;
2828

2929
if (expectedVersion == ExpectedStreamVersion.NoStream)
3030
resultTask = _client.AppendToStreamAsync(stream, StreamState.NoStream, proposedEvents, cancellationToken: cancellationToken);
@@ -38,7 +38,12 @@ CancellationToken cancellationToken
3838
cancellationToken: cancellationToken
3939
);
4040

41-
await resultTask.Ignore();
41+
var result = await resultTask.Ignore();
42+
43+
return new AppendEventsResult(
44+
result.LogPosition.CommitPosition,
45+
result.NextExpectedStreamRevision.ToInt64()
46+
);
4247

4348
static EventData ToEventData(StreamEvent streamEvent)
4449
=> new(

src/Eventuous/AggregateStore.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,24 @@ public AggregateStore(IEventStore eventStore, IEventSerializer serializer) {
1515
_serializer = Ensure.NotNull(serializer, nameof(serializer));
1616
}
1717

18-
public async Task Store<T>(T aggregate, CancellationToken cancellationToken)
18+
public async Task<AppendEventsResult> Store<T>(T aggregate, CancellationToken cancellationToken)
1919
where T : Aggregate {
2020
Ensure.NotNull(aggregate, nameof(aggregate));
2121

22-
if (aggregate.Changes.Count == 0) return;
22+
if (aggregate.Changes.Count == 0) return AppendEventsResult.NoOp;
2323

2424
var stream = StreamName.For<T>(aggregate.GetId());
2525
var expectedVersion = new ExpectedStreamVersion(aggregate.OriginalVersion);
2626

27-
await _eventStore.AppendEvents(
27+
var result = await _eventStore.AppendEvents(
2828
stream,
2929
expectedVersion,
3030
aggregate.Changes.Select(ToStreamEvent).ToArray(),
3131
cancellationToken
3232
).Ignore();
3333

34+
return result;
35+
3436
StreamEvent ToStreamEvent(object evt)
3537
=> new(TypeMap.GetTypeName(evt), _serializer.Serialize(evt), null, _serializer.ContentType);
3638
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
namespace Eventuous {
2+
public record AppendEventsResult(ulong GlobalPosition, long NextExpectedVersion) {
3+
public static AppendEventsResult NoOp = new(0, -1);
4+
}
5+
}

src/Eventuous/ApplicationService.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,9 +232,9 @@ CancellationToken cancellationToken
232232

233233
var result = await registeredHandler.Handler(aggregate, command, cancellationToken).Ignore();
234234

235-
await Store.Store(result, cancellationToken).Ignore();
235+
var storeResult = await Store.Store(result, cancellationToken).Ignore();
236236

237-
return new OkResult<T, TState, TId>(result.State, result.Changes);
237+
return new OkResult<T, TState, TId>(result.State, result.Changes, storeResult.GlobalPosition);
238238

239239
Task<T> Load() => Store.Load<T, TState, TId>(id, cancellationToken);
240240

src/Eventuous/IAggregateStore.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public interface IAggregateStore {
1616
/// <param name="cancellationToken">Cancellation token</param>
1717
/// <typeparam name="T">Aggregate type</typeparam>
1818
/// <returns></returns>
19-
Task Store<T>(T aggregate, CancellationToken cancellationToken) where T : Aggregate;
19+
Task<AppendEventsResult> Store<T>(T aggregate, CancellationToken cancellationToken) where T : Aggregate;
2020

2121
/// <summary>
2222
/// Load the aggregate from the store for a given id

src/Eventuous/IEventStore.cs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,36 @@
22
using System.Collections.Generic;
33
using System.Threading;
44
using System.Threading.Tasks;
5+
using JetBrains.Annotations;
56

67
namespace Eventuous {
8+
/// <summary>
9+
/// Event Store is a place where events are stored. It is used by <see cref="AggregateStore"/> and
10+
/// <seealso cref="StateStore"/>
11+
/// </summary>
12+
[PublicAPI]
713
public interface IEventStore {
8-
Task AppendEvents(string stream, ExpectedStreamVersion expectedVersion, IReadOnlyCollection<StreamEvent> events, CancellationToken cancellationToken);
9-
10-
Task<StreamEvent[]> ReadEvents(string stream, StreamReadPosition start, int count, CancellationToken cancellationToken);
11-
14+
Task<AppendEventsResult> AppendEvents(
15+
string stream,
16+
ExpectedStreamVersion expectedVersion,
17+
IReadOnlyCollection<StreamEvent> events,
18+
CancellationToken cancellationToken
19+
);
20+
21+
Task<StreamEvent[]> ReadEvents(
22+
string stream,
23+
StreamReadPosition start,
24+
int count,
25+
CancellationToken cancellationToken
26+
);
27+
1228
Task<StreamEvent[]> ReadEventsBackwards(string stream, int count, CancellationToken cancellationToken);
13-
14-
Task ReadStream(string stream, StreamReadPosition start, Action<StreamEvent> callback, CancellationToken cancellationToken);
29+
30+
Task ReadStream(
31+
string stream,
32+
StreamReadPosition start,
33+
Action<StreamEvent> callback,
34+
CancellationToken cancellationToken
35+
);
1536
}
1637
}

src/Eventuous/Result.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@ public abstract record Result<T, TState, TId>(TState State, IEnumerable<object>?
66
where TState : AggregateState<TState, TId>, new()
77
where TId : AggregateId;
88

9-
public record OkResult<T, TState, TId>(TState State, IEnumerable<object> Changes) : Result<T, TState, TId>(State, Changes)
10-
where T : Aggregate<TState, TId>
9+
public record OkResult<T, TState, TId>(TState State, IEnumerable<object> Changes, ulong StreamPosition)
10+
: Result<T, TState, TId>(State, Changes)
11+
where T : Aggregate<TState, TId>
1112
where TState : AggregateState<TState, TId>, new()
1213
where TId : AggregateId;
13-
14-
public record ErrorResult<T, TState, TId>() : Result<T, TState, TId>(new TState())
15-
where T : Aggregate<TState, TId>
14+
15+
public record ErrorResult<T, TState, TId>() : Result<T, TState, TId>(new TState())
16+
where T : Aggregate<TState, TId>
1617
where TState : AggregateState<TState, TId>, new()
1718
where TId : AggregateId;
18-
}
19+
}

test/Eventuous.Tests/Fakes/InMemoryEventStore.cs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,30 +7,37 @@
77
namespace Eventuous.Tests.Fakes {
88
public class InMemoryEventStore : IEventStore {
99
readonly Dictionary<string, List<StreamEvent>> _storage = new();
10+
readonly List<StreamEvent> _global = new();
1011

11-
public Task AppendEvents(
12+
public Task<AppendEventsResult> AppendEvents(
1213
string stream,
1314
ExpectedStreamVersion expectedVersion,
1415
IReadOnlyCollection<StreamEvent> events,
1516
CancellationToken cancellationToken
1617
) {
17-
return _storage.TryGetValue(stream, out var existing) ? AddToExisting() : AddToNew();
18+
if (_storage.TryGetValue(stream, out var existing)) AddToExisting();
19+
else AddToNew();
1820

19-
Task AddToExisting() {
21+
_global.AddRange(events);
22+
23+
return Task.FromResult(new AppendEventsResult((ulong) (_global.Count - 1), _storage[stream].Count));
24+
25+
void AddToExisting() {
2026
if (existing.Count >= expectedVersion.Value)
2127
throw new WrongVersion(expectedVersion, existing.Count - 1);
2228

2329
existing.AddRange(events);
24-
return Task.CompletedTask;
2530
}
2631

27-
Task AddToNew() {
28-
_storage[stream] = events.ToList();
29-
return Task.CompletedTask;
30-
}
32+
void AddToNew() => _storage[stream] = events.ToList();
3133
}
3234

33-
public Task<StreamEvent[]> ReadEvents(string stream, StreamReadPosition start, int count, CancellationToken cancellationToken)
35+
public Task<StreamEvent[]> ReadEvents(
36+
string stream,
37+
StreamReadPosition start,
38+
int count,
39+
CancellationToken cancellationToken
40+
)
3441
=> Task.FromResult(FindStream(stream).Take(count).ToArray());
3542

3643
public Task<StreamEvent[]> ReadEventsBackwards(string stream, int count, CancellationToken cancellationToken) {
@@ -40,7 +47,12 @@ public Task<StreamEvent[]> ReadEventsBackwards(string stream, int count, Cancell
4047
return Task.FromResult(reversed.Take(count).ToArray());
4148
}
4249

43-
public Task ReadStream(string stream, StreamReadPosition start, Action<StreamEvent> callback, CancellationToken cancellationToken) {
50+
public Task ReadStream(
51+
string stream,
52+
StreamReadPosition start,
53+
Action<StreamEvent> callback,
54+
CancellationToken cancellationToken
55+
) {
4456
foreach (var streamEvent in FindStream(stream)) {
4557
callback(streamEvent);
4658
}

0 commit comments

Comments
 (0)