Skip to content

StackdriverRemoteLogIO: three bugs in AF3 supervisor context — empty labels, broken read filter, unguarded send crash #68240

@leestro

Description

@leestro

Summary

Three bugs found in StackdriverRemoteLogIO (introduced in #65198) when running against
Airflow 3.2.2 with apache-airflow-providers-google==22.1.0rc1 on CeleryExecutor. All
three stem from the same root assumption: the implementation was written as if it runs in
the task subprocess, but in AF3's supervisor model REMOTE_TASK_LOG runs in the
supervisor process instead.

Reproduced on APC 1.1.2 / GKE / CeleryExecutor / Airflow 3.2.2.


Bug 1 — proc() ships empty labels to Cloud Logging

Location: StackdriverRemoteLogIO.processors → the proc closure.

What happens: Every log entry arrives in Cloud Logging with an empty labels dict.
The proc function reads record.task_instance to populate dag_id, task_id, etc.
but record.task_instance is never set in supervisor context — it is a task-subprocess
concept. The attribute is None (or absent) on every record the supervisor emits.

Fix: Parse labels from the structured log path that relative_path_from_logger()
returns in supervisor context. AF3's log path template is:

dag_id=<x>/run_id=<x>/task_id=<x>/attempt=<N>.log

All four label fields (dag_id, run_id, task_id, try_number) can be extracted from
this path with zero DB access and zero dependency on record.task_instance.


Bug 2 — read() filters on logical_date, which supervisors cannot derive

Location: StackdriverRemoteLogIO.read() → call to prepare_log_filter.

What happens: read() constructs the Cloud Logging filter using logical_date
(derived from ti.execution_date). But the supervisor has no DB connection to convert
run_idlogical_date. The filter either crashes or returns no results.

Fix: Filter on run_id instead. Both the write path (log path template, see Bug 1)
and the read path (ti.run_id) expose run_id without any DB lookup. The filter becomes:

{
    "dag_id": ti.dag_id,
    "task_id": ti.task_id,
    "run_id": ti.run_id,
    "try_number": str(ti.try_number),
}

This is consistent with how run_id is used on the write side via path-based labeling.


Bug 3 — transport.send() is unguarded, crashes supervised components on IAM/gRPC errors

Location: StackdriverRemoteLogIO.processorsproc closure → transport.send().

What happens: Any IAM permission error, gRPC connectivity failure, or quota exception
from transport.send() propagates uncaught. Because REMOTE_TASK_LOG applies to all
supervised components (scheduler, dag-processor, triggerer, workers), a single IAM
misconfiguration crashes the entire process. Observed: dag-processor pod enters
CrashLoopBackOff on every log emit when the Kubernetes Service Account lacks the
logging.logEntries.create IAM binding. This is a startup-time misconfiguration that is
easy to hit and should degrade gracefully.

Fix: Wrap transport.send() in a try/except Exception and emit a logging.warning
instead of propagating. Log delivery is best-effort; a GCL error should never kill a
task-executing process.


Workaround

A subclass of StackdriverRemoteLogIO patching all three issues:

class _GCLRemoteLogIO(StackdriverRemoteLogIO):
    @cached_property
    def processors(self):
        transport = self.transport
        resource = self.resource
        log_record_factory = getLogRecordFactory()

        def proc(logger, method_name, event):
            rel = relative_path_from_logger(logger) if logger else None
            if not rel:
                return event
            labels = _labels_from_path(rel)  # parse dag_id/run_id/task_id/try_number from path
            ...
            try:
                transport.send(record, str(msg.get("event", "")), resource=resource, labels=labels)
            except Exception as exc:
                _log.warning("GCL send failed (%s): %s", type(exc).__name__, exc)
            return event

        return (proc,)

    def read(self, relative_path, ti):
        log_filter = self.prepare_log_filter({
            "dag_id": ti.dag_id,
            "task_id": ti.task_id,
            "run_id": ti.run_id,
            "try_number": str(ti.try_number),
        })
        messages, _, _ = self.read_logs(log_filter, next_page_token=None, all_pages=True)
        return (
            [f"Reading remote log from Stackdriver for {relative_path}"],
            [messages] if messages else [],
        )

End-to-end validated on Airflow 3.2.2 / CeleryExecutor / GKE. GCL entries carry all four
labels; read() returns correct log lines; no supervisor crashes on IAM errors.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions