Skip to content

Latest commit

 

History

History
76 lines (63 loc) · 2.21 KB

File metadata and controls

76 lines (63 loc) · 2.21 KB

Flytekit Great Expectations Plugin

Great Expectations helps enforce data quality. The plugin supports the usage of Great Expectations as task and type.

To install the plugin, run the following command:

pip install flytekitplugins-great-expectations

Task Example

import os

import pandas as pd
from flytekit import Resources, kwtypes, task, workflow
from flytekitplugins.great_expectations import BatchRequestConfig, GreatExpectationsTask

simple_task_object = GreatExpectationsTask(
    name="great_expectations_task_simple",
    datasource_name="data",
    inputs=kwtypes(dataset=str),
    expectation_suite_name="test.demo",
    data_connector_name="data_example_data_connector",
    context_root_dir="great_expectations",
)

@task(limits=Resources(mem="500Mi"))
def simple_task(csv_file: str) -> int:
    result = simple_task_object(dataset=csv_file)
    df = pd.read_csv(os.path.join("greatexpectations", "data", csv_file))
    return df.shape[0]

@workflow
def simple_wf(dataset: str = "yellow_tripdata_sample_2019-01.csv") -> int:
    return simple_task(csv_file=dataset)

Type Example

from flytekit import workflow
from flytekitplugins.great_expectations import (
    BatchRequestConfig,
    GreatExpectationsFlyteConfig,
    GreatExpectationsType,
)

def simple_task(
    directory: GreatExpectationsType[
        str,
        GreatExpectationsFlyteConfig(
            datasource_name="data",
            expectation_suite_name="test.demo",
            data_connector_name="my_data_connector",
            batch_request_config=BatchRequestConfig(
                data_connector_query={
                    "batch_filter_parameters": {
                        "year": "2019",
                        "month": "01",
                    },
                    "limit": 10,
                },
            ),
            context_root_dir="great_expectations",
        ),
    ]
) -> str:
    return f"Validation works for {directory}!"


@workflow
def simple_wf(directory: str = "my_assets") -> str:
    return simple_task(directory=directory)

More examples can be found in the documentation.