Skip to content

Commit

Permalink
Updated postgres implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
aneshas committed Apr 26, 2023
1 parent d199195 commit f9c5727
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ namespace Tactical.DDD.EventSourcing.Integration.Tests
public abstract class IntegrationTest
{
protected readonly EventStore EventStore;

protected readonly PostgresOffsetTracker OffsetTracker;

protected IntegrationTest()
{
var conn = new NpgsqlConnection(CreateDb());
var conn = NpgsqlDataSource.Create(CreateDb());

new EventStoreMigrator(conn).EnsureEventStoreCreated();
new EventStoreMigrator(conn.CreateConnection()).EnsureEventStoreCreated();

EventStore = new EventStore(conn);
OffsetTracker = new PostgresOffsetTracker(conn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<PackageReference Include="Dapper" Version="2.0.90" />
<PackageReference Include="FluentAssertions" Version="6.0.0-beta0001" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.7.1" />
<PackageReference Include="Npgsql" Version="5.0.7" />
<PackageReference Include="Npgsql" Version="7.0.4" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,41 +1,25 @@
using System;
using Aperture.Core;
using Microsoft.Extensions.DependencyInjection;
using Npgsql;
using IPullEventStreamEventStore = Aperture.Core.IEventStore;

namespace Tactical.DDD.EventSourcing.Postgres.Aperture
{
public static class ApertureServiceCollectionExtensions
{
public static void AddPostgresEventStream(this IServiceCollection services) =>
services.AddScoped<IPullEventStreamEventStore, EventStore>();

public static void AddPostgresAperture(
this IServiceCollection services,
PullEventStream.Config config = default,
Func<ApertureAgent, ApertureAgent> configure = null)
string connString,
PullEventStream.Config config = default
)
{
services.AddScoped<PostgresOffsetTracker>();
services.AddTransient(_ => NpgsqlDataSource.Create(connString));
services.AddTransient<IPullEventStreamEventStore, EventStore>();
services.AddSingleton<ITrackOffset, PostgresOffsetTracker>();

services.AddScoped<IStreamEvents>(
services.AddTransient<IStreamEvents>(
ctx => new PullEventStream(
ctx.GetService<IPullEventStreamEventStore>(), config));

var agent = ApertureAgentBuilder
.CreateDefault();

services.AddSingleton(
ctx =>
{
foreach (var projection in ctx.GetServices<IProjectEvents>())
agent.AddProjection(projection);

if (configure != null)
agent = configure(agent);

return agent
.UseEventStream(ctx.GetService<IStreamEvents>());
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,31 @@
using System.Data;
using System.Linq;
using System.Threading.Tasks;
using Aperture.Core;
using Dapper;
using Npgsql;

namespace Tactical.DDD.EventSourcing.Postgres.Aperture
{
public class PostgresOffsetTracker
public class PostgresOffsetTracker : ITrackOffset
{
// private readonly IDbConnection _conn;
private readonly NpgsqlConnection _conn;
private readonly NpgsqlDataSource _conn;

public PostgresOffsetTracker(NpgsqlConnection conn)
public PostgresOffsetTracker(NpgsqlDataSource conn)
{
_conn = conn;
}

public async Task SaveOffsetAsync(Type projection, int currentOffset)
{
var sql = @"UPDATE @TableName SET last_offset = @Offset;";
await using var conn = _conn.CreateConnection();

var sql = @"INSERT INTO @TableName (id, last_offset) VALUES (1, 0)
ON CONFLICT(id) DO UPDATE SET last_offset = @Offset";

sql = sql.Replace("@TableName", TableNameFor(projection));

await _conn.ExecuteAsync(
await conn.ExecuteAsync(
sql,
new
{
Expand All @@ -33,32 +36,41 @@ await _conn.ExecuteAsync(

public async Task<int> GetOffsetAsync(Type projection)
{
CreateTrackingTableFor(projection);
await using var conn = _conn.CreateConnection();

CreateTrackingTableFor(projection, conn);

var sql = @"SELECT last_offset LastOffset FROM public.@TableName LIMIT 1;";

sql = sql.Replace("@TableName", TableNameFor(projection));

var result = await _conn.QueryFirstAsync<OffsetEntry>(sql);
var results = await conn.QueryAsync<OffsetEntry>(sql);

var offsetEntries = results as OffsetEntry[] ?? results.ToArray();

if (!offsetEntries.Any())
{
return 0;
}

return (int) result.LastOffset;
return (int) offsetEntries.First().LastOffset;
}

private void CreateTrackingTableFor(Type projection)
private void CreateTrackingTableFor(Type projection, NpgsqlConnection conn)
{
var sql = @"
BEGIN;
CREATE TABLE IF NOT EXISTS @TableName
(
id bigint NOT NULL PRIMARY KEY,
last_offset bigint NOT NULL,
updated_on timestamptz NOT NULL DEFAULT (now() at time zone 'utc')
);
INSERT INTO @TableName (last_offset) VALUES (0);
COMMIT;";

sql = sql.Replace("@TableName", TableNameFor(projection));

_conn.Execute(sql);
conn.Execute(sql);
}

private static string TableNameFor(Type projection) =>
Expand Down
17 changes: 12 additions & 5 deletions Tactical.DDD.EventSourcing.Postgres/EventStore.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Linq;
using System.Threading.Tasks;
using Aperture.Core;
Expand All @@ -26,7 +27,7 @@ public EventStoreConcurrencyCheckException(string message) : base(message)

public class EventStore : IEventStore, IPullEventStreamEventStore
{
private readonly NpgsqlConnection _conn;
private readonly NpgsqlDataSource _conn;

private readonly JsonSerializerSettings _jsonSerializerSettings = new()
{
Expand All @@ -35,13 +36,15 @@ public class EventStore : IEventStore, IPullEventStreamEventStore
MetadataPropertyHandling = MetadataPropertyHandling.ReadAhead
};

public EventStore(NpgsqlConnection conn)
public EventStore(NpgsqlDataSource conn)
{
_conn = conn;
}

public async Task<IEnumerable<Event>> LoadEventsAsync(EntityId aggregateId)
{
await using var conn = _conn.CreateConnection();

const string sql = @"SELECT
id Id,
stream_id StreamId,
Expand All @@ -54,7 +57,7 @@ FROM public.events
WHERE stream_id::text = @AggregateId
ORDER BY id";

var storedEvents = await _conn.QueryAsync<StoredEvent>(
var storedEvents = await conn.QueryAsync<StoredEvent>(
sql,
new {AggregateId = aggregateId.ToString()});

Expand Down Expand Up @@ -82,6 +85,8 @@ public async Task SaveEventsAsync(
IEnumerable<DomainEvent> events,
Dictionary<string, string> meta)
{
await using var conn = _conn.CreateConnection();

const string sql = @"INSERT INTO
events(stream_id, stream_version, stream_name, data, meta)
VALUES (@stream_id::uuid, @stream_version, @stream_name, @data::jsonb, @meta::jsonb)";
Expand All @@ -97,7 +102,7 @@ public async Task SaveEventsAsync(

try
{
await _conn.ExecuteAsync(sql, data);
await conn.ExecuteAsync(sql, data);
}
catch (PostgresException e)
{
Expand All @@ -110,6 +115,8 @@ public async Task SaveEventsAsync(

public async Task<IEnumerable<EventData>> LoadEventsAsync(Type projection, int fromOffset, int count)
{
await using var conn = _conn.CreateConnection();

const string sql = @"SELECT
id Id,
stream_id StreamId,
Expand All @@ -123,7 +130,7 @@ WHERE id > @Offset
ORDER BY id
LIMIT @Count";

var storedEvents = await _conn.QueryAsync<StoredEvent>(
var storedEvents = await conn.QueryAsync<StoredEvent>(
sql,
new
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
<PackageReference Include="Aperture.Core" Version="0.3.6" />
<PackageReference Include="Dapper" Version="2.0.90" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="7.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
<PackageReference Include="Npgsql" Version="5.0.7" />
<PackageReference Include="Npgsql" Version="7.0.4" />
</ItemGroup>

<ItemGroup>
Expand Down

0 comments on commit f9c5727

Please sign in to comment.