-
Notifications
You must be signed in to change notification settings - Fork 16.2k
Description
Apache Airflow version
Other Airflow 3 version (please specify below)
If "Other Airflow 3 version" selected, which one?
3.1.1
What happened?
We have two tasks like so [live_values_loaded, backfill_values_loaded] >> EmptyOperator(trigger_rule="all_done_min_one_success"). However, the EmptyOperator execution gets skipped when one of the two tasks gets skipped and the other one is successful.
What you think should happen instead?
In the airflow 3.1 patch notes it says: ALL_DONE_MIN_ONE_SUCCESS: This rule triggers when all upstream tasks are done (success, failed, or skipped) and at least one has succeeded, filling a gap between existing trigger rules for complex workflow patterns.
This is why we expect if any of the two tasks [live_values_loaded, backfill_values_loaded] gets skipped but the other one succeeds, our EmptyOperator should run.
How to reproduce
We have a tasklow task load_values used like this:
# ...
live_values_loaded = load_values.override(task_id='live_values')()
backfill_values_loaded = load_values.override(task_id='backfill_values')()
get_live_values >> live_values_loaded
get_backfill_values >> backfill_values_loadedIf there are no backfill values to be loaded, backfill_values gets skipped.
If there are no new live values yet, live_values gets skipped.
So if their respective upstream is skipped, the downstream load is skipped.
We need to emit an asset event just if there are any new values loaded - doesn't matter if they are live values or backfill values.
So we added this:
[live_values_loaded, backfill_values_loaded] >> EmptyOperator(
task_id="emit_asset_updates",
trigger_rule="all_done_min_one_success",
outlets=[
Asset("tigerdata://db/table/new_data"),
],
)So this should be enough to reproduce:
@task
def load_values(skip: bool)
if skip:
raise AirflowSkipException()
else:
return
@dag
def sample_dag():
live_values_loaded = load_values.override(task_id='live_values')(skip=True)
backfill_values_loaded = load_values.override(task_id='backfill_values')(skip=False)
[live_values_loaded, backfill_values_loaded] >> EmptyOperator(
task_id="emit_asset_updates",
trigger_rule="all_done_min_one_success",
outlets=[
Asset("tigerdata://db/table/new_data"),
],
)Operating System
Astronomer Runtime 3.1-3
Versions of Apache Airflow Providers
No response
Deployment
Astronomer
Deployment details
Astronomer Runtime 3.1-3
Anything else?
This might be a documentation issue. Because in the code file trigger_rule_dep.py there is this code:
elif trigger_rule == TR.ALL_DONE_MIN_ONE_SUCCESS:
# For this trigger rule, skipped tasks are not considered "done"
non_skipped_done = success + failed + upstream_failed + removed
non_skipped_upstream = upstream - skipped
if skipped > 0:
# There are skipped tasks, so not all tasks are "done" for this rule
new_state = TaskInstanceState.SKIPPEDIf this really is the intended behavior then this is a documentation issue. Of course we would prefer that this is a code issue because to us it would be comfortable if skipped was considered here ;)
Are you willing to submit PR?
- Yes I am willing to submit a PR!
( Please let me know if skip should be added to the code or just the documentation should be adjusted. When this is decided I am happy to contribute a PR :) )
Code of Conduct
- I agree to follow this project's Code of Conduct