Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion airflow-core/src/airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ def _serialize_dags(
dagbag_import_error_traceback_depth = conf.getint(
"core", "dagbag_import_error_traceback_depth", fallback=None
)
serialization_import_errors[dag.fileloc] = traceback.format_exc(
# Use relative_fileloc to match the format of parse-time import errors
# This ensures consistency across bundle types (Git, Local, etc.)
serialization_import_errors[dag.relative_fileloc] = traceback.format_exc(
limit=-dagbag_import_error_traceback_depth
)
return serialized_dags, serialization_import_errors
Expand Down
84 changes: 84 additions & 0 deletions airflow-core/tests/unit/dag_processing/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,90 @@ def test_import_error_persist_for_invalid_access_control_role(
assert len(dag_import_error_listener.new) == 1
assert len(dag_import_error_listener.existing) == 1

@pytest.mark.usefixtures("clean_db")
def test_import_errors_persisted_with_relative_paths(
self, session, dag_import_error_listener, testing_dag_bundle, dag_maker
):
"""
Test that import errors are persisted with relative file paths for bundle-backed DAGs.

This ensures consistency across bundle types (Git, Local, S3, etc.) and that errors
don't disappear from the UI when DAGs originate from bundles.

Reproduces issue where runtime errors in GitDagBundle were caught but not persisted
to import_error table because of path resolution inconsistencies.
"""
bundle_name = "testing"
relative_fileloc = "subdir/test_runtime_error.py"

# Create a dag with relative file paths (as would come from a bundle)
with dag_maker(dag_id="test_runtime_error") as dag:
pass

# Set relative fileloc as it would be set for bundle-backed DAGs
dag.fileloc = f"/absolute/path/to/bundle/{relative_fileloc}"
dag.relative_fileloc = relative_fileloc

# Simulate an import error with relative path (as stored in DagBag.import_errors)
import_errors = {
(bundle_name, relative_fileloc): "UnboundLocalError: local variable 'x' referenced before assignment"
}

# Process the DAG with import errors
update_dag_parsing_results_in_db(
bundle_name=bundle_name,
bundle_version=None,
dags=[], # No DAGs successfully parsed
import_errors=import_errors,
parse_duration=0.1,
warnings=set(),
session=session,
files_parsed={(bundle_name, relative_fileloc)},
)

# Verify the import error was persisted to the database
import_error = session.scalar(
select(ParseImportError).where(
ParseImportError.bundle_name == bundle_name,
ParseImportError.filename == relative_fileloc,
)
)

assert import_error is not None, (
f"Import error for {relative_fileloc} was not persisted to database. "
"This would cause the error to disappear from the UI."
)
assert import_error.filename == relative_fileloc
assert import_error.bundle_name == bundle_name
assert "UnboundLocalError" in import_error.stacktrace

# Verify the listener was notified of the new error
assert len(dag_import_error_listener.new) == 1

# Now test updating the error (simulating a re-parse with the same error)
update_dag_parsing_results_in_db(
bundle_name=bundle_name,
bundle_version=None,
dags=[],
import_errors=import_errors,
parse_duration=0.1,
warnings=set(),
session=session,
files_parsed={(bundle_name, relative_fileloc)},
)

# Verify only one import error exists (updated, not duplicated)
import_errors_count = session.scalar(
select(func.count(ParseImportError.id)).where(
ParseImportError.bundle_name == bundle_name,
ParseImportError.filename == relative_fileloc,
)
)
assert import_errors_count == 1

# Verify existing error listener was called
assert len(dag_import_error_listener.existing) == 1

@patch.object(ParseImportError, "full_file_path")
@pytest.mark.usefixtures("clean_db")
def test_new_import_error_replaces_old(
Expand Down
59 changes: 59 additions & 0 deletions airflow-core/tests/unit/dag_processing/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,65 @@ def fake_collect_dags(dagbag: DagBag, *args, **kwargs):
assert resp.import_errors is not None
assert "a.py" in resp.import_errors

def test_serialization_errors_use_relative_paths(self, tmp_path: pathlib.Path):
"""
Test that serialization errors use relative file paths.

This ensures that errors during DAG serialization (e.g., in _serialize_dags)
are stored with relative paths, matching the format of parse-time import errors.
This is critical for bundle-backed DAGs (Git, S3, etc.) where import errors
need to be properly persisted to the database.
"""
# Create a DAG file that will fail during serialization
dag_file = tmp_path / "test_serialization_error.py"
dag_file.write_text(textwrap.dedent("""
from airflow.sdk import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from datetime import datetime

# Create a DAG that will fail during serialization
# by having a non-serializable custom attribute
dag = DAG("test_dag", start_date=datetime(2023, 1, 1))

# Add a non-serializable object that will cause serialization to fail
class NonSerializable:
def __getstate__(self):
raise TypeError("Cannot serialize this object")

dag._non_serializable = NonSerializable()

task = EmptyOperator(task_id="test_task", dag=dag)
"""))

# Process the file with bundle_path set
resp = _parse_file(
DagFileParseRequest(
file=str(dag_file),
bundle_path=tmp_path,
bundle_name="testing",
callback_requests=[],
),
log=structlog.get_logger(),
)

assert resp is not None
# The DAG should have been parsed successfully
assert len(resp.serialized_dags) >= 0

# Check that any serialization errors use relative paths, not absolute paths
if resp.import_errors:
for error_path in resp.import_errors.keys():
# The error path should be relative (just the filename)
# not an absolute path
assert not pathlib.Path(error_path).is_absolute(), (
f"Serialization error path '{error_path}' should be relative, not absolute. "
f"This ensures consistency across bundle types (Git, Local, etc.)"
)
# For this test, it should be the filename relative to bundle_path
assert error_path == "test_serialization_error.py", (
f"Expected relative path 'test_serialization_error.py', got '{error_path}'"
)

def test_top_level_variable_access(
self,
spy_agency: SpyAgency,
Expand Down
Loading