-
Notifications
You must be signed in to change notification settings - Fork 13.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-31275] extend lineage listener with QueryOperationEvent #26089
base: master
Are you sure you want to change the base?
[FLINK-31275] extend lineage listener with QueryOperationEvent #26089
Conversation
d9709d1
to
f15dd9e
Compare
@@ -27,6 +27,7 @@ Flink provides a pluggable interface for users to register their custom logic fo | |||
This enables users to implement their own flink lineage reporter to send lineage info to third party data lineage systems for example Datahub and Openlineage. | |||
|
|||
The job status changed listeners are triggered every time status change happened for the application. The data lineage info is included in the JobCreatedEvent. | |||
QueryOperationEvent can be used together with JobCreatedEvent to provide column level lineage for Flink SQL jobs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and for Table API jobs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to see some tests showing the column lineage is produced as expected for different topologies.
some comments:
|
Reviewed by Chi on 30/01/2025 Go back to the submitter with review comments. |
f15dd9e
to
65c7d94
Compare
Signed-off-by: Pawel Leszczynski <[email protected]>
65c7d94
to
fd6e351
Compare
Thank you @davidradl for your attention and comments provided.
The linked google doc describes in details motivation behind this PR and proves column level lineage metadata can be obtained with this change. It also contains my concern as this change creates another instance of
This PR provides an ability to enrich existing lineage metadata collected. The enrichment can be done only through jobId. That allows correlating column level lineage extracted from Calcite RelNode's with lineage from DefaultJobCreatedEvent, as both events share same jobId. Within this change, job status listeners are notified on all the I don't think it's possible to test all the possible TableEnvironmentImpl method calls or all the SQLs possible. However, this change notifies the listener always when having jobId + queryOperation.
Although it may be useful to extract column level lineage from CompiledPlan, as it is fully optimized and one can directly execute the plan, I was not able to find the proper abstractions within CompiledPlan to achieve this. I found Calcite's RelNode enables CLL extraction and the same approach has been chosen for another project - flink-sql-lineage - https://github.com/HamaWhiteGG/flink-sql-lineage. I think there should be in future another |
What is the purpose of the change
This change should enable extracting column level lineage for Flink SQL jobs.
This can be achieved by
JobStatusChangedListener
if Flink exposesQueryOperation
.More details on the approach can be found in docs: https://docs.google.com/document/d/1XmbHy6XqBrMoH9rkSyOG0wbwQZgf0epz-07lr_NfikI/edit?usp=sharing
Brief change log
QueryOperationEvent
which implementsQueryOperationEvent
*Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: yesDocumentation