Skip to content

Commit

Permalink
Lakehouse monitoring integration (#156)
Browse files Browse the repository at this point in the history
* define lakehouse monitoring resource

* implement retrainig workflow based on monitored metric violation check

* update monitoring table name

* update readme with monitoring specific information

* add explanation to the metrci violation check sql query

* convert is_metric_violated flag to bool

* fix monitoring resource definition

* remove disallowed string

* incorporate review comments

- Minor readme changes
- Use dafault assets_dir path for monitoring

* incorporate review comments

- Accept inference table name from CLI
- Merge monitoring related resources into a single file
- Parametrize the metric and validation threshold

* accept fully qualified inference table name

* updates

* nit

* apply comments

* Update CLI version

* Fix tests and add regex

* Fix test

* try to fix tests again

* try to fix tests again

* Fix query

* upgrade gh versions

* add assets-dir

---------

Co-authored-by: Arpit Jasapara <[email protected]>
  • Loading branch information
s-udhaya and arpitjasa-db authored Jun 5, 2024
1 parent 9632f3d commit 4306c6b
Show file tree
Hide file tree
Showing 23 changed files with 320 additions and 33 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/run-checks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v2
- uses: actions/setup-python@v5
with:
python-version: 3.9
- name: Install act
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ https://github.com/databricks/mlops-stacks/assets/87999496/0d220d55-465e-4a69-bd

### Prerequisites
- Python 3.8+
- [Databricks CLI](https://docs.databricks.com/en/dev-tools/cli/databricks-cli.html) >= v0.212.2
- [Databricks CLI](https://docs.databricks.com/en/dev-tools/cli/databricks-cli.html) >= v0.221.0

[Databricks CLI](https://docs.databricks.com/en/dev-tools/cli/databricks-cli.html) contains [Databricks asset bundle templates](https://docs.databricks.com/en/dev-tools/bundles/templates.html) for the purpose of project creation.

Expand Down
21 changes: 18 additions & 3 deletions databricks_template_schema.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"welcome_message": "Welcome to MLOps Stacks. For detailed information on project generation, see the README at https://github.com/databricks/mlops-stacks/blob/main/README.md.",
"min_databricks_cli_version": "v0.212.2",
"min_databricks_cli_version": "v0.221.0",
"properties": {
"input_setup_cicd_and_project": {
"order": 1,
Expand Down Expand Up @@ -256,9 +256,24 @@
]
}
},
"input_include_feature_store": {
"input_inference_table_name": {
"order": 17,
"type": "string",
"description": "\nFully qualified name of inference table to attach monitoring to.\nThis table must already exist and service principals must have access.",
"default": "dev.{{ .input_project_name }}.predictions",
"pattern": "^[^ .\\-\\/]+(\\.[^ .\\-\\/]+){2}$",
"pattern_match_failure_message": "Fully qualified Unity Catalog table names must have catalog, schema, and table separated by \".\" and each cannot contain any of the following characters: \" \", \".\", \"-\", \"\\\", \"/\"",
"skip_prompt_if": {
"properties": {
"input_setup_cicd_and_project": {
"const": "CICD_Only"
}
}
}
},
"input_include_feature_store": {
"order": 18,
"type": "string",
"description": "\nWhether to include Feature Store",
"default": "no",
"enum": ["no", "yes"],
Expand All @@ -271,7 +286,7 @@
}
},
"input_include_mlflow_recipes": {
"order": 18,
"order": 19,
"type": "string",
"description": "\nWhether to include MLflow Recipes",
"default": "no",
Expand Down
2 changes: 1 addition & 1 deletion library/template_variables.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
{{- end }}

{{ define `cli_version` -}}
v0.212.2
v0.221.0
{{- end }}

{{ define `stacks_version` -}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ jobs:
unit_tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
- uses: actions/checkout@v3
- uses: actions/setup-python@v5
with:
python-version: 3.8
{{- if (eq .input_include_feature_store `yes`) }}
# Feature store tests bring up a local Spark session, so Java is required.
- uses: actions/setup-java@v2
- uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: '11'
Expand Down
6 changes: 3 additions & 3 deletions template/{{.input_root_dir}}/README.md.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ contained in the following files:
│ │
│ ├── ml-artifacts-resource.yml <- ML resource config definition for model and experiment
│ │
│ ├── monitoring-workflow-resource.yml <- ML resource config definition for data monitoring workflow
│ ├── monitoring-resource.yml <- ML resource config definition for quality monitoring workflow
{{- else if (eq .input_include_feature_store `yes`) }}
│ ├── training <- Training folder contains Notebook that trains and registers the model with feature store support.
│ │
Expand Down Expand Up @@ -89,7 +89,7 @@ contained in the following files:
│ │
│ ├── ml-artifacts-resource.yml <- ML resource config definition for model and experiment
│ │
│ ├── monitoring-workflow-resource.yml <- ML resource config definition for data monitoring workflow
│ ├── monitoring-resource.yml <- ML resource config definition for quality monitoring workflow
{{- else }}
│ ├── training <- Folder for model development via MLflow recipes.
│ │ │
Expand Down Expand Up @@ -127,7 +127,7 @@ contained in the following files:
│ │
│ ├── ml-artifacts-resource.yml <- ML resource config definition for model and experiment
│ │
│ ├── monitoring-workflow-resource.yml <- ML resource config definition for data monitoring workflow
│ ├── monitoring-resource.yml <- ML resource config definition for quality monitoring workflow
{{- end }}
{{- end }}
Expand Down
1 change: 1 addition & 0 deletions template/{{.input_root_dir}}/_params_testing_only.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ input_include_mlflow_recipes={{.input_include_mlflow_recipes}}
input_include_models_in_unity_catalog={{.input_include_models_in_unity_catalog}}
input_schema_name={{.input_schema_name}}
input_unity_catalog_read_user_group={{.input_unity_catalog_read_user_group}}
input_inference_table_name={{.input_inference_table_name}}

databricks_prod_workspace_host={{ template `databricks_prod_workspace_host` . }}
databricks_staging_workspace_host={{ template `databricks_staging_workspace_host` . }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ contained in the following files:
│ │
│ ├── ml-artifacts-resource.yml <- ML resource config definition for model and experiment
│ │
│ ├── monitoring-workflow-resource.yml <- ML resource config definition for data monitoring workflow
│ ├── monitoring-resource.yml <- ML resource config definition for quality monitoring workflow
{{- else if (eq .input_include_feature_store `yes`) }}
│ ├── training <- Training folder contains Notebook that trains and registers the model with feature store support.
│ │
Expand Down Expand Up @@ -92,7 +92,7 @@ contained in the following files:
│ │
│ ├── ml-artifacts-resource.yml <- ML resource config definition for model and experiment
│ │
│ ├── monitoring-workflow-resource.yml <- ML resource config definition for data monitoring workflow
│ ├── monitoring-resource.yml <- ML resource config definition for quality monitoring workflow
{{- else }}
│ ├── training <- Folder for model development via MLflow recipes.
│ │ │
Expand Down Expand Up @@ -130,7 +130,7 @@ contained in the following files:
│ │
│ ├── ml-artifacts-resource.yml <- ML resource config definition for model and experiment
│ │
│ ├── monitoring-workflow-resource.yml <- ML resource config definition for data monitoring workflow
│ ├── monitoring-resource.yml <- ML resource config definition for quality monitoring workflow
{{- end }}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ variables:
include:
# Resources folder contains ML artifact resources for the ML project that defines model and experiment
# And workflows resources for the ML project including model training -> validation -> deployment,
# {{- if (eq .input_include_feature_store `yes`) }} feature engineering, {{ end }} batch inference, data monitoring, metric refresh, alerts and triggering retraining
# {{- if (eq .input_include_feature_store `yes`) }} feature engineering, {{ end }} batch inference, quality monitoring, metric refresh, alerts and triggering retraining
- ./resources/*.yml

# Deployment Target specific values for workspace
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Monitoring

Databricks Data Monitoring is currently in Private Preview.

Please contact a Databricks representative for more information.
To enable monitoring as part of a scheduled Databricks workflow, please update all the TODOs in the [monitoring resource file](../resources/monitoring-resource.yml), and refer to
[{{template `project_name_alphanumeric_underscore` .}}/resources/README.md](../resources/README.md). The implementation supports monitoring of batch inference tables directly.
For real time inference tables, unpacking is required before monitoring can be attached.
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# This file is used for the main SQL query that checks the last {num_evaluation_windows} metric violations and whether at least {num_violation_windows} of those runs violate the condition.

import sys
import pathlib

sys.path.append(str(pathlib.Path(__file__).parent.parent.parent.resolve()))

"""The SQL query is divided into three main parts. The first part selects the top {num_evaluation_windows}
values of the metric to be monitored, ordered by the time window, and saves as recent_metrics.
```sql
WITH recent_metrics AS (
SELECT
{metric_to_monitor},
window
FROM
{table_name_under_monitor}_profile_metrics
WHERE
column_name = ":table"
AND slice_key IS NULL
AND model_id != "*"
AND log_type = "INPUT"
ORDER BY
window DESC
LIMIT
{num_evaluation_windows}
)
```
The `column_name = ":table"` and `slice_key IS NULL` conditions ensure that the metric
is selected for the entire table within the given granularity. The `log_type = "INPUT"`
condition ensures that the primary table metrics are considered, but not the baseline
table metrics. The `model_id!= "*"` condition ensures that the metric aggregated across
all model IDs is not selected.

The second part of the query determines if the metric values have been violated with two cases.
The first case checks if the metric value is greater than the threshold for at least {num_violation_windows} windows:
```sql
(SELECT COUNT(*) FROM recent_metrics WHERE {metric_to_monitor} > {metric_violation_threshold}) >= {num_violation_windows}
```
The second case checks if the most recent metric value is greater than the threshold. This is to make sure we only trigger retraining
if the most recent window was violated, avoiding unnecessary retraining if the violation was in the past and the metric is now within the threshold:
```sql
(SELECT {metric_to_monitor} FROM recent_metrics ORDER BY window DESC LIMIT 1) > {metric_violation_threshold}
```

The final part of the query sets the `query_result` to 1 if both of the above conditions are met, and 0 otherwise:
```sql
SELECT
CASE
WHEN
# Check if the metric value is greater than the threshold for at least {num_violation_windows} windows
AND
# Check if the most recent metric value is greater than the threshold
THEN 1
ELSE 0
END AS query_result
```
"""

sql_query = """WITH recent_metrics AS (
SELECT
{metric_to_monitor},
window
FROM
{table_name_under_monitor}_profile_metrics
WHERE
column_name = ":table"
AND slice_key IS NULL
AND model_id != "*"
AND log_type = "INPUT"
ORDER BY
window DESC
LIMIT
{num_evaluation_windows}
)
SELECT
CASE
WHEN
(SELECT COUNT(*) FROM recent_metrics WHERE {metric_to_monitor} > {metric_violation_threshold}) >= {num_violation_windows}
AND
(SELECT {metric_to_monitor} FROM recent_metrics ORDER BY window DESC LIMIT 1) > {metric_violation_threshold}
THEN 1
ELSE 0
END AS query_result
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Databricks notebook source
##################################################################################
# This notebook runs a sql query and set the result as job task value
#
# This notebook has the following parameters:
#
# * table_name_under_monitor (required) - The name of a table that is currently being monitored
# * metric_to_monitor (required) - Metric to be monitored for threshold violation
# * metric_violation_threshold (required) - Threshold value for metric violation
# * num_evaluation_windows (required) - Number of windows to check for violation
# * num_violation_windows (required) - Number of windows that need to violate the threshold
##################################################################################

# List of input args needed to run the notebook as a job.
# Provide them via DB widgets or notebook arguments.
#
# Name of the table that is currently being monitored
dbutils.widgets.text(
"table_name_under_monitor", "{{ .input_inference_table_name }}", label="Full (three-Level) table name"
)
# Metric to be used for threshold violation check
dbutils.widgets.text(
"metric_to_monitor", "root_mean_squared_error", label="Metric to be monitored for threshold violation"
)

# Threshold value to be checked
dbutils.widgets.text(
"metric_violation_threshold", "100", label="Threshold value for metric violation"
)

# Threshold value to be checked
dbutils.widgets.text(
"num_evaluation_windows", "5", label="Number of windows to check for violation"
)

# Threshold value to be checked
dbutils.widgets.text(
"num_violation_windows", "2", label="Number of windows that need to violate the threshold"
)

# COMMAND ----------

import os
import sys
notebook_path = '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get())
%cd $notebook_path
%cd ..
sys.path.append("../..")

# COMMAND ----------

from metric_violation_check_query import sql_query

table_name_under_monitor = dbutils.widgets.get("table_name_under_monitor")
metric_to_monitor = dbutils.widgets.get("metric_to_monitor")
metric_violation_threshold = dbutils.widgets.get("metric_violation_threshold")

formatted_sql_query = sql_query.format(
table_name_under_monitor=table_name_under_monitor,
metric_to_monitor=metric_to_monitor,
metric_violation_threshold=metric_violation_threshold,
num_evaluation_windows=num_evaluation_windows,
num_violation_windows=num_violation_windows)
is_metric_violated = bool(spark.sql(formatted_sql_query).toPandas()["query_result"][0])

dbutils.jobs.taskValues.set("is_metric_violated", is_metric_violated)


Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ During databricks CLI bundles deployment, the root config file will be loaded, v
ML Resource Configurations in this directory:
- model workflow (`{{template `project_name_alphanumeric_underscore` .}}/resources/model-workflow-resource.yml`)
- batch inference workflow (`{{template `project_name_alphanumeric_underscore` .}}/resources/batch-inference-workflow-resource.yml`)
- monitoring workflow (`{{template `project_name_alphanumeric_underscore` .}}/resources/monitoring-workflow-resource.yml`)
- monitoring resource and workflow (`{{template `project_name_alphanumeric_underscore` .}}/resources/monitoring-resource.yml`)
- feature engineering workflow (`{{template `project_name_alphanumeric_underscore` .}}/resources/feature-engineering-workflow-resource.yml`)
- model definition and experiment definition (`{{template `project_name_alphanumeric_underscore` .}}/resources/ml-artifacts-resource.yml`)

Expand Down Expand Up @@ -143,6 +143,41 @@ Model validation contains three components:
To set up and enable model validation, update [validation.py](../validation/validation.py) to return desired custom metrics and validation thresholds, then
resolve the `TODOs` in the ModelValidation task of [model-workflow-resource.yml](./model-workflow-resource.yml).


### Setting up monitoring
The monitoring workflow focuses on building a plug-and-play stack component for monitoring the feature drifts and model drifts and retrain based on the
violation threshold defined given the ground truth labels.

Its central purpose is to track production model performances, feature distributions and comparing different versions.

Monitoring contains four components:
* [metric_violation_check_query.py](../monitoring/metric_violation_check_query.py) defines a query that checks for violation of the monitored metric.
* [notebooks/MonitoredMetricViolationCheck](../monitoring/notebooks/MonitoredMetricViolationCheck.py) acts as an entry point, executing the violation check query against the monitored inference table.
It emits a boolean value based on the query result.
* [monitoring-resource.yml](./monitoring-resource.yml) contains the resource config, inputs parameters for monitoring, and orchestrates model retraining based on monitoring. It first runs the [notebooks/MonitoredMetricViolationCheck](../monitoring/notebooks/MonitoredMetricViolationCheck.py)
entry point then decides whether to execute the model retraining workflow.

To set up and enable monitoring:
* If it is not done already, generate inference table, join it with ground truth labels, and update the table name in [monitoring-resource.yml](./monitoring-resource.yml).
* Resolve the `TODOs` in [monitoring-resource.yml](./monitoring-resource.yml)
* OPTIONAL: Update the query in [metric_violation_check_query.py](../monitoring/metric_violation_check_query.py) to customize when the metric is considered to be in violation.

NOTE: If ground truth labels are not available, you can still set up monitoring but should disable the retraining workflow.

Retraining Constraints:
The retraining job has constraints for optimal functioning:
* Labels must be provided by the user, joined correctly for retraining history, and available on time with the retraining frequency.
* Retraining Frequency is tightly coupled with the granularity of the monitor. Users should take into account and ensure that their retraining frequency is equal to or close to the granularity of the monitor.
* If the granularity of the monitor is 1 day and retraining frequency is 1 hour, the job will preemptively stop as there is no new data to evaluate retraining criteria
* If the granularity of the monitor is 1 day and retraining frequency is 1 week, retraining would be stale and not be efficient

Permissions:
Permissions for monitoring are inherited from the original table's permissions.
* Users who own the monitored table or its parent catalog/schema can create, update, and view monitors.
* Users with read permissions on the monitored table can view its monitor.

Therefore, ensure that service principals are the owners or have the necessary permissions to manage the monitored table.

## Develop and test config changes

### databricks CLI bundles schema overview
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,14 @@ resources:
name: ${var.model_name}
description: MLflow registered model for the "{{ .input_project_name }}" ML Project for ${bundle.target} deployment target.
<<: *permissions
depends_on:
- resources.jobs.model_training_job.id
- resources.jobs.batch_inference_job.id
{{- else -}}
registered_models:
model:
name: ${var.model_name}
catalog_name: ${bundle.target}
schema_name: {{ .input_schema_name }}
comment: Registered model in Unity Catalog for the "{{ .input_project_name }}" ML Project for ${bundle.target} deployment target.
<<: *grants
depends_on:
- resources.jobs.model_training_job.id
- resources.jobs.batch_inference_job.id{{end}}
<<: *grants{{end}}

experiments:
experiment:
Expand Down
Loading

0 comments on commit 4306c6b

Please sign in to comment.