Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIKAFKA-41423: Kafka federation proof of concept using dual-cluster integration test (ProxyBasedFederationTest) #275

Closed
wants to merge 5 commits into from

Conversation

groelofs
Copy link

@groelofs groelofs commented Feb 9, 2022

First, a quick design note (relevant to the MetadataCache): the topics/partitions and nodes content of UpdateMetadataRequests (UMRs) remains unchanged, i.e., any given update contains all the nodes for that controller's set of brokers (only); controllers don't include other clusters' nodes in their own requests. The relevance to MetadataCache is that both the original version and the new one assume this and completely replace all the cached nodes for the single cluster that originated the request (NOT all the nodes in the federation).

The main changes live in:

  • KafkaController - new clusterId argument, passed down to ControllerChannelManager and ControllerBrokerRequestBatch constructors; new "pass-through" methods getBrokerNode() and addRemoteController() (probably temporary/test-only?); new rewriteAndForwardRemoteUpdateMetadataRequest() and sendRemoteUpdateMetadataRequest() methods (involved when the "broker half" receives a fresh UMR from a remote cluster and hands it off to the "controller half" to be rewritten and rerouted to its local brokers)
  • ControllerChannelManager - new remoteControllerIds set of remote/foreign controller nodes' broker IDs (to allow picking just the remote controllers out of the existing brokerStateInfo map, which is now shared by both local brokers and remote controllers); new getBrokerNode() method to tell remote controller(s) about the local controller and addRemoteController() method to receive and store such info; modifications to addNewBroker() to support both local brokers (existing case) and remote controllers (new/federation case); modified sendUpdateMetadataRequests() method to support clusterId in requests and to send local UpdateMetadataRequests to remote controllers (if federation enabled); new sendRemoteRequestToBrokers() method to send remote topic metadata to local brokers
  • KafkaApis - "broker half" that receives both normal UMRs and (if we're the lead controller node) UMRs from remote clusters in the federation; has the new "mini-router" in doHandleUpdateMetadataRequest() to distinguish the two cases and either process the UMR normally or, if federation is enabled and the UMR is a remote one and it's not already rewritten, send it to the "controller half" to be rewritten and forwarded to local brokers
  • MetadataCache - conversion of MetadataSnapshot to a trait/interface plus a single-cluster implementation ("SingleClusterMetadataSnapshot"); refactor of updateMetadata() to pull common code out into two new helper methods for sharing with FederatedMetadataCache; conversion of various methods to protected for same reason
  • FederatedMetadataCache - new federated MetadataSnapshot variant ("MultiClusterMetadataSnapshot" with an extra level of hashmap to support aliveNodes and aliveBrokers for multiple clusters); modifications to updateMetadata() to update only the relevant cluster's data in the new multi-cluster hashmaps
  • UpdateMetadataRequest.java - new clusterId argument and getters; new WrappingBuilder inner class to support the rerouting of remote requests, which are already "built" and otherwise don't easily work with the existing networking stack, which assumes Builder objects; new rewrite method for incoming remote requests (prior to rerouting them to the local brokers)
  • UpdateMetadataRequest.json - new tagged (optional) fields "ClusterId" and "RoutingClusterId"

The majority of the other files have only trivial changes, mostly involving the addition of a clusterId argument to certain constructors (e.g., UpdateMetadataRequest.Builder). There are also a few log-message tweaks to make it easier to highlight and/or grep for the clusterId uniformly in test output or other logging. I also have some leftover comments about what I thought might be bugs in the LCCR code that Lucas and I have discussed, but I didn't get around to removing my comments yet (in KafkaApis).

Testing-wise, everything lives in a single new test case at the bottom of ProxyBasedFederationTest. (Apologies for the huge work-in-progress TODO/FIXME/etc. comment...) The test still needs real assertions related to the presence or absence of various topics, but it does at least verify that a consumer can connect to a broker in either physical cluster and successfully read a given topic. In the interim it still has the old assertions from the PlaintextConsumerTest or whatever it was called; these are placeholders only.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@groelofs groelofs requested a review from gitlw February 9, 2022 06:50
…ntegration test (ProxyBasedFederationTest). Main changes live in KafkaController, ControllerChannelManager, KafkaApis, MetadataCache, and UpdateMetadataRequest itself (both .java and .json).
…ntegration test. (Fix for overlooked merge bug/typo.)
…ntegration test. (Fix for infinite loop in testBasicMultiClusterSetup(), plus indentation fix for ControllerChannelManager and cleanup of obsolete debug noise in KafkaController.)

// note that our caller already updated umr's controllerEpoch field (as well as others), so no need for that here
val updateMetadataRequestBuilder = new UpdateMetadataRequest.WrappingBuilder(umr)
updateMetadataRequestBrokerSet.intersect(controllerContext.liveOrShuttingDownBrokerIds).foreach {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the updateMetadataRequestBrokerSet and the controllerContext.liveOrShuttingDownBrokerIds would always be the same. I don't quite understand the rationale for the "intersect" operation here.

Also there might be issues with the concurrent access to the ControllerContext.
The existing requests are generated and enqueued by the ControllerEventThread, which is the only thread that's accessing states in the ControllerContext. Here we are accessing the ControllerContext state from a request-handler-thread.
Can you please check if we can follow the existing model by enqueuing an event to the KafkaController's eventManager, similar to how this API is implemented https://github.com/groelofs/kafka/blob/9bd81c19cceae503d994fa6c66c44b1ae59bbc28/core/src/main/scala/kafka/controller/KafkaController.scala#L2027 ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a condensed version of the previous method (sendRequestsToBrokers()), which calls sendUpdateMetadataRequests(), which does the exact same intersection. If there's an issue with mine, then isn't there just as much of an issue with the existing code?

data.setRoutingClusterId(routingClusterId);
data.setControllerId(controllerId);
data.setControllerEpoch(controllerEpoch);
// brokerEpoch apparently gets rewritten by the controller for every receiving broker (somewhere...):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To answer this comment, the brokerEpoch field can be left to its default value of -1 when the maxBrokerEpoch field is set. See https://github.com/groelofs/kafka/blob/4ba532d2b214f494cfe985ed08a18525146efe0d/core/src/main/scala/kafka/server/KafkaApis.scala#L3069 for details on how these two fields are used.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this rewriteRemoteRequest will only be called once from a single thread (which I think should be the ControllerEventThread as suggested above), before it goes into the queues of individual RequestSendThreads.
Thus, I think there is no need to hold the structLock, and the struct field should always be null since the UMR has just been deserialized from its wire format.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the link, but that's how it's used after the request is received. My comment was more about wherever it gets modified (in the request) prior to getting sent to any given broker, since broker epochs are per-broker, right? Or am I misunderstanding something fundamental? Is "broker epoch" not actually about brokers? (There's a separate controller epoch, so I didn't think that was the case.)

As to the lock, the method is only setting four simple fields in data and clearing the struct variable, so it's not like the lock will be held for long in any case. Is it really worth the risk? Upstream code could change at some point, and then there would be a hard-to-find race condition lurking. Seems a tad dangerous to me, don't you think?

…ntegration test. (Code-review fixes: refactor MetadataCache into legacy and federated-subclass variants, and merge ControllerChannelManager's brokerStateInfo and remoteControllerStateInfo maps in favor of simple remoteControllerIds hashset to mark remote controllers.)
…ntegration test. (Code-review fixes: rename UMR 'ClusterId' to 'OriginClusterId', and this time include new FederatedMetadataCache class, doh...)
@groelofs
Copy link
Author

Obsoleted by #326 .

@groelofs groelofs closed this Mar 31, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants