-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17509: Introduce a delayed action queue to complete purgatory actions outside purgatory. #17177
base: trunk
Are you sure you want to change the base?
Conversation
I've checked the 3 test failures. They are unrelated to the PR. I ran all of them locally and they all passed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch @adixitconfluent!
Here's my understanding of the current share fetch handling
- KafkaApis is calling into SPM to enqueue a share request
- SPM#maybeProcessFetchQueue runs recursively (! 🙀) until the queue is empty
- On each iteration, we get a share fetch request off the queue, do some validation and enqueue a DelayedShareFetch
Since adding the DelayedShareFetch to the purgatory is non-blocking, I'm pretty sure we are essentially not using the fetch queue any more. Or rather, we are now using the DelayedShareFetch purgatory as a fetch queue (which was the goal of the refactoring, after all).
For fetchQueue
I don't see too many remaining usages:
- Adding in SPM#fetchMessages (from KafkaApis)
- completeExceptionally in SPM#close
- Polling in SPM#maybeProcessFetchQueue
Since this closely matches our DelayedShareFetch usage, I'm wondering if we can remove the fetchQueue code in this PR.
WDYT?
// then we should check if there is a pending share fetch request for the topic-partition and complete it. | ||
// We add the action to delayed actions queue to avoid an infinite call stack, which could happen if | ||
// we directly call delayedShareFetchPurgatory.checkAndComplete | ||
delayedActionQueue.add(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adixitconfluent I'm a little confused by the async code here. We are gathering some futures in ShareFetchUtils#processFetchResponse, but when I look down into SharePartition#acquire it's all synchronous/blocking code (it just returns a completed CompletableFuture).
Is this some leftovers from the refactoring? Or do we intend to make SharePartition#acquire async?
I ask this because if we're not keeping the CompletableFuture return type in SharePartition#acquire, we can fix it in this PR and avoid some complexity here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mumrah , we created a JIRA https://issues.apache.org/jira/browse/KAFKA-17522 for tracking this issue earlier. Yes, it makes sense that share partition acquire
functionality need not return a future. I am not sure whether I should cover it in this PR itself.
@apoorvmittal10 any thoughts whether we should cover it in this PR or since the JIRA is assigned to you, if you're working on it already, we can have another PR for the resolution?
hi @mumrah,
You're right, we don't need the fetch queue. I have created a JIRA https://issues.apache.org/jira/browse/KAFKA-17545 for it, and will prioritize it in the coming PRs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adixitconfluent : Thanks for the PR. Added a few comments.
core/src/main/java/kafka/server/share/SharePartitionManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 comment for my knowledge.
*/ | ||
def addCompleteDelayedShareFetchPurgatoryAction(topicIdPartitions: Seq[TopicIdPartition], | ||
groupId: String, | ||
delayedShareFetchPurgatory: DelayedOperationPurgatory[DelayedShareFetch]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my knowledge, will it not be better to declare delayedShareFetchPurgatory
in replica manager itself as like other purgatories and define methods to append the requests? I see a lot purgatories are already defined there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @apoorvmittal10 , we can do it but not sure whether it is good to add more scala code. If we declare it in replicaManager, we'll also have to write the functions to add functionalities for delayedShareFetchPurgatory.tryCompleteElseWatch
and delayedShareFetchPurgatory.checkAndComplete
in replicaManager which are going to be utilized in SharePartitionManager
and SharePartition
. Additionally, we'll need to pass a replicaManager
object to SharePartition class as well, so its an added dependency. Considering all these factors, I feel it is better if we declare it in within SharePartitionManager
. Let me know if my thoughts don't make sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with your thoughts and they does make sense.
But I am skeptical about having a single purgatory out of replica manager is not a good idea, if we do have other instances then it's fine. There are metrics and definite standard code to shutdown purgatory and related instances as boilerplate code in replica manager. I know you could handle them separetely as well but it might be better to have them in single place.
I can see list offset purgatory is added recently as well, so I I think it might be ok to have the scala code in old classes. I am not an expert in the area and leave it to @junrao or @mumrah to decide. I just found the API to pass delayed purgatory in replica manager a bit odd.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adixitconfluent : Thanks for the updated PR. Added one more comment.
// then we should check if there is a pending share fetch request for the topic-partition and complete it. | ||
// We add the action to delayed actions queue to avoid an infinite call stack, which could happen if | ||
// we directly call delayedShareFetchPurgatory.checkAndComplete | ||
replicaManager.addCompleteDelayedShareFetchPurgatoryAction(CollectionConverters.asScala(result.keySet()).toSeq(), shareFetchData.groupId(), delayedShareFetchPurgatory); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit weird to add the delayedAction through ReplicaManager. Perhaps we could create DelayedActionQueue in KafkaApis and pass it to both ReplicaManager and DelayedShareFetch.
About
In reference to comment #16969 (comment) , I have introduced a
DelayedActionQueue
to add purgatory actions and try to complete them.DelayedActionQueue
when partition locks are released after fetch inforceComplete
. Also, code has been added toonExpiration
to check the delayed actions queue and try to complete it. SinceonExpiration
serves as a callback forforceComplete
, it should not lead to infinite call stack.DelayedShareFetchTest
which were occurring due to insufficient mocking.Testing
The code has been tested with the help of unit tests.