Skip to content

PartitionEoF error in the reader #1257

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ const (
InconsistentClusterID Error = 104
TransactionalIDNotFound Error = 105
FetchSessionTopicIDError Error = 106
PartitionEoF Error = 107
)

// Error satisfies the error interface.
Expand Down Expand Up @@ -377,6 +378,8 @@ func (e Error) Title() string {
return "Transactional ID Not Found"
case FetchSessionTopicIDError:
return "Fetch Session Topic ID Error"
case PartitionEoF:
return "Partition End of File"
}
return ""
}
Expand Down Expand Up @@ -586,6 +589,8 @@ func (e Error) Description() string {
return "The transactionalId could not be found"
case FetchSessionTopicIDError:
return "The fetch session encountered inconsistent topic ID usage"
case PartitionEoF:
return "Consumer reached the end of partition"
}
return ""
}
Expand Down
91 changes: 53 additions & 38 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,10 @@ type ReaderConfig struct {
// This flag is being added to retain backwards-compatibility, so it will be
// removed in a future version of kafka-go.
OffsetOutOfRangeError bool

// EnablePartitionEoF will notify the reader by throwing PartitionEoF when
// it reaches the end of partition.
EnablePartitionEoF bool
}

// Validate method validates ReaderConfig properties.
Expand Down Expand Up @@ -1194,23 +1198,24 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
defer join.Done()

(&reader{
dialer: r.config.Dialer,
logger: r.config.Logger,
errorLogger: r.config.ErrorLogger,
brokers: r.config.Brokers,
topic: key.topic,
partition: int(key.partition),
minBytes: r.config.MinBytes,
maxBytes: r.config.MaxBytes,
maxWait: r.config.MaxWait,
readBatchTimeout: r.config.ReadBatchTimeout,
backoffDelayMin: r.config.ReadBackoffMin,
backoffDelayMax: r.config.ReadBackoffMax,
version: r.version,
msgs: r.msgs,
stats: r.stats,
isolationLevel: r.config.IsolationLevel,
maxAttempts: r.config.MaxAttempts,
dialer: r.config.Dialer,
logger: r.config.Logger,
errorLogger: r.config.ErrorLogger,
brokers: r.config.Brokers,
topic: key.topic,
partition: int(key.partition),
minBytes: r.config.MinBytes,
maxBytes: r.config.MaxBytes,
maxWait: r.config.MaxWait,
readBatchTimeout: r.config.ReadBatchTimeout,
backoffDelayMin: r.config.ReadBackoffMin,
backoffDelayMax: r.config.ReadBackoffMax,
version: r.version,
msgs: r.msgs,
stats: r.stats,
isolationLevel: r.config.IsolationLevel,
maxAttempts: r.config.MaxAttempts,
enablePartitionEoF: r.config.EnablePartitionEoF,

// backwards-compatibility flags
offsetOutOfRangeError: r.config.OffsetOutOfRangeError,
Expand All @@ -1223,23 +1228,24 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
// used as a way to asynchronously fetch messages while the main program reads
// them using the high level reader API.
type reader struct {
dialer *Dialer
logger Logger
errorLogger Logger
brokers []string
topic string
partition int
minBytes int
maxBytes int
maxWait time.Duration
readBatchTimeout time.Duration
backoffDelayMin time.Duration
backoffDelayMax time.Duration
version int64
msgs chan<- readerMessage
stats *readerStats
isolationLevel IsolationLevel
maxAttempts int
dialer *Dialer
logger Logger
errorLogger Logger
brokers []string
topic string
partition int
minBytes int
maxBytes int
maxWait time.Duration
readBatchTimeout time.Duration
backoffDelayMin time.Duration
backoffDelayMax time.Duration
version int64
msgs chan<- readerMessage
stats *readerStats
isolationLevel IsolationLevel
maxAttempts int
enablePartitionEoF bool

offsetOutOfRangeError bool
}
Expand Down Expand Up @@ -1376,6 +1382,10 @@ func (r *reader) run(ctx context.Context, offset int64) {
})
r.stats.timeouts.observe(1)
continue
case errors.Is(err, PartitionEoF):
errcount = 0
r.sendError(ctx, err)
continue

case errors.Is(err, OffsetOutOfRange):
first, last, err := r.readOffsets(conn)
Expand Down Expand Up @@ -1506,6 +1516,7 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err
var err error
var size int64
var bytes int64
var newOffset = offset

for {
conn.SetReadDeadline(time.Now().Add(r.readBatchTimeout))
Expand All @@ -1524,9 +1535,9 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err
break
}

offset = msg.Offset + 1
r.stats.offset.observe(offset)
r.stats.lag.observe(highWaterMark - offset)
newOffset = msg.Offset + 1
r.stats.offset.observe(newOffset)
r.stats.lag.observe(highWaterMark - newOffset)

size++
bytes += n
Expand All @@ -1538,7 +1549,11 @@ func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, err
r.stats.readTime.observeDuration(t2.Sub(t1))
r.stats.fetchSize.observe(size)
r.stats.fetchBytes.observe(bytes)
return offset, err

if r.enablePartitionEoF && newOffset == highWaterMark && newOffset > offset {
return offset, PartitionEoF
}
return newOffset, err
}

func (r *reader) readOffsets(conn *Conn) (first, last int64, err error) {
Expand Down
25 changes: 25 additions & 0 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func TestReader(t *testing.T) {
scenario: "topic being recreated will return an error",
function: testReaderTopicRecreated,
},
{
scenario: "reading with enabled partition eof config",
function: testReaderPartitionEoF,
},
}

for _, test := range tests {
Expand Down Expand Up @@ -1982,3 +1986,24 @@ func testReaderTopicRecreated(t *testing.T, ctx context.Context, r *Reader) {
_, err = r.ReadMessage(ctx)
require.ErrorIs(t, err, OffsetOutOfRange)
}

func testReaderPartitionEoF(t *testing.T, ctx context.Context, r *Reader) {
r.config.EnablePartitionEoF = true

const N = 10
prepareReader(t, ctx, r, makeTestSequence(N)...)

var err error
for i := 0; i < N; i++ {
_, err = r.ReadMessage(ctx)
require.NoError(t, err)
}
_, err = r.ReadMessage(ctx)
require.True(t, errors.Is(err, PartitionEoF))

prepareReader(t, ctx, r, makeTestSequence(1)...)
_, err = r.ReadMessage(ctx)
require.NoError(t, err)
_, err = r.ReadMessage(ctx)
require.True(t, errors.Is(err, PartitionEoF))
}