Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ def proc(
method_name: str,
event: structlog.typing.EventDict,
):
if not logger or not relative_path_from_logger(logger):
relative = relative_path_from_logger(logger) if logger else None
if not logger or not relative:
return event

name = event.get("logger_name") or event.get("logger", "")
Expand Down Expand Up @@ -171,6 +172,13 @@ def proc(
labels.update(self.labels)
if ti:
labels.update(_task_instance_to_labels(ti))
else:
# In AF3 supervisor context record.task_instance is not set.
# Parse dag_id / task_id / try_number from the structured
# log path so Cloud Logging entries carry labels even when
# the handler runs in the supervisor process.
path_labels = _labels_from_path(str(relative))
labels.update(path_labels)
_transport.send(record, str(msg.get("event", "")), resource=self.resource, labels=labels)
return event

Expand Down Expand Up @@ -266,6 +274,39 @@ def _read_single_logs_page(self, log_filter: str, page_token: str | None = None)
return "\n".join(messages), page.next_page_token


def _labels_from_path(relative_path: str) -> dict[str, str]:
"""Parse AF3 log path into Stackdriver labels.

AF3's log path template is::

dag_id=<x>/run_id=<x>/task_id=<x>/attempt=<N>.log

All four label fields are extracted with zero DB access. When the path
does not match the expected format the function returns an empty dict
so callers can fall back to other label sources.
"""
# Strip the trailing file extension (.log) and split into segments
stem = relative_path.rsplit(".", 1)[0] if "." in relative_path else relative_path
segments = stem.split("/")
labels: dict[str, str] = {}
for segment in segments:
if "=" not in segment:
continue
key, _, value = segment.partition("=")
if key == "dag_id":
labels[LABEL_DAG_ID] = value
elif key == "task_id":
labels[LABEL_TASK_ID] = value
elif key == "attempt":
labels[LABEL_TRY_NUMBER] = value
elif key == "run_id":
# run_id is NOT a standard Stackdriver label yet, but it is used
# on the write side via the log path template. Store it so the
# read path can filter on it (Bug 2 will wire this up).
pass
return labels


def _task_instance_to_labels(ti) -> dict[str, str]:
"""Convert a task instance to Stackdriver labels."""
return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,52 @@ def test_processors_skips_non_task_logger(self, mock_client, mock_get_creds_and_
assert result is event
mock_transport_type.return_value.send.assert_not_called()

@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="airflow.sdk.log only exists in Airflow 3+")
@mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id")
@mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client")
def test_processors_extracts_labels_from_path_in_supervisor_context(
self, mock_client, mock_get_creds_and_project_id
):
"""Labels are parsed from the AF3 log path when task_instance is not set.

In AF3 supervisor context ``record.task_instance`` is None because it is
a task-subprocess concept. The handler should fall back to extracting
``dag_id``, ``task_id``, and ``try_number`` from the structured log path.
"""
mock_get_creds_and_project_id.return_value = ("creds", "project_id")

mock_transport_type = mock.MagicMock()
af3_path = "dag_id=my_dag/run_id=scheduled__2026-06-08T00:00:00+00:00/task_id=print_date/attempt=3.log"
with mock.patch("airflow.sdk.log.relative_path_from_logger", return_value=af3_path):
io = StackdriverRemoteLogIO(
base_log_folder=self.local_log_location,
gcp_log_name="airflow",
transport_type=mock_transport_type,
)
proc = io.processors[0]

event = {
"event": "task log line",
"logger_name": "airflow.task",
"timestamp": "2026-06-08T10:30:00+00:00",
}
proc(mock.MagicMock(), "info", event)

mock_transport = mock_transport_type.return_value
mock_transport.send.assert_called_once()
labels = mock_transport.send.call_args[1]["labels"]
assert labels["dag_id"] == "my_dag"
assert labels["task_id"] == "print_date"
assert labels["try_number"] == "3"

def test_labels_from_path_returns_empty_on_unexpected_format(self):
"""_labels_from_path returns an empty dict when the path doesn't match."""
from airflow.providers.google.cloud.log.stackdriver_task_handler import _labels_from_path

assert _labels_from_path("") == {}
assert _labels_from_path("no_equals_here.log") == {}
assert _labels_from_path("random/path/without/keys.log") == {}


@pytest.mark.usefixtures("clean_stackdriver_handlers")
@mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id")
Expand Down