From eb9d9ac57a1ec339de6c157f80d3c7b4916de469 Mon Sep 17 00:00:00 2001 From: Dennis Gloss Date: Wed, 17 Jan 2024 00:53:18 +0100 Subject: [PATCH] partition eof error in the reader --- error.go | 5 +++ reader.go | 91 +++++++++++++++++++++++++++++--------------------- reader_test.go | 25 ++++++++++++++ 3 files changed, 83 insertions(+), 38 deletions(-) diff --git a/error.go b/error.go index 4a7a8a278..25ff9ae02 100644 --- a/error.go +++ b/error.go @@ -119,6 +119,7 @@ const ( InconsistentClusterID Error = 104 TransactionalIDNotFound Error = 105 FetchSessionTopicIDError Error = 106 + PartitionEoF Error = 107 ) // Error satisfies the error interface. @@ -377,6 +378,8 @@ func (e Error) Title() string { return "Transactional ID Not Found" case FetchSessionTopicIDError: return "Fetch Session Topic ID Error" + case PartitionEoF: + return "Partition End of File" } return "" } @@ -586,6 +589,8 @@ func (e Error) Description() string { return "The transactionalId could not be found" case FetchSessionTopicIDError: return "The fetch session encountered inconsistent topic ID usage" + case PartitionEoF: + return "Consumer reached the end of partition" } return "" } diff --git a/reader.go b/reader.go index cfc7cb8f5..09d136b32 100644 --- a/reader.go +++ b/reader.go @@ -520,6 +520,10 @@ type ReaderConfig struct { // This flag is being added to retain backwards-compatibility, so it will be // removed in a future version of kafka-go. OffsetOutOfRangeError bool + + // EnablePartitionEoF will notify the reader by throwing PartitionEoF when + // it reaches the end of partition. + EnablePartitionEoF bool } // Validate method validates ReaderConfig properties. @@ -1194,23 +1198,24 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) { defer join.Done() (&reader{ - dialer: r.config.Dialer, - logger: r.config.Logger, - errorLogger: r.config.ErrorLogger, - brokers: r.config.Brokers, - topic: key.topic, - partition: int(key.partition), - minBytes: r.config.MinBytes, - maxBytes: r.config.MaxBytes, - maxWait: r.config.MaxWait, - readBatchTimeout: r.config.ReadBatchTimeout, - backoffDelayMin: r.config.ReadBackoffMin, - backoffDelayMax: r.config.ReadBackoffMax, - version: r.version, - msgs: r.msgs, - stats: r.stats, - isolationLevel: r.config.IsolationLevel, - maxAttempts: r.config.MaxAttempts, + dialer: r.config.Dialer, + logger: r.config.Logger, + errorLogger: r.config.ErrorLogger, + brokers: r.config.Brokers, + topic: key.topic, + partition: int(key.partition), + minBytes: r.config.MinBytes, + maxBytes: r.config.MaxBytes, + maxWait: r.config.MaxWait, + readBatchTimeout: r.config.ReadBatchTimeout, + backoffDelayMin: r.config.ReadBackoffMin, + backoffDelayMax: r.config.ReadBackoffMax, + version: r.version, + msgs: r.msgs, + stats: r.stats, + isolationLevel: r.config.IsolationLevel, + maxAttempts: r.config.MaxAttempts, + enablePartitionEoF: r.config.EnablePartitionEoF, // backwards-compatibility flags offsetOutOfRangeError: r.config.OffsetOutOfRangeError, @@ -1223,23 +1228,24 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) { // used as a way to asynchronously fetch messages while the main program reads // them using the high level reader API. type reader struct { - dialer *Dialer - logger Logger - errorLogger Logger - brokers []string - topic string - partition int - minBytes int - maxBytes int - maxWait time.Duration - readBatchTimeout time.Duration - backoffDelayMin time.Duration - backoffDelayMax time.Duration - version int64 - msgs chan<- readerMessage - stats *readerStats - isolationLevel IsolationLevel - maxAttempts int + dialer *Dialer + logger Logger + errorLogger Logger + brokers []string + topic string + partition int + minBytes int + maxBytes int + maxWait time.Duration + readBatchTimeout time.Duration + backoffDelayMin time.Duration + backoffDelayMax time.Duration + version int64 + msgs chan<- readerMessage + stats *readerStats + isolationLevel IsolationLevel + maxAttempts int + enablePartitionEoF bool offsetOutOfRangeError bool } @@ -1376,6 +1382,10 @@ func (r *reader) run(ctx context.Context, offset int64) { }) r.stats.timeouts.observe(1) continue + case errors.Is(err, PartitionEoF): + errcount = 0 + r.sendError(ctx, err) + continue case errors.Is(err, OffsetOutOfRange): first, last, err := r.readOffsets(conn) @@ -1506,6 +1516,7 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err var err error var size int64 var bytes int64 + var newOffset = offset for { conn.SetReadDeadline(time.Now().Add(r.readBatchTimeout)) @@ -1524,9 +1535,9 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err break } - offset = msg.Offset + 1 - r.stats.offset.observe(offset) - r.stats.lag.observe(highWaterMark - offset) + newOffset = msg.Offset + 1 + r.stats.offset.observe(newOffset) + r.stats.lag.observe(highWaterMark - newOffset) size++ bytes += n @@ -1538,7 +1549,11 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err r.stats.readTime.observeDuration(t2.Sub(t1)) r.stats.fetchSize.observe(size) r.stats.fetchBytes.observe(bytes) - return offset, err + + if r.enablePartitionEoF && newOffset == highWaterMark && newOffset > offset { + return offset, PartitionEoF + } + return newOffset, err } func (r *reader) readOffsets(conn *Conn) (first, last int64, err error) { diff --git a/reader_test.go b/reader_test.go index f413d7429..0c202e5c4 100644 --- a/reader_test.go +++ b/reader_test.go @@ -67,6 +67,10 @@ func TestReader(t *testing.T) { scenario: "topic being recreated will return an error", function: testReaderTopicRecreated, }, + { + scenario: "reading with enabled partition eof config", + function: testReaderPartitionEoF, + }, } for _, test := range tests { @@ -1982,3 +1986,24 @@ func testReaderTopicRecreated(t *testing.T, ctx context.Context, r *Reader) { _, err = r.ReadMessage(ctx) require.ErrorIs(t, err, OffsetOutOfRange) } + +func testReaderPartitionEoF(t *testing.T, ctx context.Context, r *Reader) { + r.config.EnablePartitionEoF = true + + const N = 10 + prepareReader(t, ctx, r, makeTestSequence(N)...) + + var err error + for i := 0; i < N; i++ { + _, err = r.ReadMessage(ctx) + require.NoError(t, err) + } + _, err = r.ReadMessage(ctx) + require.True(t, errors.Is(err, PartitionEoF)) + + prepareReader(t, ctx, r, makeTestSequence(1)...) + _, err = r.ReadMessage(ctx) + require.NoError(t, err) + _, err = r.ReadMessage(ctx) + require.True(t, errors.Is(err, PartitionEoF)) +}