Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adjust DBT OpenLineage to Airflow 3 and improve logging #47500

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

kacpermuda
Copy link
Contributor

Main change in this PR is adjusting the way _get_logical_date function works, now it should mimic OpenLineage listener's behavior. This is crucial, as OL run_id generated here must match the one generated by the listener, so that the parent-child relationship is correctly created.

Additionally, some logs have been added that will be useful for debugging any issues with dbt OL extraction.

cc @mobuchowski


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

Comment on lines +38 to +40
if AIRFLOW_V_3_0_PLUS:
dagrun = task_instance.get_template_context()["dag_run"]
return dagrun.logical_date or dagrun.run_after
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mobuchowski Does this look right? I was trying to mimic OL's listener behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, given that you don't have to deal with tests without dag_run :)

@@ -286,6 +286,10 @@ def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage:

if isinstance(self.run_id, int) and self.wait_for_termination is True:
return generate_openlineage_events_from_dbt_cloud_run(operator=self, task_instance=task_instance)
self.log.info(
"Extraction of OpenLineage events from DBT will be skipped: "
"`run_id` is not set OR `self.wait_for_termination` is False. "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're logging this, you might as well display which one of those is true

Comment on lines +120 to +125
except Exception: # type: ignore
log.info(
"Openlineage could not find DBT catalog artifact, usually available when docs are generated."
"Proceeding with metadata extraction. "
"If you see error logs above about `HTTP error: Not Found` it's safe to ignore them."
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's true and the feature is optional, we should not log those errors and maybe display one collective error message at the end if possible.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants