Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How do I commit offsets? #18

Open
shawjef3 opened this issue Mar 12, 2018 · 1 comment
Open

How do I commit offsets? #18

shawjef3 opened this issue Mar 12, 2018 · 1 comment

Comments

@shawjef3
Copy link

I found that after running a stream to get a nonempty collection of messages, the offsets weren't incremented. Is there a way in this client to commit offsets while the stream is running?

I used code something like the following, which threw NoOffsetForPartitionException. The groupIds on the clients are the same , and there was only one partition.

      val topicPartition = new TopicPartition(topic, 0)

      val receivedMessages = t.getKafkaMessages[IO]().take(messages.size).compile.toVector.unsafeRunSync()

      val consumerAfter =
        new KafkaConsumer(
          Map[String, AnyRef](
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
            ConsumerConfig.RETRY_BACKOFF_MS_CONFIG -> "1000",
            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "none",
            ConsumerConfig.GROUP_ID_CONFIG -> "increments-offsets"
          ).asJava,
          new ByteArrayDeserializer(),
          Generic.kafkaDeserializer
        )
      consumerAfter.assign(List(topicPartition).asJava)
      val endOffset = consumerAfter.position(topicPartition)
@pchlupacek
Copy link
Member

@shawjef3 we do not commit any offset. The client form fs-kafka is not durable, so you need to commit offsets in different persistent store, or eventually, you may want to use compacting kafka topic to do so.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants