Skip to content

Commit

Permalink
fix metadata, add fivetran
Browse files Browse the repository at this point in the history
  • Loading branch information
anna-geller committed Feb 8, 2023
1 parent 1259974 commit a378d51
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 9 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,4 @@ prefect config set PREFECT_API_KEY=this_must_be_a_different_api_key_as_they_are_
- simple dbt flows are **parametrized** to make it easy to execute any dbt command for the relevant project
- all ingestion flows are parametrized to make **backfills** a breeze - just change the start and end date, and the interval in your parameter values when triggering the run, and you can trigger a backfill from the same flow as usual -- see the flow parameters e.g. in the flow [flows/ingestion/ingest_jaffle_shop.py](flows/ingestion/ingest_jaffle_shop.py)
- optionally, you can track each dbt model or test as a separate task by using the ``dbt_run_from_manifest.py`` logic -- [dataplatform/blocks/dbt.py](dataplatform/blocks/dbt.py)
- it's fun to use! if you want to change how your dbt tasks look like for Halloween or Christmas, change the emoji on the block 🤗
- it's fun to use! if you want to change how your dbt tasks look like for Halloween or Christmas, change the emoji on the block 🤗
11 changes: 11 additions & 0 deletions flows/syncs_fivetran/block.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from dotenv import load_dotenv
import os
from prefect_fivetran import FivetranCredentials

load_dotenv()

fivetran_credentials = FivetranCredentials(
api_key=os.environ.get("FIVETRAN_API_KEY"),
api_secret=os.environ.get("FIVETRAN_API_SECRET_KEY"),
)
fivetran_credentials.save("default")
16 changes: 16 additions & 0 deletions flows/syncs_fivetran/fivetran.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from prefect import flow, Task
from prefect_fivetran import FivetranCredentials
from prefect_fivetran.connectors import fivetran_sync_flow


@flow
def my_flow():
fivetran_result = await fivetran_sync_flow(
fivetran_credentials=FivetranCredentials.load("default"),
connector_id="my_connector_id",
schedule_type="my_schedule_type",
poll_status_every_n_seconds=30,
)


my_flow()
31 changes: 31 additions & 0 deletions flows/syncs_fivetran/fivetran_wait.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""
pip install prefect_fivetran
prefect block register -m prefect_fivetran
"""
from prefect import flow
from prefect_fivetran import FivetranCredentials
from prefect_fivetran.connectors import (
wait_for_fivetran_connector_sync,
start_fivetran_connector_sync,
)


@flow
def example_flow(connector_id: str):
fivetran_credentials = FivetranCredentials.load("default")

last_sync = start_fivetran_connector_sync(
connector_id=connector_id,
fivetran_credentials=fivetran_credentials,
)

return wait_for_fivetran_connector_sync(
connector_id=connector_id,
fivetran_credentials=fivetran_credentials,
previous_completed_at=last_sync,
poll_status_every_n_seconds=60,
)


if __name__ == "__main__":
example_flow("bereft_indices")
2 changes: 2 additions & 0 deletions flows/syncs_fivetran/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
prefect
prefect_fivetran
3 changes: 1 addition & 2 deletions utilities/create_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from dataplatform.deploy_utils import save_block, DEFAULT_BLOCK
from dataplatform.environment import get_env


load_dotenv()


Expand All @@ -26,7 +25,7 @@
workspace = Workspace(
name=get_env(),
block_name=DEFAULT_BLOCK,
settings=dict(workspace_owner="Data Engineering", environment="Development"),
metadata=dict(workspace_owner="Data Engineering", environment="Development"),
)
save_block(workspace)

Expand Down
12 changes: 6 additions & 6 deletions utilities/dummy_sa.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
{
"type": "service_account",
"project_id": "xxx",
"private_key_id": "xxx",
"private_key": "xxx",
"client_email": "generic@xxx.iam.gserviceaccount.com",
"client_id": "xxx",
"project_id": "prefect-community",
"private_key_id": "uuid",
"private_key": "-----BEGIN PRIVATE KEY-----\n a looooooong string",
"client_email": "prefect@prefect-community.iam.gserviceaccount.com",
"client_id": "numbers",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/generic%40prefect-community.iam.gserviceaccount.com"
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/sa_name_and_project_name.iam.gserviceaccount.com"
}

0 comments on commit a378d51

Please sign in to comment.