Skip to content

Commit f925631

Browse files
committed
Resolve conflicts
2 parents dc39d58 + 9d3abf1 commit f925631

File tree

59 files changed

+1167
-123
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+1167
-123
lines changed

.editorconfig

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ resharper_int_align_assignments = true
2727
resharper_int_align_fields = true
2828
resharper_int_align_parameters = true
2929
resharper_int_align_properties = true
30+
resharper_max_attribute_length_for_same_line = 80
3031
resharper_nested_ternary_style = expanded
3132
resharper_outdent_binary_ops = true
3233
resharper_place_expr_method_on_single_line = if_owner_is_single_line

Eventuous.sln

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Tests.RabbitMq",
6666
EndProject
6767
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{C2C31C86-828F-4DBF-8EDA-C312C7BBB54B}"
6868
EndProject
69-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Shovel.Tests", "src\Shovel\test\Eventuous.Shovel.Tests\Eventuous.Shovel.Tests.csproj", "{7DC476A6-9BEA-4F29-BFB2-6BBE10577029}"
69+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Gateway.Tests", "src\Shovel\test\Eventuous.Gateway.Tests\Eventuous.Gateway.Tests.csproj", "{7DC476A6-9BEA-4F29-BFB2-6BBE10577029}"
7070
EndProject
7171
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Tests.Subscriptions", "src\Core\test\Eventuous.Tests.Subscriptions\Eventuous.Tests.Subscriptions.csproj", "{8E74DA60-D1DA-45D1-83C5-2F7262E6D342}"
7272
EndProject
@@ -108,6 +108,12 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{0E2520E7-B4A
108108
EndProject
109109
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Spyglass", "src\Experimental\src\Eventuous.Spyglass\Eventuous.Spyglass.csproj", "{F63DCF76-908F-4F4C-B8A7-4CA4EC34F6C9}"
110110
EndProject
111+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.ElasticSearch", "src\Experimental\src\Eventuous.ElasticSearch\Eventuous.ElasticSearch.csproj", "{8B9741E1-EB6A-40C9-B30D-0549A1849B9D}"
112+
EndProject
113+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Connectors.EsdbElastic", "src\Experimental\src\Eventuous.Connectors.EsdbElastic\Eventuous.Connectors.EsdbElastic.csproj", "{17A2AEBE-F96F-4F14-A075-C6984303B1B1}"
114+
EndProject
115+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Eventuous.Connectors.Base", "src\Experimental\src\Eventuous.Connectors.Base\Eventuous.Connectors.Base.csproj", "{859AF2D3-3370-412A-A163-425C42C8A04C}"
116+
EndProject
111117
Global
112118
GlobalSection(SolutionConfigurationPlatforms) = preSolution
113119
Debug|Any CPU = Debug|Any CPU
@@ -238,6 +244,18 @@ Global
238244
{F63DCF76-908F-4F4C-B8A7-4CA4EC34F6C9}.Debug|Any CPU.Build.0 = Debug|Any CPU
239245
{F63DCF76-908F-4F4C-B8A7-4CA4EC34F6C9}.Release|Any CPU.ActiveCfg = Release|Any CPU
240246
{F63DCF76-908F-4F4C-B8A7-4CA4EC34F6C9}.Release|Any CPU.Build.0 = Release|Any CPU
247+
{8B9741E1-EB6A-40C9-B30D-0549A1849B9D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
248+
{8B9741E1-EB6A-40C9-B30D-0549A1849B9D}.Debug|Any CPU.Build.0 = Debug|Any CPU
249+
{8B9741E1-EB6A-40C9-B30D-0549A1849B9D}.Release|Any CPU.ActiveCfg = Release|Any CPU
250+
{8B9741E1-EB6A-40C9-B30D-0549A1849B9D}.Release|Any CPU.Build.0 = Release|Any CPU
251+
{17A2AEBE-F96F-4F14-A075-C6984303B1B1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
252+
{17A2AEBE-F96F-4F14-A075-C6984303B1B1}.Debug|Any CPU.Build.0 = Debug|Any CPU
253+
{17A2AEBE-F96F-4F14-A075-C6984303B1B1}.Release|Any CPU.ActiveCfg = Release|Any CPU
254+
{17A2AEBE-F96F-4F14-A075-C6984303B1B1}.Release|Any CPU.Build.0 = Release|Any CPU
255+
{859AF2D3-3370-412A-A163-425C42C8A04C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
256+
{859AF2D3-3370-412A-A163-425C42C8A04C}.Debug|Any CPU.Build.0 = Debug|Any CPU
257+
{859AF2D3-3370-412A-A163-425C42C8A04C}.Release|Any CPU.ActiveCfg = Release|Any CPU
258+
{859AF2D3-3370-412A-A163-425C42C8A04C}.Release|Any CPU.Build.0 = Release|Any CPU
241259
EndGlobalSection
242260
GlobalSection(NestedProjects) = preSolution
243261
{151A0839-2B1F-49D6-B5DD-199A5FAAB610} = {C60C6094-2A03-45B6-AB33-C514C35DF823}
@@ -284,5 +302,8 @@ Global
284302
{E387CC89-42B6-40BD-800B-C369AE971FCC} = {2B7F84B7-C0E5-408F-ABAF-BF23C8305486}
285303
{0E2520E7-B4A6-47E7-AED8-662C88441A84} = {B64D59D5-7935-4DFC-8D90-D6EF3D6F2ABA}
286304
{F63DCF76-908F-4F4C-B8A7-4CA4EC34F6C9} = {0E2520E7-B4A6-47E7-AED8-662C88441A84}
305+
{8B9741E1-EB6A-40C9-B30D-0549A1849B9D} = {0E2520E7-B4A6-47E7-AED8-662C88441A84}
306+
{17A2AEBE-F96F-4F14-A075-C6984303B1B1} = {0E2520E7-B4A6-47E7-AED8-662C88441A84}
307+
{859AF2D3-3370-412A-A163-425C42C8A04C} = {0E2520E7-B4A6-47E7-AED8-662C88441A84}
287308
EndGlobalSection
288309
EndGlobal

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ services:
44

55
esdb:
66
container_name: eventuous-esdb
7-
image: eventstore/eventstore:latest #20.10.2-buster-slim
7+
image: ghcr.io/eventstore/eventstore:21.10.0-alpha-arm64v8
88
ports:
99
- '2113:2113'
1010
- '1113:1113'

src/Core/src/Eventuous.Subscriptions/Checkpoints/CheckpointCommitHandler.cs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,15 @@ internal record CommitEvent(string Id, CommitPosition CommitPosition, CommitPosi
2424
public CheckpointCommitHandler(string subscriptionId, CommitCheckpoint commitCheckpoint, int batchSize = 1) {
2525
_subscriptionId = subscriptionId;
2626
_commitCheckpoint = commitCheckpoint;
27-
var channel = Channel.CreateBounded<CommitPosition>(batchSize * 10);
27+
var channel = Channel.CreateBounded<CommitPosition>(batchSize * 1000);
2828
_worker = new ChannelWorker<CommitPosition>(channel, Process, true);
2929

3030
[MethodImpl(MethodImplOptions.AggressiveInlining)]
3131
async ValueTask Process(CommitPosition position, CancellationToken cancellationToken) {
3232
_positions.Add(position);
3333
if (_positions.Count < batchSize) return;
3434

35-
await CommitInternal(cancellationToken).NoContext();
35+
await CommitInternal(false, cancellationToken).NoContext();
3636
}
3737
}
3838

@@ -54,11 +54,11 @@ public ValueTask Commit(CommitPosition position, CancellationToken cancellationT
5454
}
5555

5656
[MethodImpl(MethodImplOptions.AggressiveInlining)]
57-
async ValueTask CommitInternal(CancellationToken cancellationToken) {
57+
async ValueTask CommitInternal(bool force, CancellationToken cancellationToken) {
5858
try {
5959
switch (_lastCommit.Valid) {
6060
// There's a gap between the last committed position and the list head
61-
case true when _lastCommit.Sequence + 1 != _positions.Min.Sequence:
61+
case true when _lastCommit.Sequence + 1 != _positions.Min.Sequence && !force:
6262
// The list head is not at the very beginning
6363
case false when _positions.Min.Sequence != 0:
6464
return;
@@ -68,9 +68,11 @@ async ValueTask CommitInternal(CancellationToken cancellationToken) {
6868
if (!commitPosition.Valid) return;
6969

7070
await _commitCheckpoint(
71-
new Checkpoint(_subscriptionId, commitPosition.Position),
72-
cancellationToken
73-
).NoContext();
71+
new Checkpoint(_subscriptionId, commitPosition.Position),
72+
force,
73+
cancellationToken
74+
)
75+
.NoContext();
7476

7577
_lastCommit = commitPosition;
7678

@@ -84,7 +86,7 @@ await _commitCheckpoint(
8486

8587
public async ValueTask DisposeAsync() {
8688
Log.Stopping(nameof(CheckpointCommitHandler), "worker", "");
87-
await _worker.Stop(CommitInternal);
89+
await _worker.Stop(ct => CommitInternal(true, ct)).NoContext();
8890
_positions.Clear();
8991
}
9092
}
@@ -97,5 +99,6 @@ public record struct CommitPosition(ulong Position, ulong Sequence) {
9799

98100
public delegate ValueTask<Checkpoint> CommitCheckpoint(
99101
Checkpoint checkpoint,
102+
bool force,
100103
CancellationToken cancellationToken
101-
);
104+
);

src/Core/src/Eventuous.Subscriptions/Checkpoints/ICheckpointStore.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@ public record Checkpoint(string Id, ulong? Position);
77
public interface ICheckpointStore {
88
ValueTask<Checkpoint> GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken);
99

10-
ValueTask<Checkpoint> StoreCheckpoint(Checkpoint checkpoint, CancellationToken cancellationToken);
10+
ValueTask<Checkpoint> StoreCheckpoint(Checkpoint checkpoint, bool force, CancellationToken cancellationToken);
1111
}

src/Core/src/Eventuous.Subscriptions/Checkpoints/MeasuredCheckpointStore.cs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,20 @@ public class MeasuredCheckpointStore : ICheckpointStore {
1313

1414
readonly ICheckpointStore _checkpointStore;
1515

16-
public MeasuredCheckpointStore(ICheckpointStore checkpointStore)
17-
=> _checkpointStore = checkpointStore;
16+
public MeasuredCheckpointStore(ICheckpointStore checkpointStore) => _checkpointStore = checkpointStore;
1817

1918
public async ValueTask<Checkpoint> GetLastCheckpoint(
2019
string checkpointId,
2120
CancellationToken cancellationToken
2221
) {
2322
using var activity = EventuousDiagnostics.ActivitySource.CreateActivity(
24-
ReadOperationName,
25-
ActivityKind.Internal,
26-
parentContext: default,
27-
GetTags(checkpointId),
28-
idFormat: ActivityIdFormat.W3C
29-
)?.Start();
23+
ReadOperationName,
24+
ActivityKind.Internal,
25+
parentContext: default,
26+
GetTags(checkpointId),
27+
idFormat: ActivityIdFormat.W3C
28+
)
29+
?.Start();
3030

3131
var checkpoint = await _checkpointStore.GetLastCheckpoint(checkpointId, cancellationToken).NoContext();
3232

@@ -37,6 +37,7 @@ CancellationToken cancellationToken
3737
[MethodImpl(MethodImplOptions.AggressiveInlining)]
3838
public async ValueTask<Checkpoint> StoreCheckpoint(
3939
Checkpoint checkpoint,
40+
bool force,
4041
CancellationToken cancellationToken
4142
) {
4243
using var activity = EventuousDiagnostics.ActivitySource.CreateActivity(
@@ -45,15 +46,16 @@ CancellationToken cancellationToken
4546
parentContext: default,
4647
GetTags(checkpoint.Id),
4748
idFormat: ActivityIdFormat.W3C
48-
)?
49+
)
50+
?
4951
.AddBaggage(CheckpointBaggage, checkpoint.Position?.ToString())
5052
.Start();
5153

52-
return await _checkpointStore.StoreCheckpoint(checkpoint, cancellationToken).NoContext();
54+
return await _checkpointStore.StoreCheckpoint(checkpoint, force, cancellationToken).NoContext();
5355
}
5456

5557
static KeyValuePair<string, object?>[] GetTags(string checkpointId)
5658
=> EventuousDiagnostics.CombineWithDefaultTags(
5759
new KeyValuePair<string, object?>(SubscriptionIdTag, checkpointId)
5860
);
59-
}
61+
}

src/Core/src/Eventuous.Subscriptions/Checkpoints/NoOpCheckpointStore.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ public ValueTask<Checkpoint> GetLastCheckpoint(string checkpointId, Cancellation
1212
return new ValueTask<Checkpoint>(_start);
1313
}
1414

15-
public ValueTask<Checkpoint> StoreCheckpoint(Checkpoint checkpoint, CancellationToken cancellationToken) {
15+
public ValueTask<Checkpoint> StoreCheckpoint(Checkpoint checkpoint, bool force, CancellationToken cancellationToken) {
1616
SubscriptionsEventSource.Log.CheckpointStored(this, checkpoint);
1717
return new ValueTask<Checkpoint>(checkpoint);
1818
}

src/Core/src/Eventuous.Subscriptions/Context/IMessageConsumeContext.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ public interface IBaseConsumeContext {
1010
string MessageType { get; }
1111
string ContentType { get; }
1212
string Stream { get; }
13+
long StreamPosition { get; }
14+
ulong GlobalPosition { get; }
1315
DateTime Created { get; }
1416
Metadata? Metadata { get; }
1517
ContextItems Items { get; }

src/Core/src/Eventuous.Subscriptions/Context/MessageConsumeContext.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ public MessageConsumeContext(
88
string eventType,
99
string contentType,
1010
string stream,
11+
long streamPosition,
12+
ulong globalPosition,
1113
ulong sequence,
1214
DateTime created,
1315
object? message,
@@ -19,6 +21,8 @@ CancellationToken cancellationToken
1921
MessageType = eventType;
2022
ContentType = contentType;
2123
Stream = stream;
24+
StreamPosition = streamPosition;
25+
GlobalPosition = globalPosition;
2226
Created = created;
2327
Metadata = metadata;
2428
Sequence = sequence;
@@ -31,6 +35,8 @@ CancellationToken cancellationToken
3135
public string MessageType { get; }
3236
public string ContentType { get; }
3337
public string Stream { get; }
38+
public long StreamPosition { get; }
39+
public ulong GlobalPosition { get; }
3440
public DateTime Created { get; }
3541
public Metadata? Metadata { get; }
3642
public object? Message { get; }

src/Core/src/Eventuous.Subscriptions/Context/WrappedConsumeContext.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ public abstract class WrappedConsumeContext : IMessageConsumeContext {
1111
public string MessageType => InnerContext.MessageType;
1212
public string ContentType => InnerContext.ContentType;
1313
public string Stream => InnerContext.Stream;
14+
public long StreamPosition => InnerContext.StreamPosition;
15+
public ulong GlobalPosition => InnerContext.GlobalPosition;
1416
public DateTime Created => InnerContext.Created;
1517
public object? Message => InnerContext.Message;
1618
public Metadata? Metadata => InnerContext.Metadata;

0 commit comments

Comments
 (0)