Skip to content

Commit 9f05a4e

Browse files
Report WrongExpectedVersion properly in SQL Server scripts (#376)
* Update check_stream * Added a test * Added try-catch to AppendEvents (SQL Server)
1 parent f13666a commit 9f05a4e

File tree

5 files changed

+85
-26
lines changed

5 files changed

+85
-26
lines changed

src/Core/test/Eventuous.Tests.Persistence.Base/Fixtures/Helpers.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,14 @@ public static Task<AppendEventsResult> AppendEvent(
3838

3939
return fixture.EventStore.AppendEvents(stream, version, [streamEvent], default);
4040
}
41+
42+
43+
public static Task<AppendEventsResult> StoreChanges(
44+
this StoreFixtureBase fixture,
45+
StreamName stream,
46+
object evt,
47+
ExpectedStreamVersion version
48+
) {
49+
return fixture.EventStore.Store(stream, version, [evt]);
50+
}
4151
}

src/Core/test/Eventuous.Tests.Persistence.Base/Store/Append.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,19 @@ public async Task ShouldFailOnWrongVersion() {
6767
var task = () => _fixture.AppendEvent(stream, evt, new(3));
6868
await task.Should().ThrowAsync<AppendToStreamException>();
6969
}
70+
71+
72+
[Fact]
73+
[Trait("Category", "Store")]
74+
public async Task ShouldFailOnWrongVersionWithOptimisticConcurrencyException() {
75+
var evt = _fixture.CreateEvent();
76+
var stream = _fixture.GetStreamName();
77+
78+
await _fixture.AppendEvent(stream, evt, ExpectedStreamVersion.NoStream);
79+
80+
evt = _fixture.CreateEvent();
81+
82+
var task = () => _fixture.StoreChanges(stream, evt, new(3));
83+
await task.Should().ThrowAsync<OptimisticConcurrencyException>();
84+
}
7085
}

src/SqlServer/src/Eventuous.SqlServer/Scripts/2_AppendEvents.sql

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@ AS
77
BEGIN
88
DECLARE @current_version INT,
99
@stream_id INT,
10-
@position BIGINT
10+
@position BIGINT,
11+
@customErrorMessage NVARCHAR(200),
12+
@newMessagesCount INT,
13+
@expected_StreamVersionAfterUpdate INT,
14+
@actual_StreamVersionAfterUpdate INT
1115

1216
if @created is null
1317
BEGIN
@@ -16,10 +20,25 @@ BEGIN
1620

1721
EXEC [__schema__].[check_stream] @stream_name, @expected_version, @current_version = @current_version OUTPUT, @stream_id = @stream_id OUTPUT
1822

19-
INSERT INTO __schema__.Messages (MessageId, MessageType, StreamId, StreamPosition, JsonData, JsonMetadata, Created)
20-
SELECT message_id, message_type, @stream_id, @current_version + (ROW_NUMBER() OVER(ORDER BY (SELECT NULL))), json_data, json_metadata, @created
21-
FROM @messages
23+
BEGIN TRY
24+
INSERT INTO __schema__.Messages (MessageId, MessageType, StreamId, StreamPosition, JsonData, JsonMetadata, Created)
25+
SELECT message_id, message_type, @stream_id, @current_version + (ROW_NUMBER() OVER(ORDER BY (SELECT NULL))), json_data, json_metadata, @created
26+
FROM @messages
27+
END TRY
28+
BEGIN CATCH
29+
IF (ERROR_NUMBER() = 2627 OR ERROR_NUMBER() = 2601) AND (SELECT CHARINDEX(N'UQ_StreamIdAndStreamPosition', ERROR_MESSAGE())) > 0
30+
BEGIN
31+
DECLARE @streamIdFromError nvarchar(20) = SUBSTRING(ERROR_MESSAGE(), PATINDEX(N'%[0-9]%,%', ERROR_MESSAGE()), PATINDEX(N'%, [0-9]%).', ERROR_MESSAGE()) - PATINDEX(N'%[0-9]%,%', ERROR_MESSAGE()))
32+
DECLARE @streamPositionFromError nvarchar(20) = SUBSTRING(ERROR_MESSAGE(), (PATINDEX(N'%, [0-9]%).', ERROR_MESSAGE())) + 2, PATINDEX(N'%).', ERROR_MESSAGE()) - (PATINDEX(N'%, [0-9]%).', ERROR_MESSAGE()) + 2))
2233

34+
-- TODO: There are multiple causes of OptimisticConcurrencyExceptions, but current client code is hard-coded to check for 'WrongExpectedVersion' in message and 50000 as error number.
35+
SELECT @customErrorMessage = FORMATMESSAGE(N'WrongExpectedVersion, another message has already been written at stream position %s on stream %s.', @streamIdFromError, @streamPositionFromError);
36+
THROW 50000, @customErrorMessage, 1;
37+
END
38+
ELSE
39+
THROW
40+
END CATCH
41+
2342
SELECT TOP 1 @current_version = StreamPosition, @position = GlobalPosition
2443
FROM __schema__.Messages
2544
WHERE StreamId = @stream_id
Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,43 @@
1-
CREATE OR ALTER PROCEDURE __schema__.check_stream
2-
@stream_name NVARCHAR(850),
3-
@expected_version int,
4-
@current_version INT OUTPUT,
5-
@stream_id INT OUTPUT
6-
AS
1+
CREATE OR ALTER PROCEDURE __schema__.check_stream @stream_name NVARCHAR(850),
2+
@expected_version int,
3+
@current_version INT OUTPUT,
4+
@stream_id INT OUTPUT
5+
AS
76
BEGIN
7+
DECLARE @customErrorMessage NVARCHAR(200)
88

9-
SELECT @current_version = [Version], @stream_id =StreamId
10-
FROM __schema__.Streams
9+
SELECT @current_version = [Version], @stream_id = StreamId
10+
FROM [__schema__].Streams
1111
WHERE StreamName = @stream_name
1212

13-
IF @stream_id is null
14-
BEGIN
15-
IF @expected_version = -2 -- Any
16-
OR @expected_version = -1 -- NoStream
13+
IF @stream_id is null
1714
BEGIN
18-
INSERT INTO __schema__.Streams (StreamName, Version) VALUES (@stream_name, -1);
19-
SELECT @current_version = Version, @stream_id = StreamId
20-
FROM __schema__.Streams
21-
WHERE StreamName = @stream_name
15+
IF @expected_version = -2 -- Any
16+
OR @expected_version = -1 -- NoStream
17+
BEGIN
18+
BEGIN TRY
19+
INSERT INTO [__schema__].Streams (StreamName, Version) VALUES (@stream_name, -1);
20+
SELECT @current_version = Version, @stream_id = StreamId
21+
FROM [__schema__].Streams
22+
WHERE StreamName = @stream_name
23+
END TRY
24+
BEGIN CATCH
25+
IF (ERROR_NUMBER() = 2627 OR ERROR_NUMBER() = 2601) AND (SELECT CHARINDEX(N'UQ_StreamName', ERROR_MESSAGE())) > 0
26+
BEGIN
27+
SELECT @customErrorMessage = FORMATMESSAGE(N'WrongExpectedVersion %i, stream already exists', @expected_version);
28+
THROW 50000, @customErrorMessage, 1;
29+
END
30+
ELSE
31+
THROW
32+
END CATCH
33+
END
34+
ELSE
35+
THROW 50001, N'StreamNotFound', 1;
2236
END
2337
ELSE
24-
THROW 50001, 'StreamNotFound', 1;
25-
END
26-
ELSE IF @expected_version != -2 and @expected_version != @current_version
27-
THROW 50000, 'WrongExpectedVersion %, current version %', 1;
28-
38+
IF @expected_version != -2 and @expected_version != @current_version
39+
BEGIN
40+
SELECT @customErrorMessage = FORMATMESSAGE(N'WrongExpectedVersion %i, current version %i', @expected_version, @current_version);
41+
THROW 50000, @customErrorMessage, 1;
42+
END
2943
END

src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/SqlContainer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ namespace Eventuous.Tests.SqlServer.Fixtures;
44

55
public static class SqlContainer {
66
public static SqlEdgeContainer Create() => new SqlEdgeBuilder()
7-
.WithImage("mcr.microsoft.com/azure-sql-edge:latest")
7+
// .WithImage("mcr.microsoft.com/azure-sql-edge:1.0.7")
8+
.WithImage("mcr.microsoft.com/mssql/server:2022-latest")
89
.Build();
910
}

0 commit comments

Comments
 (0)