Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zipping finite KafkaConsumer-based streams doesn't behave as expected (first stream gets recycled after first 1000 elements) #1293

Open
ppodlovics opened this issue Jan 25, 2024 · 1 comment

Comments

@ppodlovics
Copy link

Hello,

I've run into an issue with finite fs2-kafka streams and zipping. It seems that if I take more than 1000 elements from the default-configured streams and then zip them, the first stream's elements get recycled after the 1000th one starting with the 0th one. The example code snippet makes it clearer below.

Scala version: 2.12.18
fs2.kafka version: 3.1.0

{
      val bootstrapServers = kafkaHostPort
      val topic1Name = "test-topic-1"
      val topic2Name = "test-topic-2"

      val producerSettings: ProducerSettings[IO, String, String] =
        ProducerSettings(
          keySerializer = Serializer.string[IO](StandardCharsets.UTF_8),
          valueSerializer = Serializer.string[IO](StandardCharsets.UTF_8),
        )
          .withBootstrapServers(bootstrapServers)
          .withEnableIdempotence(true)
          .withRetries(3)

      val consumerSettings: ConsumerSettings[IO, String, String] =
        ConsumerSettings(
          Deserializer.string[IO],
          Deserializer.string[IO],
        )
          .withAutoOffsetReset(AutoOffsetReset.Earliest)
          .withBootstrapServers(bootstrapServers)
          .withGroupId("test-group")

      val records = (0 to 1010).flatMap { i =>
        Vector(
          ProducerRecord(topic1Name, s"k${i}", s"v${i}"),
          ProducerRecord(topic2Name, s"k${i}", s"v${i}"),
        )
      }.toVector
      val producerRecords = ProducerRecords(records)

      KafkaProducer.resource(producerSettings).use { producer =>
        for {
          _ <- producer.produce(producerRecords)
          topic1Stream = KafkaConsumer
            .stream(consumerSettings)
            .subscribeTo(topic1Name)
            .records
            .take(1005)
          topic2Stream = KafkaConsumer
            .stream(consumerSettings)
            .subscribeTo(topic2Name)
            .records
            .take(1005)
          result <- (topic1Stream zip topic2Stream)
            .evalTap { case (lhs, rhs) => IO.println(lhs) >> IO.println(rhs) }
            .compile
            .toVector
        } yield {
          result.foreach { case (lhs, rhs) =>
            assert(lhs.record.key == rhs.record.key && lhs.record.value == rhs.record.value)
          }
        }
      }
    }

This code fails on the assert with "k[]0" did not equal "k[100]0". I added some logging to see more clearly what is happening, and there it shows this:

CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 997, key = k997, value = v997, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 4, serializedValueSize = 4, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 998, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 997, key = k997, value = v997, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 4, serializedValueSize = 4, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 998, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 998, key = k998, value = v998, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 4, serializedValueSize = 4, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 999, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 998, key = k998, value = v998, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 4, serializedValueSize = 4, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 999, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 999, key = k999, value = v999, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 4, serializedValueSize = 4, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 1000, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 999, key = k999, value = v999, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 4, serializedValueSize = 4, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 1000, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 0, key = k0, value = v0, timestamp = Timestamp(createTime = 1706178880476), serializedKeySize = 2, serializedValueSize = 2, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 1, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 1000, key = k1000, value = v1000, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 5, serializedValueSize = 5, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 1001, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 1, key = k1, value = v1, timestamp = Timestamp(createTime = 1706178880477), serializedKeySize = 2, serializedValueSize = 2, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 2, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 1001, key = k1001, value = v1001, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 5, serializedValueSize = 5, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 1002, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 2, key = k2, value = v2, timestamp = Timestamp(createTime = 1706178880477), serializedKeySize = 2, serializedValueSize = 2, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 3, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 1002, key = k1002, value = v1002, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 5, serializedValueSize = 5, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 1003, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 3, key = k3, value = v3, timestamp = Timestamp(createTime = 1706178880477), serializedKeySize = 2, serializedValueSize = 2, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 4, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 1003, key = k1003, value = v1003, timestamp = Timestamp(createTime = 1706178880598), serializedKeySize = 5, serializedValueSize = 5, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 1004, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-2, partition = 0, offset = 4, key = k4, value = v4, timestamp = Timestamp(createTime = 1706178880477), serializedKeySize = 2, serializedValueSize = 2, leaderEpoch = 0), CommittableOffset(test-topic-2-0 -> 5, test-group))
CommittableConsumerRecord(ConsumerRecord(topic = test-topic-1, partition = 0, offset = 1004, key = k1004, value = v1004, timestamp = Timestamp(createTime = 1706178880599), serializedKeySize = 5, serializedValueSize = 5, leaderEpoch = 0), CommittableOffset(test-topic-1-0 -> 1005, test-group))

Could you help me resolve this?

P.S.: this other issue might be related: #1292

@aartigao
Copy link
Contributor

What happens if instead of zipping you use concatenation (++) of the streams? The second topic goes back to 0 too?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants