Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaConsumer closed prematurely on finite streams #1292

Open
ppodlovics opened this issue Jan 18, 2024 · 0 comments
Open

KafkaConsumer closed prematurely on finite streams #1292

ppodlovics opened this issue Jan 18, 2024 · 0 comments

Comments

@ppodlovics
Copy link

ppodlovics commented Jan 18, 2024

Hello,

I've run into an issue where my app is consuming a finite number of records from a topic, does some processing, then commits the offsets. The issue is if I use call take(...) on the record stream, and then through(commitBatchWithin(...)) the commits time out. If I flip the order, it works. Not sure what's happening here exactly, but my guess is that the consumer closes it's connection and underlying Java consumer cannot finish the commit and just hangs.

Scala version: 2.12.18
fs2.kafka version: 3.1.0

Some example code snippet:

{
      val bootstrapServers = kafkaHostPort
      val topicName = "test-topic"

      val producerSettings: ProducerSettings[IO, String, String] =
        ProducerSettings(
          keySerializer = Serializer.string[IO](StandardCharsets.UTF_8),
          valueSerializer = Serializer.string[IO](StandardCharsets.UTF_8),
        )
          .withBootstrapServers(bootstrapServers)
          .withEnableIdempotence(true)
          .withRetries(3)

      val consumerSettings: ConsumerSettings[IO, String, String] =
        ConsumerSettings(
          Deserializer.string[IO],
          Deserializer.string[IO],
        )
          .withAutoOffsetReset(AutoOffsetReset.Earliest)
          .withBootstrapServers(bootstrapServers)
          .withGroupId("test-group")

      val producerRecords = ProducerRecords(Vector(ProducerRecord(topicName, "k1", "v1"), ProducerRecord(topicName, "k2", "v2")))

      KafkaProducer.resource(producerSettings).use { producer =>
        for {
          _ <- producer.produce(producerRecords)
          _ <- KafkaConsumer
            .stream(consumerSettings)
            .subscribeTo(topicName)
            .records
            .take(1)
            .map(_.offset)
            .through(commitBatchWithin(1, 1.second))
            .compile
            .toVector
        } yield ()
      }
    }

The above code throws fs2.kafka.CommitTimeoutException, but take(1) is moved below .through(commitBatchWithin(1, 1.second)), it finishes, and commits offset.

P.S.: this other issue might be related #1293

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

1 participant