-
Notifications
You must be signed in to change notification settings - Fork 81
Python wrappers for source creation #1016
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the source wrappers. Can you edit docs also please?
Given that there is already a way to specify stream config alongside the topic, can we skip the api change to add customJson?
For specifying streaming params - we use topicInfo:
// kafka://topic_name/schema=my_schema/host=X/port=Y should parse into TopicInfo(topic_name, kafka, {schema: my_schema, host: X, port Y}) |
case class TopicInfo(name: String, topicType: String, params: Map[String, String])
topic="kafka://topic_name/schema=my_schema/host=X/port=Y" will parse into
TopicInfo(topic_name, kafka, {schema: my_schema, host: X, port Y})
api/py/test/sample/production/group_bys/sample_team/event_sample_group_by.v1
Outdated
Show resolved
Hide resolved
Hey @nikhil-zlai , thanks for the review! TopicInfo has limited usage since it makes |
Updated docs |
I see. That definitely justifies the change. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one minor nit. but lgtm!
@hzding621, please take another look |
Ping @hzding621 |
/** | ||
* Any extra attributes can be stored here. Ie, Kafka bootstrap servers for a streaming source, or AWS IAM role for accessing Iceberg table | ||
**/ | ||
5: optional string customJson |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we change it to use the MetaData, because we already have customJson
in MetaData. It is easier to wrap everything in Metadata so we don't have to add new fields in the future
5: optional string customJson | |
5: optional MetaData metaData |
/** | ||
* Any extra attributes can be stored here. Ie, Kafka bootstrap servers for a streaming source, or AWS IAM role for accessing Iceberg table | ||
**/ | ||
5: optional string customJson |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto, use MetaData instead of creating a new customJson
field.
5: optional string customJson | |
5: optional MetaData metaData |
Summary
Adding functions to the Python SDK for source objects creation. This is a fully backward-compatible change. Users can continue to use both thrift-based classes and new Python wrappers.
Why / Goal
The primary motivation is to enable the addition of extra attributes at the source level. Similarly to how it's done in
GroupBy
andJoin
: all extra arguments are stored in thecustomJson
attribute in Thrift.Sources can have all sorts of metadata, ie
bootstrap.server
for Kafka source, which can be helpful for a streaming job.Additional benefits:
before:
after:
GroupBy
,Join
) use Pythonic snake case for parameter names, whereas code generated from Thrift uses camel case (ie,snapshotTable
inEntitySource
)Test Plan
Checklist
Reviewers