Skip to content

Consume from Kafka

Alex edited this page Feb 21, 2020 · 8 revisions

Trubka is a general purpose Kafka consumer which can be used to consume plain text, protocol buffer or any arbitrary stream of bytes from Kafka.

Consuming Plain Text

consume plain command can be used to consume plain text or any byte stream from Kafka regardless of the serialisation algorithm of the content.

$> trubka consume plain <topic name> --brokers <broker:port>

Consuming Protobuf

consume proto command on the other hand is super opinionated about the bytes it consumes. It only accepts the probuf bytes of a specific proto contract to come through. In order to consume protocol buffer events from Kafka you need to tell Trubka:

  • Where your protocol buffer files (*.proto) live (--proto-root flag)
  • What topic you want to consume from (the first argument of the command)
  • And what contract it needs to use to deserialise the bytes into.
$> trubka consume proto <topic name> <contract> --proto-root <dir> --brokers <broker:port>

Keep in mind that you need the specify the fully qualified name of the contract. For example, if you have a protocol buffer message called EventHappened which lives in the contracts package and gets published to a topic named Events on your local machine, you must use the following command to start consuming:

$> trubka consume proto Events contracts.EventHappened \
--proto-root /some_dir_on_your_disk --brokers localhost:9092

Common consumer flags

By default Trubka starts consuming from the latest offset of the specified topic and writes the bytes into your terminal's buffer (stdout) by default. If you need to store the bytes into a file you can either redirect the output or use the consume command's --output-dir flag to specify the directory in which a single file per topic will be created to store the stream content.

By default, Trubka starts consuming from the latest offset of each topic and the offsets are maintained locally on the disk. So, there will be no consumer group for Trubka maintained by your Kafka cluster.

Interactive mode

Trubka can also be executed in interactive mode using the -i flag. Interactive mode walks you though the steps of picking the topic and the proto message type (if applicable) from a list of existing topics, fetched from the server, and a list of protocol buffer messages, living in the --proto-root directory (if applicable). If you have too many topics on the server, the list can be narrowed down using --topic-filter flag.

For proto consumer, the message type could also be filtered using —proto-filter flag.

Proto Consumer
$> trubka consume proto --proto-root /protocol_buffers_dir --brokers localhost:9092 \ 
--topic-filter Notifications --proto-filter EmailSent -i
Plain Text Consumer
$> trubka consume plain --brokers localhost:9092 -i --topic-filter Notifications

Note

--topic-filter and --proto-filter flags are regular expressions.

Searching Messages

You can optionally define a regular expression using the -q flag to filter the messages consumed from Kafka. It's simply a string match on the string representation of the de-serialised message content.

Clone this wiki locally