diff --git a/Source/EventFlow.SQLite/EventStores/SQLiteEventPersistence.cs b/Source/EventFlow.SQLite/EventStores/SQLiteEventPersistence.cs index 25d3a25dc..3e97d03a3 100644 --- a/Source/EventFlow.SQLite/EventStores/SQLiteEventPersistence.cs +++ b/Source/EventFlow.SQLite/EventStores/SQLiteEventPersistence.cs @@ -79,7 +79,7 @@ GlobalSequenceNumber ASC LIMIT @pageSize"; var eventDataModels = await _connection.QueryAsync( Label.Named("sqlite-fetch-events"), - string.Empty, + null, cancellationToken, sql, new @@ -137,7 +137,7 @@ INSERT INTO { ids = await _connection.InsertMultipleAsync( Label.Named("sqlite-insert-events"), - string.Empty, + null, cancellationToken, sql, eventDataModels) @@ -182,7 +182,7 @@ ORDER BY AggregateSequenceNumber ASC"; var eventDataModels = await _connection.QueryAsync( Label.Named("sqlite-fetch-events"), - string.Empty, + null, cancellationToken, sql, new @@ -194,14 +194,35 @@ ORDER BY return eventDataModels; } - public Task> LoadCommittedEventsAsync( + public async Task> LoadCommittedEventsAsync( IIdentity id, int fromEventSequenceNumber, int toEventSequenceNumber, CancellationToken cancellationToken) { - // TODO: Implement this! - throw new NotImplementedException(); + const string sql = @" + SELECT + GlobalSequenceNumber, BatchId, AggregateId, AggregateName, Data, Metadata, AggregateSequenceNumber + FROM EventFlow + WHERE + AggregateId = @AggregateId AND + AggregateSequenceNumber >= @FromEventSequenceNumber AND + AggregateSequenceNumber <= @ToEventSequenceNumber + ORDER BY + AggregateSequenceNumber ASC"; + var eventDataModels = await _connection.QueryAsync( + Label.Named("sqlite-fetch-events"), + null, + cancellationToken, + sql, + new + { + AggregateId = id.Value, + FromEventSequenceNumber = fromEventSequenceNumber, + ToEventSequenceNumber = toEventSequenceNumber + }) + .ConfigureAwait(false); + return eventDataModels; } public async Task DeleteEventsAsync( @@ -211,7 +232,7 @@ public async Task DeleteEventsAsync( const string sql = "DELETE FROM EventFlow WHERE AggregateId = @AggregateId"; var affectedRows = await _connection.ExecuteAsync( Label.Named("sqlite-delete-aggregate"), - string.Empty, + null, cancellationToken, sql, new { AggregateId = id.Value })