-
Notifications
You must be signed in to change notification settings - Fork 69
Monitoring
Consumers, producers, and readers all have states that can be monitored.
Monitoring the state is recommended because DotPulsar will retry/reconnect indefinitely on certain fault scenarios (while first switching to the 'Disconnected' state). Some fault conditions are considered final and they will cause DotPulsar to throw an exception and move to a final state, after which it should be disposed.
There are two ways to monitor the state:
- Set a StateChangeHandler
- Using OnStateChangeFrom/OnStateChangeTo or StateChangedFrom/StatedChangeTo
Most will, like the sample applications in the repository, just use the first option.
Let's have a look at the different states before we dive into monitoring them.
- Active (All is well)
- Inactive (All is well. The subscription type is 'Failover' and we are not the active consumer)
- Closed (The consumer or PulsarClient has been disposed)
- Disconnected (The connection was lost and attempts are being made to reconnect)
- Faulted (An unrecoverable error has occurred)
- ReachedEndOfTopic (No more messages will be delivered)
- Closed (The producer or PulsarClient has been disposed)
- Connected (All is well)
- Disconnected (The connection was lost and attempts are being made to reconnect)
- Faulted (An unrecoverable error has occurred)
- Closed (The reader or PulsarClient has been disposed)
- Connected: (All is well)
- Disconnected (The connection was lost and attempts are being made to reconnect)
- Faulted (An unrecoverable error has occurred)
- ReachedEndOfTopic (No more messages will be delivered)
This is the easiest option and can be used in three ways:
- Provide an implementation of IHandleStateChanged
- Provide a handler using Action<ConsumerStateChanged, CancellationToken>
- Provide a handler using Func<ConsumerStateChanged, CancellationToken, ValueTask>
The state is always first 'Disconnected' and the handler will be called for the first time when the state moves away from 'Disconnected'. Once the state is final the handler is called for the last time.
The sample applications (Consuming, Reading, and Producing) are using the option with Action<ConsumerStateChanged, CancellationToken>.
await using var consumer = client.NewConsumer()
.StateChangedHandler(Monitor)
.SubscriptionName("MySubscription")
.Topic("persistent://public/default/mytopic")
.Create();
private static void Monitor(ConsumerStateChanged stateChanged, CancellationToken cancellationToken)
{
var stateMessage = stateChanged.ConsumerState switch
{
ConsumerState.Active => "is active",
ConsumerState.Inactive => "is inactive",
ConsumerState.Disconnected => "is disconnected",
ConsumerState.Closed => "has closed",
ConsumerState.ReachedEndOfTopic => "has reached end of topic",
ConsumerState.Faulted => "has faulted",
_ => $"has an unknown state '{stateChanged.ConsumerState}'"
};
var topic = stateChanged.Consumer.Topic;
Console.WriteLine($"The consumer for topic '{topic}' " + stateMessage);
}
private static void Monitor(ReaderStateChanged stateChanged, CancellationToken cancellationToken)
{
var stateMessage = stateChanged.ReaderState switch
{
ReaderState.Connected => "is connected",
ReaderState.Disconnected => "is disconnected",
ReaderState.Closed => "has closed",
ReaderState.ReachedEndOfTopic => "has reached end of topic",
ReaderState.Faulted => "has faulted",
_ => $"has an unknown state '{stateChanged.ReaderState}'"
};
var topic = stateChanged.Reader.Topic;
Console.WriteLine($"The reader for topic '{topic}' " + stateMessage);
}
private static void Monitor(ProducerStateChanged stateChanged, CancellationToken cancellationToken)
{
var stateMessage = stateChanged.ProducerState switch
{
ProducerState.Connected => "is connected",
ProducerState.Disconnected => "is disconnected",
ProducerState.Closed => "has closed",
ProducerState.Faulted => "has faulted",
_ => $"has an unknown state '{stateChanged.ProducerState}'"
};
var topic = stateChanged.Producer.Topic;
Console.WriteLine($"The producer for topic '{topic}' " + stateMessage);
}
There is a small difference between OnStateChange[From/To] and StateChanged[From/To].
- OnStateChangeFrom and OnStateChangeTo will return the new state.
- StateChangedFrom and StateChangedTo will return an object containing the new state and a reference to the Reader, Consumer, or Producer that changed state.
Monitoring the state is easy, just call StateChangedFrom or StateChangedTo on a consumer, producer or reader. Let's see how:
var stateChanged = await consumer.StateChangedFrom(ConsumerState.Active, cancellationToken);
The variable 'stateChanged' will tell us both which consumer changed state and to which state. Some states are final, meaning the state can no longer change. For consumers 'Closed', 'Faulted' and 'ReachedEndOfTopic' are final states. When the consumer enters a final state, all monitoring tasks are completed. So if we are monitoring going to 'Disconnected' and the consumer is 'Closed', then the task will complete and return 'Closed'.
private static async ValueTask Monitor(IConsumer consumer, CancellationToken cancellationToken)
{
var state = ConsumerState.Disconnected;
while (!cancellationToken.IsCancellationRequested)
{
var stateChanged = await consumer.StateChangedFrom(state, cancellationToken);
state = stateChanged.ConsumerState;
var stateMessage = state switch
{
ConsumerState.Active => "The consumer is active",
ConsumerState.Inactive => "The consumer is inactive",
ConsumerState.Disconnected => "The consumer is disconnected",
ConsumerState.Closed => "The consumer has closed",
ConsumerState.ReachedEndOfTopic => "The consumer has reached end of topic",
ConsumerState.Faulted => "The consumer has faulted",
_ => $"The consumer has an unknown state '{state}'"
};
Console.WriteLine(stateMessage);
if (consumer.IsFinalState(state))
return;
}
}
private static async ValueTask Monitor(IReader reader, CancellationToken cancellationToken)
{
var state = ReaderState.Disconnected;
while (!cancellationToken.IsCancellationRequested)
{
var stateChanged = await reader.StateChangedFrom(state, cancellationToken);
state = stateChanged.ReaderState;
var stateMessage = state switch
{
ReaderState.Connected => "The reader is connected",
ReaderState.Disconnected => "The reader is disconnected",
ReaderState.Closed => "The reader has closed",
ReaderState.ReachedEndOfTopic => "The reader has reached end of topic",
ReaderState.Faulted => "The reader has faulted",
_ => $"The reader has an unknown state '{state}'"
};
Console.WriteLine(stateMessage);
if (reader.IsFinalState(state))
return;
}
}
private static async ValueTask Monitor(IProducer producer, CancellationToken cancellationToken)
{
var state = ProducerState.Disconnected;
while (!cancellationToken.IsCancellationRequested)
{
var stateChanged = await producer.StateChangedFrom(state, cancellationToken);
state = stateChanged.ProducerState;
var stateMessage = state switch
{
ProducerState.Connected => $"The producer is connected",
ProducerState.Disconnected => $"The producer is disconnected",
ProducerState.Closed => $"The producer has closed",
ProducerState.Faulted => $"The producer has faulted",
_ => $"The producer has an unknown state '{state}'"
};
Console.WriteLine(stateMessage);
if (producer.IsFinalState(state))
return;
}
}