Skip to content

Conversation

pyalex
Copy link

@pyalex pyalex commented Jul 18, 2025

Summary

Adding functions to the Python SDK for source objects creation. This is a fully backward-compatible change. Users can continue to use both thrift-based classes and new Python wrappers.

Why / Goal

The primary motivation is to enable the addition of extra attributes at the source level. Similarly to how it's done in GroupBy and Join: all extra arguments are stored in the customJson attribute in Thrift.
Sources can have all sorts of metadata, ie bootstrap.server for Kafka source, which can be helpful for a streaming job.

Additional benefits:

  • Less verbose API
    before:
my_source = ttypes.Source(
    events=ttypes.EventSource(
          table=...
    )
)

after:

my_source = source.EventSource(
    table=...
)
  • Improving API consistency: existing Python wrappers (ie, GroupBy, Join) use Pythonic snake case for parameter names, whereas code generated from Thrift uses camel case (ie, snapshotTable in EntitySource)
  • Omitting a required attribute will produce a more meaningful error

Test Plan

  • Added Unit Tests
  • [ x ] Covered by existing CI
  • Integration tested

Checklist

  • Documentation update

Reviewers

@pyalex pyalex changed the title Python wrappers for source objects Python wrappers for source creation Jul 18, 2025
Oleksii Moskalenko added 3 commits July 18, 2025 13:32
@pyalex pyalex marked this pull request as ready for review July 18, 2025 17:59
Copy link
Collaborator

@nikhil-zlai nikhil-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the source wrappers. Can you edit docs also please?

Given that there is already a way to specify stream config alongside the topic, can we skip the api change to add customJson?

For specifying streaming params - we use topicInfo:

// kafka://topic_name/schema=my_schema/host=X/port=Y should parse into TopicInfo(topic_name, kafka, {schema: my_schema, host: X, port Y})

case class TopicInfo(name: String, topicType: String, params: Map[String, String])

 topic="kafka://topic_name/schema=my_schema/host=X/port=Y" will parse into 

TopicInfo(topic_name, kafka, {schema: my_schema, host: X, port Y})

@pyalex
Copy link
Author

pyalex commented Jul 18, 2025

Hey @nikhil-zlai , thanks for the review!
There's more use for those extra attributes, than just Kafka host and port. For example, I want to store the Avro JSON schema near the source definition and attach it to the source. Or specify all kinds of Kafka consumer properties.

TopicInfo has limited usage since it makes / and = special symbols, and if I were to add anything encoded with base64 to this topic string, it would simply break.

Oleksii Moskalenko added 2 commits July 18, 2025 16:09
@pyalex
Copy link
Author

pyalex commented Jul 18, 2025

Updated docs

@nikhil-zlai
Copy link
Collaborator

I want to store the Avro JSON schema near the source definition and attach it to the source. Or specify all kinds of Kafka consumer properties.

I see. That definitely justifies the change.

@pyalex pyalex requested review from hzding621 and nikhil-zlai July 21, 2025 14:58
Copy link
Collaborator

@nikhil-zlai nikhil-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one minor nit. but lgtm!

@pyalex
Copy link
Author

pyalex commented Jul 23, 2025

@hzding621, please take another look

@pyalex
Copy link
Author

pyalex commented Aug 4, 2025

Ping @hzding621

/**
* Any extra attributes can be stored here. Ie, Kafka bootstrap servers for a streaming source, or AWS IAM role for accessing Iceberg table
**/
5: optional string customJson
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we change it to use the MetaData, because we already have customJson in MetaData. It is easier to wrap everything in Metadata so we don't have to add new fields in the future

Suggested change
5: optional string customJson
5: optional MetaData metaData

/**
* Any extra attributes can be stored here. Ie, Kafka bootstrap servers for a streaming source, or AWS IAM role for accessing Iceberg table
**/
5: optional string customJson
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto, use MetaData instead of creating a new customJson field.

Suggested change
5: optional string customJson
5: optional MetaData metaData

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants