From 40191e0c4823a52b701d8cb7950554f4158b1062 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Mon, 3 Feb 2025 18:02:16 +0000 Subject: [PATCH] Raise an exception when users set imcompatible RenderConfig dbt_deps and operator_args install_deps --- cosmos/converter.py | 39 ++++++++++++++++++++++---------- cosmos/dbt/project.py | 9 +++----- tests/test_converter.py | 50 +++++++++++++++++++++++++++++++++++++---- 3 files changed, 76 insertions(+), 22 deletions(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index 524864aa9..9dc3b93a0 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -17,8 +17,9 @@ from cosmos import cache, settings from cosmos.airflow.graph import build_airflow_graph from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig -from cosmos.constants import ExecutionMode +from cosmos.constants import ExecutionMode, LoadMode from cosmos.dbt.graph import DbtGraph +from cosmos.dbt.project import has_non_empty_dependencies_file from cosmos.dbt.selector import retrieve_by_label from cosmos.exceptions import CosmosValueError from cosmos.log import get_logger @@ -67,11 +68,11 @@ def airflow_kwargs(**kwargs: dict[str, Any]) -> dict[str, Any]: def validate_arguments( - select: list[str], - exclude: list[str], + render_config: RenderConfig, profile_config: ProfileConfig, task_args: dict[str, Any], - execution_mode: ExecutionMode, + execution_config: ExecutionConfig, + project_config: ProjectConfig, ) -> None: """ Validate that mutually exclusive selectors filters have not been given. @@ -84,8 +85,8 @@ def validate_arguments( :param execution_mode: the current execution mode """ for field in ("tags", "paths"): - select_items = retrieve_by_label(select, field) - exclude_items = retrieve_by_label(exclude, field) + select_items = retrieve_by_label(render_config.select, field) + exclude_items = retrieve_by_label(render_config.exclude, field) intersection = {str(item) for item in set(select_items).intersection(exclude_items)} if intersection: raise CosmosValueError(f"Can't specify the same {field[:-1]} in `select` and `exclude`: " f"{intersection}") @@ -96,8 +97,21 @@ def validate_arguments( if profile_config.profile_mapping: profile_config.profile_mapping.profile_args["schema"] = task_args["schema"] - if execution_mode in [ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV]: + if execution_config.execution_mode in [ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV]: profile_config.validate_profiles_yml() + has_non_empty_dependencies = execution_config.project_path and has_non_empty_dependencies_file( + execution_config.project_path + ) + if ( + has_non_empty_dependencies + and ( + render_config.load_method == LoadMode.DBT_LS + or (render_config.load_method == LoadMode.AUTOMATIC and not project_config.is_manifest_available()) + ) + and (render_config.dbt_deps != task_args.get("install_deps", True)) + ): + err_msg = f"When using `LoadMode.DBT_LS` and {execution_config.execution_mode}, the value of `dbt_deps` in `RenderConfig` should be the same as the `operator_args['install_deps']` value." + raise CosmosValueError(err_msg) def validate_initial_user_config( @@ -283,12 +297,13 @@ def __init__( task_args["invocation_mode"] = execution_config.invocation_mode validate_arguments( - render_config.select, - render_config.exclude, - profile_config, - task_args, - execution_mode=execution_config.execution_mode, + execution_config=execution_config, + profile_config=profile_config, + render_config=render_config, + task_args=task_args, + project_config=project_config, ) + if execution_config.execution_mode == ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None: task_args["virtualenv_dir"] = execution_config.virtualenv_dir diff --git a/cosmos/dbt/project.py b/cosmos/dbt/project.py index 2c9f9743a..4ee1aee49 100644 --- a/cosmos/dbt/project.py +++ b/cosmos/dbt/project.py @@ -25,16 +25,13 @@ def has_non_empty_dependencies_file(project_path: Path) -> bool: :returns: True or False """ project_dir = Path(project_path) - has_deps = False for filename in DBT_DEPENDENCIES_FILE_NAMES: filepath = project_dir / filename if filepath.exists() and filepath.stat().st_size > 0: - has_deps = True - break + return True - if not has_deps: - logger.info(f"Project {project_path} does not have {DBT_DEPENDENCIES_FILE_NAMES}") - return has_deps + logger.info(f"Project {project_path} does not have {DBT_DEPENDENCIES_FILE_NAMES}") + return False def create_symlinks(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool) -> None: diff --git a/tests/test_converter.py b/tests/test_converter.py index 18e0fb165..46539d53a 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -17,25 +17,60 @@ SAMPLE_DBT_PROJECT = Path(__file__).parent / "sample/" SAMPLE_DBT_MANIFEST = Path(__file__).parent / "sample/manifest.json" MULTIPLE_PARENTS_TEST_DBT_PROJECT = Path(__file__).parent.parent / "dev/dags/dbt/multiple_parents_test/" +DBT_PROJECTS_PROJ_WITH_DEPS_DIR = Path(__file__).parent.parent / "dev/dags/dbt" / "jaffle_shop" @pytest.mark.parametrize("argument_key", ["tags", "paths"]) def test_validate_arguments_tags(argument_key): selector_name = argument_key[:-1] - select = [f"{selector_name}:a,{selector_name}:b"] - exclude = [f"{selector_name}:b,{selector_name}:c"] + project_config = ProjectConfig(manifest_path=SAMPLE_DBT_MANIFEST, project_name="xubiru") + render_config = RenderConfig( + select=[f"{selector_name}:a,{selector_name}:b"], exclude=[f"{selector_name}:b,{selector_name}:c"] + ) profile_config = ProfileConfig( profile_name="test", target_name="test", profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}), ) + execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL) task_args = {} with pytest.raises(CosmosValueError) as err: - validate_arguments(select, exclude, profile_config, task_args, execution_mode=ExecutionMode.LOCAL) + validate_arguments( + execution_config=execution_config, + profile_config=profile_config, + project_config=project_config, + render_config=render_config, + task_args=task_args, + ) expected = f"Can't specify the same {selector_name} in `select` and `exclude`: {{'b'}}" assert err.value.args[0] == expected +def test_validate_arguments_exception(): + render_config = RenderConfig(load_method=LoadMode.DBT_LS, dbt_deps=False) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}), + ) + execution_config = ExecutionConfig( + execution_mode=ExecutionMode.LOCAL, dbt_project_path=DBT_PROJECTS_PROJ_WITH_DEPS_DIR + ) + project_config = ProjectConfig() + + task_args = {"install_deps": True} # this has to be the opposite of RenderConfig.dbt_deps + with pytest.raises(CosmosValueError) as err: + validate_arguments( + execution_config=execution_config, + profile_config=profile_config, + project_config=project_config, + render_config=render_config, + task_args=task_args, + ) + expected = "When using `LoadMode.DBT_LS` and ExecutionMode.LOCAL, the value of `dbt_deps` in `RenderConfig` should be the same as the `operator_args['install_deps']` value." + assert err.value.args[0] == expected + + @pytest.mark.parametrize( "execution_mode", (ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV), @@ -110,14 +145,21 @@ def test_validate_user_config_fails_project_config_render_config_env_vars(): def test_validate_arguments_schema_in_task_args(): + execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL, dbt_project_path="/tmp/project-dir") + render_config = RenderConfig() profile_config = ProfileConfig( profile_name="test", target_name="test", profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}), ) task_args = {"schema": "abcd"} + project_config = ProjectConfig(manifest_path=SAMPLE_DBT_MANIFEST, project_name="something") validate_arguments( - select=[], exclude=[], profile_config=profile_config, task_args=task_args, execution_mode=ExecutionMode.LOCAL + execution_config=execution_config, + profile_config=profile_config, + render_config=render_config, + task_args=task_args, + project_config=project_config, ) assert profile_config.profile_mapping.profile_args["schema"] == "abcd"