From 7dcdc256faf00978293bc3087d66979e9630642e Mon Sep 17 00:00:00 2001 From: Ben Pankow Date: Tue, 5 Nov 2024 14:56:42 -0800 Subject: [PATCH] [dagster-powerbi] Pull owners, additional asset metadata (#25676) ## Summary Pull owner metadata as well as columnar metadata from PowerBI, if available. ## How I Tested These Changes Update unit tests. ## Changelog > [dagster-powerbi] When using a full workspace scan, owner and column metadata is now attached automatically to assets. --- .../dagster_powerbi/translator.py | 86 ++++++++++++++----- .../dagster_powerbi_tests/conftest.py | 53 +++++++++++- .../dagster_powerbi_tests/test_translator.py | 42 +++++++++ 3 files changed, 159 insertions(+), 22 deletions(-) diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py index f1200aa4ab03e..c7d8bbcacd3c6 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py @@ -9,8 +9,9 @@ ) from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.asset_spec import AssetSpec -from dagster._core.definitions.metadata.metadata_set import NamespacedMetadataSet +from dagster._core.definitions.metadata.metadata_set import NamespacedMetadataSet, TableMetadataSet from dagster._core.definitions.metadata.metadata_value import MetadataValue +from dagster._core.definitions.metadata.table import TableColumn, TableSchema from dagster._core.definitions.tags.tag_set import NamespacedTagSet from dagster._record import record from dagster._serdes.serdes import whitelist_for_serdes @@ -39,21 +40,20 @@ def _clean_asset_name(name: str) -> str: def _attempt_parse_m_query_source(sources: List[Dict[str, Any]]) -> Optional[AssetKey]: for source in sources: if "expression" in source: - if "Snowflake.Databases" in source["expression"]: - objects = PARSE_M_QUERY_OBJECT.findall(source["expression"]) - objects_by_kind = {obj[1]: obj[0].lower() for obj in objects} - - if "Schema" in objects_by_kind and "Table" in objects_by_kind: - if "Database" in objects_by_kind: - return AssetKey( - [ - objects_by_kind["Database"], - objects_by_kind["Schema"], - objects_by_kind["Table"], - ] - ) - else: - return AssetKey([objects_by_kind["Schema"], objects_by_kind["Table"]]) + objects = PARSE_M_QUERY_OBJECT.findall(source["expression"]) + objects_by_kind = {obj[1]: obj[0].lower() for obj in objects} + + if "Schema" in objects_by_kind and "Table" in objects_by_kind: + if "Database" in objects_by_kind: + return AssetKey( + [ + objects_by_kind["Database"], + objects_by_kind["Schema"], + objects_by_kind["Table"], + ] + ) + else: + return AssetKey([objects_by_kind["Schema"], objects_by_kind["Table"]]) @whitelist_for_serdes @@ -137,6 +137,18 @@ def namespace(cls) -> str: return "dagster-powerbi" +def _build_table_metadata(table: Dict[str, Any]) -> TableMetadataSet: + return TableMetadataSet( + table_name=table["name"], + column_schema=TableSchema( + columns=[ + TableColumn(name=column["name"].lower(), type=column.get("dataType")) + for column in table["columns"] + ] + ), + ) + + class DagsterPowerBITranslator: """Translator class which converts raw response data from the PowerBI API into AssetSpecs. Subclass this class to implement custom logic for each type of PowerBI content. @@ -170,6 +182,7 @@ def get_dashboard_asset_key(self, data: PowerBIContentData) -> AssetKey: ) def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec: + dashboard_id = data.properties["id"] tile_report_ids = [ tile["reportId"] for tile in data.properties["tiles"] if "reportId" in tile ] @@ -177,7 +190,10 @@ def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec: self.get_report_asset_key(self.workspace_data.reports_by_id[report_id]) for report_id in tile_report_ids ] - url = data.properties.get("webUrl") + url = ( + data.properties.get("webUrl") + or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/dashboards/{dashboard_id}" + ) return AssetSpec( key=self.get_dashboard_asset_key(data), @@ -191,10 +207,16 @@ def get_report_asset_key(self, data: PowerBIContentData) -> AssetKey: return AssetKey(["report", _clean_asset_name(data.properties["name"])]) def get_report_spec(self, data: PowerBIContentData) -> AssetSpec: + report_id = data.properties["id"] dataset_id = data.properties["datasetId"] dataset_data = self.workspace_data.semantic_models_by_id.get(dataset_id) dataset_key = self.get_semantic_model_asset_key(dataset_data) if dataset_data else None - url = data.properties.get("webUrl") + url = ( + data.properties.get("webUrl") + or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/reports/{report_id}" + ) + + owner = data.properties.get("createdBy") return AssetSpec( key=self.get_report_asset_key(data), @@ -202,35 +224,57 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec: metadata={**PowerBIMetadataSet(web_url=MetadataValue.url(url) if url else None)}, tags={**PowerBITagSet(asset_type="report")}, kinds={"powerbi", "report"}, + owners=[owner] if owner else None, ) def get_semantic_model_asset_key(self, data: PowerBIContentData) -> AssetKey: return AssetKey(["semantic_model", _clean_asset_name(data.properties["name"])]) def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec: + dataset_id = data.properties["id"] source_ids = data.properties.get("sources", []) source_keys = [ self.get_data_source_asset_key(self.workspace_data.data_sources_by_id[source_id]) for source_id in source_ids ] - url = data.properties.get("webUrl") + url = ( + data.properties.get("webUrl") + or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/datasets/{dataset_id}" + ) for table in data.properties.get("tables", []): - source = table.get("source") + source = table.get("source", []) source_key = _attempt_parse_m_query_source(source) if source_key: source_keys.append(source_key) + owner = data.properties.get("configuredBy") + + tables = data.properties.get("tables") + table_meta = {} + if tables: + if len(tables) == 1: + table_meta = _build_table_metadata(tables[0]) + else: + table_meta = { + f"{table['name'].lower()}_column_schema": _build_table_metadata( + table + ).column_schema + for table in tables + } + return AssetSpec( key=self.get_semantic_model_asset_key(data), deps=source_keys, metadata={ **PowerBIMetadataSet( web_url=MetadataValue.url(url) if url else None, id=data.properties["id"] - ) + ), + **table_meta, }, tags={**PowerBITagSet(asset_type="semantic_model")}, kinds={"powerbi", "semantic model"}, + owners=[owner] if owner else None, ) def get_data_source_asset_key(self, data: PowerBIContentData) -> AssetKey: diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/conftest.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/conftest.py index 56147c9dacd01..46b9f6173b431 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/conftest.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/conftest.py @@ -39,6 +39,7 @@ "datasetWorkspaceId": "a2122b8f-d7e1-42e8-be2b-a5e636ca3221", "users": [], "subscriptions": [], + "createdBy": "ben@dagsterlabs.com", } SAMPLE_SEMANTIC_MODEL = { @@ -46,7 +47,6 @@ "name": "Sales & Returns Sample v201912", "webUrl": "https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/datasets/8e9c85a1-7b33-4223-9590-76bde70f9a20", "addRowsAPIEnabled": False, - "configuredBy": "ben@elementl.com", "isRefreshable": True, "isEffectiveIdentityRequired": False, "isEffectiveIdentityRolesRequired": False, @@ -58,6 +58,19 @@ "upstreamDatasets": [], "users": [], "queryScaleOutSettings": {"autoSyncReadOnlyReplicas": True, "maxReadOnlyReplicas": 0}, + "configuredBy": "chris@dagsterlabs.com", + "tables": [ + { + "name": "sales", + "columns": [ + {"name": "order_id", "dataType": "Int64"}, + {"name": "product_id", "dataType": "Int64"}, + {"name": "quantity", "dataType": "Int64"}, + {"name": "price", "dataType": "Decimal"}, + {"name": "order_date", "dataType": "DateTime"}, + ], + } + ], } @@ -78,6 +91,26 @@ "upstreamDatasets": [], "users": [], "queryScaleOutSettings": {"autoSyncReadOnlyReplicas": True, "maxReadOnlyReplicas": 0}, + "tables": [ + { + "name": "sales", + "columns": [ + {"name": "order_id", "dataType": "Int64"}, + {"name": "product_id", "dataType": "Int64"}, + {"name": "quantity", "dataType": "Int64"}, + {"name": "price", "dataType": "Decimal"}, + {"name": "order_date", "dataType": "DateTime"}, + ], + }, + { + "name": "customers", + "columns": [ + {"name": "customer_id", "dataType": "Int64"}, + {"name": "customer_name", "dataType": "String"}, + {"name": "customer_email", "dataType": "String"}, + ], + }, + ], } @@ -266,3 +299,21 @@ def second_workspace_data_api_mocks_fixture( ) yield workspace_data_api_mocks + + +@pytest.fixture( + name="second_workspace_data", +) +def second_workspace_data_fixture(second_workspace_id: str) -> PowerBIWorkspaceData: + return PowerBIWorkspaceData( + workspace_id=second_workspace_id, + dashboards_by_id={}, + reports_by_id={}, + semantic_models_by_id={ + OTHER_SAMPLE_SEMANTIC_MODEL["id"]: PowerBIContentData( + content_type=PowerBIContentType.SEMANTIC_MODEL, + properties=OTHER_SAMPLE_SEMANTIC_MODEL, + ) + }, + data_sources_by_id={}, + ) diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py index 9181c356035a6..b4ee772222c39 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py @@ -1,6 +1,7 @@ from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.metadata.metadata_value import MetadataValue +from dagster._core.definitions.metadata.table import TableColumn, TableSchema from dagster._core.definitions.tags import build_kind_tag from dagster_powerbi import DagsterPowerBITranslator from dagster_powerbi.translator import PowerBIContentData, PowerBIWorkspaceData @@ -48,6 +49,7 @@ def test_translator_report_spec(workspace_data: PowerBIWorkspaceData) -> None: **build_kind_tag("powerbi"), **build_kind_tag("report"), } + assert asset_spec.owners == ["ben@dagsterlabs.com"] def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None: @@ -66,12 +68,52 @@ def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None "https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/datasets/8e9c85a1-7b33-4223-9590-76bde70f9a20" ), "dagster-powerbi/id": "8e9c85a1-7b33-4223-9590-76bde70f9a20", + "dagster/table_name": "sales", + "dagster/column_schema": TableSchema( + columns=[ + TableColumn(name="order_id", type="Int64"), + TableColumn(name="product_id", type="Int64"), + TableColumn(name="quantity", type="Int64"), + TableColumn(name="price", type="Decimal"), + TableColumn(name="order_date", type="DateTime"), + ] + ), } assert asset_spec.tags == { "dagster-powerbi/asset_type": "semantic_model", **build_kind_tag("powerbi"), **build_kind_tag("semantic model"), } + assert asset_spec.owners == ["chris@dagsterlabs.com"] + + +def test_translator_semantic_model_many_tables(second_workspace_data: PowerBIWorkspaceData) -> None: + semantic_model = next(iter(second_workspace_data.semantic_models_by_id.values())) + + translator = DagsterPowerBITranslator(second_workspace_data) + asset_spec = translator.get_asset_spec(semantic_model) + assert asset_spec.metadata == { + "dagster-powerbi/web_url": MetadataValue.url( + "https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/datasets/8e9c85a1-7b33-4223-9590-76bde70f9a20" + ), + "dagster-powerbi/id": "ae9c85a1-7b33-4223-9590-76bde70f9a20", + "sales_column_schema": TableSchema( + columns=[ + TableColumn(name="order_id", type="Int64"), + TableColumn(name="product_id", type="Int64"), + TableColumn(name="quantity", type="Int64"), + TableColumn(name="price", type="Decimal"), + TableColumn(name="order_date", type="DateTime"), + ] + ), + "customers_column_schema": TableSchema( + columns=[ + TableColumn(name="customer_id", type="Int64"), + TableColumn(name="customer_name", type="String"), + TableColumn(name="customer_email", type="String"), + ] + ), + } class MyCustomTranslator(DagsterPowerBITranslator):