-
Notifications
You must be signed in to change notification settings - Fork 96
Azure Purview Tips
This is a collection of "tips" for working with Azure Purview types.
The Schema of a Resource Set is actually a tabular_scheam entity that contains zero or more column entities.
- Resource Set has a "tabular_schema" relationship attribute that must be a tabular_schema entity.
- Column has a "composeSchema" relationship attribute that is REQUIRED and must be a "tabular_schema" entity.
+--------+ +-------+ +------+
|Resource| |Tabular| |Column|
| Set +----->+Schema +---->+ |
+--------+ +-------+ +------+
"tabular_schema" "composeSchema"
import json
import os
from pyapacheatlas.auth import ServicePrincipalAuthentication
from pyapacheatlas.core import AtlasClient, AtlasEntity, TypeCategory
from pyapacheatlas.readers import ExcelConfiguration, ExcelReader
if __name__ == "__main__":
oauth = ServicePrincipalAuthentication() # See samples for instantiation
atlas_client = PurviewClient() # See samples for instantiation
# Create a Tabular Schema entity
ts = AtlasEntity(
name="demotabschema",
typeName="tabular_schema",
qualified_name="pyapache://demotabschema",
guid = -2
)
# Create a Column entity that references your tabular schema
col01 = AtlasEntity(
name="democolumn",
typeName="column",
qualified_name="pyapche://mycolumn",
guid=-3,
attributes={
"type":"Blah",
"description": "blabh blah"
},
relationshipAttributes = {
"composeSchema": ts.to_json(minimum=True)
}
)
# Create a resource set that references the tabular schema
rs = AtlasEntity(
name="demoresourceset",
typeName="azure_datalake_gen2_resource_set",
qualified_name="pyapache://demors",
guid = -1,
relationshipAttributes = {
"tabular_schema": ts.to_json(minimum=True)
}
)
# Upload entities
results = atlas_client.upload_entities(
[rs.to_json(), ts.to_json(), col01.to_json()]
)
# Print out results to see the guid assignemnts
print(json.dumps(results, indent=2))
# Omitting imports as seen in the previous step
ts = AtlasEntity(
name="demotabschema",
typeName="tabular_schema",
qualified_name="pyapache://demotabschema",
guid = -1
)
updated_col01 = AtlasEntity(
name="democolumn",
typeName="column",
qualified_name="pyapche://mycolumn",
guid=-2,
attributes={
"type":"Blah2",
"description": "blabh2 blah23"
},
relationshipAttributes = {
"composeSchema": ts.to_json(minimum=True)
# Alternatively, if you know the guid, your reference is simpler
# And you don't need to upload / instantiate a "dummy" of your existing tabular_schema.
# You can also get an entity by name using atlas_client.get_entity method
# ts_guid = atlas_client.get_entity(qualifiedName="pyapache://demotabschema", typeName="tabular_schema")
#"composeSchema": {"guid": "cdd919a0-your-guid-a896-18f6f6f60000"}
}
)
# When updating the column, you need the tabular schema entity ONLY if you are using the "header"
# method (AtlasEntity.to_json(minimum=True)). If you are using the guid in the column's composeSchema
# you don't need to upload the tabular schema object.
results = atlas_client.upload_entities( [updated_col01.to_json(), ts.to_json() ] )
print(json.dumps(results, indent=2))
At time of writing, PowerBI lineage seems to not always include the source table but instead points to a generic database / table entity.
You can "hook up" the lineage to an existing entity with the script below. However, an "Incremental Scan" will overwrite the changes.
LINEAGE_TO_CHANGE_QUALIFIED_NAME = ""
LINEAGE_TO_CHANGE_NAME = ""
LINEAGE_TO_CHANGE_TYPE = "powerbi_dataset_process"
# Update these to be the existing source entity that should
# Link to your Power BI dataset as above.
SOURCE_QUALIFIED_NAME = ""
SOURCE_NAME = ""
SOURCE_TYPE = ""
# A second scenario would have us appending to an existing process
# To do that, we need to query for the existing entity
dummy_existing_process = AtlasProcess(
name=LINEAGE_TO_CHANGE_NAME,
typeName=LINEAGE_TO_CHANGE_TYPE,
qualified_name=LINEAGE_TO_CHANGE_QUALIFIED_NAME,
inputs=None, # Set to None so no update will occur
outputs=None, # We will update this with .outputs below
guid=-104
)
real_existing_process = client.get_entity(
typeName=LINEAGE_TO_CHANGE_TYPE,
qualifiedName=LINEAGE_TO_CHANGE_QUALIFIED_NAME
)["entities"][0]
# Create one more output to be added.
one_more_input = AtlasEntity(
name=SOURCE_NAME,
typeName=SOURCE_TYPE,
qualified_name=SOURCE_QUALIFIED_NAME,
guid=-103
)
# Add the existing and new output to the dummy process
dummy_existing_process.inputs = [one_more_input]
complex_results = client.upload_entities( batch=[dummy_existing_process, one_more_input] )
print(json.dumps(complex_results, indent=2))
At this time, Purview does not provide column level information for Power BI Datasets. If you'd like to add that column level lineage, you can do so by using the tabular_schema relationship attribute and associating a tabular schema entity with the powerbi dataset and one or more column entities.
The below sample creates a "dummy" power bi dataset (that I assume already exists), creates a new tabular schema, and one column for the schema.
import json
import os
from pyapacheatlas.auth import ServicePrincipalAuthentication
from pyapacheatlas.core.client import PurviewClient
from pyapacheatlas.core import AtlasEntity
from pyapacheatlas.core.util import GuidTracker
oauth = ServicePrincipalAuthentication() # See samples for instantiation
client = PurviewClient() # See samples for instantiation
gt = GuidTracker()
# Set table to match your powerbi_dataset
table = AtlasEntity("Stocks2", "powerbi_dataset", "pyapacheatlas://pbi/demo_dataset_with_schema", guid=gt.get_guid())
# Create a tabular_schema
ts = AtlasEntity("mytabschema", "tabular_schema", "pyapacheatlas://pbi/demo_dataset_with_schema/ts", guid=gt.get_guid())
# Create columns that you want to add (requires a type attribute)
c1 = AtlasEntity("c1", "column", "pyapacheatlas://pbi/demo_dataset_with_schema#c1", guid=gt.get_guid())
c1.attributes.update({"type":"int"})
# Add the relationships the schema has columns, the table has a schema and workspace
ts.addRelationship(columns=[c1.to_json(minimum=True)])
table.addRelationship(tabular_schema=ts)
# If you want to create a dataset from scratch, you also need a workspace
# you'll add a workspace relationship and then upload everything together
# workspace = AtlasEntity("MyPBIWorkspace", "powerbi_workspace", "pyapacheatlas://pbi", guid=gt.get_guid())
# table.addRelationship(workspace=workspace)
# results = client.upload_entities(batch = [workspace, table, ts, c1])
# Upload the dataset, tabular_schema, and column(s)
results = client.upload_entities(batch = [table, ts, c1])
print(json.dumps(results, indent=2))
Let's say you want to instantiate your own Azure Data Factory pipelines and activities. We can create our own entities with the built-in types! We just need to follow the qualified name pattern and relationship attributes.
First, let's understand the built-in types and their relationship attributes.
-
azure_data_factory
represents a Data Factory that you've scanned.- The
pipelines
relationship attribute is an array ofadf_pipeline
.
- The
-
adf_pipeline
represents an Azure Data Factory pipeline inside an Azure Data Factory.- The
dataFactory
relationship attribute points to oneazure_data_factory
. - The
subProcesses
relationship attribute is an array ofadf_copy_activity
(or other ADF supported types).
- The
-
adf_copy_activity
represents a Copy Activity within a pipeline.- The
parent
relationship attribute points to oneadf_pipeline
. - The
subProcesses
relationship attribute is an array ofadf_copy_operation
(or other ADF supported types). - The
runInstances
relationship attribute is an array ofadf_copy_activity_run
(or other ADF supported types).
- The
-
adf_copy_operation
represents the actual process that will contain inputs and outputs- The
parent
relationship attribute points to oneadf_copy_activity
. - This is the entity that will have the inputs and outputs.
- The
Another important consideration is the qualified name pattern at each of these levels.
-
azure_data_factory
pattern:/subscriptions/{SUBSCRIPTION_ID}/resourceGroups/{RG_NAME}/providers/Microsoft.DataFactory/factories/{DATA_FACTORY_NAME}
-
adf_pipeline
pattern:{azure_data_factory_pattern}/{PIPELINE_NAME}
-
adf_copy_activity
pattern:{azure_data_factory_pattern}/{PIPELINE_NAME}/activities/{ACTIVITY_NAME}
-
adf_copy_operation
pattern:{adf_copy_activity_pattern}#{output_qualified_name}#{output_type_name}
- This output_qualified_name and output_type_name appear to be just conventions used to ensure a unique qualified name.
- However, there is nothing stopping you from using some other pattern as it does not appear to affect the UI.
In the scenario below, we will create a pipeline, copy activity, and a copy operation for an existing data factory entity.
First, there's some set up. We need to fill out the existing data factory's guid, the resource path (for the qualified name patterns) should be updated, as well as the pipeline, activity, copy operation name, and the output's qualified name and type.
DATA_FACTORY_GUID = "xxxx-yyy-zzz-123-456"
RESOURCE_PATH = "/subscriptions/XXX-YYY-ZZZ/resourceGroups/RG_NAME/providers/Microsoft.DataFactory/factories/ADF_NAME/pipelines"
PIPELINE_NAME = "MY_CUSTOM_PIPELINE"
ACTIVITY_NAME = "MY_CUSTOM_ACTIVITY"
COPY_ACTIVITY_NAME = "COPY_OP"
OP_SINK_QUALIFIED_NAME = "somedatasource"
OP_OUTPUT_TYPE = "some_type"
gt = GuidTracker()
# Create the Pipeline
adf_pipeline = AtlasEntity(
PIPELINE_NAME,
"adf_pipeline",
f"{RESOURCE_PATH}/{PIPELINE_NAME.lower()}",
str(gt.get_guid())
)
# Need to associate the pipeline with the existing data factory
adf_pipeline.addRelationship(dataFactory = {"guid":DATA_FACTORY_GUID})
# Create the Activity with a relationship to the pipeline
adf_activity = AtlasProcess(
ACTIVITY_NAME,
"adf_copy_activity",
f"{RESOURCE_PATH}/{PIPELINE_NAME.lower()}/activities/{ACTIVITY_NAME.lower()}",
guid = str(gt.get_guid()),
inputs=[],
outputs=[]
)
adf_activity.addRelationship(parent=adf_pipeline)
# Create the copy operation with a relationship to the activity
adf_copy_op = AtlasProcess(
COPY_ACTIVITY_NAME,
"adf_copy_operation",
f"{RESOURCE_PATH}/{PIPELINE_NAME.lower()}/activities/{ACTIVITY_NAME.lower()}#{OP_SINK_QUALIFIED_NAME}#{OP_OUTPUT_TYPE}",
guid = str(gt.get_guid()),
# I'm hard coding in this case but you could do other references to entities you're creating
# or using the qualified name and type
inputs=[{"guid":"127e990f-0ed5-4f6c-b37d-6bd6846954da"}],
outputs=[{"guid":"249ba40f-ca61-4470-869b-f8f6f6f60000"}]
)
adf_copy_op.addRelationship(parent=adf_activity)
# Perform the upload
results = client.upload_entities([adf_pipeline, adf_activity, adf_copy_op])
print(json.dumps(results, indent=2))