single active consumer is not consuming messages #412
-
Hi there! Got an issue when enabling a single active consumer in a stream consumers. using System.Text;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.Reliable;
var streamSystem = await StreamSystem.Create(new StreamSystemConfig()).ConfigureAwait(false);
await streamSystem.CreateStream(new StreamSpec("my-sac-stream")).ConfigureAwait(false);
var producer = await Producer.Create(new ProducerConfig(streamSystem, "my-sac-stream")).ConfigureAwait(false);
//var messages = new List<Message>(); enable when sending in batch
for (int i = 0; i < 50; i++)
{
var body = Encoding.UTF8.GetBytes($"Message #{i}");
var message = new Message(body);
await producer.Send(message).ConfigureAwait(false);
}
Console.WriteLine("Sending 50 messages to my-sac-stream");
await producer.Close().ConfigureAwait(false);
await streamSystem.Close().ConfigureAwait(false); Consumers: using System.Buffers;
using System.Text;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.Reliable;
var streamSystem = await StreamSystem.Create(new StreamSystemConfig()).ConfigureAwait(false);
var consumer = await Consumer.Create(new ConsumerConfig(streamSystem, "my-sac-stream")
{
Reference = "sac_consumer",
OffsetSpec = new OffsetTypeFirst(),
IsSingleActiveConsumer = true,
MessageHandler = async (_, consumer, context, message) =>
{
var text = Encoding.UTF8.GetString(message.Data.Contents.ToArray());
Console.WriteLine($"The message {text} was received");
await Task.CompletedTask.ConfigureAwait(false);
},
ConsumerUpdateListener = async (consumerRef, stream, isActive) =>
{
var offset = await streamSystem.QueryOffset(consumerRef, stream).ConfigureAwait(false);
return new OffsetTypeOffset(offset);
}
}).ConfigureAwait(false);
Console.WriteLine("Consumer is running. Press [enter] to exit.");
Console.ReadLine();
await consumer.Close().ConfigureAwait(false);
await streamSystem.Close().ConfigureAwait(false); when i launch my producer, everything is ok:
when i launch my consumers, the single active consumer works fine, but the consumption process doesn't:
Console doesn't print eveything else. The message above occurs in every consumer. Did i forget something in my code ? Thanks in advance |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 2 replies
-
@blendereru the API You need to store and offset before querying it. See Here is a working example: var streamSystem = await StreamSystem.Create(new StreamSystemConfig()).ConfigureAwait(false);
var consumer = await Consumer.Create(new ConsumerConfig(streamSystem, "my-sac-stream")
{
Reference = "sac_consumer",
OffsetSpec = new OffsetTypeFirst(),
IsSingleActiveConsumer = true,
MessageHandler = async (_, consumer, context, message) =>
{
var text = Encoding.UTF8.GetString(message.Data.Contents.ToArray());
Console.WriteLine($"The message {text} was received");
// Store the offset of the message.
// store offset for each message is not a good practice
//This is only for demo purpose
await consumer.StoreOffset(context.Offset).ConfigureAwait(false);
await Task.CompletedTask.ConfigureAwait(false);
},
ConsumerUpdateListener = async (consumerRef, stream, isActive) =>
{
try
{
var offset = await streamSystem.QueryOffset(consumerRef, stream).ConfigureAwait(false);
return new OffsetTypeOffset(offset);
}
catch (OffsetNotFoundException)
{
Console.WriteLine(
$"Offset not found for stream {stream} and consumer {consumerRef}. Will use the first offset");
return new OffsetTypeFirst();
}
}
}).ConfigureAwait(false);
Console.WriteLine("Consumer is running. Press [enter] to exit.");
Console.ReadLine();
await consumer.Close().ConfigureAwait(false);
await streamSystem.Close().ConfigureAwait(false); |
Beta Was this translation helpful? Give feedback.
-
Thanks! |
Beta Was this translation helpful? Give feedback.
@blendereru the API
streamSystem.QueryOffset
could raise an exception if the offset does not exist.( maybe it is time to implement #370 )
You need to store and offset before querying it. See
await consumer.StoreOffset(context.Offset).ConfigureAwait(false);
Here is a working example: