Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17620: Simplifying share partition acquire API (kip-932) #17283

Merged
merged 12 commits into from
Oct 10, 2024

Conversation

apoorvmittal10
Copy link
Collaborator

Simplified Share Partition Acquire API to not include CompletableFuture. The records do not need a future handling rather can be just acquired while updating internal data structure hence now need to of CompletableFuture.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@github-actions github-actions bot added the core Kafka Broker label Sep 26, 2024
@apoorvmittal10 apoorvmittal10 added the KIP-932 Queues for Kafka label Sep 26, 2024
releasePartitionLocks(shareFetchData.groupId(), topicPartitionData.keySet());
});

Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the semantics of the exception handling are different now. If processFetchResponse throws, you'll skip the release of the locks.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be missing something here. There is releasePartitionLocks call as well in catch block for unknowns. Though I have moved removed them from both places and moved to finally block. Is that what you meant?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving it to the finally block seems a bit odd because the lock isn’t set when topicPartitionData is empty.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true but releasePartitionLocks iterates on passed topicIdPartitions. As they would be empty then no lock will be released, as expected.

But I think your comment is less related to behaviour but more related to that do we need to call the release if topicIdPartitions is empty, correct? Though I can add a check in releasePartitionLocks for emptiness of topicIdPartitions but that seemed to me redundant as anyways the forEach will not be iterated (also topicIdPartitions can never be null). Please let me know what you think.

@chia7712
Copy link
Contributor

Could you please clarify why you decided not to use CompletableFuture (#16274 (comment))?

@apoorvmittal10
Copy link
Collaborator Author

Could you please clarify why you decided not to use CompletableFuture (#16274 (comment))?

Hi @chia7712, though my inclination was towards keeping the code with CompletableFuture as in future if we think about persisting state when records are acquired then it will give us flexibility. But given the code with CompletableFuture adds a lot more complexity in handling scenarios, as again pointed out during introducing purgatory, #17177 (comment). Hence it's better to remove it for now. Once we have concrete use case then we should think of adding it back.

@apoorvmittal10
Copy link
Collaborator Author

Thanks @AndrewJSchofield and @chia7712 for review, I have addressed the feedback.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the PR. Left a few comments.

if (acquiredRecords.isEmpty()) {
partitionData
.setRecords(null)
.setErrorCode(Errors.NONE.code())
Copy link
Contributor

@junrao junrao Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to explicitly set errorCode since it defaults to 0? If so, it would be useful to explicitly set other fields like errorMessage too. Ditto in the else clause.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -46,59 +47,59 @@ public class ShareFetchUtils {

// Process the replica manager fetch response to update share partitions and futures. We acquire the fetched data
// from share partitions.
static CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> processFetchResponse(
static Map<TopicIdPartition, ShareFetchResponseData.PartitionData> processFetchResponse(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove futures in the comment above?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

new SimpleRecord(null, "value".getBytes()));

Map<TopicIdPartition, FetchPartitionData> responseData = Collections.singletonMap(
tp0, new FetchPartitionData(Errors.OFFSET_OUT_OF_RANGE, 0L, 0L,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the purpose of the test is to test the part when no records can be acquired. However, by passing in an error response, we will skip the acquire call.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I have added other scenrio with empty as well.

@apoorvmittal10
Copy link
Collaborator Author

@junrao @chia7712 Sorry for the delay, I was away. Can you please review once again, I have addressed the comments.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the updated PR. A couple of more comments.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the updated. The changes LGTM. Could you rebase?

@apoorvmittal10
Copy link
Collaborator Author

@apoorvmittal10 : Thanks for the updated. The changes LGTM. Could you rebase?

Thanks @junrao for the review. I have rebased it.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for rebasing. Just one minor comment.

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 thanks for this patch. overall LGTM

assertTrue(resultData.get(tp0).acquiredRecords().isEmpty());
assertEquals(Errors.NONE.code(), resultData.get(tp0).errorCode());

Mockito.verify(sp0, times(1)).updateCacheAndOffsets(any(Long.class));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fetchOffsetForTimestamp is mock, so we can verify the actual value 1 instead of any(Long.class)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the updated PR. LGTM

@junrao junrao merged commit b98aee3 into apache:trunk Oct 10, 2024
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Kafka Broker KIP-932 Queues for Kafka
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants