diff --git a/README.md b/README.md index 2a69653..f2616d5 100644 --- a/README.md +++ b/README.md @@ -187,4 +187,4 @@ prefect config set PREFECT_API_KEY=this_must_be_a_different_api_key_as_they_are_ - simple dbt flows are **parametrized** to make it easy to execute any dbt command for the relevant project - all ingestion flows are parametrized to make **backfills** a breeze - just change the start and end date, and the interval in your parameter values when triggering the run, and you can trigger a backfill from the same flow as usual -- see the flow parameters e.g. in the flow [flows/ingestion/ingest_jaffle_shop.py](flows/ingestion/ingest_jaffle_shop.py) - optionally, you can track each dbt model or test as a separate task by using the ``dbt_run_from_manifest.py`` logic -- [dataplatform/blocks/dbt.py](dataplatform/blocks/dbt.py) -- it's fun to use! if you want to change how your dbt tasks look like for Halloween or Christmas, change the emoji on the block 🤗 \ No newline at end of file +- it's fun to use! if you want to change how your dbt tasks look like for Halloween or Christmas, change the emoji on the block 🤗 diff --git a/flows/syncs_fivetran/block.py b/flows/syncs_fivetran/block.py new file mode 100644 index 0000000..9beb415 --- /dev/null +++ b/flows/syncs_fivetran/block.py @@ -0,0 +1,11 @@ +from dotenv import load_dotenv +import os +from prefect_fivetran import FivetranCredentials + +load_dotenv() + +fivetran_credentials = FivetranCredentials( + api_key=os.environ.get("FIVETRAN_API_KEY"), + api_secret=os.environ.get("FIVETRAN_API_SECRET_KEY"), +) +fivetran_credentials.save("default") diff --git a/flows/syncs_fivetran/fivetran.py b/flows/syncs_fivetran/fivetran.py new file mode 100644 index 0000000..95dad98 --- /dev/null +++ b/flows/syncs_fivetran/fivetran.py @@ -0,0 +1,16 @@ +from prefect import flow, Task +from prefect_fivetran import FivetranCredentials +from prefect_fivetran.connectors import fivetran_sync_flow + + +@flow +def my_flow(): + fivetran_result = await fivetran_sync_flow( + fivetran_credentials=FivetranCredentials.load("default"), + connector_id="my_connector_id", + schedule_type="my_schedule_type", + poll_status_every_n_seconds=30, + ) + + +my_flow() diff --git a/flows/syncs_fivetran/fivetran_wait.py b/flows/syncs_fivetran/fivetran_wait.py new file mode 100644 index 0000000..f8c91a7 --- /dev/null +++ b/flows/syncs_fivetran/fivetran_wait.py @@ -0,0 +1,31 @@ +""" +pip install prefect_fivetran +prefect block register -m prefect_fivetran +""" +from prefect import flow +from prefect_fivetran import FivetranCredentials +from prefect_fivetran.connectors import ( + wait_for_fivetran_connector_sync, + start_fivetran_connector_sync, +) + + +@flow +def example_flow(connector_id: str): + fivetran_credentials = FivetranCredentials.load("default") + + last_sync = start_fivetran_connector_sync( + connector_id=connector_id, + fivetran_credentials=fivetran_credentials, + ) + + return wait_for_fivetran_connector_sync( + connector_id=connector_id, + fivetran_credentials=fivetran_credentials, + previous_completed_at=last_sync, + poll_status_every_n_seconds=60, + ) + + +if __name__ == "__main__": + example_flow("bereft_indices") diff --git a/flows/syncs_fivetran/requirements.txt b/flows/syncs_fivetran/requirements.txt new file mode 100644 index 0000000..7baf1e1 --- /dev/null +++ b/flows/syncs_fivetran/requirements.txt @@ -0,0 +1,2 @@ +prefect +prefect_fivetran \ No newline at end of file diff --git a/utilities/create_blocks.py b/utilities/create_blocks.py index 9e0780d..1fc5c85 100644 --- a/utilities/create_blocks.py +++ b/utilities/create_blocks.py @@ -15,7 +15,6 @@ from dataplatform.deploy_utils import save_block, DEFAULT_BLOCK from dataplatform.environment import get_env - load_dotenv() @@ -26,7 +25,7 @@ workspace = Workspace( name=get_env(), block_name=DEFAULT_BLOCK, - settings=dict(workspace_owner="Data Engineering", environment="Development"), + metadata=dict(workspace_owner="Data Engineering", environment="Development"), ) save_block(workspace) diff --git a/utilities/dummy_sa.json b/utilities/dummy_sa.json index 92c5605..920872a 100644 --- a/utilities/dummy_sa.json +++ b/utilities/dummy_sa.json @@ -1,12 +1,12 @@ { "type": "service_account", - "project_id": "xxx", - "private_key_id": "xxx", - "private_key": "xxx", - "client_email": "generic@xxx.iam.gserviceaccount.com", - "client_id": "xxx", + "project_id": "prefect-community", + "private_key_id": "uuid", + "private_key": "-----BEGIN PRIVATE KEY-----\n a looooooong string", + "client_email": "prefect@prefect-community.iam.gserviceaccount.com", + "client_id": "numbers", "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", - "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/generic%40prefect-community.iam.gserviceaccount.com" + "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/sa_name_and_project_name.iam.gserviceaccount.com" }