Skip to content

Commit

Permalink
[dagster-powerbi] Pull owners, additional asset metadata (#25676)
Browse files Browse the repository at this point in the history
## Summary

Pull owner metadata as well as columnar metadata from PowerBI, if
available.

## How I Tested These Changes

Update unit tests.

## Changelog

> [dagster-powerbi] When using a full workspace scan, owner and column
metadata is now attached automatically to assets.
  • Loading branch information
benpankow authored Nov 5, 2024
1 parent 010b63e commit 7dcdc25
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
)
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.metadata.metadata_set import NamespacedMetadataSet
from dagster._core.definitions.metadata.metadata_set import NamespacedMetadataSet, TableMetadataSet
from dagster._core.definitions.metadata.metadata_value import MetadataValue
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
from dagster._core.definitions.tags.tag_set import NamespacedTagSet
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes
Expand Down Expand Up @@ -39,21 +40,20 @@ def _clean_asset_name(name: str) -> str:
def _attempt_parse_m_query_source(sources: List[Dict[str, Any]]) -> Optional[AssetKey]:
for source in sources:
if "expression" in source:
if "Snowflake.Databases" in source["expression"]:
objects = PARSE_M_QUERY_OBJECT.findall(source["expression"])
objects_by_kind = {obj[1]: obj[0].lower() for obj in objects}

if "Schema" in objects_by_kind and "Table" in objects_by_kind:
if "Database" in objects_by_kind:
return AssetKey(
[
objects_by_kind["Database"],
objects_by_kind["Schema"],
objects_by_kind["Table"],
]
)
else:
return AssetKey([objects_by_kind["Schema"], objects_by_kind["Table"]])
objects = PARSE_M_QUERY_OBJECT.findall(source["expression"])
objects_by_kind = {obj[1]: obj[0].lower() for obj in objects}

if "Schema" in objects_by_kind and "Table" in objects_by_kind:
if "Database" in objects_by_kind:
return AssetKey(
[
objects_by_kind["Database"],
objects_by_kind["Schema"],
objects_by_kind["Table"],
]
)
else:
return AssetKey([objects_by_kind["Schema"], objects_by_kind["Table"]])


@whitelist_for_serdes
Expand Down Expand Up @@ -137,6 +137,18 @@ def namespace(cls) -> str:
return "dagster-powerbi"


def _build_table_metadata(table: Dict[str, Any]) -> TableMetadataSet:
return TableMetadataSet(
table_name=table["name"],
column_schema=TableSchema(
columns=[
TableColumn(name=column["name"].lower(), type=column.get("dataType"))
for column in table["columns"]
]
),
)


class DagsterPowerBITranslator:
"""Translator class which converts raw response data from the PowerBI API into AssetSpecs.
Subclass this class to implement custom logic for each type of PowerBI content.
Expand Down Expand Up @@ -170,14 +182,18 @@ def get_dashboard_asset_key(self, data: PowerBIContentData) -> AssetKey:
)

def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec:
dashboard_id = data.properties["id"]
tile_report_ids = [
tile["reportId"] for tile in data.properties["tiles"] if "reportId" in tile
]
report_keys = [
self.get_report_asset_key(self.workspace_data.reports_by_id[report_id])
for report_id in tile_report_ids
]
url = data.properties.get("webUrl")
url = (
data.properties.get("webUrl")
or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/dashboards/{dashboard_id}"
)

return AssetSpec(
key=self.get_dashboard_asset_key(data),
Expand All @@ -191,46 +207,74 @@ def get_report_asset_key(self, data: PowerBIContentData) -> AssetKey:
return AssetKey(["report", _clean_asset_name(data.properties["name"])])

def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
report_id = data.properties["id"]
dataset_id = data.properties["datasetId"]
dataset_data = self.workspace_data.semantic_models_by_id.get(dataset_id)
dataset_key = self.get_semantic_model_asset_key(dataset_data) if dataset_data else None
url = data.properties.get("webUrl")
url = (
data.properties.get("webUrl")
or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/reports/{report_id}"
)

owner = data.properties.get("createdBy")

return AssetSpec(
key=self.get_report_asset_key(data),
deps=[dataset_key] if dataset_key else None,
metadata={**PowerBIMetadataSet(web_url=MetadataValue.url(url) if url else None)},
tags={**PowerBITagSet(asset_type="report")},
kinds={"powerbi", "report"},
owners=[owner] if owner else None,
)

def get_semantic_model_asset_key(self, data: PowerBIContentData) -> AssetKey:
return AssetKey(["semantic_model", _clean_asset_name(data.properties["name"])])

def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
dataset_id = data.properties["id"]
source_ids = data.properties.get("sources", [])
source_keys = [
self.get_data_source_asset_key(self.workspace_data.data_sources_by_id[source_id])
for source_id in source_ids
]
url = data.properties.get("webUrl")
url = (
data.properties.get("webUrl")
or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/datasets/{dataset_id}"
)

for table in data.properties.get("tables", []):
source = table.get("source")
source = table.get("source", [])
source_key = _attempt_parse_m_query_source(source)
if source_key:
source_keys.append(source_key)

owner = data.properties.get("configuredBy")

tables = data.properties.get("tables")
table_meta = {}
if tables:
if len(tables) == 1:
table_meta = _build_table_metadata(tables[0])
else:
table_meta = {
f"{table['name'].lower()}_column_schema": _build_table_metadata(
table
).column_schema
for table in tables
}

return AssetSpec(
key=self.get_semantic_model_asset_key(data),
deps=source_keys,
metadata={
**PowerBIMetadataSet(
web_url=MetadataValue.url(url) if url else None, id=data.properties["id"]
)
),
**table_meta,
},
tags={**PowerBITagSet(asset_type="semantic_model")},
kinds={"powerbi", "semantic model"},
owners=[owner] if owner else None,
)

def get_data_source_asset_key(self, data: PowerBIContentData) -> AssetKey:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@
"datasetWorkspaceId": "a2122b8f-d7e1-42e8-be2b-a5e636ca3221",
"users": [],
"subscriptions": [],
"createdBy": "[email protected]",
}

SAMPLE_SEMANTIC_MODEL = {
"id": "8e9c85a1-7b33-4223-9590-76bde70f9a20",
"name": "Sales & Returns Sample v201912",
"webUrl": "https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/datasets/8e9c85a1-7b33-4223-9590-76bde70f9a20",
"addRowsAPIEnabled": False,
"configuredBy": "[email protected]",
"isRefreshable": True,
"isEffectiveIdentityRequired": False,
"isEffectiveIdentityRolesRequired": False,
Expand All @@ -58,6 +58,19 @@
"upstreamDatasets": [],
"users": [],
"queryScaleOutSettings": {"autoSyncReadOnlyReplicas": True, "maxReadOnlyReplicas": 0},
"configuredBy": "[email protected]",
"tables": [
{
"name": "sales",
"columns": [
{"name": "order_id", "dataType": "Int64"},
{"name": "product_id", "dataType": "Int64"},
{"name": "quantity", "dataType": "Int64"},
{"name": "price", "dataType": "Decimal"},
{"name": "order_date", "dataType": "DateTime"},
],
}
],
}


Expand All @@ -78,6 +91,26 @@
"upstreamDatasets": [],
"users": [],
"queryScaleOutSettings": {"autoSyncReadOnlyReplicas": True, "maxReadOnlyReplicas": 0},
"tables": [
{
"name": "sales",
"columns": [
{"name": "order_id", "dataType": "Int64"},
{"name": "product_id", "dataType": "Int64"},
{"name": "quantity", "dataType": "Int64"},
{"name": "price", "dataType": "Decimal"},
{"name": "order_date", "dataType": "DateTime"},
],
},
{
"name": "customers",
"columns": [
{"name": "customer_id", "dataType": "Int64"},
{"name": "customer_name", "dataType": "String"},
{"name": "customer_email", "dataType": "String"},
],
},
],
}


Expand Down Expand Up @@ -266,3 +299,21 @@ def second_workspace_data_api_mocks_fixture(
)

yield workspace_data_api_mocks


@pytest.fixture(
name="second_workspace_data",
)
def second_workspace_data_fixture(second_workspace_id: str) -> PowerBIWorkspaceData:
return PowerBIWorkspaceData(
workspace_id=second_workspace_id,
dashboards_by_id={},
reports_by_id={},
semantic_models_by_id={
OTHER_SAMPLE_SEMANTIC_MODEL["id"]: PowerBIContentData(
content_type=PowerBIContentType.SEMANTIC_MODEL,
properties=OTHER_SAMPLE_SEMANTIC_MODEL,
)
},
data_sources_by_id={},
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.metadata.metadata_value import MetadataValue
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
from dagster._core.definitions.tags import build_kind_tag
from dagster_powerbi import DagsterPowerBITranslator
from dagster_powerbi.translator import PowerBIContentData, PowerBIWorkspaceData
Expand Down Expand Up @@ -48,6 +49,7 @@ def test_translator_report_spec(workspace_data: PowerBIWorkspaceData) -> None:
**build_kind_tag("powerbi"),
**build_kind_tag("report"),
}
assert asset_spec.owners == ["[email protected]"]


def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None:
Expand All @@ -66,12 +68,52 @@ def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None
"https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/datasets/8e9c85a1-7b33-4223-9590-76bde70f9a20"
),
"dagster-powerbi/id": "8e9c85a1-7b33-4223-9590-76bde70f9a20",
"dagster/table_name": "sales",
"dagster/column_schema": TableSchema(
columns=[
TableColumn(name="order_id", type="Int64"),
TableColumn(name="product_id", type="Int64"),
TableColumn(name="quantity", type="Int64"),
TableColumn(name="price", type="Decimal"),
TableColumn(name="order_date", type="DateTime"),
]
),
}
assert asset_spec.tags == {
"dagster-powerbi/asset_type": "semantic_model",
**build_kind_tag("powerbi"),
**build_kind_tag("semantic model"),
}
assert asset_spec.owners == ["[email protected]"]


def test_translator_semantic_model_many_tables(second_workspace_data: PowerBIWorkspaceData) -> None:
semantic_model = next(iter(second_workspace_data.semantic_models_by_id.values()))

translator = DagsterPowerBITranslator(second_workspace_data)
asset_spec = translator.get_asset_spec(semantic_model)
assert asset_spec.metadata == {
"dagster-powerbi/web_url": MetadataValue.url(
"https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/datasets/8e9c85a1-7b33-4223-9590-76bde70f9a20"
),
"dagster-powerbi/id": "ae9c85a1-7b33-4223-9590-76bde70f9a20",
"sales_column_schema": TableSchema(
columns=[
TableColumn(name="order_id", type="Int64"),
TableColumn(name="product_id", type="Int64"),
TableColumn(name="quantity", type="Int64"),
TableColumn(name="price", type="Decimal"),
TableColumn(name="order_date", type="DateTime"),
]
),
"customers_column_schema": TableSchema(
columns=[
TableColumn(name="customer_id", type="Int64"),
TableColumn(name="customer_name", type="String"),
TableColumn(name="customer_email", type="String"),
]
),
}


class MyCustomTranslator(DagsterPowerBITranslator):
Expand Down

0 comments on commit 7dcdc25

Please sign in to comment.