Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure ordering of offset commit requests #2941

Closed
wants to merge 3 commits into from

Conversation

prestona
Copy link
Member

Add locking to prevent concurrent calls to commit from potentially being re-ordered between determining which offsets to include in the offset commit request, and sending the request to Kafka.

Note: we think this can be improved upon (by not adding more locks). A potential improvement would be to use the broker lock, and only hold this until the request is sent (avoiding holding a lock for the entire network round-trip time). However, it looks like #2409 has made sendAndReceive hold the broker lock until a response is received from the broker, so we need to understand that a little better first.

Fixes: #2940

Add locking to prevent concurrent calls to commit from potentially being
re-ordered between determining which offsets to include in the offset
commit request, and sending the request to Kafka.

Move finding the coordinator before creating the request to avoid
holding the lock in the case the coordinator is unknown. This requires a
change to the "new offset manager" test, which previously didn't expect
the mock broker to receive a find coordinator request.

Fixes: IBM#2940
Co-authored-by: Michael Burgess <[email protected]>
Signed-off-by: Adrian Preston <[email protected]>
@dnwe dnwe added the fix label Jul 21, 2024
Copy link
Collaborator

@dnwe dnwe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix LGTM, though it would be helpful if we could possibly get a reproduce of the out-of-order commits into the FV suite in a followup PR?

@prestona
Copy link
Member Author

Fix LGTM, though it would be helpful if we could possibly get a reproduce of the out-of-order commits into the FV suite in a followup PR?

Yep. Good point. The manual reproduce needed quite a few repetitions before we hit the window, but we can take a look and see what an FV suite test would look like, and how often it can be made to detect the condition.

This test simulates multiple concurrent commits for a single group
running in a single process. The expected behavior is that commits are
delivered in order to the mock broker, with no offset in the
OffsetCommitRequest being for a lower value than previously committed
for the partition.

Signed-off-by: Adrian Preston <[email protected]>
@prestona
Copy link
Member Author

I've pushed an extra commit that adds a unit test for validating commit ordering. Compared to the FV suite, a unit test has the advantage that it can really hammer the offset commit code-path from multiple goroutines. YMMV but for me the test fails after about 10-20 offset commits.

func() {
mu.Lock()
defer mu.Unlock()
if outOfOrder == "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer reordering this to an early return/guard condition:

if outOfOrder != "" {
	return
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Superseded by switch to sync/atomic.Pointer

Comment on lines 155 to 157
outOfOrder =
fmt.Sprintf("out of order commit to partition %d, current committed offset: %d, offset in request: %d",
partition, last, offset.offset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using a sync/atomic.Value with a pointer to a struct{ partition, last, offset IntType } shape struct, to avoid the inherent serialization caused by mutex locking.

Copy link
Member Author

@prestona prestona Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I ended up using an atomic.Pointer, as it avoids the need to make a type assertion when calling Load().

const numPartitions = 10
const commitsPerPartition = 1000

wg := &sync.WaitGroup{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer simply declaring a variable var wg sync.WaitGroup rather than assigning a zero value / pointer to zero value.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

defer safeClose(t, testClient)
om, err := NewOffsetManagerFromClient("group", testClient)
if err != nil {
t.Error(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the test keep going after this? om is almost certainly a nil pointer, so the first use is going to nil pointer exception, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well spotted. Substituted with t.Fatal(...).

Comment on lines 208 to 209
mu.Lock()
defer mu.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lock should be unnecessary. The wg.Wait() ensures all writes should be complete, and performs all the synchronization necessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Superseded by switch to sync/atomic.Pointer. The intent was to ensure that the error message assigned to outOfOrder (by the goroutine running the handler in the mock broker) was guaranteed to be observed by the goroutine running the test function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I figured it was added out of an abundance of caution. The information here (helpful apart from even this code review) is that the wg.Wait() itself is synchronizing, as all calls to wg.Done() happen before the return of wg.Wait(), the same as if you had used a mutex.

// Test that the correct sequence of offset commit messages is sent to a broker when
// multiple goroutines for a group are committing offsets at the same time
func TestOffsetManagerCommitSequence(t *testing.T) {
mu := &sync.Mutex{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as below, just go ahead and declare the variable instead: var mu sync.Mutex

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Superseded by switch to sync/atomic.Pointer - which I've declared as per your suggestion.

Specifically:
- Simplify variable declarations for zero-value values
- Use sync/atomic.Pointer instead of sync.Mutex
- t.Error -> t.Fatal to avoid continuing and de-referencing nil
- Remove unused named return values (copy and paste error)

Signed-off-by: Adrian Preston <[email protected]>
@prestona
Copy link
Member Author

@puellanivis, thanks for taking the time to review. I hope I've addressed your concerns in 21e6e58. I've also cherry-picked this change onto #2947 to keep that consistent.

Copy link
Contributor

@puellanivis puellanivis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks acceptable.

@dnwe
Copy link
Collaborator

dnwe commented Aug 7, 2024

Superseded by 2947

@dnwe dnwe closed this Aug 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Re-ordering of offset commit requests can cause committed offset to move "backwards".
3 participants