From 17e81a0bf11bd2ead2862026cae3bdb2f1d22373 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 17 Jun 2025 15:34:22 +0200 Subject: [PATCH 1/4] wip Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/RawConsumer.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index f34b367f..5de1bdeb 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -132,13 +132,11 @@ public class RawConsumer : AbstractEntity, IConsumer, IDisposable private readonly Channel _chunksBuffer; private readonly ushort _initialCredits; - // _completeSubscription is used to notify the ProcessChunks task // that the subscription is completed and so it can start to process the chunks // this is needed because the socket starts to receive the chunks before the subscription_id is // assigned. - private readonly TaskCompletionSource _completeSubscription = new(); - + private readonly TaskCompletionSource _completeSubscription = new(TaskCreationOptions.RunContinuationsAsynchronously); protected sealed override string DumpEntityConfiguration() { var superStream = string.IsNullOrEmpty(_config.SuperStream) @@ -161,6 +159,8 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu Logger.LogDebug("Creating... {DumpEntityConfiguration}", DumpEntityConfiguration()); Info = new ConsumerInfo(_config.Stream, _config.Reference, _config.Identifier, null); // _chunksBuffer is a channel that is used to buffer the chunks + + _chunksBuffer = Channel.CreateBounded(new BoundedChannelOptions(_initialCredits) { AllowSynchronousContinuations = false, @@ -461,6 +461,7 @@ private void ProcessChunks() // need to wait the subscription is completed // else the _subscriberId could be incorrect _completeSubscription.Task.Wait(); + try { while (!Token.IsCancellationRequested && From dc853ae323a9f662a2a1258dae136a39f3b8d6c4 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 17 Jun 2025 16:34:25 +0200 Subject: [PATCH 2/4] wip Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/RawConsumer.cs | 5 ++--- global.json | 3 ++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index 5de1bdeb..9fbfed71 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -159,8 +159,7 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu Logger.LogDebug("Creating... {DumpEntityConfiguration}", DumpEntityConfiguration()); Info = new ConsumerInfo(_config.Stream, _config.Reference, _config.Identifier, null); // _chunksBuffer is a channel that is used to buffer the chunks - - + _chunksBuffer = Channel.CreateBounded(new BoundedChannelOptions(_initialCredits) { AllowSynchronousContinuations = false, @@ -461,7 +460,7 @@ private void ProcessChunks() // need to wait the subscription is completed // else the _subscriberId could be incorrect _completeSubscription.Task.Wait(); - + try { while (!Token.IsCancellationRequested && diff --git a/global.json b/global.json index 47a7fa60..af8d7f46 100644 --- a/global.json +++ b/global.json @@ -1,5 +1,6 @@ { "sdk": { - "allowPrerelease": false + "allowPrerelease": false, + "version": "9.0.202" } } From 3dccb3c3170599fb3a9cf6e05a11dc0487ba2f45 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 17 Jun 2025 16:44:49 +0200 Subject: [PATCH 3/4] pin version Signed-off-by: Gabriele Santomaggio --- .github/workflows/build-test.yaml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml index 6fccf392..f4a12a5f 100644 --- a/.github/workflows/build-test.yaml +++ b/.github/workflows/build-test.yaml @@ -8,6 +8,12 @@ jobs: name: build/test on windows-latest runs-on: windows-latest steps: + - name: Clone repository + uses: actions/checkout@v4 + - name: Setup .NET SDK + uses: actions/setup-dotnet@v4 + with: + global-json-file: global.json - uses: actions/checkout@v4 - uses: actions/cache@v4 with: @@ -35,6 +41,12 @@ jobs: name: build/test on ubuntu-latest runs-on: ubuntu-latest steps: + - name: Clone repository + uses: actions/checkout@v4 + - name: Setup .NET SDK + uses: actions/setup-dotnet@v4 + with: + global-json-file: global.json - uses: actions/checkout@v4 - uses: actions/setup-dotnet@v4 with: From ef91bd8a3f9eac38cde092d09250cf79f2522109 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 17 Jun 2025 22:13:37 +0200 Subject: [PATCH 4/4] refactor Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/RawConsumer.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index 9fbfed71..0ab619ff 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -265,8 +265,8 @@ Message MessageFromSequence(ref ReadOnlySequence unCompressedData, ref int var slice = unCompressedData.Slice(compressOffset, 4); compressOffset += WireFormatting.ReadUInt32(ref slice, out var len); Debug.Assert(len > 0); - slice = unCompressedData.Slice(compressOffset, len); - Debug.Assert(slice.Length >= len); + var sliceMsg = unCompressedData.Slice(compressOffset, len); + Debug.Assert(sliceMsg.Length == len); compressOffset += (int)len; // Here we use the Message.From(ref ReadOnlySequence seq ..) method to parse the message @@ -274,7 +274,7 @@ Message MessageFromSequence(ref ReadOnlySequence unCompressedData, ref int // Since the ParseChunk is async and we cannot use the ref SequenceReader reader // See https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/250 for more details - var message = Message.From(ref slice, len); + var message = Message.From(ref sliceMsg, len); return message; } catch (Exception e)