Skip to content

Commit

Permalink
fix: skip reading from compacted records
Browse files Browse the repository at this point in the history
  • Loading branch information
niafly committed Nov 28, 2023
1 parent f568774 commit f4f748b
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions message_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,19 @@ func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readB

func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readBytesFunc) (
offset int64, lastOffset int64, timestamp int64, headers []Header, err error) {
if err = r.readHeader(); err != nil {
return
for r.count == 0 {
// Keep reading new headers since in some cases when compacted logs is enabled, the broker may
// return a record header without any record value since the record was compacted (header.count=0).
//
// Trying to read from this compacted record would cause wrong data read and panic on markRead() since
// the reader count is actually 0.
if err = r.readHeader(); err != nil {
return
}
// Keep this as an infinite loop since in case the batch ended up with a compacted record, it will throws EOF
// and will be handled in the caller scope (batch).
}

if r.count == int(r.header.v2.count) { // first time reading this set, so check for compression headers.
var codec CompressionCodec
if codec, err = r.header.compression(); err != nil {
Expand Down

0 comments on commit f4f748b

Please sign in to comment.