Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions airflow-core/src/airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,24 +639,29 @@ def row_value(self, row: Any, name: str) -> Any:
Extract the sort-key value for ``name`` from a result row.

Resolves the accessor through ``to_replace`` for string aliases
(e.g. ``{"dag_run_id": "run_id"}``); otherwise reads ``name`` directly.
(e.g. ``{"dag_run_id": "run_id"}``). For column-form mappings
(e.g. ``{"run_after": DagRun.run_after}``), resolves through the
primary model's attribute so association proxies can still be used
for cursor values. Raises ``NotImplementedError`` when the model
exposes no such attribute rather than emitting a ``None`` cursor token.
"""
if self.to_replace:
replacement = self.to_replace.get(name)
if isinstance(replacement, str):
return getattr(row, replacement, None)
if replacement is not None and not isinstance(replacement, list):
# TODO: Column-form ``to_replace`` (e.g. ``{"last_run_state": DagRun.state}``)
# isn't supported for cursor pagination — no endpoint that uses cursor
# pagination needs it today. When one does, decide how the row exposes the
# value (projected label on the SELECT, eagerly loaded relationship, etc.)
# and wire it up here. Raising loudly so a future caller doesn't silently
# get ``None`` cursor tokens.
raise NotImplementedError(
f"Cursor pagination does not support column-form ``to_replace`` mapping for "
f"``{name}``. Use a string alias in ``to_replace`` or sort by a primary-model "
f"attribute."
)
# Column-form mapping resolves through the primary model's attribute,
# often an association proxy onto the joined entity
# (``TaskInstance.run_after`` -> ``dag_run.run_after``). Fail loudly if the
# model exposes no such attribute, rather than emitting a ``None`` cursor token.
try:
return getattr(row, name)
except AttributeError:
raise NotImplementedError(
f"Cursor pagination cannot resolve column-form ``to_replace`` for "
f"``{name}``: the primary model exposes no such attribute. Add an "
f"association proxy, use a string alias, or sort by a primary-model column."
)
# List-form replacements are expanded in _resolve() into individual entries
# each using the column's own ORM key as attr_name, so ``name`` at this point
# is already a concrete model attribute (e.g. ``_rendered_map_index`` or
Expand Down
36 changes: 36 additions & 0 deletions airflow-core/tests/unit/api_fastapi/common/test_cursors.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import base64
import uuid
from datetime import datetime, timezone
from types import SimpleNamespace
from unittest.mock import MagicMock

import msgspec
Expand All @@ -29,6 +30,7 @@

from airflow.api_fastapi.common.cursors import apply_cursor_filter, decode_cursor, encode_cursor
from airflow.api_fastapi.common.parameters import SortParam
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance


Expand Down Expand Up @@ -85,6 +87,40 @@ def test_encode_cursor_works_without_prior_to_orm(self):
decoded = decode_cursor(token)
assert decoded == ["019462ab-1234-5678-9abc-def012345678"]

def test_encode_cursor_with_column_form_to_replace_falls_back_to_row_attr(self):
"""Column-form ``to_replace`` should still allow cursor encoding via row attribute access."""
sp = SortParam(["id", "run_after"], TaskInstance, {"run_after": DagRun.run_after})
sp.set_value(["run_after"])

row = SimpleNamespace(
run_after="2026-06-04T10:00:00+00:00",
id="019462ab-1234-5678-9abc-def012345678",
)

token = encode_cursor(row, sp)
decoded = decode_cursor(token)
assert decoded == [
"2026-06-04T10:00:00+00:00",
"019462ab-1234-5678-9abc-def012345678",
]

def test_encode_cursor_column_form_to_replace_raises_when_attribute_absent(self):
"""
``encode_cursor`` must raise ``NotImplementedError`` (not silently encode ``None``)
when a column-form ``to_replace`` key has no corresponding attribute on the row object.
A ``None`` token would cause the next-page WHERE to compare against NULL and drop rows.
"""
sp = SortParam(
["id", "data_interval_start"], TaskInstance, {"data_interval_start": DagRun.data_interval_start}
)
sp.set_value(["data_interval_start"])

# Row without data_interval_start — TaskInstance does not expose this as an attribute.
row = SimpleNamespace(id="019462ab-1234-5678-9abc-def012345678")

with pytest.raises(NotImplementedError, match="data_interval_start"):
encode_cursor(row, sp)

def test_apply_cursor_filter_wrong_value_count(self):
sp = self._make_sort_param_with_resolved_columns(["start_date"])
token = _msgpack_cursor_token(["only-one-value"])
Expand Down
24 changes: 18 additions & 6 deletions airflow-core/tests/unit/api_fastapi/common/test_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,29 @@ def test_aliased_sort_resolves_row_value_via_to_replace(self):
assert param.row_value(row, "dag_run_id") == "manual__2026-04-22"
assert param.row_value(row, "id") == 42

def test_row_value_raises_on_column_form_to_replace(self):
def test_row_value_column_form_to_replace_resolves_via_row_attribute(self):
"""
Column-form ``to_replace`` is not supported by cursor encoding. The helper must
fail loudly so a future endpoint doesn't silently ship ``None`` cursor tokens.
Column-form ``to_replace`` resolves through the primary model's attribute so
association proxies (e.g. ``TaskInstance.run_after``) are usable for cursor encoding.
"""
param = SortParam(["dag_id"], DagModel, {"last_run_state": DagRun.state}).set_value(
["last_run_state"]
)
row = SimpleNamespace(id="test_dag")
with pytest.raises(NotImplementedError, match="column-form ``to_replace``"):
param.row_value(row, "last_run_state")
row = SimpleNamespace(id="test_dag", last_run_state="success")
assert param.row_value(row, "last_run_state") == "success"
Comment thread
GayathriSrividya marked this conversation as resolved.

def test_row_value_column_form_to_replace_raises_when_attribute_absent(self):
"""
Column-form ``to_replace`` must raise ``NotImplementedError`` (not return ``None``)
when the primary model exposes no such attribute. A ``None`` cursor token would cause
the next-page ``WHERE`` to compare against ``NULL`` and silently drop rows.
"""
param = SortParam(
["dag_id"], DagModel, {"data_interval_start": DagRun.data_interval_start}
).set_value(["data_interval_start"])
row = SimpleNamespace(id="test_dag") # deliberately no data_interval_start attribute
with pytest.raises(NotImplementedError, match="data_interval_start"):
param.row_value(row, "data_interval_start")

def test_primary_key_is_not_duplicated_when_alias_maps_to_pk(self):
"""Sorting by an alias that resolves to the PK must not append the PK a second time."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@
from airflow.models.dagbundle import DagBundleModel
from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF
from airflow.models.task_store import TaskStoreModel
from airflow.models.taskinstance import uuid7
from airflow.models.taskinstancehistory import TaskInstanceHistory
from airflow.models.taskmap import TaskMap
from airflow.models.team import Team
from airflow.models.trigger import Trigger
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import BaseOperator, TaskGroup
from airflow.sdk import BaseOperator
from airflow.state.metastore import MetastoreStoreBackend
from airflow.utils.platform import getuser
from airflow.utils.state import DagRunState, State, TaskInstanceState
Expand Down Expand Up @@ -979,10 +979,6 @@ def test_rendered_map_index_order_stable_regardless_of_uuid_order(self, test_cli
This verifies that even when UUIDs are assigned out of map_index order
(as happens during retries), the response is still sorted 0, 1, 2, ...
"""
from sqlalchemy import update as sa_update

from airflow.models.taskinstance import uuid7

self.create_dag_runs_with_mapped_tasks(
dag_maker,
session,
Expand All @@ -994,7 +990,7 @@ def test_rendered_map_index_order_stable_regardless_of_uuid_order(self, test_cli
# result must still follow integer map_index order, not UUID order.
for map_index in [1, 3]:
session.execute(
sa_update(TaskInstance)
update(TaskInstance)
.where(
TaskInstance.dag_id == "retry_dag",
TaskInstance.task_id == "task_2",
Expand Down Expand Up @@ -2117,46 +2113,46 @@ def test_cursor_pagination_invalid_token(self, test_client, session):
)
assert response.status_code == 400

def test_task_group_filter_uses_run_version_not_latest(self, test_client, dag_maker, session):
def test_cursor_pagination_order_by_run_after_roundtrips(self, test_client, session):
"""
Task group lookup should use the DAG version from the run, not the latest version.
Sorting by ``run_after`` (a column-form ``to_replace`` backed by an association proxy)
must not raise a 500 when ``has_next=true``. Regression for
https://github.com/apache/airflow/issues/67970.

When a task group is renamed between versions, clicking on a historical run's
task group in the grid should still resolve correctly against the version
that run was created with — not the latest version where the group may have
a different name, i.e serialized_dag might not have that taskgroup anymore.
Verify the full cursor round-trip: the first page must include a ``next_cursor``,
and following that cursor must return the remaining TIs without overlap.
"""
dag_id = "test_tg_version"

# Version 1: task group named "process_data"
with dag_maker(dag_id, session=session):
with TaskGroup(group_id="process_data"):
EmptyOperator(task_id="step_1")
dag_maker.create_dagrun(run_id="run_v1")
session.commit()

# Version 2: task group renamed to "process_data_v2"
with dag_maker(dag_id, session=session):
with TaskGroup(group_id="process_data_v2"):
EmptyOperator(task_id="step_1")
session.commit()

# The run was created with v1 which had "process_data".
# Querying with the old group name must succeed.
response = test_client.get(
f"/dags/{dag_id}/dagRuns/run_v1/taskInstances",
params={"task_group_id": "process_data"},
dag_id = "example_python_operator"
total_tis = 5
self.create_task_instances(
session,
task_instances=[
{"start_date": DEFAULT_DATETIME_1 + dt.timedelta(minutes=(i + 1))} for i in range(total_tis)
],
dag_id=dag_id,
)
assert response.status_code == 200, response.json()
assert response.json()["total_entries"] == 1
assert response.json()["task_instances"][0]["task_id"] == "process_data.step_1"
# First page — limit < total so next_cursor must be present
response1 = test_client.get(
"/dags/~/dagRuns/~/taskInstances",
params={"limit": 3, "order_by": ["-run_after"], "cursor": ""},
)
assert response1.status_code == 200, response1.json()
body1 = response1.json()
assert len(body1["task_instances"]) == 3
next_cursor = body1["next_cursor"]
assert next_cursor is not None, "next_cursor must be present when more rows exist"

# The new group name should NOT be found in the old run's version.
response = test_client.get(
f"/dags/{dag_id}/dagRuns/run_v1/taskInstances",
params={"task_group_id": "process_data_v2"},
# Second page — follow the cursor; must not 500 and must return remaining TIs
response2 = test_client.get(
"/dags/~/dagRuns/~/taskInstances",
params={"limit": 10, "order_by": ["-run_after"], "cursor": next_cursor},
)
assert response.status_code == 404
assert response2.status_code == 200, response2.json()
body2 = response2.json()
ids1 = {ti["id"] for ti in body1["task_instances"]}
ids2 = {ti["id"] for ti in body2["task_instances"]}
assert ids1.isdisjoint(ids2), "Pages must not overlap"
assert len(ids1) + len(ids2) == total_tis


class TestGetTaskDependencies(TestTaskInstanceEndpoint):
Expand Down
Loading