diff --git a/README.md b/README.md index 1da293e..8953cb6 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,24 @@ offset 1000 on the topic "test_topic" you should run in this way: `java -jar kafka-message-seeker.jar --topic test_topic --offset 1000 --search-for hello` +If you'd like to scan the whole topic just use 0 as offset. Be aware that scanning an entire kafka topic +can take long time (depending on kafka message retention policy). + +If a message is found will be printed in the console the record metadata and the record value, +for example: + +``` +12:53:23.718 - I've found a match! + {Key: null + Offset: 3 + Partition: 0 + Value: hello} +``` + +value is the whole kafka message (not just the matched string). + +Every 20 seconds there will be an update about the current offset/partition. + ### Build your jar: You can build your own jar using sbt: diff --git a/src/main/scala/com/filipponi/seeker/MsgSeeker.scala b/src/main/scala/com/filipponi/seeker/MsgSeeker.scala index 4fed7a0..0dd1b41 100644 --- a/src/main/scala/com/filipponi/seeker/MsgSeeker.scala +++ b/src/main/scala/com/filipponi/seeker/MsgSeeker.scala @@ -22,7 +22,7 @@ object MsgSeeker extends App { OParser.parse(kafkaMsgSeekerArgsParser, args, Config.empty()) match { case Some(config) => - logger.info(s"Searching for string: ${config.stringToSeek}, from offset: ${config.offset} on topic: ${config.topic}") + logger.info(s"Searching for string: ${config.stringToSeek}, from offset: ${config.offset}, on topic: ${config.topic}, with broker: ${config.brokers}") val consumer = createConsumer(config.brokers) @@ -43,7 +43,7 @@ object MsgSeeker extends App { while (moreMessages) { - val records: ConsumerRecords[String, Array[Byte]] = consumer.poll(Duration.ofSeconds(1)) + val records: ConsumerRecords[String, Array[Byte]] = consumer.poll(Duration.ofSeconds(2)) if (records.isEmpty) moreMessages = false val iterator = records.iterator() while (iterator.hasNext) {