-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17620: Simplifying share partition acquire API (kip-932) #17283
Conversation
releasePartitionLocks(shareFetchData.groupId(), topicPartitionData.keySet()); | ||
}); | ||
|
||
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the semantics of the exception handling are different now. If processFetchResponse throws, you'll skip the release of the locks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be missing something here. There is releasePartitionLocks
call as well in catch
block for unknowns. Though I have moved removed them from both places and moved to finally block. Is that what you meant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving it to the finally block seems a bit odd because the lock isn’t set when topicPartitionData
is empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true but releasePartitionLocks
iterates on passed topicIdPartitions
. As they would be empty then no lock will be released, as expected.
But I think your comment is less related to behaviour but more related to that do we need to call the release
if topicIdPartitions
is empty, correct? Though I can add a check in releasePartitionLocks
for emptiness of topicIdPartitions
but that seemed to me redundant as anyways the forEach will not be iterated (also topicIdPartitions can never be null). Please let me know what you think.
Could you please clarify why you decided not to use |
Hi @chia7712, though my inclination was towards keeping the code with CompletableFuture as in future if we think about persisting state when records are acquired then it will give us flexibility. But given the code with CompletableFuture adds a lot more complexity in handling scenarios, as again pointed out during introducing purgatory, #17177 (comment). Hence it's better to remove it for now. Once we have concrete use case then we should think of adding it back. |
Thanks @AndrewJSchofield and @chia7712 for review, I have addressed the feedback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the PR. Left a few comments.
if (acquiredRecords.isEmpty()) { | ||
partitionData | ||
.setRecords(null) | ||
.setErrorCode(Errors.NONE.code()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to explicitly set errorCode since it defaults to 0? If so, it would be useful to explicitly set other fields like errorMessage too. Ditto in the else clause.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -46,59 +47,59 @@ public class ShareFetchUtils { | |||
|
|||
// Process the replica manager fetch response to update share partitions and futures. We acquire the fetched data | |||
// from share partitions. | |||
static CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> processFetchResponse( | |||
static Map<TopicIdPartition, ShareFetchResponseData.PartitionData> processFetchResponse( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove futures in the comment above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
new SimpleRecord(null, "value".getBytes())); | ||
|
||
Map<TopicIdPartition, FetchPartitionData> responseData = Collections.singletonMap( | ||
tp0, new FetchPartitionData(Errors.OFFSET_OUT_OF_RANGE, 0L, 0L, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, the purpose of the test is to test the part when no records can be acquired. However, by passing in an error response, we will skip the acquire
call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, I have added other scenrio with empty as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the updated PR. A couple of more comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the updated. The changes LGTM. Could you rebase?
Thanks @junrao for the review. I have rebased it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for rebasing. Just one minor comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 thanks for this patch. overall LGTM
assertTrue(resultData.get(tp0).acquiredRecords().isEmpty()); | ||
assertEquals(Errors.NONE.code(), resultData.get(tp0).errorCode()); | ||
|
||
Mockito.verify(sp0, times(1)).updateCacheAndOffsets(any(Long.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: fetchOffsetForTimestamp
is mock, so we can verify the actual value 1
instead of any(Long.class)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the updated PR. LGTM
Simplified Share Partition Acquire API to not include CompletableFuture. The records do not need a future handling rather can be just acquired while updating internal data structure hence now need to of CompletableFuture.
Committer Checklist (excluded from commit message)