Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/68180.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Restore the ``[scheduler] ignore_first_depends_on_past_by_default`` option, which was silently ignored in Airflow 3. The Task SDK hardcoded the default to ``False``, so a new task added to an existing DAG with ``depends_on_past=True`` was blocked by ``PrevDagrunDep`` forever and the DAG run never completed. The default is now read from the config again (``True`` by default), as it was in Airflow 2.
3 changes: 2 additions & 1 deletion airflow-core/tests/unit/models/test_mappedoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from airflow.models.taskmap import TaskMap
from airflow.providers.standard.operators.python import PythonOperator
from airflow.sdk import DAG, BaseOperator, TaskGroup, setup, task, task_group, teardown
from airflow.sdk.definitions._internal.abstractoperator import DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST
from airflow.serialization.definitions.baseoperator import SerializedBaseOperator
from airflow.task.trigger_rule import TriggerRule
from airflow.utils.state import TaskInstanceState
Expand Down Expand Up @@ -1536,7 +1537,7 @@ def test_properties(
assert not op.is_setup
assert not op.is_teardown
assert not op.depends_on_past
assert op.ignore_first_depends_on_past == bool(SerializedBaseOperator.ignore_first_depends_on_past)
assert op.ignore_first_depends_on_past == DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST
assert not op.wait_for_downstream
assert op.retries == SerializedBaseOperator.retries
assert op.queue == SerializedBaseOperator.queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ def _operator_defaults(overrides):
VAR = Encoding.VAR
serialized_simple_dag_ground_truth = {
"__version": 3,
"client_defaults": {"tasks": {"ignore_first_depends_on_past": True}},
"dag": {
"default_args": {
"__type": "dict",
Expand Down Expand Up @@ -1633,7 +1634,7 @@ def test_no_new_fields_added_to_base_operator(self):
"has_on_skipped_callback": False,
"has_on_success_callback": False,
"has_retry_policy": False,
"ignore_first_depends_on_past": False,
"ignore_first_depends_on_past": True,
"is_setup": False,
"is_teardown": False,
"inlets": [],
Expand Down
59 changes: 59 additions & 0 deletions airflow-core/tests/unit/ti_deps/deps/test_prev_dagrun_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,65 @@ def test_first_task_run_of_new_task(self, testing_dag_bundle):
assert dep.is_met(ti=ti, dep_context=dep_context)
mock_has_any_prior_tis.assert_called_once_with(ti, session=ANY)

def test_first_task_run_of_new_task_uses_config_default(self, testing_dag_bundle):
"""
A new task added to an existing DAG should pass its first run without an
explicit ``ignore_first_depends_on_past`` because the default is driven by
``[scheduler] ignore_first_depends_on_past_by_default`` (``True``).

This is the behavior that regressed when the config became dead: with a
hardcoded ``False`` default, the new task stayed blocked forever.
"""
dag = DAG("test_dag", schedule=timedelta(days=1), start_date=START_DATE)
old_task = BaseOperator(
task_id="test_task",
dag=dag,
depends_on_past=True,
start_date=START_DATE,
wait_for_downstream=False,
)
scheduler_dag = sync_dag_to_db(dag)
# Old DAG run will include only TaskInstance of old_task
scheduler_dag.create_dagrun(
run_id="old_run",
state=TaskInstanceState.SUCCESS,
logical_date=old_task.start_date,
run_type=DagRunType.SCHEDULED,
data_interval=(old_task.start_date, old_task.start_date),
run_after=old_task.start_date,
triggered_by=DagRunTriggeredByType.TEST,
)

# New task relies on the config default rather than setting the flag explicitly.
new_task = BaseOperator(
task_id="new_task",
dag=dag,
depends_on_past=True,
start_date=old_task.start_date,
)
assert new_task.ignore_first_depends_on_past is True

logical_date = convert_to_utc(datetime(2016, 1, 2))
dr = create_scheduler_dag(dag).create_dagrun(
run_id="new_run",
state=DagRunState.RUNNING,
logical_date=logical_date,
run_type=DagRunType.SCHEDULED,
data_interval=(logical_date, logical_date),
run_after=logical_date,
triggered_by=DagRunTriggeredByType.TEST,
)

ti = dr.get_task_instance(new_task.task_id)
ti.task = new_task

dep_context = DepContext(ignore_depends_on_past=False)
dep = PrevDagrunDep()

with patch.object(dep, "_has_any_prior_tis", Mock(return_value=False)) as mock_has_any_prior_tis:
assert dep.is_met(ti=ti, dep_context=dep_context)
mock_has_any_prior_tis.assert_called_once_with(ti, session=ANY)


@pytest.mark.parametrize(
"kwargs",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@
"downstream_task_ids": "{{ 'task_2' in result and 'section_1.section_2.section_3.task_7' in result }}",
"execution_timeout": "456.0 seconds",
"executor_config": {},
"ignore_first_depends_on_past": false,
"ignore_first_depends_on_past": true,
"inlets": "[]",
"mapped": false,
"multiple_outputs": false,
Expand Down Expand Up @@ -503,7 +503,7 @@
"downstream_task_ids": "{{ 'task_2' in result and 'section_1.section_2.section_3.task_7' in result }}",
"execution_timeout": "456.0 seconds",
"executor_config": {},
"ignore_first_depends_on_past": false,
"ignore_first_depends_on_past": true,
"inlets": "[]",
"mapped": false,
"multiple_outputs": false,
Expand Down Expand Up @@ -675,7 +675,7 @@
"depends_on_past": false,
"downstream_task_ids": "{{ 'task_3' in result and 'check_events' in result }}",
"executor_config": {},
"ignore_first_depends_on_past": false,
"ignore_first_depends_on_past": true,
"inlets": "[{'uri': 'file://host1/dir2/file2.txt', 'extra': {}, 'type': 'asset'}, {'uri': 'unknown_scheme://host1/dir2/file3.txt', 'extra': {}, 'type': 'asset'}]",
"mapped": false,
"max_retry_delay": "42.0 seconds",
Expand Down Expand Up @@ -852,7 +852,7 @@
"depends_on_past": false,
"downstream_task_ids": "{{ 'task_3' in result and 'check_events' in result }}",
"executor_config": {},
"ignore_first_depends_on_past": false,
"ignore_first_depends_on_past": true,
"inlets": "[{'uri': 'file://host1/dir2/file2.txt', 'extra': {}, 'type': 'asset'}, {'uri': 'unknown_scheme://host1/dir2/file3.txt', 'extra': {}, 'type': 'asset'}]",
"mapped": false,
"max_retry_delay": "42.0 seconds",
Expand Down Expand Up @@ -1023,7 +1023,7 @@
"depends_on_past": false,
"downstream_task_ids": "{{ 'task_4' in result and 'section_1.task_5' in result }}",
"executor_config": {},
"ignore_first_depends_on_past": false,
"ignore_first_depends_on_past": true,
"inlets": "[]",
"mapped": false,
"multiple_outputs": false,
Expand Down Expand Up @@ -1193,7 +1193,7 @@
"depends_on_past": false,
"downstream_task_ids": "{{ 'task_4' in result and 'section_1.task_5' in result }}",
"executor_config": {},
"ignore_first_depends_on_past": false,
"ignore_first_depends_on_past": true,
"inlets": "[]",
"mapped": false,
"multiple_outputs": false,
Expand Down Expand Up @@ -1360,7 +1360,7 @@
"executor_config": {},
"external_dag_id": "external_dag_id",
"external_task_id": "external_task_id",
"ignore_first_depends_on_past": false,
"ignore_first_depends_on_past": true,
"inlets": "[]",
"mapped": false,
"max_active_tis_per_dag": 7,
Expand Down Expand Up @@ -1519,7 +1519,7 @@
"executor_config": {},
"external_dag_id": "external_dag_id",
"external_task_id": "external_task_id",
"ignore_first_depends_on_past": false,
"ignore_first_depends_on_past": true,
"inlets": "[]",
"mapped": false,
"max_active_tis_per_dag": 7,
Expand Down Expand Up @@ -1675,7 +1675,7 @@
"depends_on_past": false,
"downstream_task_ids": "['section_1.section_2.task_6']",
"executor_config": {},
"ignore_first_depends_on_past": false,
"ignore_first_depends_on_past": true,
"inlets": "[]",
"mapped": "{{ result == true }}",
"multiple_outputs": false,
Expand Down Expand Up @@ -1838,7 +1838,7 @@
"depends_on_past": false,
"downstream_task_ids": "['section_1.section_2.task_6']",
"executor_config": {},
"ignore_first_depends_on_past": false,
"ignore_first_depends_on_past": true,
"inlets": "[]",
"mapped": "{{ result == true }}",
"multiple_outputs": false,
Expand Down Expand Up @@ -2001,7 +2001,7 @@
"depends_on_past": false,
"downstream_task_ids": "['check_events']",
"executor_config": {},
"ignore_first_depends_on_past": false,
"ignore_first_depends_on_past": true,
"inlets": "[]",
"mapped": false,
"multiple_outputs": false,
Expand Down Expand Up @@ -2174,7 +2174,7 @@
"depends_on_past": false,
"downstream_task_ids": "['check_events']",
"executor_config": {},
"ignore_first_depends_on_past": false,
"ignore_first_depends_on_past": true,
"inlets": "[]",
"mapped": false,
"multiple_outputs": false,
Expand Down Expand Up @@ -2347,7 +2347,7 @@
"depends_on_past": false,
"downstream_task_ids": "['check_events']",
"executor_config": {},
"ignore_first_depends_on_past": false,
"ignore_first_depends_on_past": true,
"inlets": "[]",
"mapped": false,
"multiple_outputs": false,
Expand Down Expand Up @@ -2519,7 +2519,7 @@
"depends_on_past": false,
"downstream_task_ids": "['check_events']",
"executor_config": {},
"ignore_first_depends_on_past": false,
"ignore_first_depends_on_past": true,
"inlets": "[]",
"mapped": false,
"multiple_outputs": false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@
"depends_on_past": false,
"downstream_task_ids": "['check_events']",
"executor_config": {},
"ignore_first_depends_on_past": false,
"ignore_first_depends_on_past": true,
"inlets": "[]",
"mapped": false,
"multiple_outputs": false,
Expand Down Expand Up @@ -277,7 +277,7 @@
"depends_on_past": false,
"downstream_task_ids": "['check_events']",
"executor_config": {},
"ignore_first_depends_on_past": false,
"ignore_first_depends_on_past": true,
"inlets": "[]",
"mapped": false,
"multiple_outputs": false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3197,7 +3197,10 @@ def __init__(self, *args, **kwargs):
"execution_timeout": None,
"executor_config": {},
"hitl_summary": "hitl_summary",
"ignore_first_depends_on_past": False,
# Default comes from the installed core's [scheduler]
# ignore_first_depends_on_past_by_default, so derive it rather than
# hardcoding (True on Airflow 3.2+, False on older 3.x).
"ignore_first_depends_on_past": task_10.ignore_first_depends_on_past,
"inlets": "[{'uri': 'uri1', 'extra': {'a': 1}, 'type': 'asset'}]",
"mapped": False,
"max_active_tis_per_dag": None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@
MAXIMUM_PRIORITY_WEIGHT: int = 2147483647
DEFAULT_EXECUTOR: str | None = None
DEFAULT_QUEUE: str = conf.get_mandatory_value("operators", "default_queue")
DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST: bool = False
DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST: bool = conf.getboolean(
"scheduler", "ignore_first_depends_on_past_by_default"
)
DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING: bool = False
DEFAULT_RETRIES: int = conf.getint("core", "default_task_retries", fallback=0)
DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta(
Expand Down
31 changes: 31 additions & 0 deletions task-sdk/tests/task_sdk/bases/test_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,37 @@ def test_email_on_actions(self):
assert test_task.email_on_retry is False
assert test_task.email_on_failure is True

def test_default_ignore_first_depends_on_past(self):
"""The default comes from ``[scheduler] ignore_first_depends_on_past_by_default`` (``True``)."""
test_task = BaseOperator(task_id="test_default_ignore_first_depends_on_past")
assert test_task.ignore_first_depends_on_past is True

def test_ignore_first_depends_on_past_override(self):
test_task = BaseOperator(task_id="test_ignore_first_dop", ignore_first_depends_on_past=False)
assert test_task.ignore_first_depends_on_past is False

def test_default_ignore_first_depends_on_past_follows_config(self):
"""
The module-level default must be read from config, not hardcoded.

Guards against the regression where ``[scheduler]
ignore_first_depends_on_past_by_default`` became a dead config that no
code read (the default was hardcoded, silently ignoring the option).
"""
import importlib

from airflow.sdk.definitions._internal import abstractoperator

with mock.patch.dict(
"os.environ", {"AIRFLOW__SCHEDULER__IGNORE_FIRST_DEPENDS_ON_PAST_BY_DEFAULT": "False"}
):
try:
reloaded = importlib.reload(abstractoperator)
assert reloaded.DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST is False
finally:
# Restore the module-level constant to its config default for other tests.
importlib.reload(abstractoperator)

def test_incorrect_default_args(self):
default_args = {"test_param": True, "extra_param": True}
op = FakeOperator(default_args=default_args)
Expand Down
Loading