Skip to content

Commit 7e7f06b

Browse files
committed
feat: DJ sync
1 parent 693156e commit 7e7f06b

File tree

8 files changed

+1044
-4
lines changed

8 files changed

+1044
-4
lines changed

src/preset_cli/api/clients/superset.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,6 @@ class OwnershipType(TypedDict):
231231

232232

233233
class SupersetClient: # pylint: disable=too-many-public-methods
234-
235234
"""
236235
A client for running queries against Superset.
237236
"""
@@ -360,9 +359,11 @@ def get_data( # pylint: disable=too-many-locals, too-many-arguments
360359

361360
# and order bys
362361
processed_orderbys = [
363-
(orderby, not order_desc)
364-
if orderby in metric_names
365-
else (convert_to_adhoc_metric(orderby), not order_desc)
362+
(
363+
(orderby, not order_desc)
364+
if orderby in metric_names
365+
else (convert_to_adhoc_metric(orderby), not order_desc)
366+
)
366367
for orderby in (order_by or [])
367368
]
368369

src/preset_cli/cli/superset/sync/dj/__init__.py

Whitespace-only changes.
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
"""
2+
A command to sync DJ cubes into a Superset instance.
3+
"""
4+
5+
from __future__ import annotations
6+
7+
import logging
8+
from uuid import UUID
9+
10+
import click
11+
from datajunction import DJClient
12+
from yarl import URL
13+
14+
from preset_cli.api.clients.superset import SupersetClient
15+
from preset_cli.cli.superset.sync.dj.lib import sync_cube
16+
from preset_cli.lib import split_comma
17+
18+
_logger = logging.getLogger(__name__)
19+
20+
21+
@click.command()
22+
@click.option(
23+
"--database-uuid",
24+
required=True,
25+
help="Database UUID",
26+
)
27+
@click.option(
28+
"--schema",
29+
required=True,
30+
help="Schema where virtual dataset will be created",
31+
)
32+
@click.option(
33+
"--cubes",
34+
callback=split_comma,
35+
help="Comma-separated list of cubes to sync",
36+
)
37+
@click.option(
38+
"dj_url",
39+
"--dj-url",
40+
required=True,
41+
help="DJ URL",
42+
default="http://localhost:8000",
43+
)
44+
@click.option(
45+
"dj_username",
46+
"--dj-username",
47+
required=True,
48+
help="DJ username",
49+
default="dj",
50+
)
51+
@click.option(
52+
"dj_password",
53+
"--dj-password",
54+
required=True,
55+
help="DJ password",
56+
default="dj",
57+
)
58+
@click.option("--external-url-prefix", default="", help="Base URL for resources")
59+
@click.pass_context
60+
def dj( # pylint: disable=invalid-name,too-many-arguments
61+
ctx: click.core.Context,
62+
database_uuid: str,
63+
schema: str,
64+
cubes: list[str],
65+
dj_url: str,
66+
dj_username: str,
67+
dj_password: str,
68+
external_url_prefix: str = "",
69+
) -> None:
70+
"""
71+
Sync DJ cubes to Superset.
72+
"""
73+
superset_auth = ctx.obj["AUTH"]
74+
superset_url = URL(ctx.obj["INSTANCE"])
75+
superset_client = SupersetClient(superset_url, superset_auth)
76+
77+
dj_client = DJClient(dj_url)
78+
dj_client.basic_login(dj_username, dj_password)
79+
80+
base_url = URL(external_url_prefix) if external_url_prefix else None
81+
82+
for cube in cubes:
83+
sync_cube(
84+
UUID(database_uuid),
85+
schema,
86+
dj_client,
87+
superset_client,
88+
cube,
89+
base_url,
90+
)
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
"""
2+
Helper functions for DJ sync.
3+
"""
4+
5+
import json
6+
from typing import Any
7+
from uuid import UUID
8+
9+
from datajunction import DJClient
10+
from yarl import URL
11+
12+
from preset_cli.api.clients.superset import SupersetClient
13+
from preset_cli.api.operators import OneToMany
14+
15+
16+
def sync_cube( # pylint: disable=too-many-arguments
17+
database_uuid: UUID,
18+
schema: str,
19+
dj_client: DJClient,
20+
superset_client: SupersetClient,
21+
cube: str,
22+
base_url: URL | None,
23+
) -> None:
24+
"""
25+
Sync a DJ cube to a Superset virtual dataset.
26+
"""
27+
response = dj_client._session.post( # pylint: disable=protected-access
28+
"/graphql",
29+
json={
30+
"query": """
31+
query FindCubes($names:[String!], $tags: [String!]) {
32+
findNodes(names: $names, tags: $tags, nodeTypes: [CUBE]) {
33+
name
34+
current {
35+
description
36+
displayName
37+
cubeMetrics {
38+
name
39+
description
40+
extractedMeasures {
41+
derivedExpression
42+
}
43+
}
44+
cubeDimensions {
45+
name
46+
}
47+
}
48+
}
49+
}
50+
""",
51+
"variables": {"names": [cube]},
52+
},
53+
)
54+
payload = response.json()
55+
description = payload["data"]["findNodes"][0]["current"]["description"]
56+
columns = [
57+
dimension["name"]
58+
for dimension in payload["data"]["findNodes"][0]["current"]["cubeDimensions"]
59+
]
60+
metrics = [
61+
{
62+
"metric_name": metric["name"],
63+
"expression": metric["extractedMeasures"]["derivedExpression"],
64+
"description": metric["description"],
65+
}
66+
for metric in payload["data"]["findNodes"][0]["current"]["cubeMetrics"]
67+
]
68+
69+
response = dj_client._session.post( # pylint: disable=protected-access
70+
"/graphql",
71+
json={
72+
"query": """
73+
query MeasuresSql($metrics: [String!]!, $dimensions: [String!]!) {
74+
measuresSql(
75+
cube: {metrics: $metrics, dimensions: $dimensions, filters: []}
76+
preaggregate: true
77+
) {
78+
sql
79+
}
80+
}
81+
""",
82+
"variables": {
83+
"metrics": [metric["metric_name"] for metric in metrics],
84+
"dimensions": columns,
85+
},
86+
},
87+
)
88+
payload = response.json()
89+
sql = payload["data"]["measuresSql"][0]["sql"]
90+
91+
database = get_database(superset_client, database_uuid)
92+
dataset = get_or_create_dataset(superset_client, database, schema, cube, sql)
93+
94+
superset_client.update_dataset(
95+
dataset["id"],
96+
override_columns=True,
97+
metrics=[],
98+
)
99+
100+
superset_client.update_dataset(
101+
dataset["id"],
102+
override_columns=False,
103+
metrics=metrics,
104+
description=description,
105+
is_managed_externally=True,
106+
external_url=base_url / "nodes" / cube if base_url else None,
107+
extra=json.dumps(
108+
{
109+
"certification": {
110+
"certified_by": "DJ",
111+
"details": "This table is created by DJ.",
112+
},
113+
},
114+
),
115+
sql=sql,
116+
)
117+
118+
119+
def get_database(superset_client: SupersetClient, uuid: UUID) -> dict[str, Any]:
120+
"""
121+
Get database info given its UUID.
122+
"""
123+
databases = superset_client.get_databases(uuid=str(uuid))
124+
if not databases:
125+
raise ValueError(f"Database with UUID {uuid} not found in Superset.")
126+
127+
return databases[0]
128+
129+
130+
def get_or_create_dataset(
131+
superset_client: SupersetClient,
132+
database: dict[str, Any],
133+
schema: str,
134+
cube: str,
135+
sql: str,
136+
) -> dict[str, Any]:
137+
"""
138+
Get or create a dataset in Superset.
139+
"""
140+
if existing := superset_client.get_datasets(
141+
database=OneToMany(database["id"]), # type: ignore
142+
schema=schema,
143+
table_name=cube,
144+
):
145+
dataset = existing[0]
146+
return superset_client.get_dataset(dataset["id"])
147+
148+
return superset_client.create_dataset(
149+
database=database["id"],
150+
catalog=None,
151+
schema=schema,
152+
table_name=cube,
153+
sql=sql,
154+
)

src/preset_cli/cli/superset/sync/main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import click
66

77
from preset_cli.cli.superset.sync.dbt.command import dbt_cloud, dbt_core
8+
from preset_cli.cli.superset.sync.dj.command import dj
89
from preset_cli.cli.superset.sync.native.command import native
910

1011

@@ -16,6 +17,7 @@ def sync() -> None:
1617

1718

1819
sync.add_command(native)
20+
sync.add_command(dj)
1921
sync.add_command(dbt_cloud, name="dbt-cloud")
2022
sync.add_command(dbt_core, name="dbt-core")
2123
# for backwards compatibility

tests/cli/superset/sync/dj/__init__.py

Whitespace-only changes.
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
"""
2+
Tests for the DJ sync command.
3+
"""
4+
5+
# pylint: disable=invalid-name
6+
7+
from uuid import UUID
8+
9+
from click.testing import CliRunner
10+
from pytest_mock import MockerFixture
11+
from yarl import URL
12+
13+
from preset_cli.cli.superset.main import superset_cli
14+
15+
16+
def test_dj_command(mocker: MockerFixture) -> None:
17+
"""
18+
Tests for the sync command.
19+
"""
20+
SupersetClient = mocker.patch(
21+
"preset_cli.cli.superset.sync.dj.command.SupersetClient",
22+
)
23+
UsernamePasswordAuth = mocker.patch(
24+
"preset_cli.cli.superset.main.UsernamePasswordAuth",
25+
)
26+
DJClient = mocker.patch("preset_cli.cli.superset.sync.dj.command.DJClient")
27+
sync_cube = mocker.patch("preset_cli.cli.superset.sync.dj.command.sync_cube")
28+
29+
runner = CliRunner()
30+
result = runner.invoke(
31+
superset_cli,
32+
[
33+
"https://superset.example.org/",
34+
"sync",
35+
"dj",
36+
"--cubes",
37+
"default.repair_orders_cube",
38+
"--database-uuid",
39+
"a1ad7bd5-b1a3-4d64-afb1-a84c2f4d7715",
40+
"--schema",
41+
"schema",
42+
],
43+
catch_exceptions=False,
44+
)
45+
assert result.exit_code == 0
46+
47+
SupersetClient.assert_called_once_with(
48+
URL("https://superset.example.org/"),
49+
UsernamePasswordAuth(),
50+
)
51+
DJClient.assert_called_once_with("http://localhost:8000")
52+
DJClient().basic_login.assert_called_once_with("dj", "dj")
53+
54+
sync_cube.assert_called_once_with(
55+
UUID("a1ad7bd5-b1a3-4d64-afb1-a84c2f4d7715"),
56+
"schema",
57+
DJClient(),
58+
SupersetClient(),
59+
"default.repair_orders_cube",
60+
None,
61+
)

0 commit comments

Comments
 (0)