Skip to content

Commit 368dc0a

Browse files
authored
topicctl bug fix for offsets (#171)
* topicctl bug fix for offsets * Bug fixes and test case fixtures * minor spelling fixture * Formatting code for bounds.go
1 parent 3d58ff7 commit 368dc0a

File tree

4 files changed

+36
-16
lines changed

4 files changed

+36
-16
lines changed

pkg/groups/groups.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ func GetMemberLags(
160160
GroupId: groupID,
161161
},
162162
)
163+
log.Debugf("Received consumerOffsets: %+v", offsets)
164+
163165
if err != nil {
164166
return nil, err
165167
}

pkg/groups/groups_test.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -289,10 +289,13 @@ func TestGetLags(t *testing.T) {
289289
require.NoError(t, err)
290290
require.Equal(t, 2, len(lags))
291291

292+
// We create topic with 2 partitions and send 10 messages. first, last offset for all partitions is 0
293+
// When we consume 4 messages, 5 is the latest/last/newest offset
294+
// We consume 2 messages for each partition. Hence member_offset(2) <= last_offset(5)
292295
for l, lag := range lags {
293296
assert.Equal(t, l, lag.Partition)
294-
assert.Equal(t, int64(4), lag.NewestOffset)
295-
assert.LessOrEqual(t, lag.MemberOffset, int64(4))
297+
assert.Equal(t, int64(5), lag.NewestOffset)
298+
assert.LessOrEqual(t, lag.MemberOffset, int64(5))
296299
}
297300
}
298301

@@ -330,10 +333,12 @@ func TestGetEarliestOrLatestOffset(t *testing.T) {
330333

331334
groupPartitions := groupDetails.Members[0].TopicPartitions[topicName]
332335

336+
// A topic with 2 partitions and produced 10 messages. first/earliest offset = 0, last/lastest offset = 5
337+
// Consume 8 messages. first/earliest offset = 0, member offset = 4, last/lastest offset = 5
333338
for _, partition := range groupPartitions {
334339
offset, err := GetEarliestOrLatestOffset(ctx, connector, topicName, LatestResetOffsetsStrategy, partition)
335340
require.NoError(t, err)
336-
assert.Equal(t, int64(4), offset)
341+
assert.Equal(t, int64(5), offset)
337342

338343
offset, err = GetEarliestOrLatestOffset(ctx, connector, topicName, EarliestResetOffsetsStrategy, partition)
339344
require.NoError(t, err)
@@ -390,10 +395,10 @@ func TestResetOffsets(t *testing.T) {
390395
assert.Equal(t, int64(2), lags[0].MemberOffset)
391396
assert.Equal(t, int64(1), lags[1].MemberOffset)
392397

393-
// latest offset of partition 0
398+
// latest offset of partition 0 -> This should be 5. first offset = 0, last offset = 5 (total 10 messages)
394399
latestOffset, err := GetEarliestOrLatestOffset(ctx, connector, topicName, LatestResetOffsetsStrategy, 0)
395400
require.NoError(t, err)
396-
// earliest offset of partition 1
401+
// earliest offset of partition 1 -> This should be 0. first offset = 0, last offset = 5 (total 10 messages)
397402
earliestOffset, err := GetEarliestOrLatestOffset(ctx, connector, topicName, EarliestResetOffsetsStrategy, 1)
398403
require.NoError(t, err)
399404

@@ -413,7 +418,10 @@ func TestResetOffsets(t *testing.T) {
413418
require.NoError(t, err)
414419

415420
require.Equal(t, 2, len(lags))
416-
assert.Equal(t, int64(4), lags[0].MemberOffset)
421+
// partiton 0, we reset offset to latestoffset which is 5
422+
assert.Equal(t, int64(5), lags[0].MemberOffset)
423+
424+
// partiton 1, we reset offset to earliestoffset which is 0
417425
assert.Equal(t, int64(0), lags[1].MemberOffset)
418426

419427
}

pkg/messages/bounds.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,10 @@ func GetPartitionBounds(
174174
}, nil
175175
}
176176

177-
if minOffset > firstOffset {
177+
// if minOffset is equal to lastOffset
178+
// We read message (firstMessage) from minOffset+1 Which can lead to invalid reads
179+
// Hence, We will not move first offset to match min offset if minOffset >= lastOffset
180+
if minOffset > firstOffset && minOffset < lastOffset {
178181
log.Debugf(
179182
"Moving first offset forward to match min offset (%d)",
180183
minOffset,
@@ -233,11 +236,18 @@ func GetPartitionBounds(
233236
)
234237
}
235238

239+
log.Debugf(
240+
"Final offsets for %d: %d->%d",
241+
partition,
242+
firstMessage.Offset,
243+
lastMessage.Offset,
244+
)
245+
236246
return Bounds{
237247
Partition: partition,
238248
FirstOffset: firstMessage.Offset,
239249
FirstTime: firstMessage.Time,
240-
LastOffset: lastMessage.Offset,
250+
LastOffset: lastMessage.Offset + 1,
241251
LastTime: lastMessage.Time,
242252
}, nil
243253
}

pkg/messages/bounds_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,16 @@ func TestGetAllPartitionBounds(t *testing.T) {
6464
bounds, err := GetAllPartitionBounds(ctx, connector, topicName, nil)
6565
assert.NoError(t, err)
6666

67-
// The first partition gets 3 messages
67+
// The first partition gets 3 messages. (i.e) earliest/first offset is 0 and latest/last is 3
6868
assert.Equal(t, 4, len(bounds))
6969
assert.Equal(t, 0, bounds[0].Partition)
7070
assert.Equal(t, int64(0), bounds[0].FirstOffset)
71-
assert.Equal(t, int64(2), bounds[0].LastOffset)
71+
assert.Equal(t, int64(3), bounds[0].LastOffset)
7272

73-
// The last partition gets only 2 messages
73+
// The last partition gets only 2 messages. (i.e) earliest/first offset is 0 and latest/last is 2
7474
assert.Equal(t, 3, bounds[3].Partition)
7575
assert.Equal(t, int64(0), bounds[3].FirstOffset)
76-
assert.Equal(t, int64(1), bounds[3].LastOffset)
76+
assert.Equal(t, int64(2), bounds[3].LastOffset)
7777

7878
boundsWithOffsets, err := GetAllPartitionBounds(
7979
ctx,
@@ -87,13 +87,13 @@ func TestGetAllPartitionBounds(t *testing.T) {
8787

8888
assert.Equal(t, 4, len(boundsWithOffsets))
8989

90-
// Start of first partition is moved forward
90+
// Start of first partition is moved forward. First partition has earliest offset is 0 and latest is 3
9191
assert.Equal(t, 0, boundsWithOffsets[0].Partition)
9292
assert.Equal(t, int64(1), boundsWithOffsets[0].FirstOffset)
93-
assert.Equal(t, int64(2), boundsWithOffsets[0].LastOffset)
93+
assert.Equal(t, int64(3), boundsWithOffsets[0].LastOffset)
9494

95-
// Other partition bounds are unchanged
95+
// Other partition bounds are unchanged. Last partition has earliest offset is 0 and latest is 2
9696
assert.Equal(t, 3, boundsWithOffsets[3].Partition)
9797
assert.Equal(t, int64(0), boundsWithOffsets[3].FirstOffset)
98-
assert.Equal(t, int64(1), boundsWithOffsets[3].LastOffset)
98+
assert.Equal(t, int64(2), boundsWithOffsets[3].LastOffset)
9999
}

0 commit comments

Comments
 (0)