Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

All DAGS disappear on import failure #283

Open
skiedude opened this issue Nov 12, 2024 · 10 comments
Open

All DAGS disappear on import failure #283

skiedude opened this issue Nov 12, 2024 · 10 comments
Labels
bug Something isn't working priority/medium

Comments

@skiedude
Copy link

for path in Path("/usr/local/airflow/dags/yamls/").rglob('*.yaml'):
  dag_factory = dagfactory.DagFactory(path)
  dag_factory.generate_dags(globals())

Using the following code we are importing all yaml dag files at once. Airflow sees this in some areas as a "single" dag. This means if one of those yaml files has an issue being loaded, all of them disappear from the UI.

Is it possible to not have that happen? Off the top of my head, we could create one python file to import each dag individually, but that seems very repetative.

@skiedude skiedude changed the title Possible to not failing import all dags on error? All DAGS disappear on import failure Nov 12, 2024
@tatiana
Copy link
Collaborator

tatiana commented Nov 14, 2024

Hi @skiedude, please could you which version of DAG Factory you're using?

We had the hope that 0.2.0 would solve this problem:
https://github.com/astronomer/dag-factory/releases/tag/v0.20.0

In particular, this change:
#184

@skiedude
Copy link
Author

Hi @skiedude, please could you which version of DAG Factory you're using?

We had the hope that 0.2.0 would solve this problem: https://github.com/astronomer/dag-factory/releases/tag/v0.20.0

In particular, this change: #184

Hey @tatiana we are using

astro@airflow-worker-7c6b6f4489-4j6xn:/usr/local/airflow$ pip freeze | grep -i factory
dag-factory==0.20.0

Which appears to be your released version with the fix. Could it be that I'm not using your method load_yaml_dags?

@skiedude
Copy link
Author

Ok tried out the following code

from airflow import DAG
from dagfactory import load_yaml_dags

load_yaml_dags(globals_dict=globals())

That does work, and the bad dags disappear from the UI, however I don't see any errors at the top showing that dags failed to import and when I check

http://localhost:8080/api/v1/importErrors

{
"import_errors": [],
"total_entries": 0
}

Is there anyway to know that Dags are failing to import?

@tatiana
Copy link
Collaborator

tatiana commented Nov 15, 2024

I see your point, @skiedude. The experience would be much better if we displayed those invalid YMLs as DAG import messages in Airflow.

You should be able to identify invalid YAML files by checking the scheduler log messages. They will be similar to:

Failed to load dag from tests/fixtures/invalid_yaml.yml'

https://github.com/quydx/airlfow-dagfactory/blob/a74253d1ae036b59d9fdfe25071f416b3dbc084f/dagfactory/dagfactory.py#L202

I suggest we add two follow-up tickets out of this one:

  1. Extend the behaviour from load_yaml_dags to dag_factory.generate_dags, so it also handles gracefully invalid YML files
  2. Evaluate the possibility of DAG Factory raising an Airflow import error - so it can be displayed in the UI if the same happened to a Python DAG.

@skiedude
Copy link
Author

Thanks for the followup @tatiana . My overall understanding of all the underlying architecture is rudimentary. But in my personal testing I copied the load_yaml_dags function to my import file and did the following

    for config_file_path in candidate_dag_files:
        config_file_abs_path = str(config_file_path.absolute())
        logging.info("Loading %s", config_file_abs_path)
        try:
            factory = DagFactory(config_file_abs_path)
            factory.generate_dags(globals_dict)
        except Exception:  # pylint: disable=broad-except
            logging.exception("Failed to load dag from %s", config_file_path)
            failed_imports.append(config_file_path)
        else:
            factory.emit_telemetry("load_yaml_dags")
            logging.info("DAG loaded: %s", config_file_path)

    if failed_imports:
        raise ImportError("Failed importing some dags")

From what I've observed, even if the raise is after the successful imports are done, it removes all dags because this file which is seen as the entry point for all dags threw an error.

So it seems to reason it would need to insert an import error in the database in a different way? that didn't raise an exception. I've poked around the airflow codebase a bit, but haven't found a method of inserting an importerror without just raising one.

@cmarteepants cmarteepants added bug Something isn't working priority/medium labels Nov 20, 2024
@cmarteepants
Copy link
Collaborator

Hi @skiedude, that's a great point. I'm going to check with the Airflow team and get their thoughts on how we can log this so that the error shows up in the UI, without raising an exception and stopping the process. Stay tuned :)

@tatiana
Copy link
Collaborator

tatiana commented Nov 21, 2024

@skiedude @cmarteepants My thought was to iterate through all the YML files, identify the ones that are problematic, and raise the Import error by the end, printing out the list of YAMLs that were invalid - so the DAGs that can be loaded would be loaded, but we'd still have visibility of the errors via the Airflow UI

@ashb
Copy link

ashb commented Nov 21, 2024

@skiedude @cmarteepants My thought was to iterate through all the YML files, identify the ones that are problematic, and raise the Import error by the end, printing out the list of YAMLs that were invalid - so the DAGs that can be loaded would be loaded, but we'd still have visibility of the errors via the Airflow UI

Does that work? My first thought is that ant exception at parse time results in no dags being loaded from the python file

@kaxil
Copy link
Collaborator

kaxil commented Nov 21, 2024

Also, all the Import Errors are deleted for a file, if that file is parsed correctly:

https://github.com/apache/airflow/blob/440c224af5592f9007eef43d1dbe9025aa34e177/airflow/dag_processing/processor.py#L452-L457

@skiedude
Copy link
Author

@skiedude @cmarteepants My thought was to iterate through all the YML files, identify the ones that are problematic, and raise the Import error by the end, printing out the list of YAMLs that were invalid - so the DAGs that can be loaded would be loaded, but we'd still have visibility of the errors via the Airflow UI

Does that work? My first thought is that ant exception at parse time results in no dags being loaded from the python file

This is what I have observed. Even if you raise the exception after the imports have taken place, since that file is the entry point for all of them, if it raises an exception they all still disappear.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working priority/medium
Projects
None yet
Development

No branches or pull requests

5 participants