Skip to content

all_done_min_one_success task getting skipped when one upstream task is skipped contracting documented behavior #59925

@DataCerealz

Description

@DataCerealz

Apache Airflow version

Other Airflow 3 version (please specify below)

If "Other Airflow 3 version" selected, which one?

3.1.1

What happened?

We have two tasks like so [live_values_loaded, backfill_values_loaded] >> EmptyOperator(trigger_rule="all_done_min_one_success"). However, the EmptyOperator execution gets skipped when one of the two tasks gets skipped and the other one is successful.

What you think should happen instead?

In the airflow 3.1 patch notes it says: ALL_DONE_MIN_ONE_SUCCESS: This rule triggers when all upstream tasks are done (success, failed, or skipped) and at least one has succeeded, filling a gap between existing trigger rules for complex workflow patterns.

This is why we expect if any of the two tasks [live_values_loaded, backfill_values_loaded] gets skipped but the other one succeeds, our EmptyOperator should run.

How to reproduce

We have a tasklow task load_values used like this:

# ...
live_values_loaded = load_values.override(task_id='live_values')()
backfill_values_loaded = load_values.override(task_id='backfill_values')()

get_live_values >> live_values_loaded
get_backfill_values >> backfill_values_loaded

If there are no backfill values to be loaded, backfill_values gets skipped.
If there are no new live values yet, live_values gets skipped.
So if their respective upstream is skipped, the downstream load is skipped.

We need to emit an asset event just if there are any new values loaded - doesn't matter if they are live values or backfill values.

So we added this:

[live_values_loaded, backfill_values_loaded] >> EmptyOperator(
	task_id="emit_asset_updates",
	trigger_rule="all_done_min_one_success",
	outlets=[
		Asset("tigerdata://db/table/new_data"),
	],
)

So this should be enough to reproduce:

@task
def load_values(skip: bool)
  if skip:
    raise AirflowSkipException()
  else:
    return

@dag
def sample_dag():

  live_values_loaded = load_values.override(task_id='live_values')(skip=True)
  backfill_values_loaded = load_values.override(task_id='backfill_values')(skip=False)
  
  [live_values_loaded, backfill_values_loaded] >> EmptyOperator(
  	task_id="emit_asset_updates",
  	trigger_rule="all_done_min_one_success",
  	outlets=[
  		Asset("tigerdata://db/table/new_data"),
  	],
  )

Operating System

Astronomer Runtime 3.1-3

Versions of Apache Airflow Providers

No response

Deployment

Astronomer

Deployment details

Astronomer Runtime 3.1-3

Anything else?

This might be a documentation issue. Because in the code file trigger_rule_dep.py there is this code:

              elif trigger_rule == TR.ALL_DONE_MIN_ONE_SUCCESS:
                  # For this trigger rule, skipped tasks are not considered "done"
                    non_skipped_done = success + failed + upstream_failed + removed
                    non_skipped_upstream = upstream - skipped

                    if skipped > 0:
                        # There are skipped tasks, so not all tasks are "done" for this rule
                        new_state = TaskInstanceState.SKIPPED

If this really is the intended behavior then this is a documentation issue. Of course we would prefer that this is a code issue because to us it would be comfortable if skipped was considered here ;)

Are you willing to submit PR?

  • Yes I am willing to submit a PR!
    ( Please let me know if skip should be added to the code or just the documentation should be adjusted. When this is decided I am happy to contribute a PR :) )

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions