Summary
Three bugs found in StackdriverRemoteLogIO (introduced in #65198) when running against
Airflow 3.2.2 with apache-airflow-providers-google==22.1.0rc1 on CeleryExecutor. All
three stem from the same root assumption: the implementation was written as if it runs in
the task subprocess, but in AF3's supervisor model REMOTE_TASK_LOG runs in the
supervisor process instead.
Reproduced on APC 1.1.2 / GKE / CeleryExecutor / Airflow 3.2.2.
Bug 1 — proc() ships empty labels to Cloud Logging
Location: StackdriverRemoteLogIO.processors → the proc closure.
What happens: Every log entry arrives in Cloud Logging with an empty labels dict.
The proc function reads record.task_instance to populate dag_id, task_id, etc.
but record.task_instance is never set in supervisor context — it is a task-subprocess
concept. The attribute is None (or absent) on every record the supervisor emits.
Fix: Parse labels from the structured log path that relative_path_from_logger()
returns in supervisor context. AF3's log path template is:
dag_id=<x>/run_id=<x>/task_id=<x>/attempt=<N>.log
All four label fields (dag_id, run_id, task_id, try_number) can be extracted from
this path with zero DB access and zero dependency on record.task_instance.
Bug 2 — read() filters on logical_date, which supervisors cannot derive
Location: StackdriverRemoteLogIO.read() → call to prepare_log_filter.
What happens: read() constructs the Cloud Logging filter using logical_date
(derived from ti.execution_date). But the supervisor has no DB connection to convert
run_id → logical_date. The filter either crashes or returns no results.
Fix: Filter on run_id instead. Both the write path (log path template, see Bug 1)
and the read path (ti.run_id) expose run_id without any DB lookup. The filter becomes:
{
"dag_id": ti.dag_id,
"task_id": ti.task_id,
"run_id": ti.run_id,
"try_number": str(ti.try_number),
}
This is consistent with how run_id is used on the write side via path-based labeling.
Bug 3 — transport.send() is unguarded, crashes supervised components on IAM/gRPC errors
Location: StackdriverRemoteLogIO.processors → proc closure → transport.send().
What happens: Any IAM permission error, gRPC connectivity failure, or quota exception
from transport.send() propagates uncaught. Because REMOTE_TASK_LOG applies to all
supervised components (scheduler, dag-processor, triggerer, workers), a single IAM
misconfiguration crashes the entire process. Observed: dag-processor pod enters
CrashLoopBackOff on every log emit when the Kubernetes Service Account lacks the
logging.logEntries.create IAM binding. This is a startup-time misconfiguration that is
easy to hit and should degrade gracefully.
Fix: Wrap transport.send() in a try/except Exception and emit a logging.warning
instead of propagating. Log delivery is best-effort; a GCL error should never kill a
task-executing process.
Workaround
A subclass of StackdriverRemoteLogIO patching all three issues:
class _GCLRemoteLogIO(StackdriverRemoteLogIO):
@cached_property
def processors(self):
transport = self.transport
resource = self.resource
log_record_factory = getLogRecordFactory()
def proc(logger, method_name, event):
rel = relative_path_from_logger(logger) if logger else None
if not rel:
return event
labels = _labels_from_path(rel) # parse dag_id/run_id/task_id/try_number from path
...
try:
transport.send(record, str(msg.get("event", "")), resource=resource, labels=labels)
except Exception as exc:
_log.warning("GCL send failed (%s): %s", type(exc).__name__, exc)
return event
return (proc,)
def read(self, relative_path, ti):
log_filter = self.prepare_log_filter({
"dag_id": ti.dag_id,
"task_id": ti.task_id,
"run_id": ti.run_id,
"try_number": str(ti.try_number),
})
messages, _, _ = self.read_logs(log_filter, next_page_token=None, all_pages=True)
return (
[f"Reading remote log from Stackdriver for {relative_path}"],
[messages] if messages else [],
)
End-to-end validated on Airflow 3.2.2 / CeleryExecutor / GKE. GCL entries carry all four
labels; read() returns correct log lines; no supervisor crashes on IAM errors.
Summary
Three bugs found in
StackdriverRemoteLogIO(introduced in #65198) when running againstAirflow 3.2.2 with
apache-airflow-providers-google==22.1.0rc1on CeleryExecutor. Allthree stem from the same root assumption: the implementation was written as if it runs in
the task subprocess, but in AF3's supervisor model
REMOTE_TASK_LOGruns in thesupervisor process instead.
Reproduced on APC 1.1.2 / GKE / CeleryExecutor / Airflow 3.2.2.
Bug 1 —
proc()ships empty labels to Cloud LoggingLocation:
StackdriverRemoteLogIO.processors→ theprocclosure.What happens: Every log entry arrives in Cloud Logging with an empty
labelsdict.The
procfunction readsrecord.task_instanceto populatedag_id,task_id, etc.but
record.task_instanceis never set in supervisor context — it is a task-subprocessconcept. The attribute is
None(or absent) on every record the supervisor emits.Fix: Parse labels from the structured log path that
relative_path_from_logger()returns in supervisor context. AF3's log path template is:
All four label fields (
dag_id,run_id,task_id,try_number) can be extracted fromthis path with zero DB access and zero dependency on
record.task_instance.Bug 2 —
read()filters onlogical_date, which supervisors cannot deriveLocation:
StackdriverRemoteLogIO.read()→ call toprepare_log_filter.What happens:
read()constructs the Cloud Logging filter usinglogical_date(derived from
ti.execution_date). But the supervisor has no DB connection to convertrun_id→logical_date. The filter either crashes or returns no results.Fix: Filter on
run_idinstead. Both the write path (log path template, see Bug 1)and the read path (
ti.run_id) exposerun_idwithout any DB lookup. The filter becomes:{ "dag_id": ti.dag_id, "task_id": ti.task_id, "run_id": ti.run_id, "try_number": str(ti.try_number), }This is consistent with how
run_idis used on the write side via path-based labeling.Bug 3 —
transport.send()is unguarded, crashes supervised components on IAM/gRPC errorsLocation:
StackdriverRemoteLogIO.processors→procclosure →transport.send().What happens: Any IAM permission error, gRPC connectivity failure, or quota exception
from
transport.send()propagates uncaught. BecauseREMOTE_TASK_LOGapplies to allsupervised components (scheduler, dag-processor, triggerer, workers), a single IAM
misconfiguration crashes the entire process. Observed: dag-processor pod enters
CrashLoopBackOffon every log emit when the Kubernetes Service Account lacks thelogging.logEntries.createIAM binding. This is a startup-time misconfiguration that iseasy to hit and should degrade gracefully.
Fix: Wrap
transport.send()in atry/except Exceptionand emit alogging.warninginstead of propagating. Log delivery is best-effort; a GCL error should never kill a
task-executing process.
Workaround
A subclass of
StackdriverRemoteLogIOpatching all three issues:End-to-end validated on Airflow 3.2.2 / CeleryExecutor / GKE. GCL entries carry all four
labels;
read()returns correct log lines; no supervisor crashes on IAM errors.