Skip to content

Conversation

robobario
Copy link
Member

This proposal outlines a new feature for the Kroxylicious filter framework: a mechanism to resolve topic names from their corresponding topic IDs. This will enable filters to implement business logic based on human-readable topic names, rather than the underlying Kafka implementation detail of topic IDs.

@robobario robobario requested a review from a team as a code owner August 15, 2025 04:54
@robobario
Copy link
Member Author

Here's a rough prototype kroxylicious/kroxylicious#2583


### 3. Performance and Caching

To avoid overwhelming the upstream cluster with frequent `Metadata` requests, the responses from this lookup should be cached. Equally we do not wish to impede client and other internal requests, we
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How long will the cache be valid for? What effect would a client's metadata request have on the cache?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about actions that we know will make the cache stale (create topic, delete topic etc).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should start off with a solution that doesn't cache, and add caching later if there is user demand for it. Users can always cache the results themselves, if they wish.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, very possible to implement it without caching first.

Mm for cache invalidation we should probably also react to RPCs that fail due to non-existent topic, like produce/consume.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should see how it behaves without caching before adding a cache.

However, one thing to consider is a chain of N filters which sequentially handle the same response R containing a UUID:

  1. Filter N handles R, sees unknown UUID x, requests metadata, gets metadata response forwards response R to the client
  2. Filter N-1 handles R, sees unknown UUID x, requests metadata, gets metadata response forwards response R to the client
  3. Filter N-2 handles R, sees unknown UUID x, requests metadata, gets metadata response forwards response R to the client
  4. ...You get the idea

To avoid this I think we could safely cache the Metadata response at least until we actually send the response to the client.

However, it's also quite possible the client also doesn't know x (more likely for larger clusters, where there's a higher probability that it found out x via a different connection). So slightly better would be to make use of R.correlation_id and keep the cached metadata response until we've handled request R.correlation_id+k, for some small k. Clients can pipeline requests, so while I think k=1 is (I think) perfectly safe, it might be beneficial to make k=5 (IIRC the default max inflight requests), or make it even smarter by accounting for the number of inflight requests we already know about. (We're returning R for correlation_id c, the highest correlation id we've observed for a request from this client is c+y, so keep the metadata respose till we've handled request c+y+1. That should catch the case where request c+y+1 is the client's metadata request for x. Might also want an upper bound on time-staleness too, just to be sure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid this I think we could safely cache the Metadata response at least until we actually send the response to the client.

I like this idea.

So slightly better would be to make use of R.correlation_id and keep the cached metadata response until we've handled request R.correlation_id+k, for some small k

It is a clever idea but it sounds tricky. Do we really need it? My gut says wait until we see a use-case where the problem bites, then add the clever code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have backed off including caching in the proposal, moving it to a future extensions section.


The 'edge' aspect is saying that we want to cache what the upstream said the topic names are, at the edge of the Proxy -> Upstream, but they must then traverse the Filter chain to maintain composability.

We can implement a very long lived cache invalidation strategy as Kafkatopics cannot be renamed. Inconsistencies will arise if the upstream topics are deleted but we should be able to rely on the upstream failing if we forward any interactions with deleted topics.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside: KIP-516 imagines a world where topics can be renamed. Obviously, we don't need to think about rightnow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I, too, don't want to think about this. But we should.

Kafka already has a mechanism for deferred deletion. I don't recall if that's just soft deleting the data, with the topic deleted from the metadata immediately (such that you can immediately recreate the topic once you have the deletion response success), or whether it imposes a delay before you can recreate the topic. If the latter then knowing the timeout would at least give us an upper bound on how long to leave it before revalidating caches.

* Asynchronously resolves a set of topic UUIDs to their corresponding topic names.
*
* @param topicUuids A Set of topic UUIDs to resolve.
* @return A CompletionStage that will complete with an unmodifiable Map<Uuid, String>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I question whether failing in the case where a Uuid is not found is the right behavior. The caller has no way to know which Uuid has been deleted, so I think you are really forcing them to query with singletons. I think in the case where a topic id no longer exists, returning a map with that key absent is better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also a bit cautious about this. "cannot be resolved" covers things like the user not being authorized to see the topic, as well as races wrt topic creation and deletion. We need to be sure that what the client observes is consistent with Kafka behaviour.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have reworked it with a Result class that will take either the name or an exception, so we can have specific exception types for non-existence vs authorization or other error codes from Kafka.


The change to `FilterContext` is **backwards compatible**, as it involves adding a new default method.

The underlying `Metadata` request will use the lowest API version capable of looking up topics by ID. The rationale is that if a client has received topic IDs, the upstream broker must be capable of resolving them via a `Metadata` request.
Copy link
Member

@k-wall k-wall Aug 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach might invite weird broker bugs to surface. The Broker will be seeing the client's metadata requests at one version and potentially the proxy's own requests at a different one, all down the same channel. I doubt that case gets any testing. I think it would be simpler to say that the API fails with an UnsupportOperatorException if the negotiated metadata api version is too low.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That might be a bit tricky, the client knows the ApiVersion range supported for Metadata requests but it does not have to send a Metadata request on every connection. It might get Metadata on the bootstrap connection, then connect to a broker, get ApiVersions then start sending Produce Requests at it. The proxy won't know the actual version the client will send if it discovers the metadata needs to be updated.

We could expose the negotiated ApiVersions to this mechanism and use the highest version supported by proxy/broker, but we don't know the clients capabilities.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proxy won't know the actual version the client will send if it discovers the metadata needs to be updated.

https://kafka.apache.org/protocol#api_versions say "If multiple versions of an API are supported by broker and client, clients are recommended to use the latest version supported by the broker and itself.". That's a should rather than a must.

Copy link
Member Author

@robobario robobario Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to pick the highest version supported by proxy/broker for the out-of-band request, what I'm saying is it will be hard to guarantee that we send with the same API version that the downstream client would pick because the proxy doesn't know the downstream client's capabilities. But happy to default to highest supported version, it will probably have more test coverage in apache/kafka.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll also need to implement something in this case, something that can remember the API versions we sent to the downstream client for the channel. So that we can pick the highest supported version.

CompletionStage<Map<Uuid, String>> getTopicNames(Set<Uuid> topicUuids);
```

This method will internally send a `Metadata` request to the upstream Kafka cluster to fetch the required topic information.
Copy link
Member

@k-wall k-wall Aug 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this must be sending a request for all topics rather than just the subset queried by the user? I am guessing too that includeClusterAuthorizedOperations etc will be false to minimize the amount of the work the server has to do.

It is a pity one can't query for topics without pulling the partition information (which might be huge).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had imagined only querying the topic ids that we are immediately interested in. I guess doing them all at once would save incrementally building the cache up.


Kafka ACLs may restrict a user's access to topic metadata. We must not introduce a security vulnerability that leaks information about topics to unauthorized users.

Therefore, the initial implementation of the cache will be **per-channel**. This approach scopes the cache 1:1 with an upstream connection and its authenticated principal, ensuring that a user can only receive metadata they are authorized to see. In the future, we could explore a virtual-cluster-level cache, but this would require users to explicitly opt-in after being warned of the potential security trade-offs.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'd need the proxy to authenticate itself with credentials with sufficient privileges to list all the topics. Filtering the results would be hard to achieve.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hard to achieve.

except in the case where the proxy is the sole arbiter of authZ.

@k-wall
Copy link
Member

k-wall commented Aug 26, 2025

Thanks for the write up @robobario. I left some comments but I think the shape is broadly good.
I'd mention this proposal on Slack and see if anyone in the community has any opinions.

Copy link
Member

@tombentley tombentley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @robobario

* Asynchronously resolves a set of topic UUIDs to their corresponding topic names.
*
* @param topicUuids A Set of topic UUIDs to resolve.
* @return A CompletionStage that will complete with an unmodifiable Map<Uuid, String>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also a bit cautious about this. "cannot be resolved" covers things like the user not being authorized to see the topic, as well as races wrt topic creation and deletion. We need to be sure that what the client observes is consistent with Kafka behaviour.


### 3. Performance and Caching

To avoid overwhelming the upstream cluster with frequent `Metadata` requests, the responses from this lookup should be cached. Equally we do not wish to impede client and other internal requests, we
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should see how it behaves without caching before adding a cache.

However, one thing to consider is a chain of N filters which sequentially handle the same response R containing a UUID:

  1. Filter N handles R, sees unknown UUID x, requests metadata, gets metadata response forwards response R to the client
  2. Filter N-1 handles R, sees unknown UUID x, requests metadata, gets metadata response forwards response R to the client
  3. Filter N-2 handles R, sees unknown UUID x, requests metadata, gets metadata response forwards response R to the client
  4. ...You get the idea

To avoid this I think we could safely cache the Metadata response at least until we actually send the response to the client.

However, it's also quite possible the client also doesn't know x (more likely for larger clusters, where there's a higher probability that it found out x via a different connection). So slightly better would be to make use of R.correlation_id and keep the cached metadata response until we've handled request R.correlation_id+k, for some small k. Clients can pipeline requests, so while I think k=1 is (I think) perfectly safe, it might be beneficial to make k=5 (IIRC the default max inflight requests), or make it even smarter by accounting for the number of inflight requests we already know about. (We're returning R for correlation_id c, the highest correlation id we've observed for a request from this client is c+y, so keep the metadata respose till we've handled request c+y+1. That should catch the case where request c+y+1 is the client's metadata request for x. Might also want an upper bound on time-staleness too, just to be sure.


The 'edge' aspect is saying that we want to cache what the upstream said the topic names are, at the edge of the Proxy -> Upstream, but they must then traverse the Filter chain to maintain composability.

We can implement a very long lived cache invalidation strategy as Kafkatopics cannot be renamed. Inconsistencies will arise if the upstream topics are deleted but we should be able to rely on the upstream failing if we forward any interactions with deleted topics.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I, too, don't want to think about this. But we should.

Kafka already has a mechanism for deferred deletion. I don't recall if that's just soft deleting the data, with the topic deleted from the metadata immediately (such that you can immediately recreate the topic once you have the deletion response success), or whether it imposes a delay before you can recreate the topic. If the latter then knowing the timeout would at least give us an upper bound on how long to leave it before revalidating caches.


Kafka ACLs may restrict a user's access to topic metadata. We must not introduce a security vulnerability that leaks information about topics to unauthorized users.

Therefore, the initial implementation of the cache will be **per-channel**. This approach scopes the cache 1:1 with an upstream connection and its authenticated principal, ensuring that a user can only receive metadata they are authorized to see. In the future, we could explore a virtual-cluster-level cache, but this would require users to explicitly opt-in after being warned of the potential security trade-offs.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hard to achieve.

except in the case where the proxy is the sole arbiter of authZ.

@k-wall k-wall changed the title Add topic name lookup facility 008 - Add topic name lookup facility Sep 19, 2025
Signed-off-by: Robert Young <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: No status
Development

Successfully merging this pull request may close these issues.

3 participants