Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: avoid virtual datasets whenever possible #295

Merged
merged 2 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions src/preset_cli/cli/superset/sync/dbt/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ def create_dataset(
Virtual datasets are created when the table database is different from the main
database, for systems that support cross-database queries (Trino, BigQuery, etc.)
"""
kwargs = {
"database": database["id"],
"catalog": model["database"],
"schema": model["schema"],
"table_name": model.get("alias") or model["name"],
}
try:
# try to create dataset with catalog
return client.create_dataset(**kwargs)
except SupersetError as ex:
if not no_catalog_support(ex):
raise ex

Copy link
Contributor

@Vitor-Avila Vitor-Avila May 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could just delete the catalog key here (del kwargs["catalog"])

url = make_url(database["sqlalchemy_uri"])
if model_in_database(model, url):
Copy link
Contributor

@Vitor-Avila Vitor-Avila May 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then avoid this if/else and have a single if not model_in_database(model, url) with kwargs["sql"] = f"SELECT * FROM {source}" (and the remaining logic that's specific for virtual datasets)

kwargs = {
Expand All @@ -91,6 +104,32 @@ def create_dataset(
return client.create_dataset(**kwargs)


def no_catalog_support(ex: SupersetError) -> bool:
"""
Return if the error is due to a lack of catalog support.

The errors payload looks like this:

[
{
"message": json.dumps({"message": {"catalog": ["Unknown field."]}}),
"error_type": "UNKNOWN_ERROR",
"level": ErrorLevel.ERROR,
},
]

"""
for error in ex.errors:
try:
message = json.loads(error["message"])
if "Unknown field." in message["message"]["catalog"]:
return True
except Exception: # pylint: disable=broad-except
pass

return False
betodealmeida marked this conversation as resolved.
Show resolved Hide resolved


def get_or_create_dataset(
client: SupersetClient,
model: ModelSchema,
Expand Down
137 changes: 120 additions & 17 deletions tests/cli/superset/sync/dbt/datasets_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Tests for ``preset_cli.cli.superset.sync.dbt.datasets``.
"""
# pylint: disable=invalid-name, too-many-lines
# pylint: disable=invalid-name, too-many-lines, redefined-outer-name

import json
from typing import Any, Dict, List, cast
Expand All @@ -25,9 +25,16 @@
get_certification_info,
get_or_create_dataset,
model_in_database,
no_catalog_support,
sync_datasets,
)
from preset_cli.exceptions import CLIError, ErrorLevel, ErrorPayload, SupersetError
from preset_cli.exceptions import (
CLIError,
DatabaseNotFoundError,
ErrorLevel,
ErrorPayload,
SupersetError,
)

metric_schema = MetricSchema()

Expand Down Expand Up @@ -147,6 +154,7 @@ def test_sync_datasets_new(mocker: MockerFixture) -> None:
)
client.create_dataset.assert_called_with(
database=1,
catalog="examples_dev",
schema="public",
table_name="messages_channels",
)
Expand Down Expand Up @@ -396,7 +404,12 @@ def test_sync_datasets_new_bq_error(mocker: MockerFixture) -> None:
)
client.create_dataset.assert_has_calls(
[
mock.call(database=1, schema="public", table_name="messages_channels"),
mock.call(
database=1,
catalog="examples_dev",
schema="public",
table_name="messages_channels",
),
],
)
client.update_dataset.assert_has_calls([])
Expand Down Expand Up @@ -741,41 +754,69 @@ def test_sync_datasets_no_columns(mocker: MockerFixture) -> None:
)


def test_create_dataset_physical(mocker: MockerFixture) -> None:
@pytest.fixture()
def no_catalog_support_client(mocker: MockerFixture) -> Any:
"""
Test ``create_dataset`` for physical datasets.
Fixture to return a mocked client with no catalog support.
"""
client = mocker.MagicMock()
client.create_dataset.side_effect = [
SupersetError(
errors=[
{
"message": json.dumps({"message": {"catalog": ["Unknown field."]}}),
"error_type": "UNKNOWN_ERROR",
"level": ErrorLevel.ERROR,
},
],
),
None,
]
mocker.patch(
"preset_cli.cli.superset.sync.dbt.datasets.no_catalog_support",
return_value=True,
)
return client


def test_create_dataset_physical_no_catalog(
no_catalog_support_client: MockerFixture,
) -> None:
"""
Test ``create_dataset`` for physical datasets.
"""
create_dataset(
client,
no_catalog_support_client,
{
"id": 1,
"catalog": "examples_dev",
"schema": "public",
"name": "Database",
"sqlalchemy_uri": "postgresql://user@host/examples_dev",
},
models[0],
)
client.create_dataset.assert_called_with(
no_catalog_support_client.create_dataset.assert_called_with(
database=1,
schema="public",
table_name="messages_channels",
)


def test_create_dataset_virtual(mocker: MockerFixture) -> None:
def test_create_dataset_virtual(
mocker: MockerFixture,
no_catalog_support_client: MockerFixture,
) -> None:
"""
Test ``create_dataset`` for virtual datasets.
"""
create_engine = mocker.patch(
"preset_cli.cli.superset.sync.dbt.lib.create_engine",
)
create_engine().dialect.identifier_preparer.quote = lambda token: token
client = mocker.MagicMock()

create_dataset(
client,
no_catalog_support_client,
{
"id": 1,
"schema": "public",
Expand All @@ -784,7 +825,7 @@ def test_create_dataset_virtual(mocker: MockerFixture) -> None:
},
models[0],
)
client.create_dataset.assert_called_with(
no_catalog_support_client.create_dataset.assert_called_with(
database=1,
schema="public",
table_name="messages_channels",
Expand All @@ -793,18 +834,15 @@ def test_create_dataset_virtual(mocker: MockerFixture) -> None:


def test_create_dataset_virtual_missing_dependency(
capsys: pytest.CaptureFixture[str],
mocker: MockerFixture,
no_catalog_support_client: MockerFixture,
) -> None:
"""
Test ``create_dataset`` for virtual datasets when the DB connection requires
an additional package.
"""
client = mocker.MagicMock()

with pytest.raises(NotImplementedError):
create_dataset(
client,
no_catalog_support_client,
{
"id": 1,
"schema": "public",
Expand All @@ -814,9 +852,18 @@ def test_create_dataset_virtual_missing_dependency(
models[0],
)


def test_create_dataset_virtual_missing_dependency_snowflake(
capsys: pytest.CaptureFixture[str],
no_catalog_support_client: MockerFixture,
) -> None:
"""
Test ``create_dataset`` for virtual datasets when the DB connection requires
an additional package.
"""
with pytest.raises(SystemExit) as excinfo:
create_dataset(
client,
no_catalog_support_client,
{
"id": 1,
"schema": "public",
Expand Down Expand Up @@ -1442,3 +1489,59 @@ def test_clean_metadata() -> None:
"to_be_kept": "To Be Kept",
"yeah": "sure",
}


def test_no_catalog_support() -> None:
"""
Test the ``no_catalog_support`` helper.
"""
assert no_catalog_support(DatabaseNotFoundError()) is False
assert (
no_catalog_support(
SupersetError(
errors=[
{
"message": json.dumps({"message": "Error"}),
"error_type": "UNKNOWN_ERROR",
"level": ErrorLevel.ERROR,
},
{
"message": json.dumps(
{"message": {"catalog": ["Cannot contain spaces."]}},
),
"error_type": "UNKNOWN_ERROR",
"level": ErrorLevel.ERROR,
},
{
"message": json.dumps(
{"message": {"catalog": ["Unknown field."]}},
),
"error_type": "UNKNOWN_ERROR",
"level": ErrorLevel.ERROR,
},
],
),
)
is True
)


def test_create_dataset_error(mocker: MockerFixture) -> None:
"""
Test that ``create_dataset`` surfaces errors.
"""
client = mocker.MagicMock()
client.create_dataset.side_effect = DatabaseNotFoundError()

with pytest.raises(DatabaseNotFoundError):
create_dataset(
client,
{
"id": 1,
"catalog": "examples_dev",
"schema": "public",
"name": "Database",
"sqlalchemy_uri": "postgresql://user@host/examples_dev",
},
models[0],
)
Loading