From 60b9673a96a7dc036be2f36dce3873cd175ccd2a Mon Sep 17 00:00:00 2001 From: Deepak Kumar Date: Sat, 6 Jun 2026 18:29:18 -0700 Subject: [PATCH] feat(metrics): emit scheduler.executor_events_duration per executor --- .../src/airflow/jobs/scheduler_job_runner.py | 10 +++++++--- .../tests/unit/jobs/test_scheduler_job.py | 16 ++++++++++++++++ .../observability/metrics/metrics_template.yaml | 7 +++++++ 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index a6496665e8b0d..37c0e729438be 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1734,9 +1734,13 @@ def _run_scheduler_loop(self) -> None: with create_session() as session: num_finished_events = 0 for executor in self.executors: - num_finished_events += self._process_executor_events( - executor=executor, session=session - ) + with stats.timer( + "scheduler.executor_events_duration", + tags={"executor": type(executor).__name__}, + ): + num_finished_events += self._process_executor_events( + executor=executor, session=session + ) for executor in self.executors: try: diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index cca74569bdf0b..c7a6f18f26647 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -1289,6 +1289,22 @@ def test_executor_heartbeat_emits_timer(self, mock_executors, configure_testing_ for executor, timer_call in zip(self.job_runner.executors, heartbeat_calls): assert timer_call.kwargs.get("tags") == {"executor": type(executor).__name__} + def test_process_executor_events_emits_timer(self, mock_executors, configure_testing_dag_bundle): + with configure_testing_dag_bundle(os.devnull): + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=1) + with patch("airflow.jobs.scheduler_job_runner.stats.timer") as mock_timer: + self.job_runner._execute() + + event_calls = [ + timer_call + for timer_call in mock_timer.call_args_list + if timer_call.args and timer_call.args[0] == "scheduler.executor_events_duration" + ] + assert len(event_calls) == len(self.job_runner.executors) + for executor, timer_call in zip(self.job_runner.executors, event_calls): + assert timer_call.kwargs.get("tags") == {"executor": type(executor).__name__} + def test_executor_events_processed(self, mock_executors, configure_testing_dag_bundle): with configure_testing_dag_bundle(os.devnull): scheduler_job = Job() diff --git a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml index f4fe52e529848..36e1edfaac705 100644 --- a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml +++ b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml @@ -666,6 +666,13 @@ metrics: legacy_name: "-" name_variables: [] + - name: "scheduler.executor_events_duration" + description: "Milliseconds spent in ``_process_executor_events()`` per scheduler loop iteration, + tagged by executor class name so each configured executor is reported separately." + type: "timer" + legacy_name: "-" + name_variables: [] + - name: "dagrun.first_task_scheduling_delay" description: "Milliseconds elapsed between first task start_date and dagrun expected start" type: "timer"