Skip to content

Commit 9097683

Browse files
committed
Improved SqlServerMessageStore append message deadlock handling and refactored class
1 parent 17afbdb commit 9097683

File tree

1 file changed

+17
-25
lines changed

1 file changed

+17
-25
lines changed

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerMessageStore.cs

Lines changed: 17 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,31 +2,21 @@
22
using System.Collections.Generic;
33
using System.Threading.Tasks;
44
using Cleipnir.ResilientFunctions.Domain;
5-
using Cleipnir.ResilientFunctions.Domain.Exceptions;
65
using Cleipnir.ResilientFunctions.Messaging;
76
using Cleipnir.ResilientFunctions.Storage;
87
using Microsoft.Data.SqlClient;
98

109
namespace Cleipnir.ResilientFunctions.SqlServer;
1110

12-
public class SqlServerMessageStore : IMessageStore
11+
public class SqlServerMessageStore(string connectionString, string tablePrefix = "") : IMessageStore
1312
{
14-
private readonly string _connectionString;
15-
private readonly string _tablePrefix;
16-
17-
public SqlServerMessageStore(string connectionString, string tablePrefix = "")
18-
{
19-
_connectionString = connectionString;
20-
_tablePrefix = tablePrefix;
21-
}
22-
2313
private string? _initializeSql;
2414
public async Task Initialize()
2515
{
2616
await using var conn = await CreateConnection();
2717

2818
_initializeSql ??= @$"
29-
CREATE TABLE {_tablePrefix}_Messages (
19+
CREATE TABLE {tablePrefix}_Messages (
3020
FlowType INT,
3121
FlowInstance UNIQUEIDENTIFIER,
3222
Position INT NOT NULL,
@@ -46,23 +36,26 @@ PRIMARY KEY (FlowType, FlowInstance, Position)
4636
public async Task TruncateTable()
4737
{
4838
await using var conn = await CreateConnection();
49-
_truncateTableSql ??= $"TRUNCATE TABLE {_tablePrefix}_Messages;";
39+
_truncateTableSql ??= $"TRUNCATE TABLE {tablePrefix}_Messages;";
5040
var command = new SqlCommand(_truncateTableSql, conn);
5141
await command.ExecuteNonQueryAsync();
5242
}
5343

54-
private string? _appendMessageSql;
5544
public async Task<FunctionStatus?> AppendMessage(StoredId storedId, StoredMessage storedMessage)
45+
=> await AppendMessage(storedId, storedMessage, depth: 0);
46+
47+
private string? _appendMessageSql;
48+
private async Task<FunctionStatus?> AppendMessage(StoredId storedId, StoredMessage storedMessage, int depth)
5649
{
5750
await using var conn = await CreateConnection();
5851

5952
_appendMessageSql ??= @$"
60-
INSERT INTO {_tablePrefix}_Messages
53+
INSERT INTO {tablePrefix}_Messages
6154
(FlowType, FlowInstance, Position, MessageJson, MessageType, IdempotencyKey)
6255
VALUES (
6356
@FlowType,
6457
@FlowInstance,
65-
(SELECT COALESCE(MAX(position), -1) + 1 FROM {_tablePrefix}_Messages WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance),
58+
(SELECT COALESCE(MAX(position), -1) + 1 FROM {tablePrefix}_Messages WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance),
6659
@MessageJson, @MessageType, @IdempotencyKey
6760
);";
6861

@@ -78,14 +71,13 @@ INSERT INTO {_tablePrefix}_Messages
7871
}
7972
catch (SqlException e)
8073
{
81-
if (e.Number == SqlError.UNIQUENESS_INDEX_VIOLATION) //idempotency key already exists
82-
return await GetSuspensionStatus(storedId, conn);
83-
if (e.Number != SqlError.DEADLOCK_VICTIM && e.Number != SqlError.UNIQUENESS_VIOLATION)
74+
if (depth == 10 || e.Number != SqlError.DEADLOCK_VICTIM)
8475
throw;
8576

77+
// ReSharper disable once DisposeOnUsingVariable
8678
await conn.DisposeAsync();
8779
await Task.Delay(Random.Shared.Next(50, 250));
88-
return await AppendMessage(storedId, storedMessage);
80+
return await AppendMessage(storedId, storedMessage, depth + 1);
8981
}
9082

9183
return await GetSuspensionStatus(storedId, conn);
@@ -97,7 +89,7 @@ public async Task<bool> ReplaceMessage(StoredId storedId, int position, StoredMe
9789
await using var conn = await CreateConnection();
9890

9991
_replaceMessageSql ??= @$"
100-
UPDATE {_tablePrefix}_Messages
92+
UPDATE {tablePrefix}_Messages
10193
SET MessageJson = @MessageJson, MessageType = @MessageType, IdempotencyKey = @IdempotencyKey
10294
WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance AND Position = @Position";
10395

@@ -118,7 +110,7 @@ public async Task Truncate(StoredId storedId)
118110
{
119111
await using var conn = await CreateConnection();
120112
_truncateSql ??= @$"
121-
DELETE FROM {_tablePrefix}_Messages
113+
DELETE FROM {tablePrefix}_Messages
122114
WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance";
123115

124116
await using var command = new SqlCommand(_truncateSql, conn);
@@ -134,7 +126,7 @@ public async Task<IReadOnlyList<StoredMessage>> GetMessages(StoredId storedId, i
134126
await using var conn = await CreateConnection();
135127
_getMessagesSql ??= @$"
136128
SELECT MessageJson, MessageType, IdempotencyKey
137-
FROM {_tablePrefix}_Messages
129+
FROM {tablePrefix}_Messages
138130
WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance AND Position >= @Position
139131
ORDER BY Position ASC;";
140132

@@ -168,7 +160,7 @@ public async Task<IReadOnlyList<StoredMessage>> GetMessages(StoredId storedId, i
168160

169161
private async Task<SqlConnection> CreateConnection()
170162
{
171-
var conn = new SqlConnection(_connectionString);
163+
var conn = new SqlConnection(connectionString);
172164
await conn.OpenAsync();
173165
return conn;
174166
}
@@ -178,7 +170,7 @@ private async Task<SqlConnection> CreateConnection()
178170
{
179171
_getSuspensionStatusSql ??= @$"
180172
SELECT Epoch, Status
181-
FROM {_tablePrefix}
173+
FROM {tablePrefix}
182174
WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance";
183175
await using var command = new SqlCommand(_getSuspensionStatusSql, connection);
184176
command.Parameters.AddWithValue("@FlowType", storedId.Type.Value);

0 commit comments

Comments
 (0)