Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-31275] extend lineage listener with QueryOperationEvent #26089

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

pawel-big-lebowski
Copy link

@pawel-big-lebowski pawel-big-lebowski commented Jan 29, 2025

What is the purpose of the change

This change should enable extracting column level lineage for Flink SQL jobs.
This can be achieved by JobStatusChangedListener if Flink exposes QueryOperation.
More details on the approach can be found in docs: https://docs.google.com/document/d/1XmbHy6XqBrMoH9rkSyOG0wbwQZgf0epz-07lr_NfikI/edit?usp=sharing

Brief change log

  • *Introduce new QueryOperationEvent which implements QueryOperationEvent *
  • notify query operations from TableEnvironmentImpl

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? Added extra info on job listeners' page.

@pawel-big-lebowski pawel-big-lebowski marked this pull request as draft January 29, 2025 13:15
@flinkbot
Copy link
Collaborator

flinkbot commented Jan 29, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@pawel-big-lebowski pawel-big-lebowski marked this pull request as ready for review January 29, 2025 14:45
@@ -27,6 +27,7 @@ Flink provides a pluggable interface for users to register their custom logic fo
This enables users to implement their own flink lineage reporter to send lineage info to third party data lineage systems for example Datahub and Openlineage.

The job status changed listeners are triggered every time status change happened for the application. The data lineage info is included in the JobCreatedEvent.
QueryOperationEvent can be used together with JobCreatedEvent to provide column level lineage for Flink SQL jobs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and for Table API jobs?

Copy link
Contributor

@davidradl davidradl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to see some tests showing the column lineage is produced as expected for different topologies.

@davidradl
Copy link
Contributor

some comments:

  • it would be great to get this very useful capability into Flink. Thanks for the contribution
  • I see that the document pointed to, has some side effects - are these still the case, if so do we need to document them?
  • I see that spark implements CLL at the optimization level. Are we doing CCL on the optimized logical plan? If not what is the thinking around this?

@davidradl
Copy link
Contributor

Reviewed by Chi on 30/01/2025 Go back to the submitter with review comments.

@pawel-big-lebowski
Copy link
Author

Thank you @davidradl for your attention and comments provided.

I see that the document pointed to, has some side effects - are these still the case, if so do we need to document them?

The linked google doc describes in details motivation behind this PR and proves column level lineage metadata can be obtained with this change. It also contains my concern as this change creates another instance of JobStatusChangedListener. Within TableEnvironmentImpl I was not able to reach the listeners from executors (#24754 -> this PR introduced the listeners). I included a comment within the code.

I would like to see some tests showing the column lineage is produced as expected for different topologies.

This PR provides an ability to enrich existing lineage metadata collected. The enrichment can be done only through jobId. That allows correlating column level lineage extracted from Calcite RelNode's with lineage from DefaultJobCreatedEvent, as both events share same jobId. Within this change, job status listeners are notified on all the execEnv.executeAsync calls, when job client is created.

I don't think it's possible to test all the possible TableEnvironmentImpl method calls or all the SQLs possible. However, this change notifies the listener always when having jobId + queryOperation.

I see that spark implements CLL at the optimization level. Are we doing CCL on the optimized logical plan? If not what is the thinking around this?

Although it may be useful to extract column level lineage from CompiledPlan, as it is fully optimized and one can directly execute the plan, I was not able to find the proper abstractions within CompiledPlan to achieve this. I found Calcite's RelNode enables CLL extraction and the same approach has been chosen for another project - flink-sql-lineage - https://github.com/HamaWhiteGG/flink-sql-lineage.

I think there should be in future another CompiledPlanEvent that notifies the job listener as well. In this case, it will be the listener to decide whether to extract lineage metadata from RelNodes, CompiledPlan, etc. However, at the moment I didn't find the way to connect CompiledPlan with the JobId.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants