Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully error when users set imcompatible RenderConfig.dbt_deps and operator_args install_deps #1505

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
from cosmos import cache, settings
from cosmos.airflow.graph import build_airflow_graph
from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import ExecutionMode
from cosmos.constants import ExecutionMode, LoadMode
from cosmos.dbt.graph import DbtGraph
from cosmos.dbt.project import has_non_empty_dependencies_file
from cosmos.dbt.selector import retrieve_by_label
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger
Expand Down Expand Up @@ -67,11 +68,11 @@ def airflow_kwargs(**kwargs: dict[str, Any]) -> dict[str, Any]:


def validate_arguments(
select: list[str],
exclude: list[str],
render_config: RenderConfig,
profile_config: ProfileConfig,
task_args: dict[str, Any],
execution_mode: ExecutionMode,
execution_config: ExecutionConfig,
project_config: ProjectConfig,
) -> None:
"""
Validate that mutually exclusive selectors filters have not been given.
Expand All @@ -84,8 +85,8 @@ def validate_arguments(
:param execution_mode: the current execution mode
"""
for field in ("tags", "paths"):
select_items = retrieve_by_label(select, field)
exclude_items = retrieve_by_label(exclude, field)
select_items = retrieve_by_label(render_config.select, field)
exclude_items = retrieve_by_label(render_config.exclude, field)
intersection = {str(item) for item in set(select_items).intersection(exclude_items)}
if intersection:
raise CosmosValueError(f"Can't specify the same {field[:-1]} in `select` and `exclude`: " f"{intersection}")
Expand All @@ -96,8 +97,21 @@ def validate_arguments(
if profile_config.profile_mapping:
profile_config.profile_mapping.profile_args["schema"] = task_args["schema"]

if execution_mode in [ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV]:
if execution_config.execution_mode in [ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV]:
profile_config.validate_profiles_yml()
has_non_empty_dependencies = execution_config.project_path and has_non_empty_dependencies_file(
execution_config.project_path
)
if (
has_non_empty_dependencies
and (
render_config.load_method == LoadMode.DBT_LS
or (render_config.load_method == LoadMode.AUTOMATIC and not project_config.is_manifest_available())
)
and (render_config.dbt_deps != task_args.get("install_deps", True))
Copy link
Contributor

@pankajkoti pankajkoti Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please reconfirm the case when the user has not explicitly set install_deps in their task, what value comes here for install_deps?

I am thinking that when user has not specified install_deps in their task_args, assuming it would be False by default and at the same time if they have specified render_config.dbt_deps=False, we would raise this error and this should not be the case, no?

I am thinking if we could rather change this to,

if render_config.dbt_deps and task_args.get("install_deps") ==  False

Copy link
Collaborator Author

@tatiana tatiana Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pankajkoti the current "default" value of install_deps is True at

self.install_deps = install_deps and has_non_empty_dependencies_file(Path(self.project_dir))

I agree we should improve this; perhaps I could isolate and unify the logic as a function, e.g. "calculate_default_install_deps`, and use it in both places?

We need render_config.dbt_deps and the default task_args.get("install_deps") behaviour in the operators to either be + or -, and it seems the proposed alternative wouldn't give us this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the current "default" value of install_deps is True at

Wouldn’t the current default value of install_deps be False? I mean at

install_deps: bool = False,

Yes, since we’re validating arguments before initializing the operators, it looks like setting the default value is deferred until the operators are actually instantiated. This could cause some confusion because if the user hasn’t explicitly set install_deps in their DAG operators, the key won’t exist in task_args at this stage, meaning task_args.get("install_deps") would return None.

I agree that having a common function to determine the default value would be beneficial, as it could be used in both places. Additionally, we’d need to account for cases where the user provides a custom value for a specific operator, ensuring that we correctly handle overrides while still computing the default value when necessary.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn’t the current default value of install_deps be False? I mean at

From a function signature definition, yes, False is the default value. But from an initialization perspective, we're changing the "default" behaviour

self.install_deps = install_deps and has_non_empty_dependencies_file(Path(self.project_dir))

That means that even if users explicitly set True, the install_ops may become False.

Additionally, we’d need to account for cases where the user provides a custom value for a specific operator, ensuring that we correctly handle overrides while still computing the default value when necessary.

Yes, I believe we should not allow users -as of now - to override this property. Otherwise they will end up seeing the original exception reported by our customer. WDYT?

Copy link
Contributor

@pankajkoti pankajkoti Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, probably we could brainstorm this more together over a call when possible?

):
err_msg = f"When using `LoadMode.DBT_LS` and {execution_config.execution_mode}, the value of `dbt_deps` in `RenderConfig` should be the same as the `operator_args['install_deps']` value."
raise CosmosValueError(err_msg)


def validate_initial_user_config(
Expand Down Expand Up @@ -283,12 +297,13 @@ def __init__(
task_args["invocation_mode"] = execution_config.invocation_mode

validate_arguments(
render_config.select,
render_config.exclude,
profile_config,
task_args,
execution_mode=execution_config.execution_mode,
execution_config=execution_config,
profile_config=profile_config,
render_config=render_config,
task_args=task_args,
project_config=project_config,
)

if execution_config.execution_mode == ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None:
task_args["virtualenv_dir"] = execution_config.virtualenv_dir

Expand Down
9 changes: 3 additions & 6 deletions cosmos/dbt/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,13 @@ def has_non_empty_dependencies_file(project_path: Path) -> bool:
:returns: True or False
"""
project_dir = Path(project_path)
has_deps = False
for filename in DBT_DEPENDENCIES_FILE_NAMES:
filepath = project_dir / filename
if filepath.exists() and filepath.stat().st_size > 0:
has_deps = True
break
return True

if not has_deps:
logger.info(f"Project {project_path} does not have {DBT_DEPENDENCIES_FILE_NAMES}")
return has_deps
logger.info(f"Project {project_path} does not have {DBT_DEPENDENCIES_FILE_NAMES}")
return False


def create_symlinks(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool) -> None:
Expand Down
50 changes: 46 additions & 4 deletions tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,60 @@
SAMPLE_DBT_PROJECT = Path(__file__).parent / "sample/"
SAMPLE_DBT_MANIFEST = Path(__file__).parent / "sample/manifest.json"
MULTIPLE_PARENTS_TEST_DBT_PROJECT = Path(__file__).parent.parent / "dev/dags/dbt/multiple_parents_test/"
DBT_PROJECTS_PROJ_WITH_DEPS_DIR = Path(__file__).parent.parent / "dev/dags/dbt" / "jaffle_shop"


@pytest.mark.parametrize("argument_key", ["tags", "paths"])
def test_validate_arguments_tags(argument_key):
selector_name = argument_key[:-1]
select = [f"{selector_name}:a,{selector_name}:b"]
exclude = [f"{selector_name}:b,{selector_name}:c"]
project_config = ProjectConfig(manifest_path=SAMPLE_DBT_MANIFEST, project_name="xubiru")
render_config = RenderConfig(
select=[f"{selector_name}:a,{selector_name}:b"], exclude=[f"{selector_name}:b,{selector_name}:c"]
)
profile_config = ProfileConfig(
profile_name="test",
target_name="test",
profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}),
)
execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL)
task_args = {}
with pytest.raises(CosmosValueError) as err:
validate_arguments(select, exclude, profile_config, task_args, execution_mode=ExecutionMode.LOCAL)
validate_arguments(
execution_config=execution_config,
profile_config=profile_config,
project_config=project_config,
render_config=render_config,
task_args=task_args,
)
expected = f"Can't specify the same {selector_name} in `select` and `exclude`: {{'b'}}"
assert err.value.args[0] == expected


def test_validate_arguments_exception():
render_config = RenderConfig(load_method=LoadMode.DBT_LS, dbt_deps=False)
profile_config = ProfileConfig(
profile_name="test",
target_name="test",
profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}),
)
execution_config = ExecutionConfig(
execution_mode=ExecutionMode.LOCAL, dbt_project_path=DBT_PROJECTS_PROJ_WITH_DEPS_DIR
)
project_config = ProjectConfig()

task_args = {"install_deps": True} # this has to be the opposite of RenderConfig.dbt_deps
with pytest.raises(CosmosValueError) as err:
validate_arguments(
execution_config=execution_config,
profile_config=profile_config,
project_config=project_config,
render_config=render_config,
task_args=task_args,
)
expected = "When using `LoadMode.DBT_LS` and ExecutionMode.LOCAL, the value of `dbt_deps` in `RenderConfig` should be the same as the `operator_args['install_deps']` value."
assert err.value.args[0] == expected


@pytest.mark.parametrize(
"execution_mode",
(ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV),
Expand Down Expand Up @@ -110,14 +145,21 @@ def test_validate_user_config_fails_project_config_render_config_env_vars():


def test_validate_arguments_schema_in_task_args():
execution_config = ExecutionConfig(execution_mode=ExecutionMode.LOCAL, dbt_project_path="/tmp/project-dir")
render_config = RenderConfig()
profile_config = ProfileConfig(
profile_name="test",
target_name="test",
profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}),
)
task_args = {"schema": "abcd"}
project_config = ProjectConfig(manifest_path=SAMPLE_DBT_MANIFEST, project_name="something")
validate_arguments(
select=[], exclude=[], profile_config=profile_config, task_args=task_args, execution_mode=ExecutionMode.LOCAL
execution_config=execution_config,
profile_config=profile_config,
render_config=render_config,
task_args=task_args,
project_config=project_config,
)
assert profile_config.profile_mapping.profile_args["schema"] == "abcd"

Expand Down