-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
LIKAFKA-41423: Kafka federation proof of concept using dual-cluster integration test (ProxyBasedFederationTest) #275
Conversation
…ntegration test (ProxyBasedFederationTest). Main changes live in KafkaController, ControllerChannelManager, KafkaApis, MetadataCache, and UpdateMetadataRequest itself (both .java and .json).
…ntegration test. (Fix for overlooked merge bug/typo.)
79232d8
to
fa534d4
Compare
…ntegration test. (Fix for infinite loop in testBasicMultiClusterSetup(), plus indentation fix for ControllerChannelManager and cleanup of obsolete debug noise in KafkaController.)
clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
Outdated
Show resolved
Hide resolved
|
||
// note that our caller already updated umr's controllerEpoch field (as well as others), so no need for that here | ||
val updateMetadataRequestBuilder = new UpdateMetadataRequest.WrappingBuilder(umr) | ||
updateMetadataRequestBrokerSet.intersect(controllerContext.liveOrShuttingDownBrokerIds).foreach { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems the updateMetadataRequestBrokerSet and the controllerContext.liveOrShuttingDownBrokerIds would always be the same. I don't quite understand the rationale for the "intersect" operation here.
Also there might be issues with the concurrent access to the ControllerContext.
The existing requests are generated and enqueued by the ControllerEventThread, which is the only thread that's accessing states in the ControllerContext. Here we are accessing the ControllerContext state from a request-handler-thread.
Can you please check if we can follow the existing model by enqueuing an event to the KafkaController's eventManager, similar to how this API is implemented https://github.com/groelofs/kafka/blob/9bd81c19cceae503d994fa6c66c44b1ae59bbc28/core/src/main/scala/kafka/controller/KafkaController.scala#L2027 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a condensed version of the previous method (sendRequestsToBrokers()), which calls sendUpdateMetadataRequests(), which does the exact same intersection. If there's an issue with mine, then isn't there just as much of an issue with the existing code?
data.setRoutingClusterId(routingClusterId); | ||
data.setControllerId(controllerId); | ||
data.setControllerEpoch(controllerEpoch); | ||
// brokerEpoch apparently gets rewritten by the controller for every receiving broker (somewhere...): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To answer this comment, the brokerEpoch field can be left to its default value of -1 when the maxBrokerEpoch field is set. See https://github.com/groelofs/kafka/blob/4ba532d2b214f494cfe985ed08a18525146efe0d/core/src/main/scala/kafka/server/KafkaApis.scala#L3069 for details on how these two fields are used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems this rewriteRemoteRequest will only be called once from a single thread (which I think should be the ControllerEventThread as suggested above), before it goes into the queues of individual RequestSendThreads.
Thus, I think there is no need to hold the structLock, and the struct field should always be null since the UMR has just been deserialized from its wire format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the link, but that's how it's used after the request is received. My comment was more about wherever it gets modified (in the request) prior to getting sent to any given broker, since broker epochs are per-broker, right? Or am I misunderstanding something fundamental? Is "broker epoch" not actually about brokers? (There's a separate controller epoch, so I didn't think that was the case.)
As to the lock, the method is only setting four simple fields in data and clearing the struct variable, so it's not like the lock will be held for long in any case. Is it really worth the risk? Upstream code could change at some point, and then there would be a hard-to-find race condition lurking. Seems a tad dangerous to me, don't you think?
core/src/main/scala/kafka/controller/ControllerChannelManager.scala
Outdated
Show resolved
Hide resolved
…ntegration test. (Code-review fixes: refactor MetadataCache into legacy and federated-subclass variants, and merge ControllerChannelManager's brokerStateInfo and remoteControllerStateInfo maps in favor of simple remoteControllerIds hashset to mark remote controllers.)
…ntegration test. (Code-review fixes: rename UMR 'ClusterId' to 'OriginClusterId', and this time include new FederatedMetadataCache class, doh...)
Obsoleted by #326 . |
First, a quick design note (relevant to the MetadataCache): the topics/partitions and nodes content of UpdateMetadataRequests (UMRs) remains unchanged, i.e., any given update contains all the nodes for that controller's set of brokers (only); controllers don't include other clusters' nodes in their own requests. The relevance to MetadataCache is that both the original version and the new one assume this and completely replace all the cached nodes for the single cluster that originated the request (NOT all the nodes in the federation).
The main changes live in:
The majority of the other files have only trivial changes, mostly involving the addition of a clusterId argument to certain constructors (e.g., UpdateMetadataRequest.Builder). There are also a few log-message tweaks to make it easier to highlight and/or grep for the clusterId uniformly in test output or other logging. I also have some leftover comments about what I thought might be bugs in the LCCR code that Lucas and I have discussed, but I didn't get around to removing my comments yet (in KafkaApis).
Testing-wise, everything lives in a single new test case at the bottom of ProxyBasedFederationTest. (Apologies for the huge work-in-progress TODO/FIXME/etc. comment...) The test still needs real assertions related to the presence or absence of various topics, but it does at least verify that a consumer can connect to a broker in either physical cluster and successfully read a given topic. In the interim it still has the old assertions from the PlaintextConsumerTest or whatever it was called; these are placeholders only.
Committer Checklist (excluded from commit message)