Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions hooli_data_eng/databricks/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from dagster_databricks import PipesDatabricksClient
from databricks.sdk.service import jobs
from hooli_data_eng.databricks.resources import launch_and_poll_databricks_job


# This asset uses the forecasted orders to flag any days that
Expand Down Expand Up @@ -71,3 +72,12 @@ def databricks_asset(
context=context,
extras=extras,
).get_materialize_result()


@dg.asset(
required_resource_keys={"databricks"},
deps=[dg.AssetKey(["databricks_asset"])],
)
def databricks_workflow_asset(context: dg.AssetExecutionContext) -> None:
databricks = context.resources.databricks
launch_and_poll_databricks_job(context, databricks, 733330858351118)
36 changes: 36 additions & 0 deletions hooli_data_eng/databricks/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,39 @@
import dagster as dg
from dagster_databricks import PipesDatabricksClient, databricks_pyspark_step_launcher
from hooli_data_eng.utils import get_env
from dagster_databricks import databricks_client


# Configure the Databricks client resource
databricks_client_instance = databricks_client.configured(
{
"host": {"env": "DATABRICKS_HOST"},
"token": {"env": "DATABRICKS_TOKEN"},
}
)


def launch_and_poll_databricks_job(context, client, job_id):
jobs_service = client.workspace_client.jobs

run = jobs_service.run_now(
job_id=job_id,
)
run_id = run.bind()["run_id"]

get_run_response = jobs_service.get_run(run_id=run_id)

context.log.info(
f"Launched databricks job run for '{get_run_response.run_name}' (`{run_id}`). URL:"
f" {get_run_response.run_page_url}. Waiting to run to complete."
)

client.wait_for_run_to_complete(
logger=context.log,
databricks_run_id=run_id,
poll_interval_sec=5,
max_wait_time_sec=600,
)


# The production deployment on Dagster Cloud uses production Snowflake
Expand Down Expand Up @@ -109,13 +142,16 @@
"LOCAL": {
"step_launcher": dg.ResourceDefinition.none_resource(),
"pipes_databricks_client": dg.ResourceDefinition.none_resource(),
"databricks": databricks_client_instance,
},
"BRANCH": {
"step_launcher": db_step_launcher,
"pipes_databricks_client": dg.ResourceDefinition.none_resource(),
"databricks": databricks_client_instance,
},
"PROD": {
"step_launcher": db_step_launcher,
"pipes_databricks_client": PipesDatabricksClient(client),
"databricks": databricks_client_instance,
},
}
Loading
Loading