Skip to content

Improve parse chunk consumer side #423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ jobs:
name: build/test on windows-latest
runs-on: windows-latest
steps:
- name: Clone repository
uses: actions/checkout@v4
- name: Setup .NET SDK
uses: actions/setup-dotnet@v4
with:
global-json-file: global.json
- uses: actions/checkout@v4
- uses: actions/cache@v4
with:
Expand Down Expand Up @@ -35,6 +41,12 @@ jobs:
name: build/test on ubuntu-latest
runs-on: ubuntu-latest
steps:
- name: Clone repository
uses: actions/checkout@v4
- name: Setup .NET SDK
uses: actions/setup-dotnet@v4
with:
global-json-file: global.json
- uses: actions/checkout@v4
- uses: actions/setup-dotnet@v4
with:
Expand Down
12 changes: 6 additions & 6 deletions RabbitMQ.Stream.Client/RawConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,11 @@ public class RawConsumer : AbstractEntity, IConsumer, IDisposable

private readonly Channel<Chunk> _chunksBuffer;
private readonly ushort _initialCredits;

// _completeSubscription is used to notify the ProcessChunks task
// that the subscription is completed and so it can start to process the chunks
// this is needed because the socket starts to receive the chunks before the subscription_id is
// assigned.
private readonly TaskCompletionSource _completeSubscription = new();

private readonly TaskCompletionSource _completeSubscription = new(TaskCreationOptions.RunContinuationsAsynchronously);
protected sealed override string DumpEntityConfiguration()
{
var superStream = string.IsNullOrEmpty(_config.SuperStream)
Expand All @@ -161,6 +159,7 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu
Logger.LogDebug("Creating... {DumpEntityConfiguration}", DumpEntityConfiguration());
Info = new ConsumerInfo(_config.Stream, _config.Reference, _config.Identifier, null);
// _chunksBuffer is a channel that is used to buffer the chunks

_chunksBuffer = Channel.CreateBounded<Chunk>(new BoundedChannelOptions(_initialCredits)
{
AllowSynchronousContinuations = false,
Expand Down Expand Up @@ -266,16 +265,16 @@ Message MessageFromSequence(ref ReadOnlySequence<byte> unCompressedData, ref int
var slice = unCompressedData.Slice(compressOffset, 4);
compressOffset += WireFormatting.ReadUInt32(ref slice, out var len);
Debug.Assert(len > 0);
slice = unCompressedData.Slice(compressOffset, len);
Debug.Assert(slice.Length >= len);
var sliceMsg = unCompressedData.Slice(compressOffset, len);
Debug.Assert(sliceMsg.Length == len);
compressOffset += (int)len;

// Here we use the Message.From(ref ReadOnlySequence<byte> seq ..) method to parse the message
// instead of the Message From(ref SequenceReader<byte> reader ..) method
// Since the ParseChunk is async and we cannot use the ref SequenceReader<byte> reader
// See https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/250 for more details

var message = Message.From(ref slice, len);
var message = Message.From(ref sliceMsg, len);
return message;
}
catch (Exception e)
Expand Down Expand Up @@ -461,6 +460,7 @@ private void ProcessChunks()
// need to wait the subscription is completed
// else the _subscriberId could be incorrect
_completeSubscription.Task.Wait();

try
{
while (!Token.IsCancellationRequested &&
Expand Down
3 changes: 2 additions & 1 deletion global.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"sdk": {
"allowPrerelease": false
"allowPrerelease": false,
"version": "9.0.202"
}
}