Skip to content

Commit

Permalink
Added postgres event store
Browse files Browse the repository at this point in the history
  • Loading branch information
aneshas committed Jul 5, 2021
1 parent 5c81a6b commit 7a40bd8
Show file tree
Hide file tree
Showing 24 changed files with 742 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ mono: none
dotnet: 5.0
script:
- dotnet restore
- dotnet test
- dotnet test ./Tactical.DDD/Tactical.DDD.csproj
6 changes: 6 additions & 0 deletions Tactical.DDD.EventSourcing.Integration.Tests/FooAggregate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Tactical.DDD.EventSourcing.Integration.Tests
{
public class FooAggregate : AggregateRoot<FooId>
{
}
}
6 changes: 6 additions & 0 deletions Tactical.DDD.EventSourcing.Integration.Tests/FooEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
using System;

namespace Tactical.DDD.EventSourcing.Integration.Tests
{
public record FooEvent(int Value, DateTime CreatedAt) : DomainEvent(CreatedAt);
}
10 changes: 10 additions & 0 deletions Tactical.DDD.EventSourcing.Integration.Tests/FooId.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System;

namespace Tactical.DDD.EventSourcing.Integration.Tests
{
public record FooId : EntityId
{
private Guid _guid = Guid.NewGuid();
public override string ToString() => _guid.ToString();
}
}
40 changes: 40 additions & 0 deletions Tactical.DDD.EventSourcing.Integration.Tests/IntegrationTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System;
using Dapper;
using Npgsql;
using Tactical.DDD.EventSourcing.Postgres;
using Tactical.DDD.EventSourcing.Postgres.Aperture;

namespace Tactical.DDD.EventSourcing.Integration.Tests
{
public abstract class IntegrationTest
{
protected readonly EventStore EventStore;

protected readonly PostgresOffsetTracker OffsetTracker;

protected IntegrationTest()
{
var conn = new NpgsqlConnection(CreateDb());

new EventStoreMigrator(conn).EnsureEventStoreCreated();

EventStore = new EventStore(conn);
OffsetTracker = new PostgresOffsetTracker(conn);
}

private static string CreateDb()
{
var conn = new NpgsqlConnection(
"Server=127.0.0.1;Port=5432;User Id=postgres;");

var dbName = $"db_{Guid.NewGuid().ToString().Replace("-", "_")}";

conn.Execute($"CREATE DATABASE {dbName};");

conn.Close();

return
$"Server=127.0.0.1;Port=5432;Database={dbName};User Id=postgres;";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Dapper;
using FluentAssertions;
using Npgsql;
using Tactical.DDD.EventSourcing.Postgres;
using Xunit;

namespace Tactical.DDD.EventSourcing.Integration.Tests
{
public class PostgresEventStoreTests : IntegrationTest
{
[Fact]
public async Task ShouldLoadSavedEventsPerAggregate()
{
var aggregateAId = new FooId();
var aggregateAEvents = new List<DomainEvent>
{
new FooEvent(1, DateTime.Now),
new FooEvent(2, DateTime.Now),
new FooEvent(3, DateTime.Now)
};

var aggregateBId = new FooId();
var aggregateBEvents = new List<DomainEvent>
{
new FooEvent(4, DateTime.Now),
new FooEvent(5, DateTime.Now),
new FooEvent(6, DateTime.Now)
};

var meta = new Dictionary<string, string>
{
{"foo", "bar"},
{"client-ip", "127.0.0.1"}
};

await EventStore.SaveEventsAsync(
nameof(FooAggregate), aggregateAId, 0, aggregateAEvents, meta);

await EventStore.SaveEventsAsync(
nameof(FooAggregate), aggregateBId, 0, aggregateBEvents, meta);

var events = await EventStore.LoadEventsAsync(aggregateAId);

Action<Event> AssertEvent(int version, int value) =>
@event =>
{
@event.Id.Should().Be((uint) value);
@event.StreamId.Should().Be(aggregateAId);
@event.StreamVersion.Should().Be(version);
@event.StreamName.Should().Be(nameof(FooAggregate));
@event.Meta.Should().BeEquivalentTo(meta);

var domainEvent = @event.DomainEvent as FooEvent;

Assert.NotNull(domainEvent);

domainEvent.Value.Should().Be(value);
};

events
.Should()
.SatisfyRespectively(
AssertEvent(1, 1),
AssertEvent(2, 2),
AssertEvent(3, 3)
);
}

[Fact]
public async Task ShouldThrowNotFoundException() =>
await Assert.ThrowsAsync<AggregateNotFoundException>(() =>
EventStore.LoadEventsAsync(new FooId()));

[Fact]
public async Task ShouldThrowConcurrencyCheckException()
{
var fooId = new FooId();
var events = new List<DomainEvent>
{
new FooEvent(1, DateTime.Now),
new FooEvent(2, DateTime.Now),
new FooEvent(3, DateTime.Now)
};

await EventStore.SaveEventsAsync(
nameof(FooAggregate), fooId, 0, events, new Dictionary<string, string>());

await Assert.ThrowsAsync<EventStoreConcurrencyCheckException>(() =>
EventStore.SaveEventsAsync(
nameof(FooAggregate), fooId, 0, events, new Dictionary<string, string>()));
}

[Fact]
public async Task ShouldLoadEventsFromOffset()
{
var fooId = new FooId();
var events = new List<DomainEvent>
{
new FooEvent(1, DateTime.Now),
new FooEvent(2, DateTime.Now),
new FooEvent(3, DateTime.Now),
new FooEvent(4, DateTime.Now),
new FooEvent(5, DateTime.Now)
};

await EventStore.SaveEventsAsync(
nameof(FooAggregate), fooId, 0, events, new Dictionary<string, string>());

var loadedEvents = await EventStore.LoadEventsAsync(typeof(FooAggregate), 2, 2);

loadedEvents
.Should()
.SatisfyRespectively(
@event =>
{
@event.Offset.Should().Be(3);

var e = @event.Event as FooEvent;

e.Value.Should().Be(3);
e.CreatedAt.Should().BeSameDateAs(DateTime.Today);
},
@event =>
{
@event.Offset.Should().Be(4);

var e = @event.Event as FooEvent;

e.Value.Should().Be(4);
e.CreatedAt.Should().BeSameDateAs(DateTime.Today);
}
);
}

[Fact]
public async Task NoResultsShouldYieldAnEmptyEnumerable()
{
var events = await EventStore.LoadEventsAsync(typeof(FooAggregate), 0, 100);

events
.Should()
.BeEmpty();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System.Threading.Tasks;
using FluentAssertions;
using Xunit;

namespace Tactical.DDD.EventSourcing.Integration.Tests
{
public class PostgresOffsetTrackerTests : IntegrationTest
{
[Fact]
public async Task InitialOffsetQueryShouldReturnZeroOffset()
{
var offset = await OffsetTracker.GetOffsetAsync(GetType());

offset.Should().Be(0);
}

[Fact]
public async Task ShouldLoadSavedOffset()
{
var offset = 100;

await OffsetTracker.GetOffsetAsync(GetType());
await OffsetTracker.SaveOffsetAsync(GetType(), 10);
await OffsetTracker.SaveOffsetAsync(GetType(), 50);
await OffsetTracker.SaveOffsetAsync(GetType(), offset);

var gotOffset = await OffsetTracker.GetOffsetAsync(GetType());

gotOffset.Should().Be(offset);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>

<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Dapper" Version="2.0.90" />
<PackageReference Include="FluentAssertions" Version="6.0.0-beta0001" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.7.1" />
<PackageReference Include="Npgsql" Version="5.0.7" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="coverlet.collector" Version="1.3.0">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Tactical.DDD.EventSourcing.Postgres\Tactical.DDD.EventSourcing.Postgres.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System;
using Aperture.Core;
using Microsoft.Extensions.DependencyInjection;
using IPullEventStreamEventStore = Aperture.Core.IEventStore;

namespace Tactical.DDD.EventSourcing.Postgres.Aperture
{
public static class ApertureServiceCollectionExtensions
{
public static void AddPostgresEventStream(this IServiceCollection services) =>
services.AddScoped<IPullEventStreamEventStore, EventStore>();

public static void AddPostgresAperture(
this IServiceCollection services,
PullEventStream.Config config = default,
Func<ApertureAgent, ApertureAgent> configure = null)
{
services.AddScoped<PostgresOffsetTracker>();

services.AddScoped<IStreamEvents>(
ctx => new PullEventStream(
ctx.GetService<IPullEventStreamEventStore>(), config));

var agent = ApertureAgentBuilder
.CreateDefault();

services.AddSingleton(
ctx =>
{
foreach (var projection in ctx.GetServices<IProjectEvents>())
agent.AddProjection(projection);

if (configure != null)
agent = configure(agent);

return agent
.UseEventStream(ctx.GetService<IStreamEvents>());
});
}
}
}
7 changes: 7 additions & 0 deletions Tactical.DDD.EventSourcing.Postgres/Aperture/OffsetEntry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Tactical.DDD.EventSourcing.Postgres.Aperture
{
public class OffsetEntry
{
public uint LastOffset { get; set; }
}
}
Loading

0 comments on commit 7a40bd8

Please sign in to comment.