-
Notifications
You must be signed in to change notification settings - Fork 101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prefetched messages from revoked partition not dropped on Rebalance #755
Comments
jakobmerrild
changed the title
Prefetched messages from unassigned partition not dropped on Rebalance
Prefetched messages from revoked partition not dropped on Rebalance
Nov 10, 2021
Here's a proper test of the desired behavior ( I think) it("should handle rebalance") {
withTopic { topic =>
createCustomTopic(topic, partitions = 3)
val produced1 = (0 until 100).map(n => s"key-$n" -> s"value->$n")
val produced2 = (100 until 200).map(n => s"key-$n" -> s"value->$n")
val producedTotal = produced1.size.toLong + produced2.size.toLong
val commitBatchSize = 10
def startConsumer(
consumedQueue: Queue[IO, CommittableConsumerRecord[IO, String, String]],
stopSignal: SignallingRef[IO, Boolean]
): IO[FiberIO[Unit]] =
{
KafkaConsumer
.stream(consumerSettings[IO].withMaxPrefetchBatches(0))
.subscribeTo(topic)
.flatMap(_.stream)
.evalTap(consumedQueue.offer)
.evalMap(commitable => IO.sleep(100.millis).as(commitable.offset)) // Simulate work being done
.through(commitBatchWithin(commitBatchSize, 1.hour)) // Use unreasonably high time to ensure count determines commits
.interruptWhen(stopSignal)
.compile
.drain
}
.start
(for {
stopSignal <- SignallingRef[IO, Boolean](false)
queue <- Queue.unbounded[IO, CommittableConsumerRecord[IO, String, String]]
ref <- Ref.of[IO, Map[String, Int]](Map.empty)
_ <- IO(publishToKafka(topic, produced1))
_ <- startConsumer(queue, stopSignal) // Start the first consumer
_ <- IO.sleep(5.seconds) // simulate scale out event after a while
_ <- startConsumer(queue, stopSignal) // Start the second consumer
_ <- IO(publishToKafka(topic, produced2)) // Produce some more stuff
_ <- Stream
.fromQueueUnterminated(queue)
.evalMap { committable =>
// Construct map of how many times each message was consumed
ref.modify { counts =>
val key = committable.record.key
val newCounts = counts.updated(key, counts.getOrElse(key, 0) + 1)
(newCounts, newCounts)
}
}
.takeWhile(_.size < producedTotal)
.compile
.drain
.guarantee(stopSignal.set(true))
keys <- ref.get
} yield {
assert {
keys.size.toLong == producedTotal && {
// Number of messages handled more than once shouldn't be higher than the commit threshold
keys.values.filterNot(_ == 1).size < commitBatchSize
}
}
}).unsafeRunSync()
}
} |
Note that I've been using |
This issue should be related to this one: #127 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
It seems that in the following scenario a consumer can be handling messages for a partition they are not assigned to.
It's a little difficult for me to write an exact test of the desired behavior, but here's a minimum example based on your test code. Uses printlns 😊 (modified from existing test)
Truncated output:
Desired behavior:
Any messages from a revoked partition should be dropped unless they have already been pulled by downstream.
The text was updated successfully, but these errors were encountered: