Skip to content

Commit b8bbcea

Browse files
committed
partition eof error in the reader
1 parent 1b64d17 commit b8bbcea

File tree

3 files changed

+74
-38
lines changed

3 files changed

+74
-38
lines changed

error.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ const (
119119
InconsistentClusterID Error = 104
120120
TransactionalIDNotFound Error = 105
121121
FetchSessionTopicIDError Error = 106
122+
PartitionEoF Error = 107
122123
)
123124

124125
// Error satisfies the error interface.
@@ -377,6 +378,8 @@ func (e Error) Title() string {
377378
return "Transactional ID Not Found"
378379
case FetchSessionTopicIDError:
379380
return "Fetch Session Topic ID Error"
381+
case PartitionEoF:
382+
return "Partition End of File"
380383
}
381384
return ""
382385
}
@@ -586,6 +589,8 @@ func (e Error) Description() string {
586589
return "The transactionalId could not be found"
587590
case FetchSessionTopicIDError:
588591
return "The fetch session encountered inconsistent topic ID usage"
592+
case PartitionEoF:
593+
return "Consumer reached the end of partition"
589594
}
590595
return ""
591596
}

reader.go

Lines changed: 53 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,10 @@ type ReaderConfig struct {
520520
// This flag is being added to retain backwards-compatibility, so it will be
521521
// removed in a future version of kafka-go.
522522
OffsetOutOfRangeError bool
523+
524+
// EnablePartitionEoF will notify the reader by throwing PartitionEoF when
525+
// it reaches the end of partition.
526+
EnablePartitionEoF bool
523527
}
524528

525529
// Validate method validates ReaderConfig properties.
@@ -1194,23 +1198,24 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
11941198
defer join.Done()
11951199

11961200
(&reader{
1197-
dialer: r.config.Dialer,
1198-
logger: r.config.Logger,
1199-
errorLogger: r.config.ErrorLogger,
1200-
brokers: r.config.Brokers,
1201-
topic: key.topic,
1202-
partition: int(key.partition),
1203-
minBytes: r.config.MinBytes,
1204-
maxBytes: r.config.MaxBytes,
1205-
maxWait: r.config.MaxWait,
1206-
readBatchTimeout: r.config.ReadBatchTimeout,
1207-
backoffDelayMin: r.config.ReadBackoffMin,
1208-
backoffDelayMax: r.config.ReadBackoffMax,
1209-
version: r.version,
1210-
msgs: r.msgs,
1211-
stats: r.stats,
1212-
isolationLevel: r.config.IsolationLevel,
1213-
maxAttempts: r.config.MaxAttempts,
1201+
dialer: r.config.Dialer,
1202+
logger: r.config.Logger,
1203+
errorLogger: r.config.ErrorLogger,
1204+
brokers: r.config.Brokers,
1205+
topic: key.topic,
1206+
partition: int(key.partition),
1207+
minBytes: r.config.MinBytes,
1208+
maxBytes: r.config.MaxBytes,
1209+
maxWait: r.config.MaxWait,
1210+
readBatchTimeout: r.config.ReadBatchTimeout,
1211+
backoffDelayMin: r.config.ReadBackoffMin,
1212+
backoffDelayMax: r.config.ReadBackoffMax,
1213+
version: r.version,
1214+
msgs: r.msgs,
1215+
stats: r.stats,
1216+
isolationLevel: r.config.IsolationLevel,
1217+
maxAttempts: r.config.MaxAttempts,
1218+
enablePartitionEoF: r.config.EnablePartitionEoF,
12141219

12151220
// backwards-compatibility flags
12161221
offsetOutOfRangeError: r.config.OffsetOutOfRangeError,
@@ -1223,23 +1228,24 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
12231228
// used as a way to asynchronously fetch messages while the main program reads
12241229
// them using the high level reader API.
12251230
type reader struct {
1226-
dialer *Dialer
1227-
logger Logger
1228-
errorLogger Logger
1229-
brokers []string
1230-
topic string
1231-
partition int
1232-
minBytes int
1233-
maxBytes int
1234-
maxWait time.Duration
1235-
readBatchTimeout time.Duration
1236-
backoffDelayMin time.Duration
1237-
backoffDelayMax time.Duration
1238-
version int64
1239-
msgs chan<- readerMessage
1240-
stats *readerStats
1241-
isolationLevel IsolationLevel
1242-
maxAttempts int
1231+
dialer *Dialer
1232+
logger Logger
1233+
errorLogger Logger
1234+
brokers []string
1235+
topic string
1236+
partition int
1237+
minBytes int
1238+
maxBytes int
1239+
maxWait time.Duration
1240+
readBatchTimeout time.Duration
1241+
backoffDelayMin time.Duration
1242+
backoffDelayMax time.Duration
1243+
version int64
1244+
msgs chan<- readerMessage
1245+
stats *readerStats
1246+
isolationLevel IsolationLevel
1247+
maxAttempts int
1248+
enablePartitionEoF bool
12431249

12441250
offsetOutOfRangeError bool
12451251
}
@@ -1376,6 +1382,10 @@ func (r *reader) run(ctx context.Context, offset int64) {
13761382
})
13771383
r.stats.timeouts.observe(1)
13781384
continue
1385+
case errors.Is(err, PartitionEoF):
1386+
errcount = 0
1387+
r.sendError(ctx, err)
1388+
continue
13791389

13801390
case errors.Is(err, OffsetOutOfRange):
13811391
first, last, err := r.readOffsets(conn)
@@ -1506,6 +1516,7 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err
15061516
var err error
15071517
var size int64
15081518
var bytes int64
1519+
var newOffset = offset
15091520

15101521
for {
15111522
conn.SetReadDeadline(time.Now().Add(r.readBatchTimeout))
@@ -1524,9 +1535,9 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err
15241535
break
15251536
}
15261537

1527-
offset = msg.Offset + 1
1528-
r.stats.offset.observe(offset)
1529-
r.stats.lag.observe(highWaterMark - offset)
1538+
newOffset = msg.Offset + 1
1539+
r.stats.offset.observe(newOffset)
1540+
r.stats.lag.observe(highWaterMark - newOffset)
15301541

15311542
size++
15321543
bytes += n
@@ -1538,7 +1549,11 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err
15381549
r.stats.readTime.observeDuration(t2.Sub(t1))
15391550
r.stats.fetchSize.observe(size)
15401551
r.stats.fetchBytes.observe(bytes)
1541-
return offset, err
1552+
1553+
if r.enablePartitionEoF && newOffset == highWaterMark && newOffset > offset {
1554+
return offset, PartitionEoF
1555+
}
1556+
return newOffset, err
15421557
}
15431558

15441559
func (r *reader) readOffsets(conn *Conn) (first, last int64, err error) {

reader_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ func TestReader(t *testing.T) {
6767
scenario: "topic being recreated will return an error",
6868
function: testReaderTopicRecreated,
6969
},
70+
{
71+
scenario: "reading with enabled partition eof",
72+
function: testReaderPartitionEoF,
73+
},
7074
}
7175

7276
for _, test := range tests {
@@ -1982,3 +1986,15 @@ func testReaderTopicRecreated(t *testing.T, ctx context.Context, r *Reader) {
19821986
_, err = r.ReadMessage(ctx)
19831987
require.ErrorIs(t, err, OffsetOutOfRange)
19841988
}
1989+
1990+
func testReaderPartitionEoF(t *testing.T, ctx context.Context, r *Reader) {
1991+
r.config.EnablePartitionEoF = true
1992+
1993+
prepareReader(t, ctx, r, makeTestSequence(1)...)
1994+
1995+
_, err := r.ReadMessage(ctx)
1996+
require.NoError(t, err)
1997+
1998+
_, err = r.ReadMessage(ctx)
1999+
require.True(t, errors.Is(err, PartitionEoF))
2000+
}

0 commit comments

Comments
 (0)