first simple implementation
Sep 29, 2019
commit 6e9f2f9
12 changed files with 537 additions and 0 deletions.
.gitignore
# sbt
# (may want to keep parts of 'project')

# eclipse

# intellij idea

# mac

# other?

# kafka-message-seeker

This application will start a kafka consumer that will scan a topic looking for the a string.
To run the application you need to build (or download the pre built jar) with these options:

Usage: kafkaMessage seeker [options]
-t, --topic <value> The topic for which seek the message
-s, --search-for <value>
The string that will be searched in topic
-o, --offset <value> The offset to start with (for every partition!)

For example if you want to find all the messages that contains the string "hello" starting from
offset 1000 on the topic "test_topic" you should run in this way:

`java -jar kafka-message-seeker.jar --topic test_topic --offset 1000 --search-for hello`

# Build your jar:

You can build your own jar using sbt:
sbt clean assembly

This will run tests as well (will take less than a minute).
name := "kafka-message-seeker"
version := "0.1"
organization := "com.filiponi"
scalaVersion := "2.12.8"

assemblyJarName in assembly := s"${name.value}-${version.value}.jar"

libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-clients" % "2.3.0",
"com.github.scopt" %% "scopt" % "4.0.0-RC2",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"org.scalatest" %% "scalatest" % "3.0.8" % Test,
"com.whisk" %% "docker-testkit-scalatest" % "0.9.8" % Test,
"com.whisk" %% "docker-testkit-impl-spotify" % "0.9.8" % Test,
"org.mockito" % "mockito-all" % "1.10.19" % Test

sbt.version = 1.3.2
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
%d{HH:mm:ss.SSS} - %msg%n
<root level="error">
<appender-ref ref="CONSOLE"/>
<logger name="com.filipponi" level="debug" additivity="false">
<appender-ref ref="CONSOLE"/>
package com.filipponi.seeker

import scopt.{OParser, OParserBuilder}

object CommandLineParser {

* Simple configuration class for the input args.
* @param brokers the kafka brokers.
* @param topic the topic to use.
* @param stringToSeek the string to seek in the kafka messages.
* @param offset the offset to start with.
case class Config(brokers: String,
topic: String,
stringToSeek: String,
offset: Long)

object Config {
def empty(): Config = new Config("","", "", 0L)

val builder: OParserBuilder[Config] = OParser.builder[Config]

val kafkaMsgSeekerArgsParser: OParser[Unit, Config] = {
import builder._
programName("kafkaMessage seeker"),
head("kafkaMessageSeeker", "0.1"),
opt[String]( 'b',"brokers")
.action((x, c) => c.copy(brokers = x))
.text("The kafka brokers"),
opt[String]( 't',"topic")
.action((x, c) => c.copy(topic = x))
.text("The topic for which seek the message"),
opt[String]( 's',"search-for")
.action((x, c) => c.copy(stringToSeek = x))
.text("The string that will be searched in topic"),
opt[Long]( 'o',"offset")
.action((x, c) => c.copy(offset = x))
.text("The offset to start with (for every partition!)")
package com.filipponi.seeker

import java.time.Duration
import java.util.{Collections, Properties}

import com.filipponi.seeker.CommandLineParser.{Config, kafkaMsgSeekerArgsParser}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import org.slf4j.{Logger, LoggerFactory}
import scopt.OParser

import scala.collection.JavaConverters._

object MsgSeeker extends App {

private val logger = LoggerFactory.getLogger(getClass)

seek(logger = logger,args)

private[seeker] def seek(logger: Logger, args: Array[String]) = {
OParser.parse(kafkaMsgSeekerArgsParser, args, Config.empty()) match {
case Some(config) =>"Searching for string: ${config.stringToSeek}, from offset: ${config.offset} on topic: ${config.topic}")

val consumer = createConsumer(config.brokers)


val partitionInfos = consumer.partitionsFor(config.topic).asScala

//this poll does the trick to assign all the partition to this consumer, otherwise i can't seek.

partitionInfos.foreach { partitionInfo => TopicPartition(partitionInfo.topic(), partitionInfo.partition()), config.offset)

var moreMessages = true //this is the way to stop cycle from odersky, but i don't really like it

var timer = System.currentTimeMillis()

while (moreMessages) {

val records: ConsumerRecords[String, Array[Byte]] = consumer.poll(Duration.ofSeconds(1))
if (records.isEmpty) moreMessages = false
val iterator = records.iterator()
while (iterator.hasNext) {
val record: ConsumerRecord[String, Array[Byte]] =

val value = new String(record.value())

if (value.contains(config.stringToSeek)) {"I've found a match! \n {Key: ${record.key()} \n Offset: ${record.offset()} \n Partition: ${record.partition()} \n Value: $value}")

//prints updates roughly every 20 seconds
if (System.currentTimeMillis() - timer > 20000) {"{Currently processing record at Offset: ${record.offset()} and partition: ${record.partition()} }")
timer = System.currentTimeMillis()
}"No more messages!")


case _ =>



private def createConsumer(brokers: String): KafkaConsumer[String, Array[Byte]] = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer])
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
props.put(ConsumerConfig.GROUP_ID_CONFIG, s"kafka-message-seeker-${scala.util.Random.nextString(10)}")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val consumer = new KafkaConsumer[String, Array[Byte]](props)

<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
%d{HH:mm:ss.SSS} - %msg%n
<root level="error">
<appender-ref ref="CONSOLE"/>
<logger name="com.filipponi" level="debug" additivity="false">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE"/>
package com.filipponi.seeker

import com.filipponi.seeker.CommandLineParser.{Config,kafkaMsgSeekerArgsParser}
import org.scalatest.{FlatSpec, Matchers}
import scopt.OParser

class CommandLineParserTest extends FlatSpec with Matchers {

"CommandLineParser" should "return a none when options are not correct" in {

val args = new Array[String](0)

OParser.parse(kafkaMsgSeekerArgsParser, args, Config.empty()) should be(None)


"CommandLineParser" should "return a Some(config) all required options are passed" in {

val args = new Array[String](8)

args(0) = "--topic"
args(1) = "test"
args(2) = "--search-for"
args(3) = "string"
args(4) = "--offset"
args(5) = "101010"
args(6) = "--brokers"
args(7) = "localhost:9092"

OParser.parse(kafkaMsgSeekerArgsParser, args, Config.empty()) should be(Some(Config("localhost:9092","test", "string", 101010)))


"CommandLineParser" should "return a Some(config) regardless the order of options couple" in {

val args = new Array[String](8)

args(2) = "--topic"
args(3) = "test"
args(0) = "--search-for"
args(1) = "string"
args(6) = "--offset"
args(7) = "101010"
args(4) = "--brokers"
args(5) = "localhost:9092"

OParser.parse(kafkaMsgSeekerArgsParser, args, Config.empty()) should be(Some(Config("localhost:9092","test", "string", 101010)))



