Skip to content

Azure Purview Tips

Will Johnson edited this page May 15, 2021 · 6 revisions

This is a collection of "tips" for working with Azure Purview types.

Resource Set and Tabular Schema

The Schema of a Resource Set is actually a tabular_scheam entity that contains zero or more column entities.

  • Resource Set has a "tabular_schema" relationship attribute that must be a tabular_schema entity.
  • Column has a "composeSchema" relationship attribute that is REQUIRED and must be a "tabular_schema" entity.
+--------+      +-------+     +------+
|Resource|      |Tabular|     |Column|
|   Set  +----->+Schema +---->+      |
+--------+      +-------+     +------+
   "tabular_schema"   "composeSchema"  

Create your own Resource Set

import json
import os

from pyapacheatlas.auth import ServicePrincipalAuthentication
from pyapacheatlas.core import AtlasClient, AtlasEntity, TypeCategory
from pyapacheatlas.readers import ExcelConfiguration, ExcelReader

if __name__ == "__main__":
    oauth = ServicePrincipalAuthentication() # See samples for instantiation
    atlas_client = PurviewClient() # See samples for instantiation
    
    # Create a Tabular Schema entity
    ts = AtlasEntity(
        name="demotabschema",
        typeName="tabular_schema",
        qualified_name="pyapache://demotabschema",
        guid = -2
    )
    # Create a Column entity that references your tabular schema
    col01 = AtlasEntity(
        name="democolumn",
        typeName="column",
        qualified_name="pyapche://mycolumn",
        guid=-3,
        attributes={
            "type":"Blah",
            "description": "blabh blah"
        },
        relationshipAttributes = {
            "composeSchema": ts.to_json(minimum=True)
        }
    )

    # Create a resource set that references the tabular schema
    rs = AtlasEntity(
        name="demoresourceset",
        typeName="azure_datalake_gen2_resource_set",
        qualified_name="pyapache://demors",
        guid = -1,
        relationshipAttributes = {
            "tabular_schema": ts.to_json(minimum=True)
        }
    )

    # Upload entities
    results = atlas_client.upload_entities(
        [rs.to_json(), ts.to_json(), col01.to_json()]
    )
    # Print out results to see the guid assignemnts
    print(json.dumps(results, indent=2))

Update an Existing Column

# Omitting imports as seen in the previous step
    ts = AtlasEntity(
        name="demotabschema",
        typeName="tabular_schema",
        qualified_name="pyapache://demotabschema",
        guid = -1
    )

    updated_col01 = AtlasEntity(
        name="democolumn",
        typeName="column",
        qualified_name="pyapche://mycolumn",
        guid=-2,
        attributes={
            "type":"Blah2",
            "description": "blabh2 blah23"
        },
        relationshipAttributes = {
            "composeSchema":  ts.to_json(minimum=True)
            # Alternatively, if you know the guid, your reference is simpler
            # And you don't need to upload / instantiate a "dummy" of your existing tabular_schema.
            # You can also get an entity by name using atlas_client.get_entity method
            # ts_guid = atlas_client.get_entity(qualifiedName="pyapache://demotabschema", typeName="tabular_schema")
            #"composeSchema": {"guid": "cdd919a0-your-guid-a896-18f6f6f60000"}
        }
    )

    # When updating the column, you need the tabular schema entity ONLY if you are using the "header"
    # method (AtlasEntity.to_json(minimum=True)). If you are using the guid in the column's composeSchema
    # you don't need to upload the tabular schema object.
    results = atlas_client.upload_entities( [updated_col01.to_json(), ts.to_json() ] )

    print(json.dumps(results, indent=2))

Update PowerBI Lineage

At time of writing, PowerBI lineage seems to not always include the source table but instead points to a generic database / table entity.

You can "hook up" the lineage to an existing entity with the script below. However, an "Incremental Scan" will overwrite the changes.

LINEAGE_TO_CHANGE_QUALIFIED_NAME = ""
LINEAGE_TO_CHANGE_NAME = ""
LINEAGE_TO_CHANGE_TYPE = "powerbi_dataset_process"

# Update these to be the existing source entity that should
# Link to your Power BI dataset as above.
SOURCE_QUALIFIED_NAME = ""
SOURCE_NAME = ""
SOURCE_TYPE = ""
# A second scenario would have us appending to an existing process
# To do that, we need to query for the existing entity
dummy_existing_process = AtlasProcess(
    name=LINEAGE_TO_CHANGE_NAME,
    typeName=LINEAGE_TO_CHANGE_TYPE,
    qualified_name=LINEAGE_TO_CHANGE_QUALIFIED_NAME,
    inputs=None,  # Set to None so no update will occur
    outputs=None,  # We will update this with .outputs below
    guid=-104
)
real_existing_process = client.get_entity(
    typeName=LINEAGE_TO_CHANGE_TYPE,
    qualifiedName=LINEAGE_TO_CHANGE_QUALIFIED_NAME
)["entities"][0]

# Create one more output to be added.
one_more_input = AtlasEntity(
    name=SOURCE_NAME,
    typeName=SOURCE_TYPE,
    qualified_name=SOURCE_QUALIFIED_NAME,
    guid=-103
)
# Add the existing and new output to the dummy process
dummy_existing_process.inputs = [one_more_input]

complex_results = client.upload_entities( batch=[dummy_existing_process, one_more_input] )
print(json.dumps(complex_results, indent=2))

Update Power BI Dataset Schema with tabular_schema Relationship Attribute

At this time, Purview does not provide column level information for Power BI Datasets. If you'd like to add that column level lineage, you can do so by using the tabular_schema relationship attribute and associating a tabular schema entity with the powerbi dataset and one or more column entities.

The below sample creates a "dummy" power bi dataset (that I assume already exists), creates a new tabular schema, and one column for the schema.

import json
import os

from pyapacheatlas.auth import ServicePrincipalAuthentication
from pyapacheatlas.core.client import PurviewClient
from pyapacheatlas.core import AtlasEntity
from pyapacheatlas.core.util import GuidTracker

oauth = ServicePrincipalAuthentication() # See samples for instantiation
client = PurviewClient() # See samples for instantiation

gt = GuidTracker()
# Set table to match your powerbi_dataset
table = AtlasEntity("Stocks2", "powerbi_dataset", "pyapacheatlas://pbi/demo_dataset_with_schema", guid=gt.get_guid())
# Create a tabular_schema
ts = AtlasEntity("mytabschema", "tabular_schema", "pyapacheatlas://pbi/demo_dataset_with_schema/ts", guid=gt.get_guid())
# Create columns that you want to add (requires a type attribute)
c1 = AtlasEntity("c1", "column", "pyapacheatlas://pbi/demo_dataset_with_schema#c1", guid=gt.get_guid())
c1.attributes.update({"type":"int"})

# Add the relationships the schema has columns, the table has a schema and workspace
ts.addRelationship(columns=[c1.to_json(minimum=True)])
table.addRelationship(tabular_schema=ts)


# If you want to create a dataset from scratch, you also need a workspace
# you'll add a workspace relationship and then upload everything together
# workspace = AtlasEntity("MyPBIWorkspace", "powerbi_workspace", "pyapacheatlas://pbi", guid=gt.get_guid())
# table.addRelationship(workspace=workspace)
# results = client.upload_entities(batch = [workspace, table, ts, c1])

# Upload the dataset, tabular_schema, and column(s)
results = client.upload_entities(batch = [table, ts, c1])

print(json.dumps(results, indent=2))

Working with Azure Data Factory Types

Let's say you want to instantiate your own Azure Data Factory pipelines and activities. We can create our own entities with the built-in types! We just need to follow the qualified name pattern and relationship attributes.

ADF Types and Relationships

First, let's understand the built-in types and their relationship attributes.

  • azure_data_factory represents a Data Factory that you've scanned.
    • The pipelines relationship attribute is an array of adf_pipeline.
  • adf_pipeline represents an Azure Data Factory pipeline inside an Azure Data Factory.
    • The dataFactory relationship attribute points to one azure_data_factory.
    • The subProcesses relationship attribute is an array of adf_copy_activity (or other ADF supported types).
  • adf_copy_activity represents a Copy Activity within a pipeline.
    • The parent relationship attribute points to one adf_pipeline.
    • The subProcesses relationship attribute is an array of adf_copy_operation (or other ADF supported types).
    • The runInstances relationship attribute is an array of adf_copy_activity_run (or other ADF supported types).
  • adf_copy_operation represents the actual process that will contain inputs and outputs
    • The parent relationship attribute points to one adf_copy_activity.
    • This is the entity that will have the inputs and outputs.

ADF Qualified Names

Another important consideration is the qualified name pattern at each of these levels.

  • azure_data_factory pattern: /subscriptions/{SUBSCRIPTION_ID}/resourceGroups/{RG_NAME}/providers/Microsoft.DataFactory/factories/{DATA_FACTORY_NAME}
  • adf_pipeline pattern: {azure_data_factory_pattern}/{PIPELINE_NAME}
  • adf_copy_activity pattern: {azure_data_factory_pattern}/{PIPELINE_NAME}/activities/{ACTIVITY_NAME}
  • adf_copy_operation pattern: {adf_copy_activity_pattern}#{output_qualified_name}#{output_type_name}
    • This output_qualified_name and output_type_name appear to be just conventions used to ensure a unique qualified name.
    • However, there is nothing stopping you from using some other pattern as it does not appear to affect the UI.

Creating Entities with ADF Types

In the scenario below, we will create a pipeline, copy activity, and a copy operation for an existing data factory entity.

First, there's some set up. We need to fill out the existing data factory's guid, the resource path (for the qualified name patterns) should be updated, as well as the pipeline, activity, copy operation name, and the output's qualified name and type.

DATA_FACTORY_GUID = "xxxx-yyy-zzz-123-456"
RESOURCE_PATH = "/subscriptions/XXX-YYY-ZZZ/resourceGroups/RG_NAME/providers/Microsoft.DataFactory/factories/ADF_NAME/pipelines"
PIPELINE_NAME = "MY_CUSTOM_PIPELINE"
ACTIVITY_NAME = "MY_CUSTOM_ACTIVITY"
COPY_ACTIVITY_NAME = "COPY_OP"
OP_SINK_QUALIFIED_NAME = "somedatasource"
OP_OUTPUT_TYPE = "some_type"

gt = GuidTracker()

# Create the Pipeline
adf_pipeline = AtlasEntity(
    PIPELINE_NAME,
    "adf_pipeline",
    f"{RESOURCE_PATH}/{PIPELINE_NAME.lower()}",
    str(gt.get_guid())
)
# Need to associate the pipeline with the existing data factory
adf_pipeline.addRelationship(dataFactory = {"guid":DATA_FACTORY_GUID})

# Create the Activity with a relationship to the pipeline
adf_activity = AtlasProcess(
    ACTIVITY_NAME,
    "adf_copy_activity",
    f"{RESOURCE_PATH}/{PIPELINE_NAME.lower()}/activities/{ACTIVITY_NAME.lower()}",
    guid = str(gt.get_guid()),
    inputs=[],
    outputs=[]
)
adf_activity.addRelationship(parent=adf_pipeline)

# Create the copy operation with a relationship to the activity
adf_copy_op = AtlasProcess(
    COPY_ACTIVITY_NAME,
    "adf_copy_operation",
    f"{RESOURCE_PATH}/{PIPELINE_NAME.lower()}/activities/{ACTIVITY_NAME.lower()}#{OP_SINK_QUALIFIED_NAME}#{OP_OUTPUT_TYPE}",
    guid = str(gt.get_guid()),
    # I'm hard coding in this case but you could do other references to entities you're creating
    # or using the qualified name and type
    inputs=[{"guid":"127e990f-0ed5-4f6c-b37d-6bd6846954da"}],
    outputs=[{"guid":"249ba40f-ca61-4470-869b-f8f6f6f60000"}]
)
adf_copy_op.addRelationship(parent=adf_activity)

# Perform the upload
results = client.upload_entities([adf_pipeline, adf_activity, adf_copy_op])
print(json.dumps(results, indent=2))