-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17348: Add custom produce request parsers #16812
KAFKA-17348: Add custom produce request parsers #16812
Conversation
Is there a jira or a KIP for this feature? |
I did not create one. Where do I find out how to do that? Thank you. |
I believe you need to create a KIP for this as it is a new configuration for the broker side and this should be under changing "public interfaces" that need a KIP. To create a KIP you need to follow the steps here https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals#KafkaImprovementProposals-GettingStarted |
if (null != produceRequestParserClassName) { | ||
log.debug("ProduceRequestParser class {} from system property {}", produceRequestParserClassName, PRODUCE_REQUEST_PARSER_PROPERTY); | ||
} else { | ||
produceRequestParserClassName = System.getenv(PRODUCE_REQUEST_PARSER_ENV); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Majority of Kafka config doesn't use env variables. Please check KafkaConfig.scala
if you want to add a config to broker side
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking for best way to get KafkaConfig into this component. Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an efficient way to use KafkaConfig in static context of a java class? I just scanned all the code and did not find a precedent. ProduceRequest.parse() is static, and all my changes are around delegating the behavior of that method to a separate class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not that I'm aware of, also if you need this config to be dynamic we have DynamicBrokerConfig which depends on KafkaConfig.
I would suggest that you open a KIP for discussion with a clear motivation for why you think this feature is important which is the most important part then the community can discuss the technical proposal of how would these config get loaded on the mailing list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. I submitted a request to create an account. Will bring this up as soon as it is created. Thank you for your guidance.
69a8bb6
to
818ee8a
Compare
Replaced by #17324
This PR adds ability to specify a custom produce request parser. A custom produce request parser would allow to intercept all incoming messages before they get into the broker and apply broker wide logic to the messages. This could be a trace , a filter, or a transform(such as lineage).
The inline new ProduceRequest(new ProduceRequestData(new ByteBufferAccessor(buffer), version), version) was moved out of ProduceRequest.java into its own class ProduceRequestParser.java, and ProduceRequest class was augmented with the dynamic loading of a specified parser class. When no class is specified, the default one is loaded maintaining full backward compatibilty.
JIRA: KAFKA-17348
KIP: KIP-1086
Committer Checklist (excluded from commit message)