From 230853641a642bd292d3e8a184414fc658edf916 Mon Sep 17 00:00:00 2001 From: Vitor Avila <96086495+Vitor-Avila@users.noreply.github.com> Date: Thu, 27 Jun 2024 21:37:24 -0300 Subject: [PATCH] chore(dbt Core): Bypass metric evaluation if final SQL is provided (#305) * chore(dbt Core): Bypass metric evaluation if final SQL is provided * Adding PR#302 back --- .../cli/superset/sync/dbt/command.py | 27 ++- src/preset_cli/cli/superset/sync/dbt/lib.py | 30 ++- tests/cli/superset/sync/dbt/command_test.py | 200 +++++++++++++++--- tests/cli/superset/sync/dbt/lib_test.py | 148 ++++++++++++- 4 files changed, 366 insertions(+), 39 deletions(-) diff --git a/src/preset_cli/cli/superset/sync/dbt/command.py b/src/preset_cli/cli/superset/sync/dbt/command.py index c4d2be49..1f56abe8 100644 --- a/src/preset_cli/cli/superset/sync/dbt/command.py +++ b/src/preset_cli/cli/superset/sync/dbt/command.py @@ -15,7 +15,6 @@ from preset_cli.api.clients.dbt import ( DBTClient, JobSchema, - MetricSchema, MFMetricWithSQLSchema, MFSQLEngine, ModelSchema, @@ -27,6 +26,7 @@ from preset_cli.cli.superset.sync.dbt.exposures import ModelKey, sync_exposures from preset_cli.cli.superset.sync.dbt.lib import ( apply_select, + get_og_metric_from_config, list_failed_models, load_profiles, ) @@ -214,14 +214,25 @@ def dbt_core( # pylint: disable=too-many-arguments, too-many-branches, too-many else: og_metrics = [] sl_metrics = [] - metric_schema = MetricSchema() for config in configs["metrics"].values(): - if "calculation_method" in config or "sql" in config: - # conform to the same schema that dbt Cloud uses for metrics - config["dependsOn"] = config.pop("depends_on")["nodes"] - config["uniqueId"] = config.pop("unique_id") - config["dialect"] = dialect - og_metrics.append(metric_schema.load(config)) + # First validate if metadata is already available + if config.get("meta", {}).get("superset", {}).get("model") and ( + sql := config.get("meta", {}).get("superset", {}).pop("expression") + ): + metric = get_og_metric_from_config( + config, + dialect, + depends_on=[], + sql=sql, + ) + og_metrics.append(metric) + + # dbt legacy schema + elif "calculation_method" in config or "sql" in config: + metric = get_og_metric_from_config(config, dialect) + og_metrics.append(metric) + + # dbt semantic layer # Only validate semantic layer metrics if MF dialect is specified elif mf_dialect is not None and ( sl_metric := get_sl_metric(config, model_map, mf_dialect) diff --git a/src/preset_cli/cli/superset/sync/dbt/lib.py b/src/preset_cli/cli/superset/sync/dbt/lib.py index 5757f58f..27c63c8b 100644 --- a/src/preset_cli/cli/superset/sync/dbt/lib.py +++ b/src/preset_cli/cli/superset/sync/dbt/lib.py @@ -16,7 +16,7 @@ from sqlalchemy.engine.url import URL from sqlalchemy.exc import NoSuchModuleError -from preset_cli.api.clients.dbt import ModelSchema +from preset_cli.api.clients.dbt import MetricSchema, ModelSchema from preset_cli.exceptions import CLIError _logger = logging.getLogger(__name__) @@ -498,3 +498,31 @@ def list_failed_models(failed_models: List[str]) -> str: error_message += f"\n - {failed_model}" return error_message + + +def get_og_metric_from_config( + metric_config: Dict[str, Any], + dialect: str, + depends_on: Optional[List[str]] = None, + sql: Optional[str] = None, +) -> MetricSchema: + """ + Return an og metric from the config, adhering to the dbt Cloud schema. + """ + metric_schema = MetricSchema() + if depends_on is not None: + metric_config["dependsOn"] = depends_on + metric_config.pop("depends_on", None) + else: + metric_config["dependsOn"] = metric_config.pop("depends_on")["nodes"] + + if sql is not None: + metric_config["expression"] = sql + metric_config["calculation_method"] = "derived" + metric_config.pop("type", None) + metric_config.pop("sql", None) + + metric_config["uniqueId"] = metric_config.pop("unique_id") + metric_config["dialect"] = dialect + + return metric_schema.load(metric_config) diff --git a/tests/cli/superset/sync/dbt/command_test.py b/tests/cli/superset/sync/dbt/command_test.py index af39ef98..b9e5d35f 100644 --- a/tests/cli/superset/sync/dbt/command_test.py +++ b/tests/cli/superset/sync/dbt/command_test.py @@ -3,9 +3,12 @@ """ # pylint: disable=invalid-name, too-many-lines, line-too-long +import copy +import json import os from pathlib import Path from subprocess import CalledProcessError +from unittest import mock import pytest import yaml @@ -128,33 +131,6 @@ }, ] -dbt_core_metrics = [ - { - "label": "", - "sql": "*", - "depends_on": ["model.superset_examples.messages_channels"], - "meta": {}, - "description": "", - "name": "cnt", - "type": "count", - "filters": [], - "unique_id": "metric.superset_examples.cnt", - "created_at": 1642630986.1942852, - "package_name": "superset_examples", - "sources": [], - "root_path": "/Users/beto/Projects/dbt-examples/superset_examples", - "path": "slack/schema.yml", - "resource_type": "metric", - "original_file_path": "models/slack/schema.yml", - "model": "ref('messages_channels')", - "timestamp": None, - "fqn": ["superset_examples", "slack", "cnt"], - "time_grains": [], - "tags": [], - "refs": [["messages_channels"]], - "dimensions": [], - }, -] superset_metrics = { "model.superset_examples.messages_channels": [ @@ -2851,6 +2827,176 @@ def test_dbt_core_disallow_edits_superset( ) +def test_dbt_core_metrics_with_syntax_defined( + mocker: MockerFixture, + fs: FakeFilesystem, +) -> None: + """ + Test the ``dbt_core`` command with metrics that have the SQL to be used in + the Superset metric already defined and therefore don't require evaluation. + """ + updated_manifest = json.loads(copy.deepcopy(manifest_contents)) + + # Setting metrics in all schemas that have an ``expression`` and + # ``model`` already set. The model is different to purposefully test this. + updated_manifest["metrics"] = { + "metric.superset_examples.cnt": { + "depends_on": {"nodes": ["model.superset_examples.messages_channels"]}, + "description": "", + "filters": [], + "label": "Metric from an old version", + "meta": { + "superset": { + "expression": "MAX(id)", + "model": "model.other_project.other_model", + }, + }, + "name": "old_dbt_version_metric", + "sql": "*", + "type": "count", + "unique_id": "metric.superset_examples.cnt", + }, + "metric.superset_examples.other_metric": { + "depends_on": {"nodes": ["model.superset_examples.messages_channels"]}, + "description": "", + "filters": [], + "label": "Metric in the legacy semantic layer", + "meta": { + "superset": { + "expression": "MIN(id)", + "model": "model.other_project.other_model", + }, + }, + "name": "legacy_semantic_layer_metric", + "expression": "quantity", + "calculation_method": "sum", + "unique_id": "metric.superset_examples.other_metric", + }, + # Semantic layer metric as well + "metric.superset_examples.sl_metric": { + "name": "sl_metric", + "resource_type": "metric", + "package_name": "superset_examples", + "unique_id": "metric.superset_examples.sl_metric", + "description": "Semantic Layer Metric", + "label": "Semantic Layer Metric - label", + "type": "simple", + "type_params": { + "measure": { + "name": "customers_with_orders", + "filter": None, + "alias": None, + }, + "input_measures": [ + {"name": "customers_with_orders", "filter": None, "alias": None}, + ], + "metrics": [], + }, + "filter": None, + "metadata": None, + "meta": { + "superset": { + "expression": "MAX(price_each) - MAX(cost)", + "model": "model.other_project.other_model", + }, + }, + "depends_on": { + "macros": [], + "nodes": ["semantic_model.jaffle_shop.orders"], + }, + "refs": [], + "metrics": [], + }, + } + + # Create other model so that the logic still works + new_model = copy.deepcopy( + updated_manifest["nodes"]["model.superset_examples.messages_channels"], + ) + new_model_id = "model.other_project.other_model" + new_model["unique_id"] = new_model_id + updated_manifest["nodes"][new_model_id] = new_model + updated_manifest["child_map"][new_model_id] = [] + + root = Path("/path/to/root") + fs.create_dir(root) + manifest = root / "default/target/manifest.json" + fs.create_file(manifest, contents=json.dumps(updated_manifest)) + profiles = root / ".dbt/profiles.yml" + fs.create_file(profiles, contents=profiles_contents) + + SupersetClient = mocker.patch( + "preset_cli.cli.superset.sync.dbt.command.SupersetClient", + ) + client = SupersetClient() + mocker.patch("preset_cli.cli.superset.main.UsernamePasswordAuth") + sync_database = mocker.patch( + "preset_cli.cli.superset.sync.dbt.command.sync_database", + ) + sync_datasets = mocker.patch( + "preset_cli.cli.superset.sync.dbt.command.sync_datasets", + return_value=([], []), + ) + + runner = CliRunner() + result = runner.invoke( + superset_cli, + [ + "https://superset.example.org/", + "sync", + "dbt-core", + str(manifest), + "--profiles", + str(profiles), + "--project", + "default", + "--target", + "dev", + ], + catch_exceptions=False, + ) + assert result.exit_code == 0 + + # Testing only about the metric relationship/syntax in this test + sync_datasets.assert_called_with( + client, + mock.ANY, + { + "model.other_project.other_model": [ + { + "description": "", + "expression": "MAX(id)", + "extra": "{}", + "metric_name": "old_dbt_version_metric", + "metric_type": "derived", + "verbose_name": "Metric from an old version", + }, + { + "description": "", + "expression": "MIN(id)", + "extra": "{}", + "metric_name": "legacy_semantic_layer_metric", + "metric_type": "derived", + "verbose_name": "Metric in the legacy semantic layer", + }, + { + "description": "Semantic Layer Metric", + "expression": "MAX(price_each) - MAX(cost)", + "extra": "{}", + "metric_name": "sl_metric", + "metric_type": "derived", + "verbose_name": "Semantic Layer Metric - label", + }, + ], + }, + sync_database(), + False, + "", + reload_columns=True, + merge_metadata=False, + ) + + def test_dbt_cloud(mocker: MockerFixture) -> None: """ Test the ``dbt-cloud`` command. diff --git a/tests/cli/superset/sync/dbt/lib_test.py b/tests/cli/superset/sync/dbt/lib_test.py index 4dde45bb..fd9f265a 100644 --- a/tests/cli/superset/sync/dbt/lib_test.py +++ b/tests/cli/superset/sync/dbt/lib_test.py @@ -3,10 +3,11 @@ """ # pylint: disable=invalid-name +import copy import json import math from pathlib import Path -from typing import List +from typing import Any, Dict, List import pytest from pyfakefs.fake_filesystem import FakeFilesystem @@ -21,11 +22,40 @@ create_engine_with_check, env_var, filter_models, + get_og_metric_from_config, list_failed_models, load_profiles, ) from preset_cli.exceptions import CLIError +base_metric_config: Dict[str, Any] = { + "name": "revenue_metric", + "expression": "price_each", + "description": "revenue.", + "calculation_method": "sum", + "unique_id": "metric.postgres.revenue_verbose_name_from_dbt", + "label": "Sales Revenue Metric and this is the dbt label", + "depends_on": { + "nodes": ["model.postgres.vehicle_sales"], + }, + "metrics": [], + "created_at": 1701101973.269536, + "resource_type": "metric", + "fqn": ["postgres", "revenue_verbose_name_from_dbt"], + "model": "ref('vehicle_sales')", + "path": "schema.yml", + "package_name": "postgres", + "original_file_path": "models/schema.yml", + "refs": [{"name": "vehicle_sales", "package": None, "version": None}], + "time_grains": [], + "model_unique_id": None, + "meta": { + "superset": { + "d3format": ",.2f", + }, + }, +} + def test_build_sqlalchemy_params_postgres(mocker: MockerFixture) -> None: """ @@ -658,7 +688,7 @@ def test_apply_select_using_path(fs: FakeFilesystem) -> None: def test_list_failed_models_single_model() -> None: """ - Test ``list_failed_models()`` with a single failed model + Test ``list_failed_models`` with a single failed model """ error_list = list_failed_models(["single_failure"]) assert error_list == "Below model(s) failed to sync:\n - single_failure" @@ -666,10 +696,122 @@ def test_list_failed_models_single_model() -> None: def test_list_failed_models_multiple_models() -> None: """ - Test ``list_failed_models()`` with multiple failed models + Test ``list_failed_models`` with multiple failed models """ error_list = list_failed_models(["single_failure", "another_failure"]) assert ( error_list == "Below model(s) failed to sync:\n - single_failure\n - another_failure" ) + + +def test_get_og_metric_from_config() -> None: + """ + Test ``get_og_metric_from_config`` method. + """ + metric_config = copy.deepcopy(base_metric_config) + assert get_og_metric_from_config(metric_config, "my_dialect") == { + "depends_on": ["model.postgres.vehicle_sales"], + "description": "revenue.", + "meta": { + "superset": { + "d3format": ",.2f", + }, + }, + "name": "revenue_metric", + "label": "Sales Revenue Metric and this is the dbt label", + "unique_id": "metric.postgres.revenue_verbose_name_from_dbt", + "calculation_method": "sum", + "expression": "price_each", + "dialect": "my_dialect", + "metrics": [], + "created_at": 1701101973.269536, + "resource_type": "metric", + "fqn": ["postgres", "revenue_verbose_name_from_dbt"], + "model": "ref('vehicle_sales')", + "path": "schema.yml", + "package_name": "postgres", + "original_file_path": "models/schema.yml", + "refs": [{"name": "vehicle_sales", "package": None, "version": None}], + "time_grains": [], + "model_unique_id": None, + } + + +def test_get_og_metric_from_config_older_dbt_version() -> None: + """ + Test ``get_og_metric_from_config`` when passing a metric built using an + older dbt version (< 1.3). + """ + metric_config = copy.deepcopy(base_metric_config) + metric_config["type"] = metric_config.pop("calculation_method") + metric_config["sql"] = metric_config.pop("expression") + assert get_og_metric_from_config(metric_config, "other_dialect") == { + "depends_on": ["model.postgres.vehicle_sales"], + "description": "revenue.", + "meta": { + "superset": { + "d3format": ",.2f", + }, + }, + "name": "revenue_metric", + "label": "Sales Revenue Metric and this is the dbt label", + "unique_id": "metric.postgres.revenue_verbose_name_from_dbt", + "type": "sum", + "sql": "price_each", + "dialect": "other_dialect", + "metrics": [], + "created_at": 1701101973.269536, + "resource_type": "metric", + "fqn": ["postgres", "revenue_verbose_name_from_dbt"], + "model": "ref('vehicle_sales')", + "path": "schema.yml", + "package_name": "postgres", + "original_file_path": "models/schema.yml", + "refs": [{"name": "vehicle_sales", "package": None, "version": None}], + "time_grains": [], + "model_unique_id": None, + } + + +def test_get_og_metric_from_config_ready_metric() -> None: + """ + Test ``get_og_metric_from_config`` when passing a metric that already + contains the model's ``unique_id`` and its ``sql``. + """ + metric_config = copy.deepcopy(base_metric_config) + metric_config["meta"]["superset"]["model"] = "model.postgres.vehicle_sales" + + # Make sure that: + # - depends_on is empty (so that ``skip_parsing`` gets later set to True) + # - ``expression`` gets the value of ``sql`` + # - ``calculation_method`` is set to ``derived`` + assert get_og_metric_from_config( + metric_config, + "preset_sql", + depends_on=[], + sql="SUM(price_each) - max(cost)", + ) == { + "depends_on": [], + "description": "revenue.", + "meta": { + "superset": {"d3format": ",.2f", "model": "model.postgres.vehicle_sales"}, + }, + "name": "revenue_metric", + "label": "Sales Revenue Metric and this is the dbt label", + "unique_id": "metric.postgres.revenue_verbose_name_from_dbt", + "calculation_method": "derived", + "expression": "SUM(price_each) - max(cost)", + "dialect": "preset_sql", + "metrics": [], + "created_at": 1701101973.269536, + "resource_type": "metric", + "fqn": ["postgres", "revenue_verbose_name_from_dbt"], + "model": "ref('vehicle_sales')", + "path": "schema.yml", + "package_name": "postgres", + "original_file_path": "models/schema.yml", + "refs": [{"name": "vehicle_sales", "package": None, "version": None}], + "time_grains": [], + "model_unique_id": None, + }