Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stale consumers after JVM shutdown #1341

Open
misja-alma opened this issue Aug 8, 2024 · 3 comments
Open

Stale consumers after JVM shutdown #1341

misja-alma opened this issue Aug 8, 2024 · 3 comments

Comments

@misja-alma
Copy link

misja-alma commented Aug 8, 2024

Our application runs on 2 Kubernetes pods and uses fs2-kafka for several consumers; most of the time we have 8 consumers in total per topic, 4 consumers on each pod, every consumer subscribed to its own partition.
When our pods are scaled down and up again, i.e. when our application is restarted, we often find that for one topic, some 'zombie' consumers are still hanging around on the broker after the restart. For this topic we see a mix of new consumers from both restarted pods, plus a few consumers from one pod from before the restart. At least, that's what the broker shows, in reality the old pod has long died and those consumers don't exist anymore. However since the broker is unaware of this, the stale consumers are still assigned to partitions, which means that new messages from those partitions are not consumed anymore and a lag builds up.

The topic where this happens is not always the same, also sometimes a restart doesn't give any problems. And the workaround is simply to restart our pods once more, usually the stale consumers are gone after that.

We suspect that the consumer might not have unsubscribed properly during JVM shutdown, but this is just a guess: we don't do any unsubscribing by ourselves, we completely rely on fs2-kafka for this.
We usually subscribe like this:

    KafkaConsumer
      .stream(consumerSettings)
      .evalTap(_.subscribeTo(topic))
      .flatMap(_.stream)
      .evalTap(x => log(show"Received ${x.record.value} from $topic", logLevel))
      .groupWithin(groupWithin.messages, groupWithin.duration)
      .evalMap(processRecords)
      .evalMap(CommittableOffsetBatch.fromFoldable(_).commit.whenA(commitOffset))

and we don't have any code to explicitly shutdown or unsubscribe upon JVM exit.

Could it be that we are not using fs2-kafka in the correct way, and that this is causing the random stale consumers? Or is this perhaps a known issue?

@aartigao
Copy link
Contributor

Even if FS2 is failing to unsubscribe, this is mostly a Kafka Broker issue.

Which value do you have set for session.timeout.ms?

Regardless of the lib used (FS2, raw KafkaConsumer, etc) the broker should free those partitions and reassign them to another instance live in the Consumer Group

@misja-alma
Copy link
Author

We left session.timeout.ms at its default, so 45000 (45s).

We have reason to suspect that the problem is on our side, because other application that are using Akka Kafka clients and are on the same broker do not have this problem.

@aartigao
Copy link
Contributor

aartigao commented Sep 5, 2024

You can try to set org.apache.kafka logs to DEBUG level. Here you'll see if the lib is unsubscribing or not. But if it's not, I'd bet on some resource shutdown order issue in your app. We have multiple projects using FS2 at $work at these graceful shutdown correctly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants