Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Records are missing when second rebalance happens. #1051

Open
MrKustra94 opened this issue Aug 9, 2022 · 6 comments
Open

Records are missing when second rebalance happens. #1051

MrKustra94 opened this issue Aug 9, 2022 · 6 comments

Comments

@MrKustra94
Copy link

MrKustra94 commented Aug 9, 2022

Hey,
during deployments we have noticed a strange issue with records consumption.

Versions

FS2-Kafka Version = 3.0.0-M7 (observed also for M4)
Kafka client version = 3.2.0

Background

Let's assume that we are working with topic called topic, which has 100 partitions.
Our consumer is running as a single pod (pod-1). It is consuming all 100 partitions. During Kubernetes rolling deployments another instance is created, let's call it pod-2. First rebalance is triggered, making pod-1 and pod-2 consuming 50 partitions each.
Let's assume that:
pod-1 is consuming partitions 0, 1, 2, 3, ...., 49
pod-2 is consuming partitions 50, 51, 52, 53, ...., 99
After few seconds, pod-1 gets shutdown and pod-2 is the only working pod. Second rebalance gets triggered and now pod-2 is consuming all partitions.
What we have noticed is that for partitions, which were previously assigned, some messages are not consumed just after second re-assignment.
More examples below.

Code

This issue is really hard to reproduce. It happens very rarely.

val settings =
  ConsumerSettings[F, Option[String], Array[Byte]](
    keyDeserializer = Deserializer[F, Option[String]],
    valueDeserializer = Deserializer[F, Array[Byte]],
  ).withBootstrapServers("<configuration>")
   .withGroupId("TestGroup")
   .withAutoOffsetReset("earliest")
   .withClientId("TestClientId")
   .withEnableAutoCommit(false)
   .withPollInterval(50 millis)
  //Important note: following assignment strategies are included:
 //partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]

def recordProcessor(
  records: Stream[F, CommittableConsumerRecord[F, Option[String], Array[Byte]]]
): Stream[F, CommittableOffset[F]] =
  records.evalMap { record =>
    val recordFormatted =
      s"Published record: ${record.record.topic}-${record.record.partition}:${record.record.offset}"
    Sync[F].delay(println(recordFormatted)) >> Sync[F].pure(record.offset)
  }

KafkaConsumer
  .stream(settings)
  .evalTap(_.subscribeTo("topic"))
  .flatMap(_.partitionedStream.map(recordProcessor))
  .map(_
    .through(
      _.groupWithin(100, 50 millis)
        .evalMap(CommittableOffsetBatch.fromFoldable(_).commit)
    )
  )
  .parJoinUnbounded
  .scope

Example issued output

//pod-2 has been started. Consumer is being prepared...
INFO o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=TestClientId, groupId=TestGroup] Setting offset for partition topic-59 to the committed offset FetchPosition{offset=1000028, offsetEpoch=Optional.empty, ...}
//pod-1 has been shutdown. Rebalance has started again and now all partitions are assigned to pod-2
INFO o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=TestClientId, groupId=TestGroup] Setting offset for partition topic-59 to the committed offset FetchPosition{offset=1000032, offsetEpoch=Optional.empty, ...}
Published record: topic-59:1000037 //Offset is not moved back to 1000032, but it should, since Range (EAGER) 
// Commit happens, setting offset to 1000038.
Published record: topic-59:1000040 //Missed records: 1000038, 1000039, which exist in the topic but were skipped by consumer.
Published record: topic-59:1000041
Published record: topic-59:1000042
@MrKustra94 MrKustra94 changed the title Records are missed when second rebalance happens. Records are missing when second rebalance happens. Aug 9, 2022
@MrKustra94
Copy link
Author

MrKustra94 commented Aug 12, 2022

One more observation:
this issue is appearing when partition has been paused just before rebalance.
It is not occurring for all partitions, but if it happens, then issued partition was paused just before rebalance.

@MrKustra94
Copy link
Author

I think that it may be related:
https://issues.apache.org/jira/plugins/servlet/mobile#issue/KAFKA-13463

@MrKustra94
Copy link
Author

MrKustra94 commented Aug 18, 2022

I think I found the potential issue.
I will try to clarify it via few examples.

I've noticed that above issues are appearing when some of the partitions were paused before rebalance. In Kafka 3.2.0 there is a special INFO message logs them:

INFO o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=TestClientId, groupId=TestGroup] The pause flag in partitions [topic-59, topic-67, ...] will be removed due to revocation.

Those partitions were then re-assigned, but if we take a look at example above:

//pod-2 has been started. Consumer is being prepared...
INFO o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=TestClientId, groupId=TestGroup] Setting offset for partition topic-59 to the committed offset FetchPosition{offset=1000028, offsetEpoch=Optional.empty, ...}
//pod-1 has been shutdown. Rebalance has started again and now all partitions are assigned to pod-2
INFO o.a.k.c.c.i.ConsumerCoordinator [Consumer clientId=TestClientId, groupId=TestGroup] Setting offset for partition topic-59 to the committed offset FetchPosition{offset=1000032, offsetEpoch=Optional.empty, ...}
Published record: topic-59:1000037 //Offset is not moved back to 1000032, but it should, since Range (EAGER) 
// Commit happens, setting offset to 1000038. I think they were just delivered lately due too many concurrent streams going on. That's why they appear as if they were delivered after rebalance.
Published record: topic-59:1000040 //Missed records: 1000038, 1000039, which exist in the topic but were skipped by consumer.
Published record: topic-59:1000041
Published record: topic-59:1000042

Fetching for those partitions, after rebalance, hasn't been started from 1000032, but from 1000040.
This is caused by the issue mentioned below:
https://issues.apache.org/jira/browse/KAFKA-13463

What I think that happened is that missing records were dropped during revoke as part of KafkaConsumerActor defined callback, too eagerly.
They might have been placed there because poll has returned them even if they were paused.
I prepared a potential MR with fix to that.

@LMnet
Copy link
Member

LMnet commented Aug 18, 2022

@MrKustra94 nice research! Thank you.

Also, I wonder why do we need this pause calls inside fs2-kafka. Maybe we could just get rid of them? Just use poll and that's it. What do you think @bplommer @vlovgr ?

@MrKustra94
Copy link
Author

MrKustra94 commented Aug 18, 2022

I think it was introduced in order to increase the general fairness and throughput on slow consumers.
So that we can imitate that each partition is consumed in fair fashion. If there is a fetch request and record for it available, let's just skip fetching new records for this partition. Let's give other partitions a chance to do so.
I see that it is a quite popular approach in Java/Scala wrappers on Java Kafka clients.

@simonpetty
Copy link

simonpetty commented Dec 11, 2023

I know this issue is quite old now, but I think we're facing this same issue.
We added logging that shows the last offset processed when a partition is revoked, then later on when it arrives back at the same node, the first offset processed is some amount advanced from that - and no commits were performed in between.

We also see lots of similar FS2 Kafka logs to the OP about pausing/revoking etc

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

3 participants