-
Notifications
You must be signed in to change notification settings - Fork 0
Developer Notes
Currently many of the list-* scripts attempt to rewind to the beginning of a topic and replay all messages, exiting once the last known message is replayed. If the topic cleanup.policy
is incorrectly set to delete
instead of compact
it is possible for the log cleaner to delete the last message (highest index). If this happens, the scripts may hang until a new message is received as the highest index message no longer exists and will not be received (some scripts support this on purpose with the --monitor
flag). Also, ensure --from-beginning
is used with command line consumer.
There are two opposing use cases to balance:
- Keep old messages around as long as necessary for "audit/archiver" to read (and potentially to reboot then re-connect and catch back up)
- Keep old messages around as short as possible to minimize accumulation of need-to-be compacted messages in order to speed-up client initialization (clients have to read and compact whatever Kafka doesn't)
We're using a dedicated Kafka cluster so it seems reasonable to use the broker configs instead of the per topic configs below.
Compact Configs:
topic / broker | note | value |
---|---|---|
cleanup.policy / log.cleanup.policy | Enable persistent compaction | compact |
delete.retention.ms / log.cleaner.delete.retention.ms | How long to hold tombstones - for us this effectively sets max time a materialized view (cache) can be offline without needing a complete re-read of entire topic since compact guarantees maintaining latest values EXCEPT tombstones so after this time limit re-connecting to Kafka and resuming where you left off may miss tombstones. |
86400000 (1 day) |
max.compaction.lag.ms / log.cleaner.max.compaction.lag.ms | Max time message remains ineligible for compaction. |
600000 (10 minutes) |
min.compaction.lag.ms / log.cleaner.min.compaction.lag.ms | Min time message remains ineligible for compaction |
300000 (5 minutes) |
min.cleanable.dirty.ratio / log.cleaner.min.cleanable.ratio | For messages less than max.compaction.lag.ms old and greater than min.compaction.lag.ms old logs that are "dirtier" than this ratio are considered eligible for compaction (cleaning). Default of 0.5 means must be dirtier than 50% duplicate messages (same keys) to trigger compaction. |
0.1 |
segment.ms / log.roll.ms | How frequently to roll a segment NOTE. Only inactive segments are eligible for compaction and the default of 7 days means Kafka is rarely going to compact low volume data (high volume data may roll more frequently due to log size). |
300000 (5 minutes) |
message.timestamp.type / log.message.timestamp.type | How timestamps are assigned to messages. If you use CreateTime, the producer may supply bogus timestamps that break compaction (timestamps in the future interfere with log.roll.ms ). |
LogAppendTime |
NOTE: Kafka will only consider rolling a log segment when a message is written to the log. This means if you re-import a ton of messages all-at-once and then don't make any changes for a long time you'll have an inflated log for a long time. This can happen with class/instance messages for example. You need to wait max.compaction.lag.ms
then "touch" the log by writing a message (and perhaps immediately tombstone it).
Other Broker Configs:
option | note | value |
---|---|---|
auto.create.topics.enable | Create a topic automatically if it doesn't exist and a producer is attempting to write to it. Sounds like bad idea in production environment. | false |
log.cleaner.backoff.ms | How long to sleep if nothing to clean at the moment (default is 15 seconds) |
5000 (15 seconds) |
log.cleaner.threads | Number of cleaner threads (default is 1) | 1 |
There are lots of nuances with Docker builds/runs to think about. There are many ways to work with various trade offs. After some trial and error I've come up with my own Docker Strategy.