Skip to content

Commit

Permalink
Implement loading aggregates events range
Browse files Browse the repository at this point in the history
  • Loading branch information
rasmus committed Dec 24, 2024
1 parent 6c9fc15 commit 4adec92
Showing 1 changed file with 28 additions and 7 deletions.
35 changes: 28 additions & 7 deletions Source/EventFlow.SQLite/EventStores/SQLiteEventPersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ GlobalSequenceNumber ASC
LIMIT @pageSize";
var eventDataModels = await _connection.QueryAsync<EventDataModel>(
Label.Named("sqlite-fetch-events"),
string.Empty,
null,
cancellationToken,
sql,
new
Expand Down Expand Up @@ -137,7 +137,7 @@ INSERT INTO
{
ids = await _connection.InsertMultipleAsync<long, EventDataModel>(
Label.Named("sqlite-insert-events"),
string.Empty,
null,
cancellationToken,
sql,
eventDataModels)
Expand Down Expand Up @@ -182,7 +182,7 @@ ORDER BY
AggregateSequenceNumber ASC";
var eventDataModels = await _connection.QueryAsync<EventDataModel>(
Label.Named("sqlite-fetch-events"),
string.Empty,
null,
cancellationToken,
sql,
new
Expand All @@ -194,14 +194,35 @@ ORDER BY
return eventDataModels;
}

public Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync(
public async Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync(
IIdentity id,
int fromEventSequenceNumber,
int toEventSequenceNumber,
CancellationToken cancellationToken)
{
// TODO: Implement this!
throw new NotImplementedException();
const string sql = @"
SELECT
GlobalSequenceNumber, BatchId, AggregateId, AggregateName, Data, Metadata, AggregateSequenceNumber
FROM EventFlow
WHERE
AggregateId = @AggregateId AND
AggregateSequenceNumber >= @FromEventSequenceNumber AND
AggregateSequenceNumber <= @ToEventSequenceNumber
ORDER BY
AggregateSequenceNumber ASC";
var eventDataModels = await _connection.QueryAsync<EventDataModel>(
Label.Named("sqlite-fetch-events"),
null,
cancellationToken,
sql,
new
{
AggregateId = id.Value,
FromEventSequenceNumber = fromEventSequenceNumber,
ToEventSequenceNumber = toEventSequenceNumber
})
.ConfigureAwait(false);
return eventDataModels;
}

public async Task DeleteEventsAsync(
Expand All @@ -211,7 +232,7 @@ public async Task DeleteEventsAsync(
const string sql = "DELETE FROM EventFlow WHERE AggregateId = @AggregateId";
var affectedRows = await _connection.ExecuteAsync(
Label.Named("sqlite-delete-aggregate"),
string.Empty,
null,
cancellationToken,
sql,
new { AggregateId = id.Value })
Expand Down

0 comments on commit 4adec92

Please sign in to comment.