From 7ab6f8af769d2eadd08a8945e5074845ca852a49 Mon Sep 17 00:00:00 2001 From: Gayathri Srividya Rajavarapu Date: Thu, 4 Jun 2026 02:11:25 +0530 Subject: [PATCH 1/2] Fix cursor encoding for column-form SortParam to_replace --- .../airflow/api_fastapi/common/parameters.py | 29 +++++++------ .../unit/api_fastapi/common/test_cursors.py | 36 ++++++++++++++++ .../api_fastapi/common/test_parameters.py | 24 ++++++++--- .../routes/public/test_task_instances.py | 42 ++++++++++++++++++- 4 files changed, 112 insertions(+), 19 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py b/airflow-core/src/airflow/api_fastapi/common/parameters.py index 685736ed2f43d..82645d87bcb5c 100644 --- a/airflow-core/src/airflow/api_fastapi/common/parameters.py +++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py @@ -639,24 +639,29 @@ def row_value(self, row: Any, name: str) -> Any: Extract the sort-key value for ``name`` from a result row. Resolves the accessor through ``to_replace`` for string aliases - (e.g. ``{"dag_run_id": "run_id"}``); otherwise reads ``name`` directly. + (e.g. ``{"dag_run_id": "run_id"}``). For column-form mappings + (e.g. ``{"run_after": DagRun.run_after}``), resolves through the + primary model's attribute so association proxies can still be used + for cursor values. Raises ``NotImplementedError`` when the model + exposes no such attribute rather than emitting a ``None`` cursor token. """ if self.to_replace: replacement = self.to_replace.get(name) if isinstance(replacement, str): return getattr(row, replacement, None) if replacement is not None and not isinstance(replacement, list): - # TODO: Column-form ``to_replace`` (e.g. ``{"last_run_state": DagRun.state}``) - # isn't supported for cursor pagination — no endpoint that uses cursor - # pagination needs it today. When one does, decide how the row exposes the - # value (projected label on the SELECT, eagerly loaded relationship, etc.) - # and wire it up here. Raising loudly so a future caller doesn't silently - # get ``None`` cursor tokens. - raise NotImplementedError( - f"Cursor pagination does not support column-form ``to_replace`` mapping for " - f"``{name}``. Use a string alias in ``to_replace`` or sort by a primary-model " - f"attribute." - ) + # Column-form mapping resolves through the primary model's attribute, + # often an association proxy onto the joined entity + # (``TaskInstance.run_after`` -> ``dag_run.run_after``). Fail loudly if the + # model exposes no such attribute, rather than emitting a ``None`` cursor token. + try: + return getattr(row, name) + except AttributeError: + raise NotImplementedError( + f"Cursor pagination cannot resolve column-form ``to_replace`` for " + f"``{name}``: the primary model exposes no such attribute. Add an " + f"association proxy, use a string alias, or sort by a primary-model column." + ) # List-form replacements are expanded in _resolve() into individual entries # each using the column's own ORM key as attr_name, so ``name`` at this point # is already a concrete model attribute (e.g. ``_rendered_map_index`` or diff --git a/airflow-core/tests/unit/api_fastapi/common/test_cursors.py b/airflow-core/tests/unit/api_fastapi/common/test_cursors.py index 11db6ca5ba8e9..8bf8adbba3646 100644 --- a/airflow-core/tests/unit/api_fastapi/common/test_cursors.py +++ b/airflow-core/tests/unit/api_fastapi/common/test_cursors.py @@ -20,6 +20,7 @@ import base64 import uuid from datetime import datetime, timezone +from types import SimpleNamespace from unittest.mock import MagicMock import msgspec @@ -29,6 +30,7 @@ from airflow.api_fastapi.common.cursors import apply_cursor_filter, decode_cursor, encode_cursor from airflow.api_fastapi.common.parameters import SortParam +from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance @@ -85,6 +87,40 @@ def test_encode_cursor_works_without_prior_to_orm(self): decoded = decode_cursor(token) assert decoded == ["019462ab-1234-5678-9abc-def012345678"] + def test_encode_cursor_with_column_form_to_replace_falls_back_to_row_attr(self): + """Column-form ``to_replace`` should still allow cursor encoding via row attribute access.""" + sp = SortParam(["id", "run_after"], TaskInstance, {"run_after": DagRun.run_after}) + sp.set_value(["run_after"]) + + row = SimpleNamespace( + run_after="2026-06-04T10:00:00+00:00", + id="019462ab-1234-5678-9abc-def012345678", + ) + + token = encode_cursor(row, sp) + decoded = decode_cursor(token) + assert decoded == [ + "2026-06-04T10:00:00+00:00", + "019462ab-1234-5678-9abc-def012345678", + ] + + def test_encode_cursor_column_form_to_replace_raises_when_attribute_absent(self): + """ + ``encode_cursor`` must raise ``NotImplementedError`` (not silently encode ``None``) + when a column-form ``to_replace`` key has no corresponding attribute on the row object. + A ``None`` token would cause the next-page WHERE to compare against NULL and drop rows. + """ + sp = SortParam( + ["id", "data_interval_start"], TaskInstance, {"data_interval_start": DagRun.data_interval_start} + ) + sp.set_value(["data_interval_start"]) + + # Row without data_interval_start — TaskInstance does not expose this as an attribute. + row = SimpleNamespace(id="019462ab-1234-5678-9abc-def012345678") + + with pytest.raises(NotImplementedError, match="data_interval_start"): + encode_cursor(row, sp) + def test_apply_cursor_filter_wrong_value_count(self): sp = self._make_sort_param_with_resolved_columns(["start_date"]) token = _msgpack_cursor_token(["only-one-value"]) diff --git a/airflow-core/tests/unit/api_fastapi/common/test_parameters.py b/airflow-core/tests/unit/api_fastapi/common/test_parameters.py index e2742d88a94a1..ecf20a47c6faa 100644 --- a/airflow-core/tests/unit/api_fastapi/common/test_parameters.py +++ b/airflow-core/tests/unit/api_fastapi/common/test_parameters.py @@ -135,17 +135,29 @@ def test_aliased_sort_resolves_row_value_via_to_replace(self): assert param.row_value(row, "dag_run_id") == "manual__2026-04-22" assert param.row_value(row, "id") == 42 - def test_row_value_raises_on_column_form_to_replace(self): + def test_row_value_column_form_to_replace_resolves_via_row_attribute(self): """ - Column-form ``to_replace`` is not supported by cursor encoding. The helper must - fail loudly so a future endpoint doesn't silently ship ``None`` cursor tokens. + Column-form ``to_replace`` resolves through the primary model's attribute so + association proxies (e.g. ``TaskInstance.run_after``) are usable for cursor encoding. """ param = SortParam(["dag_id"], DagModel, {"last_run_state": DagRun.state}).set_value( ["last_run_state"] ) - row = SimpleNamespace(id="test_dag") - with pytest.raises(NotImplementedError, match="column-form ``to_replace``"): - param.row_value(row, "last_run_state") + row = SimpleNamespace(id="test_dag", last_run_state="success") + assert param.row_value(row, "last_run_state") == "success" + + def test_row_value_column_form_to_replace_raises_when_attribute_absent(self): + """ + Column-form ``to_replace`` must raise ``NotImplementedError`` (not return ``None``) + when the primary model exposes no such attribute. A ``None`` cursor token would cause + the next-page ``WHERE`` to compare against ``NULL`` and silently drop rows. + """ + param = SortParam( + ["dag_id"], DagModel, {"data_interval_start": DagRun.data_interval_start} + ).set_value(["data_interval_start"]) + row = SimpleNamespace(id="test_dag") # deliberately no data_interval_start attribute + with pytest.raises(NotImplementedError, match="data_interval_start"): + param.row_value(row, "data_interval_start") def test_primary_key_is_not_duplicated_when_alias_maps_to_pk(self): """Sorting by an alias that resolves to the PK must not append the PK a second time.""" diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 6edae03cb090b..06cb447fb4b38 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -2117,7 +2117,47 @@ def test_cursor_pagination_invalid_token(self, test_client, session): ) assert response.status_code == 400 - def test_task_group_filter_uses_run_version_not_latest(self, test_client, dag_maker, session): + def test_cursor_pagination_order_by_run_after_roundtrips(self, test_client, session): + """ + Sorting by ``run_after`` (a column-form ``to_replace`` backed by an association proxy) + must not raise a 500 when ``has_next=true``. Regression for + https://github.com/apache/airflow/issues/67970. + + Verify the full cursor round-trip: the first page must include a ``next_cursor``, + and following that cursor must return the remaining TIs without overlap. + """ + dag_id = "example_python_operator" + total_tis = 5 + self.create_task_instances( + session, + task_instances=[ + {"start_date": DEFAULT_DATETIME_1 + dt.timedelta(minutes=(i + 1))} for i in range(total_tis) + ], + dag_id=dag_id, + ) + # First page — limit < total so next_cursor must be present + response1 = test_client.get( + "/dags/~/dagRuns/~/taskInstances", + params={"limit": 3, "order_by": ["-run_after"], "cursor": ""}, + ) + assert response1.status_code == 200, response1.json() + body1 = response1.json() + assert len(body1["task_instances"]) == 3 + next_cursor = body1["next_cursor"] + assert next_cursor is not None, "next_cursor must be present when more rows exist" + + # Second page — follow the cursor; must not 500 and must return remaining TIs + response2 = test_client.get( + "/dags/~/dagRuns/~/taskInstances", + params={"limit": 10, "order_by": ["-run_after"], "cursor": next_cursor}, + ) + assert response2.status_code == 200, response2.json() + body2 = response2.json() + ids1 = {ti["id"] for ti in body1["task_instances"]} + ids2 = {ti["id"] for ti in body2["task_instances"]} + assert ids1.isdisjoint(ids2), "Pages must not overlap" + assert len(ids1) + len(ids2) == total_tis + """ Task group lookup should use the DAG version from the run, not the latest version. From 061ebcde6faefdda3052044edc9b3417208b36c4 Mon Sep 17 00:00:00 2001 From: Gayathri Srividya Rajavarapu Date: Wed, 10 Jun 2026 21:57:07 +0530 Subject: [PATCH 2/2] Fix: move uuid7 import to module level, use top-level update import --- .../routes/public/test_task_instances.py | 50 ++----------------- 1 file changed, 3 insertions(+), 47 deletions(-) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 06cb447fb4b38..3627ba62d6850 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -43,12 +43,12 @@ from airflow.models.dagbundle import DagBundleModel from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF from airflow.models.task_store import TaskStoreModel +from airflow.models.taskinstance import uuid7 from airflow.models.taskinstancehistory import TaskInstanceHistory from airflow.models.taskmap import TaskMap from airflow.models.team import Team from airflow.models.trigger import Trigger -from airflow.providers.standard.operators.empty import EmptyOperator -from airflow.sdk import BaseOperator, TaskGroup +from airflow.sdk import BaseOperator from airflow.state.metastore import MetastoreStoreBackend from airflow.utils.platform import getuser from airflow.utils.state import DagRunState, State, TaskInstanceState @@ -979,10 +979,6 @@ def test_rendered_map_index_order_stable_regardless_of_uuid_order(self, test_cli This verifies that even when UUIDs are assigned out of map_index order (as happens during retries), the response is still sorted 0, 1, 2, ... """ - from sqlalchemy import update as sa_update - - from airflow.models.taskinstance import uuid7 - self.create_dag_runs_with_mapped_tasks( dag_maker, session, @@ -994,7 +990,7 @@ def test_rendered_map_index_order_stable_regardless_of_uuid_order(self, test_cli # result must still follow integer map_index order, not UUID order. for map_index in [1, 3]: session.execute( - sa_update(TaskInstance) + update(TaskInstance) .where( TaskInstance.dag_id == "retry_dag", TaskInstance.task_id == "task_2", @@ -2158,46 +2154,6 @@ def test_cursor_pagination_order_by_run_after_roundtrips(self, test_client, sess assert ids1.isdisjoint(ids2), "Pages must not overlap" assert len(ids1) + len(ids2) == total_tis - """ - Task group lookup should use the DAG version from the run, not the latest version. - - When a task group is renamed between versions, clicking on a historical run's - task group in the grid should still resolve correctly against the version - that run was created with — not the latest version where the group may have - a different name, i.e serialized_dag might not have that taskgroup anymore. - """ - dag_id = "test_tg_version" - - # Version 1: task group named "process_data" - with dag_maker(dag_id, session=session): - with TaskGroup(group_id="process_data"): - EmptyOperator(task_id="step_1") - dag_maker.create_dagrun(run_id="run_v1") - session.commit() - - # Version 2: task group renamed to "process_data_v2" - with dag_maker(dag_id, session=session): - with TaskGroup(group_id="process_data_v2"): - EmptyOperator(task_id="step_1") - session.commit() - - # The run was created with v1 which had "process_data". - # Querying with the old group name must succeed. - response = test_client.get( - f"/dags/{dag_id}/dagRuns/run_v1/taskInstances", - params={"task_group_id": "process_data"}, - ) - assert response.status_code == 200, response.json() - assert response.json()["total_entries"] == 1 - assert response.json()["task_instances"][0]["task_id"] == "process_data.step_1" - - # The new group name should NOT be found in the old run's version. - response = test_client.get( - f"/dags/{dag_id}/dagRuns/run_v1/taskInstances", - params={"task_group_id": "process_data_v2"}, - ) - assert response.status_code == 404 - class TestGetTaskDependencies(TestTaskInstanceEndpoint): def setup_method(self):