Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prefetched messages from revoked partition not dropped on Rebalance #755

Open
jakobmerrild opened this issue Nov 10, 2021 · 3 comments
Open

Comments

@jakobmerrild
Copy link

jakobmerrild commented Nov 10, 2021

It seems that in the following scenario a consumer can be handling messages for a partition they are not assigned to.

  1. Single consumer is assigned to all partitions (0-2)
  2. Due to high load an auto scaling system introduces a second consumer
  3. Rebalance happens; revoking partition 2 from the first consumer and assigning it to the new consumer
  4. The first consumer continues to handle messages from partition 2 (presumably due to prefetching)

It's a little difficult for me to write an exact test of the desired behavior, but here's a minimum example based on your test code. Uses printlns 😊 (modified from existing test)

     it("should handle rebalance") {
      withTopic { topic =>
        createCustomTopic(topic, partitions = 3)
        val produced1 = (0 until 100).map(n => s"key-$n" -> s"value->$n")
        val produced2 = (100 until 200).map(n => s"key-$n" -> s"value->$n")
        val producedTotal = produced1.size.toLong + produced2.size.toLong

        def startConsumer(
                           consumedQueue: Queue[IO, CommittableConsumerRecord[IO, String, String]],
                           stopSignal: SignallingRef[IO, Boolean],
                           i: Int
                         ): IO[FiberIO[Unit]] =
            {
              KafkaConsumer
                .stream(consumerSettings[IO].withMaxPrefetchBatches(0))
                .subscribeTo(topic)
                .flatMap(_.stream)
                .evalTap(consumedQueue.offer)
                .evalTap(commitable => IO(println(s"$i handled ${commitable.record.key} from part ${commitable.record.partition}")))
                .evalMap(commitable => IO.sleep(100.millis).as(commitable.offset)) // Simulate work being done
                .through(commitBatchWithin(100, 1.second)) // Commit so any new consumers don't start from the beginning of their partition
                .interruptWhen(stopSignal)
                .compile
                .drain
            }
            .start

        (for {
          stopSignal <- SignallingRef[IO, Boolean](false)
          queue <- Queue.unbounded[IO, CommittableConsumerRecord[IO, String, String]]
          ref <- Ref.of[IO, Map[String, Int]](Map.empty)
          _ <- IO(publishToKafka(topic, produced1))
          _ <- startConsumer(queue, stopSignal, 0) // Start the first consumer
          _ <- IO.sleep(5.seconds) // simulate scale out event after a while
          _ <- IO(println("Scaled out..."))
          _ <- startConsumer(queue, stopSignal, 1) // Start the second consumer
          _ <- IO(publishToKafka(topic, produced2)) // Produce some more stuff
          _ <- Stream
            .fromQueueUnterminated(queue)
            .evalMap { committable =>
              // Construct map of how many times each message was consumed
              ref.modify { counts =>
                val key = committable.record.key
                val newCounts = counts.updated(key, counts.getOrElse(key, 0) + 1)
                (newCounts, newCounts)
              }
            }
            .takeWhile(_.size < producedTotal)
            .compile
            .drain
            .guarantee(stopSignal.set(true))
          keys <- ref.get
        } yield {
          assert {
            keys.size.toLong == producedTotal && {
              keys == (0 until producedTotal.toInt).map { n =>
                s"key-$n" -> 1 // Each message should have been handled once if first consumer doesn't keep polling old partition
              }.toMap
            }
          }
        }).unsafeRunSync()
      }
    }

Truncated output:

0 handled key-0 from part 1
...
0 handled key-2 from part 2
Scaled out...
0 handled key-3 from part 2
...
1 handled key-39 from part 2 <-- partition 2 assigned to new consumer
0 handled key-50 from part 2
1 handled key-41 from part 2
0 handled key-58 from part 2
1 handled key-46 from part 2
0 handled key-63 from part 2
1 handled key-48 from part 2
0 handled key-66 from part 2
1 handled key-49 from part 2
0 handled key-67 from part 2
1 handled key-50 from part 2
0 handled key-70 from part 2
1 handled key-58 from part 2
0 handled key-71 from part 2
1 handled key-63 from part 2
0 handled key-73 from part 2
1 handled key-66 from part 2
0 handled key-76 from part 2
1 handled key-67 from part 2
0 handled key-77 from part 2
1 handled key-70 from part 2
0 handled key-78 from part 2
1 handled key-71 from part 2
0 handled key-79 from part 2
1 handled key-73 from part 2
0 handled key-85 from part 2
1 handled key-76 from part 2
0 handled key-92 from part 2
1 handled key-77 from part 2
0 handled key-95 from part 2
1 handled key-78 from part 2
0 handled key-98 from part 2
1 handled key-79 from part 2
0 handled key-1 from part 0
1 handled key-85 from part 2
0 handled key-4 from part 0
1 handled key-92 from part 2
0 handled key-9 from part 0
1 handled key-95 from part 2
0 handled key-15 from part 0
1 handled key-98 from part 2
0 handled key-17 from part 0
1 handled key-101 from part 2
0 handled key-19 from part 0
1 handled key-112 from part 2
0 handled key-21 from part 0
1 handled key-113 from part 2
0 handled key-26 from part 0
1 handled key-116 from part 2
0 handled key-28 from part 0
1 handled key-118 from part 2
0 handled key-37 from part 0
1 handled key-121 from part 2
0 handled key-42 from part 0
1 handled key-125 from part 2
0 handled key-43 from part 0
1 handled key-128 from part 2
0 handled key-53 from part 0
1 handled key-129 from part 2
0 handled key-56 from part 0
1 handled key-133 from part 2
0 handled key-59 from part 0
1 handled key-134 from part 2
0 handled key-60 from part 0
1 handled key-140 from part 2
0 handled key-62 from part 0
1 handled key-141 from part 2
0 handled key-65 from part 0
1 handled key-142 from part 2
0 handled key-68 from part 0
1 handled key-148 from part 2
0 handled key-69 from part 0
1 handled key-150 from part 2
0 handled key-81 from part 0
1 handled key-159 from part 2
0 handled key-83 from part 0
1 handled key-162 from part 2
0 handled key-88 from part 0
1 handled key-166 from part 2
0 handled key-89 from part 0
1 handled key-175 from part 2
0 handled key-90 from part 0
1 handled key-177 from part 2
0 handled key-96 from part 0
1 handled key-178 from part 2
0 handled key-100 from part 1
1 handled key-182 from part 2
0 handled key-102 from part 1
1 handled key-184 from part 2
0 handled key-104 from part 1
1 handled key-186 from part 2
0 handled key-105 from part 1
1 handled key-188 from part 2
0 handled key-106 from part 1
1 handled key-189 from part 2
0 handled key-107 from part 1
1 handled key-190 from part 2
0 handled key-108 from part 1
1 handled key-192 from part 2
0 handled key-110 from part 1
1 handled key-199 from part 2
... partition 2 is empty at this point
0 handled key-115 from part 1
...
0 handled key-103 from part 0
...

Desired behavior:
Any messages from a revoked partition should be dropped unless they have already been pulled by downstream.

@jakobmerrild jakobmerrild changed the title Prefetched messages from unassigned partition not dropped on Rebalance Prefetched messages from revoked partition not dropped on Rebalance Nov 10, 2021
@jakobmerrild
Copy link
Author

Here's a proper test of the desired behavior ( I think)

it("should handle rebalance") {
      withTopic { topic =>
        createCustomTopic(topic, partitions = 3)
        val produced1 = (0 until 100).map(n => s"key-$n" -> s"value->$n")
        val produced2 = (100 until 200).map(n => s"key-$n" -> s"value->$n")
        val producedTotal = produced1.size.toLong + produced2.size.toLong
        val commitBatchSize = 10

        def startConsumer(
                           consumedQueue: Queue[IO, CommittableConsumerRecord[IO, String, String]],
                           stopSignal: SignallingRef[IO, Boolean]
                         ): IO[FiberIO[Unit]] =
            {
              KafkaConsumer
                .stream(consumerSettings[IO].withMaxPrefetchBatches(0))
                .subscribeTo(topic)
                .flatMap(_.stream)
                .evalTap(consumedQueue.offer)
                .evalMap(commitable => IO.sleep(100.millis).as(commitable.offset)) // Simulate work being done
                .through(commitBatchWithin(commitBatchSize, 1.hour)) // Use unreasonably high time to ensure count determines commits
                .interruptWhen(stopSignal)
                .compile
                .drain
            }
            .start

        (for {
          stopSignal <- SignallingRef[IO, Boolean](false)
          queue <- Queue.unbounded[IO, CommittableConsumerRecord[IO, String, String]]
          ref <- Ref.of[IO, Map[String, Int]](Map.empty)
          _ <- IO(publishToKafka(topic, produced1))
          _ <- startConsumer(queue, stopSignal) // Start the first consumer
          _ <- IO.sleep(5.seconds) // simulate scale out event after a while
          _ <- startConsumer(queue, stopSignal) // Start the second consumer
          _ <- IO(publishToKafka(topic, produced2)) // Produce some more stuff
          _ <- Stream
            .fromQueueUnterminated(queue)
            .evalMap { committable =>
              // Construct map of how many times each message was consumed
              ref.modify { counts =>
                val key = committable.record.key
                val newCounts = counts.updated(key, counts.getOrElse(key, 0) + 1)
                (newCounts, newCounts)
              }
            }
            .takeWhile(_.size < producedTotal)
            .compile
            .drain
            .guarantee(stopSignal.set(true))
          keys <- ref.get
        } yield {
          assert {
            keys.size.toLong == producedTotal && {
              // Number of messages handled more than once shouldn't be higher than the commit threshold
              keys.values.filterNot(_ == 1).size < commitBatchSize
            }
          }
        }).unsafeRunSync()
      }
    }

@jakobmerrild
Copy link
Author

Note that I've been using .stream to consume as that's what we are using in our code base. It might be more prudent/easier to define the desired behavior with partitionedMapStream

@filosganga
Copy link
Member

This issue should be related to this one: #127

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants