Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ ssl_show_warn = False
ca_certs =

[kubernetes_executor]
kubernetes_queue = kubernetes
api_client_retry_configuration =
logs_task_metadata = False
pod_template_file =
Expand Down
4 changes: 3 additions & 1 deletion airflow-core/src/airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ def _serialize_dags(
dagbag_import_error_traceback_depth = conf.getint(
"core", "dagbag_import_error_traceback_depth", fallback=None
)
serialization_import_errors[dag.fileloc] = traceback.format_exc(
# Use relative_fileloc to match the format of parse-time import errors
# This ensures consistency across bundle types (Git, Local, etc.)
serialization_import_errors[dag.relative_fileloc] = traceback.format_exc(
limit=-dagbag_import_error_traceback_depth
)
return serialized_dags, serialization_import_errors
Expand Down
23 changes: 22 additions & 1 deletion airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2987,7 +2987,28 @@ def _try_to_load_executor(self, ti: TaskInstance, session, team_name=NOTSET) ->
# Firstly, check if there is no executor set on the TaskInstance, if not, we need to fetch the default
# (either globally or for the team)
if ti.executor is None:
if not team_name:
# Check if task queue matches any executor's specific queue configuration
# This enables queue-based routing for multiple executors (e.g., KubernetesExecutor)
queue_matched_executor = None
for _executor in self.job.executors:
# Match team if applicable
if team_name and _executor.team_name != team_name and _executor.team_name is not None:
continue
# Check if executor has a queue configuration (e.g., kubernetes_queue)
if hasattr(_executor, "kubernetes_queue") and _executor.kubernetes_queue:
if ti.queue == _executor.kubernetes_queue:
queue_matched_executor = _executor
self.log.debug(
"Task %s matched queue '%s' to executor %s via kubernetes_queue config",
ti,
ti.queue,
_executor.name,
)
break

if queue_matched_executor:
executor = queue_matched_executor
elif not team_name:
# No team is specified, so just use the global default executor
executor = self.job.executor
else:
Expand Down
84 changes: 84 additions & 0 deletions airflow-core/tests/unit/dag_processing/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,90 @@ def test_import_error_persist_for_invalid_access_control_role(
assert len(dag_import_error_listener.new) == 1
assert len(dag_import_error_listener.existing) == 1

@pytest.mark.usefixtures("clean_db")
def test_import_errors_persisted_with_relative_paths(
self, session, dag_import_error_listener, testing_dag_bundle, dag_maker
):
"""
Test that import errors are persisted with relative file paths for bundle-backed DAGs.
This ensures consistency across bundle types (Git, Local, S3, etc.) and that errors
don't disappear from the UI when DAGs originate from bundles.
Reproduces issue where runtime errors in GitDagBundle were caught but not persisted
to import_error table because of path resolution inconsistencies.
"""
bundle_name = "testing"
relative_fileloc = "subdir/test_runtime_error.py"

# Create a dag with relative file paths (as would come from a bundle)
with dag_maker(dag_id="test_runtime_error") as dag:
pass

# Set relative fileloc as it would be set for bundle-backed DAGs
dag.fileloc = f"/absolute/path/to/bundle/{relative_fileloc}"
dag.relative_fileloc = relative_fileloc

# Simulate an import error with relative path (as stored in DagBag.import_errors)
import_errors = {
(bundle_name, relative_fileloc): "UnboundLocalError: local variable 'x' referenced before assignment"
}

# Process the DAG with import errors
update_dag_parsing_results_in_db(
bundle_name=bundle_name,
bundle_version=None,
dags=[], # No DAGs successfully parsed
import_errors=import_errors,
parse_duration=0.1,
warnings=set(),
session=session,
files_parsed={(bundle_name, relative_fileloc)},
)

# Verify the import error was persisted to the database
import_error = session.scalar(
select(ParseImportError).where(
ParseImportError.bundle_name == bundle_name,
ParseImportError.filename == relative_fileloc,
)
)

assert import_error is not None, (
f"Import error for {relative_fileloc} was not persisted to database. "
"This would cause the error to disappear from the UI."
)
assert import_error.filename == relative_fileloc
assert import_error.bundle_name == bundle_name
assert "UnboundLocalError" in import_error.stacktrace

# Verify the listener was notified of the new error
assert len(dag_import_error_listener.new) == 1

# Now test updating the error (simulating a re-parse with the same error)
update_dag_parsing_results_in_db(
bundle_name=bundle_name,
bundle_version=None,
dags=[],
import_errors=import_errors,
parse_duration=0.1,
warnings=set(),
session=session,
files_parsed={(bundle_name, relative_fileloc)},
)

# Verify only one import error exists (updated, not duplicated)
import_errors_count = session.scalar(
select(func.count(ParseImportError.id)).where(
ParseImportError.bundle_name == bundle_name,
ParseImportError.filename == relative_fileloc,
)
)
assert import_errors_count == 1

# Verify existing error listener was called
assert len(dag_import_error_listener.existing) == 1

@patch.object(ParseImportError, "full_file_path")
@pytest.mark.usefixtures("clean_db")
def test_new_import_error_replaces_old(
Expand Down
59 changes: 59 additions & 0 deletions airflow-core/tests/unit/dag_processing/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,65 @@ def fake_collect_dags(dagbag: DagBag, *args, **kwargs):
assert resp.import_errors is not None
assert "a.py" in resp.import_errors

def test_serialization_errors_use_relative_paths(self, tmp_path: pathlib.Path):
"""
Test that serialization errors use relative file paths.
This ensures that errors during DAG serialization (e.g., in _serialize_dags)
are stored with relative paths, matching the format of parse-time import errors.
This is critical for bundle-backed DAGs (Git, S3, etc.) where import errors
need to be properly persisted to the database.
"""
# Create a DAG file that will fail during serialization
dag_file = tmp_path / "test_serialization_error.py"
dag_file.write_text(textwrap.dedent("""
from airflow.sdk import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from datetime import datetime
# Create a DAG that will fail during serialization
# by having a non-serializable custom attribute
dag = DAG("test_dag", start_date=datetime(2023, 1, 1))
# Add a non-serializable object that will cause serialization to fail
class NonSerializable:
def __getstate__(self):
raise TypeError("Cannot serialize this object")
dag._non_serializable = NonSerializable()
task = EmptyOperator(task_id="test_task", dag=dag)
"""))

# Process the file with bundle_path set
resp = _parse_file(
DagFileParseRequest(
file=str(dag_file),
bundle_path=tmp_path,
bundle_name="testing",
callback_requests=[],
),
log=structlog.get_logger(),
)

assert resp is not None
# The DAG should have been parsed successfully
assert len(resp.serialized_dags) >= 0

# Check that any serialization errors use relative paths, not absolute paths
if resp.import_errors:
for error_path in resp.import_errors.keys():
# The error path should be relative (just the filename)
# not an absolute path
assert not pathlib.Path(error_path).is_absolute(), (
f"Serialization error path '{error_path}' should be relative, not absolute. "
f"This ensures consistency across bundle types (Git, Local, etc.)"
)
# For this test, it should be the filename relative to bundle_path
assert error_path == "test_serialization_error.py", (
f"Expected relative path 'test_serialization_error.py', got '{error_path}'"
)

def test_top_level_variable_access(
self,
spy_agency: SpyAgency,
Expand Down
48 changes: 48 additions & 0 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -7957,6 +7957,54 @@ def test_multi_team_config_disabled_uses_legacy_behavior(self, dag_maker, mock_e
assert result1 == scheduler_job.executor # Default for no explicit executor
assert result2 == mock_executors[1] # Matched by executor name

def test_try_to_load_executor_queue_based_routing(self, dag_maker, session):
"""Test executor selection based on task queue matching executor's kubernetes_queue config."""
mock_jwt_generator = MagicMock(spec=JWTGenerator)
mock_jwt_generator.generate.return_value = "mock-token"

# Create mock CeleryExecutor
celery_executor = mock.MagicMock(name="CeleryExecutor", slots_available=8, slots_occupied=0)
celery_executor.name = ExecutorName(alias="CeleryExecutor", module_path="celery.executor.path")
celery_executor.jwt_generator = mock_jwt_generator
celery_executor.team_name = None
celery_executor.sentry_integration = ""
celery_executor.queue_workload.__func__ = BaseExecutor.queue_workload

# Create mock KubernetesExecutor with kubernetes_queue attribute
k8s_executor = mock.MagicMock(name="KubernetesExecutor", slots_available=8, slots_occupied=0)
k8s_executor.name = ExecutorName(alias="KubernetesExecutor", module_path="kubernetes.executor.path")
k8s_executor.jwt_generator = mock_jwt_generator
k8s_executor.team_name = None
k8s_executor.sentry_integration = ""
k8s_executor.kubernetes_queue = "kubernetes" # Configure kubernetes queue name
k8s_executor.queue_workload.__func__ = BaseExecutor.queue_workload

with mock.patch("airflow.jobs.job.Job.executors", new_callable=PropertyMock) as executors_mock:
executors_mock.return_value = [celery_executor, k8s_executor]

# Task with default queue should use CeleryExecutor (default executor)
with dag_maker(dag_id="test_dag_default", session=session):
task_default = EmptyOperator(task_id="task_default", queue="default")

dr_default = dag_maker.create_dagrun()
ti_default = dr_default.get_task_instance(task_default.task_id, session)

scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job)

result_default = self.job_runner._try_to_load_executor(ti_default, session)
assert result_default == celery_executor

# Task with kubernetes queue should use KubernetesExecutor
with dag_maker(dag_id="test_dag_k8s", session=session):
task_k8s = EmptyOperator(task_id="task_k8s", queue="kubernetes")

dr_k8s = dag_maker.create_dagrun()
ti_k8s = dr_k8s.get_task_instance(task_k8s.task_id, session)

result_k8s = self.job_runner._try_to_load_executor(ti_k8s, session)
assert result_k8s == k8s_executor


@pytest.mark.need_serialized_dag
def test_schedule_dag_run_with_upstream_skip(dag_maker, session):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ def __init__(self):
self.kube_client: client.CoreV1Api | None = None
self.scheduler_job_id: str | None = None
self.last_handled: dict[TaskInstanceKey, float] = {}
self.kubernetes_queue: str | None = None
self.kubernetes_queue: str | None = conf.get(
"kubernetes_executor", "kubernetes_queue", fallback="kubernetes"
)
self.task_publish_retries: Counter[TaskInstanceKey] = Counter()
self.task_publish_max_retries = conf.getint(
"kubernetes_executor", "task_publish_max_retries", fallback=0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ def get_provider_info():
"kubernetes_executor": {
"description": None,
"options": {
"kubernetes_queue": {
"description": "Define the queue name for tasks that should be executed by ``KubernetesExecutor`` when using multiple executors.\nWhen the queue of a task matches this value (default ``kubernetes``),\nthe task is routed to ``KubernetesExecutor``.\nThis is used for queue-based executor routing in multi-executor configurations.\n",
"version_added": "3.0.7",
"type": "string",
"example": None,
"default": "kubernetes",
},
"api_client_retry_configuration": {
"description": "Kwargs to override the default urllib3 Retry used in the kubernetes API client\n",
"version_added": None,
Expand Down
Loading