From a4bd9f9e20e353d2301a9e93ee13d4c79ae4b79a Mon Sep 17 00:00:00 2001 From: Anmol Mishra Date: Sun, 31 May 2026 17:37:03 +0530 Subject: [PATCH 1/4] fix: restore Airflow 2 Task Instances view semantics for mark-success Closes #67707 --- .../airflow/serialization/definitions/dag.py | 130 ++++++++++-------- airflow-core/tests/unit/models/test_dag.py | 69 +++++++++- 2 files changed, 136 insertions(+), 63 deletions(-) diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py b/airflow-core/src/airflow/serialization/definitions/dag.py index 6fb7bf083cf76..f66f847f1c148 100644 --- a/airflow-core/src/airflow/serialization/definitions/dag.py +++ b/airflow-core/src/airflow/serialization/definitions/dag.py @@ -736,45 +736,49 @@ def set_task_instance_state( return altered # Clear downstream tasks that are in failed/upstream_failed state to resume them. - # Flush the session so that the tasks marked success are reflected in the db. - session.flush() - subset = self.partial_subset( - task_ids={task_id}, - include_downstream=True, - include_upstream=False, - ) - - # Raises an error if not found - dr_id, logical_date = session.execute( - select(DagRun.id, DagRun.logical_date).where( - DagRun.run_id == run_id, DagRun.dag_id == self.dag_id + # Only clear downstreams when ``downstream=True`` is explicitly passed, so that + # marking a single task instance as success (e.g. from the Task Instances view) + # does not unexpectedly resume downstream tasks — restoring Airflow 2 behavior. + if downstream: + # Flush the session so that the tasks marked success are reflected in the db. + session.flush() + subset = self.partial_subset( + task_ids={task_id}, + include_downstream=True, + include_upstream=False, ) - ).one() - # Now we want to clear downstreams of tasks that had their state set... - clear_kwargs = { - "only_failed": True, - "session": session, - # Exclude the task itself from being cleared. - "exclude_task_ids": frozenset((task_id,)), - } - if not future and not past: # Simple case 1: we're only dealing with exactly one run. - clear_kwargs["run_id"] = run_id - subset.clear(**clear_kwargs) - elif future and past: # Simple case 2: we're clearing ALL runs. - subset.clear(**clear_kwargs) - else: # Complex cases: we may have more than one run, based on a date range. - # Make 'future' and 'past' make some sense when multiple runs exist - # for the same logical date. We order runs by their id and only - # clear runs have larger/smaller ids. - exclude_run_id_stmt = select(DagRun.run_id).where(DagRun.logical_date == logical_date) - if future: - clear_kwargs["start_date"] = logical_date - exclude_run_id_stmt = exclude_run_id_stmt.where(DagRun.id > dr_id) - else: - clear_kwargs["end_date"] = logical_date - exclude_run_id_stmt = exclude_run_id_stmt.where(DagRun.id < dr_id) - subset.clear(exclude_run_ids=frozenset(session.scalars(exclude_run_id_stmt)), **clear_kwargs) + # Raises an error if not found + dr_id, logical_date = session.execute( + select(DagRun.id, DagRun.logical_date).where( + DagRun.run_id == run_id, DagRun.dag_id == self.dag_id + ) + ).one() + + # Now we want to clear downstreams of tasks that had their state set... + clear_kwargs = { + "only_failed": True, + "session": session, + # Exclude the task itself from being cleared. + "exclude_task_ids": frozenset((task_id,)), + } + if not future and not past: # Simple case 1: we're only dealing with exactly one run. + clear_kwargs["run_id"] = run_id + subset.clear(**clear_kwargs) + elif future and past: # Simple case 2: we're clearing ALL runs. + subset.clear(**clear_kwargs) + else: # Complex cases: we may have more than one run, based on a date range. + # Make 'future' and 'past' make some sense when multiple runs exist + # for the same logical date. We order runs by their id and only + # clear runs have larger/smaller ids. + exclude_run_id_stmt = select(DagRun.run_id).where(DagRun.logical_date == logical_date) + if future: + clear_kwargs["start_date"] = logical_date + exclude_run_id_stmt = exclude_run_id_stmt.where(DagRun.id > dr_id) + else: + clear_kwargs["end_date"] = logical_date + exclude_run_id_stmt = exclude_run_id_stmt.where(DagRun.id < dr_id) + subset.clear(exclude_run_ids=frozenset(session.scalars(exclude_run_id_stmt)), **clear_kwargs) return altered @provide_session @@ -851,32 +855,36 @@ def set_task_group_state( return altered # Clear downstream tasks that are in failed/upstream_failed state to resume them. - session.flush() - subset = self.partial_subset( - task_ids=task_ids, - include_downstream=True, - include_upstream=False, - ) + # Only clear downstreams when ``downstream=True`` is explicitly passed, so that + # marking a task group's tasks as success without downstream does not + # unexpectedly resume downstream tasks. + if downstream: + session.flush() + subset = self.partial_subset( + task_ids=task_ids, + include_downstream=True, + include_upstream=False, + ) - clear_kwargs: dict = { - "only_failed": True, - "session": session, - "exclude_task_ids": frozenset(task_ids), - } - if not future and not past: - clear_kwargs["run_id"] = run_id - subset.clear(**clear_kwargs) - elif future and past: - subset.clear(**clear_kwargs) - else: - exclude_run_id_stmt = select(DagRun.run_id).where(DagRun.logical_date == logical_date) - if future: - clear_kwargs["start_date"] = logical_date - exclude_run_id_stmt = exclude_run_id_stmt.where(DagRun.id > dr_id) + clear_kwargs: dict = { + "only_failed": True, + "session": session, + "exclude_task_ids": frozenset(task_ids), + } + if not future and not past: + clear_kwargs["run_id"] = run_id + subset.clear(**clear_kwargs) + elif future and past: + subset.clear(**clear_kwargs) else: - clear_kwargs["end_date"] = logical_date - exclude_run_id_stmt = exclude_run_id_stmt.where(DagRun.id < dr_id) - subset.clear(exclude_run_ids=frozenset(session.scalars(exclude_run_id_stmt)), **clear_kwargs) + exclude_run_id_stmt = select(DagRun.run_id).where(DagRun.logical_date == logical_date) + if future: + clear_kwargs["start_date"] = logical_date + exclude_run_id_stmt = exclude_run_id_stmt.where(DagRun.id > dr_id) + else: + clear_kwargs["end_date"] = logical_date + exclude_run_id_stmt = exclude_run_id_stmt.where(DagRun.id < dr_id) + subset.clear(exclude_run_ids=frozenset(session.scalars(exclude_run_id_stmt)), **clear_kwargs) return altered diff --git a/airflow-core/tests/unit/models/test_dag.py b/airflow-core/tests/unit/models/test_dag.py index 0a15c9ab09df5..0f40908486526 100644 --- a/airflow-core/tests/unit/models/test_dag.py +++ b/airflow-core/tests/unit/models/test_dag.py @@ -2999,7 +2999,7 @@ def test_count_number_queries(self, tasks_count, testing_dag_bundle): ["test-run-id"], ) def test_set_task_instance_state(run_id, session, dag_maker): - """Test that set_task_instance_state updates the TaskInstance state and clear downstream failed""" + """Test that set_task_instance_state updates the TaskInstance state""" start_date = datetime_tz(2020, 1, 1) with dag_maker( "test_set_task_instance_state", @@ -3037,6 +3037,8 @@ def get_ti_from_db(task): session.flush() + # When downstream=False (default), only the selected TI state is changed - + # downstream failed/upstream_failed tasks are NOT cleared (Airflow 2 semantics). altered = dag.set_task_instance_state( task_id=task_1.task_id, run_id=run_id, @@ -3050,6 +3052,68 @@ def get_ti_from_db(task): assert isinstance(inspect(ti1).attrs.dag_run.loaded_value, DagRun) # task_2 remains as SUCCESS assert get_ti_from_db(task_2).state == State.SUCCESS + # task_3 and task_4 remain in their FAILED/UPSTREAM_FAILED state because downstream=False + assert get_ti_from_db(task_3).state == State.UPSTREAM_FAILED + assert get_ti_from_db(task_4).state == State.FAILED + # task_5 remains as SKIPPED + assert get_ti_from_db(task_5).state == State.SKIPPED + + assert {tuple(t.key) for t in altered} == { + ("test_set_task_instance_state", "task_1", dagrun.run_id, 0, -1) + } + + +def test_set_task_instance_state_downstream_clears_failed(run_id, session, dag_maker): + """Test that set_task_instance_state with downstream=True clears downstream failed/upstream_failed""" + start_date = datetime_tz(2020, 1, 1) + with dag_maker( + "test_set_task_instance_state_downstream", + start_date=start_date, + session=session, + serialized=True, + ) as dag: + task_1 = EmptyOperator(task_id="task_1") + task_2 = EmptyOperator(task_id="task_2") + task_3 = EmptyOperator(task_id="task_3") + task_4 = EmptyOperator(task_id="task_4") + task_5 = EmptyOperator(task_id="task_5") + task_1 >> [task_2, task_3, task_4, task_5] + + dagrun = dag_maker.create_dagrun( + run_id=run_id, + state=State.FAILED, + run_type=DagRunType.SCHEDULED, + ) + + def get_ti_from_db(task): + return session.scalar( + select(TI).where( + TI.dag_id == dag.dag_id, + TI.task_id == task.task_id, + TI.run_id == dagrun.run_id, + ) + ) + + get_ti_from_db(task_1).state = State.FAILED + get_ti_from_db(task_2).state = State.SUCCESS + get_ti_from_db(task_3).state = State.UPSTREAM_FAILED + get_ti_from_db(task_4).state = State.FAILED + get_ti_from_db(task_5).state = State.SKIPPED + + session.flush() + + # When downstream=True, downstream failed/upstream_failed tasks ARE cleared. + altered = dag.set_task_instance_state( + task_id=task_1.task_id, + downstream=True, + run_id=run_id, + state=State.SUCCESS, + session=session, + ) + ti1 = get_ti_from_db(task_1) + assert ti1.state == State.SUCCESS + # task_2 remains as SUCCESS + assert get_ti_from_db(task_2).state == State.SUCCESS # task_3 and task_4 are cleared because they were in FAILED/UPSTREAM_FAILED state assert get_ti_from_db(task_3).state == State.NONE assert get_ti_from_db(task_4).state == State.NONE @@ -3060,7 +3124,7 @@ def get_ti_from_db(task): assert dagrun.get_state() == State.QUEUED assert {tuple(t.key) for t in altered} == { - ("test_set_task_instance_state", "task_1", dagrun.run_id, 0, -1) + ("test_set_task_instance_state_downstream", "task_1", dagrun.run_id, 0, -1) } @@ -3122,6 +3186,7 @@ def consumer(value): dag.set_task_instance_state( task_id=task_id, map_indexes=[1], + downstream=True, future=True, run_id=dr1.run_id, state=TaskInstanceState.SUCCESS, From 7d4256a70397b5ed0dd6d158405f03c219dafb08 Mon Sep 17 00:00:00 2001 From: anmolxlight Date: Mon, 1 Jun 2026 08:24:48 +0000 Subject: [PATCH 2/4] fix(ci): repair set_task_instance_state downstream handling + tests - dag.py: pass downstream=False to mark_tasks.set_state so it only mutates the targeted TIs; let the explicit 'if downstream:' block own downstream handling. Fixes both set_task_instance_state and set_task_group_state. - dag.py: apply ruff-format break on the long line in the clear path. - test_dag.py: add @pytest.mark.need_serialized_dag and the run_id parametrize to test_set_task_instance_state_downstream_clears_failed (was missing run_id fixture). - test_dag.py: drop the 'downstream=True' addition from test_set_task_instance_state_mapped and assert downstream stays FAILED (the test exercises the new 'do not clear downstream by default' behavior). Resolves the 9 CI check failures on PR #67790: - CI image checks / Static checks (ruff-format) - Postgres/MySQL/Sqlite tests (both Core..Serialization and API..CLI) - Low dep tests (both groups) --- .../airflow/serialization/definitions/dag.py | 20 ++++++++++++++++--- airflow-core/tests/unit/models/test_dag.py | 12 ++++++++--- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py b/airflow-core/src/airflow/serialization/definitions/dag.py index f66f847f1c148..ed3599e1dc7df 100644 --- a/airflow-core/src/airflow/serialization/definitions/dag.py +++ b/airflow-core/src/airflow/serialization/definitions/dag.py @@ -720,11 +720,17 @@ def set_task_instance_state( else: tasks_to_set_state = [(task, map_index) for map_index in map_indexes] + # Only set the state on the targeted task instances here. We do not pass + # ``downstream`` through to ``set_state`` because that helper would mark + # downstream task instances as the same state, which would then prevent the + # explicit downstream-clearing block below from finding them in the + # FAILED/UPSTREAM_FAILED state. Downstream handling is therefore done + # explicitly in the ``if downstream:`` block below. altered = set_state( tasks=tasks_to_set_state, run_id=run_id, upstream=upstream, - downstream=downstream, + downstream=False, future=future, past=past, state=state, @@ -839,11 +845,17 @@ def set_task_group_state( dag_runs_query = dag_runs_query.where(DagRun.logical_date >= logical_date) with lock_rows(dag_runs_query, session): + # Only set the state on the targeted task group instances here. Do + # not pass ``downstream`` through to ``set_state``; that helper + # would mark downstream task instances as the same state and + # prevent the explicit downstream-clearing block below from + # finding them in FAILED/UPSTREAM_FAILED. Downstream handling is + # done explicitly in the ``if downstream:`` block below. altered = set_state( tasks=tasks_to_set_state, run_id=run_id, upstream=upstream, - downstream=downstream, + downstream=False, future=future, past=past, state=state, @@ -884,7 +896,9 @@ def set_task_group_state( else: clear_kwargs["end_date"] = logical_date exclude_run_id_stmt = exclude_run_id_stmt.where(DagRun.id < dr_id) - subset.clear(exclude_run_ids=frozenset(session.scalars(exclude_run_id_stmt)), **clear_kwargs) + subset.clear( + exclude_run_ids=frozenset(session.scalars(exclude_run_id_stmt)), **clear_kwargs + ) return altered diff --git a/airflow-core/tests/unit/models/test_dag.py b/airflow-core/tests/unit/models/test_dag.py index 0f40908486526..101afe267a29e 100644 --- a/airflow-core/tests/unit/models/test_dag.py +++ b/airflow-core/tests/unit/models/test_dag.py @@ -3063,6 +3063,11 @@ def get_ti_from_db(task): } +@pytest.mark.need_serialized_dag +@pytest.mark.parametrize( + "run_id", + ["test-run-id"], +) def test_set_task_instance_state_downstream_clears_failed(run_id, session, dag_maker): """Test that set_task_instance_state with downstream=True clears downstream failed/upstream_failed""" start_date = datetime_tz(2020, 1, 1) @@ -3183,10 +3188,11 @@ def consumer(value): (task_id, 1, dr2.run_id, TaskInstanceState.FAILED), ] + # When ``downstream`` is not passed, only the selected TI state is changed — + # downstream failed/upstream_failed tasks are NOT cleared (Airflow 2 semantics). dag.set_task_instance_state( task_id=task_id, map_indexes=[1], - downstream=True, future=True, run_id=dr1.run_id, state=TaskInstanceState.SUCCESS, @@ -3195,10 +3201,10 @@ def consumer(value): assert dr1 in session, "Check session is passed down all the way" assert session.execute(ti_query).all() == [ - ("downstream", -1, dr1.run_id, None), + ("downstream", -1, dr1.run_id, TaskInstanceState.FAILED), (task_id, 0, dr1.run_id, TaskInstanceState.FAILED), (task_id, 1, dr1.run_id, TaskInstanceState.SUCCESS), - ("downstream", -1, dr2.run_id, None), + ("downstream", -1, dr2.run_id, TaskInstanceState.FAILED), (task_id, 0, dr2.run_id, TaskInstanceState.FAILED), (task_id, 1, dr2.run_id, TaskInstanceState.SUCCESS), ] From cdfc26099bf26094f899e8d23d46c25b09644832 Mon Sep 17 00:00:00 2001 From: Anmol Mishra Date: Sat, 6 Jun 2026 21:19:36 +0000 Subject: [PATCH 3/4] fix: restore task group downstream state updates --- .../airflow/serialization/definitions/dag.py | 62 ++++++++----------- 1 file changed, 25 insertions(+), 37 deletions(-) diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py b/airflow-core/src/airflow/serialization/definitions/dag.py index ed3599e1dc7df..f314e19c350ff 100644 --- a/airflow-core/src/airflow/serialization/definitions/dag.py +++ b/airflow-core/src/airflow/serialization/definitions/dag.py @@ -845,17 +845,11 @@ def set_task_group_state( dag_runs_query = dag_runs_query.where(DagRun.logical_date >= logical_date) with lock_rows(dag_runs_query, session): - # Only set the state on the targeted task group instances here. Do - # not pass ``downstream`` through to ``set_state``; that helper - # would mark downstream task instances as the same state and - # prevent the explicit downstream-clearing block below from - # finding them in FAILED/UPSTREAM_FAILED. Downstream handling is - # done explicitly in the ``if downstream:`` block below. altered = set_state( tasks=tasks_to_set_state, run_id=run_id, upstream=upstream, - downstream=False, + downstream=downstream, future=future, past=past, state=state, @@ -867,38 +861,32 @@ def set_task_group_state( return altered # Clear downstream tasks that are in failed/upstream_failed state to resume them. - # Only clear downstreams when ``downstream=True`` is explicitly passed, so that - # marking a task group's tasks as success without downstream does not - # unexpectedly resume downstream tasks. - if downstream: - session.flush() - subset = self.partial_subset( - task_ids=task_ids, - include_downstream=True, - include_upstream=False, - ) + session.flush() + subset = self.partial_subset( + task_ids=task_ids, + include_downstream=True, + include_upstream=False, + ) - clear_kwargs: dict = { - "only_failed": True, - "session": session, - "exclude_task_ids": frozenset(task_ids), - } - if not future and not past: - clear_kwargs["run_id"] = run_id - subset.clear(**clear_kwargs) - elif future and past: - subset.clear(**clear_kwargs) + clear_kwargs: dict = { + "only_failed": True, + "session": session, + "exclude_task_ids": frozenset(task_ids), + } + if not future and not past: + clear_kwargs["run_id"] = run_id + subset.clear(**clear_kwargs) + elif future and past: + subset.clear(**clear_kwargs) + else: + exclude_run_id_stmt = select(DagRun.run_id).where(DagRun.logical_date == logical_date) + if future: + clear_kwargs["start_date"] = logical_date + exclude_run_id_stmt = exclude_run_id_stmt.where(DagRun.id > dr_id) else: - exclude_run_id_stmt = select(DagRun.run_id).where(DagRun.logical_date == logical_date) - if future: - clear_kwargs["start_date"] = logical_date - exclude_run_id_stmt = exclude_run_id_stmt.where(DagRun.id > dr_id) - else: - clear_kwargs["end_date"] = logical_date - exclude_run_id_stmt = exclude_run_id_stmt.where(DagRun.id < dr_id) - subset.clear( - exclude_run_ids=frozenset(session.scalars(exclude_run_id_stmt)), **clear_kwargs - ) + clear_kwargs["end_date"] = logical_date + exclude_run_id_stmt = exclude_run_id_stmt.where(DagRun.id < dr_id) + subset.clear(exclude_run_ids=frozenset(session.scalars(exclude_run_id_stmt)), **clear_kwargs) return altered From 4c4866b93eafe67127abdda62b68bcb80ceb2527 Mon Sep 17 00:00:00 2001 From: Anmol Mishra Date: Wed, 10 Jun 2026 04:26:17 +0000 Subject: [PATCH 4/4] Trigger CI: re-run after Docker Hub timeout