Skip to content

Monitoring

Daniel Blankensteiner edited this page Oct 2, 2019 · 8 revisions

Monitoring

Consumers, producers and readers all have states that can be monitored.

Monitoring the state is recommended because DotPulsar will retry/reconnect indefinitely on certain fault scenarios (while first switching to the 'Disconnected' state). Some fault conditions are considered final and they will cause DotPulsar to throw an exception and move to a final state, after which it should be disposed.

Let's have a look at what states they can have.

Consumer states

  • Active (All is well)
  • Inactive (All is well. The subscription type is 'Failover' and you are not the active consumer)
  • Closed (The consumer or PulsarClient has been disposed)
  • Disconnected (The connection was lost and attempts are being made to reconnect)
  • Faulted (An unrecoverable error has occurred)
  • ReachedEndOfTopic (No more messages will be delivered)

Producer states

  • Closed (The producer or PulsarClient has been disposed)
  • Connected (All is well)
  • Disconnected (The connection was lost and attempts are being made to reconnect)
  • Faulted (An unrecoverable error has occurred)

Reader states

  • Closed (The reader or PulsarClient has been disposed)
  • Connected: (All is well)
  • Disconnected (The connection was lost and attempts are being made to reconnect)
  • Faulted (An unrecoverable error has occurred)
  • ReachedEndOfTopic (No more messages will be delivered)

How to monitor the state

Monitoring the state is easy, just call StateChangedFrom or StateChangedTo on a consumer, producer or reader. Let's see how:

var state = await consumer.StateChangedFrom(ConsumerState.Active, cancellationToken);

Here the variable 'state' will contain the new state. You can monitor going from (StateChangedFrom) and to (StateChangedTo) a specific state. Some states are final, meaning the state can no longer change. For consumers 'Closed', 'Faulted' and 'ReachedEndOfTopic' are final states. When the consumer enters a final state, all monitoring tasks are completed. So if you are monitoring going to 'Disconnected' and the consumer is 'Closed', then your task will complete and return 'Closed'.

Monitoring consumer state

private static async ValueTask Monitor(IConsumer consumer, CancellationToken cancellationToken)
{
    var state = ConsumerState.Disconnected;

    while (!cancellationToken.IsCancellationRequested)
    {
        state = await consumer.StateChangedFrom(state, cancellationToken);

        var stateMessage = state switch
        {
            ConsumerState.Active => "The consumer is active",
            ConsumerState.Inactive => "The consumer is inactive",
            ConsumerState.Disconnected => "The consumer is disconnected",
            ConsumerState.Closed => "The consumer has closed",
            ConsumerState.ReachedEndOfTopic => "The consumer has reached end of topic",
            ConsumerState.Faulted => "The consumer has faulted",
            _ => $"The consumer has an unknown state '{state}'"
        };

        Console.WriteLine(stateMessage);

        if (consumer.IsFinalState(state))
            return;
    }
}

Monitoring reader state

private static async ValueTask Monitor(IReader reader, CancellationToken cancellationToken)
{
    var state = ReaderState.Disconnected;

    while (!cancellationToken.IsCancellationRequested)
    {
        state = await reader.StateChangedFrom(state, cancellationToken);

        var stateMessage = state switch
        {
            ReaderState.Connected => "The reader is connected",
            ReaderState.Disconnected => "The reader is disconnected",
            ReaderState.Closed => "The reader has closed",
            ReaderState.ReachedEndOfTopic => "The reader has reached end of topic",
            ReaderState.Faulted => "The reader has faulted",
            _ => $"The reader has an unknown state '{state}'"
        };

        Console.WriteLine(stateMessage);

        if (reader.IsFinalState(state))
            return;
    }
}

Monitoring producer state

private static async ValueTask Monitor(IProducer producer, CancellationToken cancellationToken)
{
    var state = ProducerState.Disconnected;

    while (!cancellationToken.IsCancellationRequested)
    {
        state = await producer.StateChangedFrom(state, cancellationToken);

        var stateMessage = state switch
        {
            ProducerState.Connected => $"The producer is connected",
            ProducerState.Disconnected => $"The producer is disconnected",
            ProducerState.Closed => $"The producer has closed",
            ProducerState.Faulted => $"The producer has faulted",
            _ => $"The producer has an unknown state '{state}'"
        };

        Console.WriteLine(stateMessage);

        if (producer.IsFinalState(state))
            return;
    }
}
Clone this wiki locally