Skip to content

Commit

Permalink
chore(dbt Core): Bypass metric evaluation if final SQL is provided (#305
Browse files Browse the repository at this point in the history
)

* chore(dbt Core): Bypass metric evaluation if final SQL is provided
* Adding PR#302 back
  • Loading branch information
Vitor-Avila authored Jun 28, 2024
1 parent 0fdb888 commit 2308536
Show file tree
Hide file tree
Showing 4 changed files with 366 additions and 39 deletions.
27 changes: 19 additions & 8 deletions src/preset_cli/cli/superset/sync/dbt/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from preset_cli.api.clients.dbt import (
DBTClient,
JobSchema,
MetricSchema,
MFMetricWithSQLSchema,
MFSQLEngine,
ModelSchema,
Expand All @@ -27,6 +26,7 @@
from preset_cli.cli.superset.sync.dbt.exposures import ModelKey, sync_exposures
from preset_cli.cli.superset.sync.dbt.lib import (
apply_select,
get_og_metric_from_config,
list_failed_models,
load_profiles,
)
Expand Down Expand Up @@ -214,14 +214,25 @@ def dbt_core( # pylint: disable=too-many-arguments, too-many-branches, too-many
else:
og_metrics = []
sl_metrics = []
metric_schema = MetricSchema()
for config in configs["metrics"].values():
if "calculation_method" in config or "sql" in config:
# conform to the same schema that dbt Cloud uses for metrics
config["dependsOn"] = config.pop("depends_on")["nodes"]
config["uniqueId"] = config.pop("unique_id")
config["dialect"] = dialect
og_metrics.append(metric_schema.load(config))
# First validate if metadata is already available
if config.get("meta", {}).get("superset", {}).get("model") and (
sql := config.get("meta", {}).get("superset", {}).pop("expression")
):
metric = get_og_metric_from_config(
config,
dialect,
depends_on=[],
sql=sql,
)
og_metrics.append(metric)

# dbt legacy schema
elif "calculation_method" in config or "sql" in config:
metric = get_og_metric_from_config(config, dialect)
og_metrics.append(metric)

# dbt semantic layer
# Only validate semantic layer metrics if MF dialect is specified
elif mf_dialect is not None and (
sl_metric := get_sl_metric(config, model_map, mf_dialect)
Expand Down
30 changes: 29 additions & 1 deletion src/preset_cli/cli/superset/sync/dbt/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from sqlalchemy.engine.url import URL
from sqlalchemy.exc import NoSuchModuleError

from preset_cli.api.clients.dbt import ModelSchema
from preset_cli.api.clients.dbt import MetricSchema, ModelSchema
from preset_cli.exceptions import CLIError

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -498,3 +498,31 @@ def list_failed_models(failed_models: List[str]) -> str:
error_message += f"\n - {failed_model}"

return error_message


def get_og_metric_from_config(
metric_config: Dict[str, Any],
dialect: str,
depends_on: Optional[List[str]] = None,
sql: Optional[str] = None,
) -> MetricSchema:
"""
Return an og metric from the config, adhering to the dbt Cloud schema.
"""
metric_schema = MetricSchema()
if depends_on is not None:
metric_config["dependsOn"] = depends_on
metric_config.pop("depends_on", None)
else:
metric_config["dependsOn"] = metric_config.pop("depends_on")["nodes"]

if sql is not None:
metric_config["expression"] = sql
metric_config["calculation_method"] = "derived"
metric_config.pop("type", None)
metric_config.pop("sql", None)

metric_config["uniqueId"] = metric_config.pop("unique_id")
metric_config["dialect"] = dialect

return metric_schema.load(metric_config)
200 changes: 173 additions & 27 deletions tests/cli/superset/sync/dbt/command_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
"""
# pylint: disable=invalid-name, too-many-lines, line-too-long

import copy
import json
import os
from pathlib import Path
from subprocess import CalledProcessError
from unittest import mock

import pytest
import yaml
Expand Down Expand Up @@ -128,33 +131,6 @@
},
]

dbt_core_metrics = [
{
"label": "",
"sql": "*",
"depends_on": ["model.superset_examples.messages_channels"],
"meta": {},
"description": "",
"name": "cnt",
"type": "count",
"filters": [],
"unique_id": "metric.superset_examples.cnt",
"created_at": 1642630986.1942852,
"package_name": "superset_examples",
"sources": [],
"root_path": "/Users/beto/Projects/dbt-examples/superset_examples",
"path": "slack/schema.yml",
"resource_type": "metric",
"original_file_path": "models/slack/schema.yml",
"model": "ref('messages_channels')",
"timestamp": None,
"fqn": ["superset_examples", "slack", "cnt"],
"time_grains": [],
"tags": [],
"refs": [["messages_channels"]],
"dimensions": [],
},
]

superset_metrics = {
"model.superset_examples.messages_channels": [
Expand Down Expand Up @@ -2851,6 +2827,176 @@ def test_dbt_core_disallow_edits_superset(
)


def test_dbt_core_metrics_with_syntax_defined(
mocker: MockerFixture,
fs: FakeFilesystem,
) -> None:
"""
Test the ``dbt_core`` command with metrics that have the SQL to be used in
the Superset metric already defined and therefore don't require evaluation.
"""
updated_manifest = json.loads(copy.deepcopy(manifest_contents))

# Setting metrics in all schemas that have an ``expression`` and
# ``model`` already set. The model is different to purposefully test this.
updated_manifest["metrics"] = {
"metric.superset_examples.cnt": {
"depends_on": {"nodes": ["model.superset_examples.messages_channels"]},
"description": "",
"filters": [],
"label": "Metric from an old version",
"meta": {
"superset": {
"expression": "MAX(id)",
"model": "model.other_project.other_model",
},
},
"name": "old_dbt_version_metric",
"sql": "*",
"type": "count",
"unique_id": "metric.superset_examples.cnt",
},
"metric.superset_examples.other_metric": {
"depends_on": {"nodes": ["model.superset_examples.messages_channels"]},
"description": "",
"filters": [],
"label": "Metric in the legacy semantic layer",
"meta": {
"superset": {
"expression": "MIN(id)",
"model": "model.other_project.other_model",
},
},
"name": "legacy_semantic_layer_metric",
"expression": "quantity",
"calculation_method": "sum",
"unique_id": "metric.superset_examples.other_metric",
},
# Semantic layer metric as well
"metric.superset_examples.sl_metric": {
"name": "sl_metric",
"resource_type": "metric",
"package_name": "superset_examples",
"unique_id": "metric.superset_examples.sl_metric",
"description": "Semantic Layer Metric",
"label": "Semantic Layer Metric - label",
"type": "simple",
"type_params": {
"measure": {
"name": "customers_with_orders",
"filter": None,
"alias": None,
},
"input_measures": [
{"name": "customers_with_orders", "filter": None, "alias": None},
],
"metrics": [],
},
"filter": None,
"metadata": None,
"meta": {
"superset": {
"expression": "MAX(price_each) - MAX(cost)",
"model": "model.other_project.other_model",
},
},
"depends_on": {
"macros": [],
"nodes": ["semantic_model.jaffle_shop.orders"],
},
"refs": [],
"metrics": [],
},
}

# Create other model so that the logic still works
new_model = copy.deepcopy(
updated_manifest["nodes"]["model.superset_examples.messages_channels"],
)
new_model_id = "model.other_project.other_model"
new_model["unique_id"] = new_model_id
updated_manifest["nodes"][new_model_id] = new_model
updated_manifest["child_map"][new_model_id] = []

root = Path("/path/to/root")
fs.create_dir(root)
manifest = root / "default/target/manifest.json"
fs.create_file(manifest, contents=json.dumps(updated_manifest))
profiles = root / ".dbt/profiles.yml"
fs.create_file(profiles, contents=profiles_contents)

SupersetClient = mocker.patch(
"preset_cli.cli.superset.sync.dbt.command.SupersetClient",
)
client = SupersetClient()
mocker.patch("preset_cli.cli.superset.main.UsernamePasswordAuth")
sync_database = mocker.patch(
"preset_cli.cli.superset.sync.dbt.command.sync_database",
)
sync_datasets = mocker.patch(
"preset_cli.cli.superset.sync.dbt.command.sync_datasets",
return_value=([], []),
)

runner = CliRunner()
result = runner.invoke(
superset_cli,
[
"https://superset.example.org/",
"sync",
"dbt-core",
str(manifest),
"--profiles",
str(profiles),
"--project",
"default",
"--target",
"dev",
],
catch_exceptions=False,
)
assert result.exit_code == 0

# Testing only about the metric relationship/syntax in this test
sync_datasets.assert_called_with(
client,
mock.ANY,
{
"model.other_project.other_model": [
{
"description": "",
"expression": "MAX(id)",
"extra": "{}",
"metric_name": "old_dbt_version_metric",
"metric_type": "derived",
"verbose_name": "Metric from an old version",
},
{
"description": "",
"expression": "MIN(id)",
"extra": "{}",
"metric_name": "legacy_semantic_layer_metric",
"metric_type": "derived",
"verbose_name": "Metric in the legacy semantic layer",
},
{
"description": "Semantic Layer Metric",
"expression": "MAX(price_each) - MAX(cost)",
"extra": "{}",
"metric_name": "sl_metric",
"metric_type": "derived",
"verbose_name": "Semantic Layer Metric - label",
},
],
},
sync_database(),
False,
"",
reload_columns=True,
merge_metadata=False,
)


def test_dbt_cloud(mocker: MockerFixture) -> None:
"""
Test the ``dbt-cloud`` command.
Expand Down
Loading

0 comments on commit 2308536

Please sign in to comment.