Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1734,9 +1734,13 @@ def _run_scheduler_loop(self) -> None:
with create_session() as session:
num_finished_events = 0
for executor in self.executors:
num_finished_events += self._process_executor_events(
executor=executor, session=session
)
with stats.timer(
"scheduler.executor_events_duration",
tags={"executor": type(executor).__name__},
):
num_finished_events += self._process_executor_events(
executor=executor, session=session
)

for executor in self.executors:
try:
Expand Down
16 changes: 16 additions & 0 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1289,6 +1289,22 @@ def test_executor_heartbeat_emits_timer(self, mock_executors, configure_testing_
for executor, timer_call in zip(self.job_runner.executors, heartbeat_calls):
assert timer_call.kwargs.get("tags") == {"executor": type(executor).__name__}

def test_process_executor_events_emits_timer(self, mock_executors, configure_testing_dag_bundle):
with configure_testing_dag_bundle(os.devnull):
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=1)
with patch("airflow.jobs.scheduler_job_runner.stats.timer") as mock_timer:
self.job_runner._execute()

event_calls = [
timer_call
for timer_call in mock_timer.call_args_list
if timer_call.args and timer_call.args[0] == "scheduler.executor_events_duration"
]
assert len(event_calls) == len(self.job_runner.executors)
for executor, timer_call in zip(self.job_runner.executors, event_calls):
assert timer_call.kwargs.get("tags") == {"executor": type(executor).__name__}

def test_executor_events_processed(self, mock_executors, configure_testing_dag_bundle):
with configure_testing_dag_bundle(os.devnull):
scheduler_job = Job()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,13 @@ metrics:
legacy_name: "-"
name_variables: []

- name: "scheduler.executor_events_duration"
description: "Milliseconds spent in ``_process_executor_events()`` per scheduler loop iteration,
tagged by executor class name so each configured executor is reported separately."
type: "timer"
legacy_name: "-"
name_variables: []

- name: "dagrun.first_task_scheduling_delay"
description: "Milliseconds elapsed between first task start_date and dagrun expected start"
type: "timer"
Expand Down
Loading