-
Notifications
You must be signed in to change notification settings - Fork 14.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Adjust DBT OpenLineage to Airflow 3 and improve logging #47500
base: main
Are you sure you want to change the base?
feat: Adjust DBT OpenLineage to Airflow 3 and improve logging #47500
Conversation
if AIRFLOW_V_3_0_PLUS: | ||
dagrun = task_instance.get_template_context()["dag_run"] | ||
return dagrun.logical_date or dagrun.run_after |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mobuchowski Does this look right? I was trying to mimic OL's listener behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, given that you don't have to deal with tests without dag_run
:)
@@ -286,6 +286,10 @@ def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage: | |||
|
|||
if isinstance(self.run_id, int) and self.wait_for_termination is True: | |||
return generate_openlineage_events_from_dbt_cloud_run(operator=self, task_instance=task_instance) | |||
self.log.info( | |||
"Extraction of OpenLineage events from DBT will be skipped: " | |||
"`run_id` is not set OR `self.wait_for_termination` is False. " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you're logging this, you might as well display which one of those is true
except Exception: # type: ignore | ||
log.info( | ||
"Openlineage could not find DBT catalog artifact, usually available when docs are generated." | ||
"Proceeding with metadata extraction. " | ||
"If you see error logs above about `HTTP error: Not Found` it's safe to ignore them." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that's true and the feature is optional, we should not log those errors and maybe display one collective error message at the end if possible.
Main change in this PR is adjusting the way
_get_logical_date
function works, now it should mimic OpenLineage listener's behavior. This is crucial, as OL run_id generated here must match the one generated by the listener, so that the parent-child relationship is correctly created.Additionally, some logs have been added that will be useful for debugging any issues with dbt OL extraction.
cc @mobuchowski
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.