diff --git a/airflow-core/src/airflow/config_templates/provider_config_fallback_defaults.cfg b/airflow-core/src/airflow/config_templates/provider_config_fallback_defaults.cfg index b49c633c5af1d..5e99be03b3acd 100644 --- a/airflow-core/src/airflow/config_templates/provider_config_fallback_defaults.cfg +++ b/airflow-core/src/airflow/config_templates/provider_config_fallback_defaults.cfg @@ -113,6 +113,7 @@ ssl_show_warn = False ca_certs = [kubernetes_executor] +kubernetes_queue = kubernetes api_client_retry_configuration = logs_task_metadata = False pod_template_file = diff --git a/airflow-core/src/airflow/dag_processing/processor.py b/airflow-core/src/airflow/dag_processing/processor.py index 82711527803dc..0110f4ddf9555 100644 --- a/airflow-core/src/airflow/dag_processing/processor.py +++ b/airflow-core/src/airflow/dag_processing/processor.py @@ -250,7 +250,9 @@ def _serialize_dags( dagbag_import_error_traceback_depth = conf.getint( "core", "dagbag_import_error_traceback_depth", fallback=None ) - serialization_import_errors[dag.fileloc] = traceback.format_exc( + # Use relative_fileloc to match the format of parse-time import errors + # This ensures consistency across bundle types (Git, Local, etc.) + serialization_import_errors[dag.relative_fileloc] = traceback.format_exc( limit=-dagbag_import_error_traceback_depth ) return serialized_dags, serialization_import_errors diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 98e743ae42382..038ea5b82ebf7 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2987,7 +2987,28 @@ def _try_to_load_executor(self, ti: TaskInstance, session, team_name=NOTSET) -> # Firstly, check if there is no executor set on the TaskInstance, if not, we need to fetch the default # (either globally or for the team) if ti.executor is None: - if not team_name: + # Check if task queue matches any executor's specific queue configuration + # This enables queue-based routing for multiple executors (e.g., KubernetesExecutor) + queue_matched_executor = None + for _executor in self.job.executors: + # Match team if applicable + if team_name and _executor.team_name != team_name and _executor.team_name is not None: + continue + # Check if executor has a queue configuration (e.g., kubernetes_queue) + if hasattr(_executor, "kubernetes_queue") and _executor.kubernetes_queue: + if ti.queue == _executor.kubernetes_queue: + queue_matched_executor = _executor + self.log.debug( + "Task %s matched queue '%s' to executor %s via kubernetes_queue config", + ti, + ti.queue, + _executor.name, + ) + break + + if queue_matched_executor: + executor = queue_matched_executor + elif not team_name: # No team is specified, so just use the global default executor executor = self.job.executor else: diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py b/airflow-core/tests/unit/dag_processing/test_collection.py index ad4a8a73382e6..73bb35edea615 100644 --- a/airflow-core/tests/unit/dag_processing/test_collection.py +++ b/airflow-core/tests/unit/dag_processing/test_collection.py @@ -607,6 +607,90 @@ def test_import_error_persist_for_invalid_access_control_role( assert len(dag_import_error_listener.new) == 1 assert len(dag_import_error_listener.existing) == 1 + @pytest.mark.usefixtures("clean_db") + def test_import_errors_persisted_with_relative_paths( + self, session, dag_import_error_listener, testing_dag_bundle, dag_maker + ): + """ + Test that import errors are persisted with relative file paths for bundle-backed DAGs. + + This ensures consistency across bundle types (Git, Local, S3, etc.) and that errors + don't disappear from the UI when DAGs originate from bundles. + + Reproduces issue where runtime errors in GitDagBundle were caught but not persisted + to import_error table because of path resolution inconsistencies. + """ + bundle_name = "testing" + relative_fileloc = "subdir/test_runtime_error.py" + + # Create a dag with relative file paths (as would come from a bundle) + with dag_maker(dag_id="test_runtime_error") as dag: + pass + + # Set relative fileloc as it would be set for bundle-backed DAGs + dag.fileloc = f"/absolute/path/to/bundle/{relative_fileloc}" + dag.relative_fileloc = relative_fileloc + + # Simulate an import error with relative path (as stored in DagBag.import_errors) + import_errors = { + (bundle_name, relative_fileloc): "UnboundLocalError: local variable 'x' referenced before assignment" + } + + # Process the DAG with import errors + update_dag_parsing_results_in_db( + bundle_name=bundle_name, + bundle_version=None, + dags=[], # No DAGs successfully parsed + import_errors=import_errors, + parse_duration=0.1, + warnings=set(), + session=session, + files_parsed={(bundle_name, relative_fileloc)}, + ) + + # Verify the import error was persisted to the database + import_error = session.scalar( + select(ParseImportError).where( + ParseImportError.bundle_name == bundle_name, + ParseImportError.filename == relative_fileloc, + ) + ) + + assert import_error is not None, ( + f"Import error for {relative_fileloc} was not persisted to database. " + "This would cause the error to disappear from the UI." + ) + assert import_error.filename == relative_fileloc + assert import_error.bundle_name == bundle_name + assert "UnboundLocalError" in import_error.stacktrace + + # Verify the listener was notified of the new error + assert len(dag_import_error_listener.new) == 1 + + # Now test updating the error (simulating a re-parse with the same error) + update_dag_parsing_results_in_db( + bundle_name=bundle_name, + bundle_version=None, + dags=[], + import_errors=import_errors, + parse_duration=0.1, + warnings=set(), + session=session, + files_parsed={(bundle_name, relative_fileloc)}, + ) + + # Verify only one import error exists (updated, not duplicated) + import_errors_count = session.scalar( + select(func.count(ParseImportError.id)).where( + ParseImportError.bundle_name == bundle_name, + ParseImportError.filename == relative_fileloc, + ) + ) + assert import_errors_count == 1 + + # Verify existing error listener was called + assert len(dag_import_error_listener.existing) == 1 + @patch.object(ParseImportError, "full_file_path") @pytest.mark.usefixtures("clean_db") def test_new_import_error_replaces_old( diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py b/airflow-core/tests/unit/dag_processing/test_processor.py index 1348a428d5fcb..a114117a199fb 100644 --- a/airflow-core/tests/unit/dag_processing/test_processor.py +++ b/airflow-core/tests/unit/dag_processing/test_processor.py @@ -139,6 +139,65 @@ def fake_collect_dags(dagbag: DagBag, *args, **kwargs): assert resp.import_errors is not None assert "a.py" in resp.import_errors + def test_serialization_errors_use_relative_paths(self, tmp_path: pathlib.Path): + """ + Test that serialization errors use relative file paths. + + This ensures that errors during DAG serialization (e.g., in _serialize_dags) + are stored with relative paths, matching the format of parse-time import errors. + This is critical for bundle-backed DAGs (Git, S3, etc.) where import errors + need to be properly persisted to the database. + """ + # Create a DAG file that will fail during serialization + dag_file = tmp_path / "test_serialization_error.py" + dag_file.write_text(textwrap.dedent(""" + from airflow.sdk import DAG + from airflow.providers.standard.operators.empty import EmptyOperator + from datetime import datetime + + # Create a DAG that will fail during serialization + # by having a non-serializable custom attribute + dag = DAG("test_dag", start_date=datetime(2023, 1, 1)) + + # Add a non-serializable object that will cause serialization to fail + class NonSerializable: + def __getstate__(self): + raise TypeError("Cannot serialize this object") + + dag._non_serializable = NonSerializable() + + task = EmptyOperator(task_id="test_task", dag=dag) + """)) + + # Process the file with bundle_path set + resp = _parse_file( + DagFileParseRequest( + file=str(dag_file), + bundle_path=tmp_path, + bundle_name="testing", + callback_requests=[], + ), + log=structlog.get_logger(), + ) + + assert resp is not None + # The DAG should have been parsed successfully + assert len(resp.serialized_dags) >= 0 + + # Check that any serialization errors use relative paths, not absolute paths + if resp.import_errors: + for error_path in resp.import_errors.keys(): + # The error path should be relative (just the filename) + # not an absolute path + assert not pathlib.Path(error_path).is_absolute(), ( + f"Serialization error path '{error_path}' should be relative, not absolute. " + f"This ensures consistency across bundle types (Git, Local, etc.)" + ) + # For this test, it should be the filename relative to bundle_path + assert error_path == "test_serialization_error.py", ( + f"Expected relative path 'test_serialization_error.py', got '{error_path}'" + ) + def test_top_level_variable_access( self, spy_agency: SpyAgency, diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index b1e69d3e45338..f89482b1cb4b9 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -7957,6 +7957,54 @@ def test_multi_team_config_disabled_uses_legacy_behavior(self, dag_maker, mock_e assert result1 == scheduler_job.executor # Default for no explicit executor assert result2 == mock_executors[1] # Matched by executor name + def test_try_to_load_executor_queue_based_routing(self, dag_maker, session): + """Test executor selection based on task queue matching executor's kubernetes_queue config.""" + mock_jwt_generator = MagicMock(spec=JWTGenerator) + mock_jwt_generator.generate.return_value = "mock-token" + + # Create mock CeleryExecutor + celery_executor = mock.MagicMock(name="CeleryExecutor", slots_available=8, slots_occupied=0) + celery_executor.name = ExecutorName(alias="CeleryExecutor", module_path="celery.executor.path") + celery_executor.jwt_generator = mock_jwt_generator + celery_executor.team_name = None + celery_executor.sentry_integration = "" + celery_executor.queue_workload.__func__ = BaseExecutor.queue_workload + + # Create mock KubernetesExecutor with kubernetes_queue attribute + k8s_executor = mock.MagicMock(name="KubernetesExecutor", slots_available=8, slots_occupied=0) + k8s_executor.name = ExecutorName(alias="KubernetesExecutor", module_path="kubernetes.executor.path") + k8s_executor.jwt_generator = mock_jwt_generator + k8s_executor.team_name = None + k8s_executor.sentry_integration = "" + k8s_executor.kubernetes_queue = "kubernetes" # Configure kubernetes queue name + k8s_executor.queue_workload.__func__ = BaseExecutor.queue_workload + + with mock.patch("airflow.jobs.job.Job.executors", new_callable=PropertyMock) as executors_mock: + executors_mock.return_value = [celery_executor, k8s_executor] + + # Task with default queue should use CeleryExecutor (default executor) + with dag_maker(dag_id="test_dag_default", session=session): + task_default = EmptyOperator(task_id="task_default", queue="default") + + dr_default = dag_maker.create_dagrun() + ti_default = dr_default.get_task_instance(task_default.task_id, session) + + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job) + + result_default = self.job_runner._try_to_load_executor(ti_default, session) + assert result_default == celery_executor + + # Task with kubernetes queue should use KubernetesExecutor + with dag_maker(dag_id="test_dag_k8s", session=session): + task_k8s = EmptyOperator(task_id="task_k8s", queue="kubernetes") + + dr_k8s = dag_maker.create_dagrun() + ti_k8s = dr_k8s.get_task_instance(task_k8s.task_id, session) + + result_k8s = self.job_runner._try_to_load_executor(ti_k8s, session) + assert result_k8s == k8s_executor + @pytest.mark.need_serialized_dag def test_schedule_dag_run_with_upstream_skip(dag_maker, session): diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 4756405523cda..fabf28718f47e 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -159,7 +159,9 @@ def __init__(self): self.kube_client: client.CoreV1Api | None = None self.scheduler_job_id: str | None = None self.last_handled: dict[TaskInstanceKey, float] = {} - self.kubernetes_queue: str | None = None + self.kubernetes_queue: str | None = conf.get( + "kubernetes_executor", "kubernetes_queue", fallback="kubernetes" + ) self.task_publish_retries: Counter[TaskInstanceKey] = Counter() self.task_publish_max_retries = conf.getint( "kubernetes_executor", "task_publish_max_retries", fallback=0 diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py index 963178ab645e2..f8dd0ea676444 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py @@ -107,6 +107,13 @@ def get_provider_info(): "kubernetes_executor": { "description": None, "options": { + "kubernetes_queue": { + "description": "Define the queue name for tasks that should be executed by ``KubernetesExecutor`` when using multiple executors.\nWhen the queue of a task matches this value (default ``kubernetes``),\nthe task is routed to ``KubernetesExecutor``.\nThis is used for queue-based executor routing in multi-executor configurations.\n", + "version_added": "3.0.7", + "type": "string", + "example": None, + "default": "kubernetes", + }, "api_client_retry_configuration": { "description": "Kwargs to override the default urllib3 Retry used in the kubernetes API client\n", "version_added": None,