Skip to content

Commit cb40b60

Browse files
authored
Add and populate UCX workflow_runs table (#2754)
## Changes Add and populate workflow runs table ### Linked issues Resolves #2600 ### Functionality - [x] added relevant user documentation - [x] modified existing workflow: `migration-process-experimental` ### Tests - [ ] manually tested - [x] added unit tests - [x] added integration tests ### TODO - [ ] Handle concurrent writes, [see](#2754 (comment)) - [x] Decide on getting workflow run status from `parse_log_task` --> only add it to the migration progress workflow for now
1 parent ab14010 commit cb40b60

File tree

15 files changed

+265
-8
lines changed

15 files changed

+265
-8
lines changed

README.md

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ See [contributing instructions](CONTRIBUTING.md) to help improve this project.
8080
* [`table-migrated-to-uc`](#table-migrated-to-uc)
8181
* [`to-json-in-shared-clusters`](#to-json-in-shared-clusters)
8282
* [`unsupported-magic-line`](#unsupported-magic-line)
83+
* [[EXPERIMENTAL] Migration Progress Workflow](#experimental-migration-progress-workflow)
8384
* [Utility commands](#utility-commands)
8485
* [`logs` command](#logs-command)
8586
* [`ensure-assessment-run` command](#ensure-assessment-run-command)
@@ -126,7 +127,7 @@ See [contributing instructions](CONTRIBUTING.md) to help improve this project.
126127
* [`revert-cluster-remap` command](#revert-cluster-remap-command)
127128
* [`upload` command](#upload-command)
128129
* [`download` command](#download-command)
129-
* [`join-collection` command](#join-collection command)
130+
* [`join-collection` command](#join-collection-command)
130131
* [collection eligible command](#collection-eligible-command)
131132
* [Common Challenges and the Solutions](#common-challenges-and-the-solutions)
132133
* [Network Connectivity Issues](#network-connectivity-issues)
@@ -994,6 +995,19 @@ This message indicates the code that could not be analysed by UCX. User must che
994995

995996
[[back to top](#databricks-labs-ucx)]
996997

998+
## [EXPERIMENTAL] Migration Progress Workflow
999+
1000+
The `migration-progress-experimental` workflow updates a subset of the inventory tables to track migration status of
1001+
workspace resources that need to be migrated. Besides updating the inventory tables, this workflow tracks the migration
1002+
progress by updating the following [UCX catalog](#create-ucx-catalog-command) tables:
1003+
1004+
- `workflow_runs`: Tracks the status of the workflow runs.
1005+
1006+
_Note: A subset of the inventory is updated, *not* the complete inventory that is initially gathered by
1007+
the [assessment workflow](#assessment-workflow)._
1008+
1009+
[[back to top](#databricks-labs-ucx)]
1010+
9971011
# Utility commands
9981012

9991013
## `logs` command
@@ -1029,11 +1043,13 @@ listed with the [`workflows` command](#workflows-command).
10291043
databricks labs ucx update-migration-progress
10301044
```
10311045

1032-
This command updates a subset of the inventory tables that are used to track workspace resources that need to be migrated. It does this by triggering the `migration-process-experimental` workflow to run on a workspace and waiting for it to complete. This can be used to ensure that dashboards and associated reporting are updated to reflect the current state of the workspace.
1033-
1034-
_Note: Only a subset of the inventory is updated, *not* the complete inventory that is initially gathered by the [assessment workflow](#assessment-workflow)._
1046+
This command runs the [(experimental) migration progress workflow](#experimental-migration-progress-workflow) to update
1047+
the migration status of workspace resources that need to be migrated. It does this by triggering
1048+
the `migration-progress-experimental` workflow to run on a workspace and waiting for
1049+
it to complete.
10351050

1036-
Workflows and their status can be listed with the [`workflows` command](#workflows-commandr), while failed workflows can be fixed with the [`repair-run` command](#repair-run-command).
1051+
Workflows and their status can be listed with the [`workflows` command](#workflows-commandr), while failed workflows can
1052+
be fixed with the [`repair-run` command](#repair-run-command).
10371053

10381054
[[back to top](#databricks-labs-ucx)]
10391055

src/databricks/labs/ucx/cli.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,7 @@ def create_ucx_catalog(w: WorkspaceClient, prompts: Prompts, ctx: WorkspaceConte
611611
"""
612612
workspace_context = ctx or WorkspaceContext(w)
613613
workspace_context.catalog_schema.create_ucx_catalog(prompts)
614+
workspace_context.progress_tracking_installation.run()
614615

615616

616617
@ucx.command

src/databricks/labs/ucx/contexts/workflow_task.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from databricks.labs.ucx.hive_metastore.table_size import TableSizeCrawler
1717
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler
1818
from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder
19+
from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder
1920

2021

2122
class RuntimeContext(GlobalContext):
@@ -109,3 +110,16 @@ def task_run_warning_recorder(self):
109110
self.inventory_database,
110111
int(self.named_parameters.get("attempt", "0")),
111112
)
113+
114+
@cached_property
115+
def workflow_run_recorder(self) -> WorkflowRunRecorder:
116+
return WorkflowRunRecorder(
117+
self.sql_backend,
118+
self.config.ucx_catalog,
119+
workspace_id=self.workspace_client.get_workspace_id(),
120+
workflow_name=self.named_parameters["workflow"],
121+
workflow_id=int(self.named_parameters["job_id"]),
122+
workflow_run_id=int(self.named_parameters["parent_run_id"]),
123+
workflow_run_attempt=int(self.named_parameters.get("attempt", 0)),
124+
workflow_start_time=self.named_parameters["start_time"],
125+
)

src/databricks/labs/ucx/contexts/workspace_cli.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from databricks.labs.ucx.azure.resources import AzureAPIClient, AzureResources
1919
from databricks.labs.ucx.contexts.application import CliContext
2020
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
21+
from databricks.labs.ucx.progress.install import ProgressTrackingInstallation
2122
from databricks.labs.ucx.source_code.base import CurrentSessionState
2223
from databricks.labs.ucx.source_code.linters.context import LinterContext
2324
from databricks.labs.ucx.source_code.linters.files import LocalFileMigrator, LocalCodeLinter
@@ -179,6 +180,10 @@ def iam_role_creation(self):
179180
def notebook_loader(self) -> NotebookLoader:
180181
return NotebookLoader()
181182

183+
@cached_property
184+
def progress_tracking_installation(self) -> ProgressTrackingInstallation:
185+
return ProgressTrackingInstallation(self.sql_backend, self.config.ucx_catalog)
186+
182187

183188
class LocalCheckoutContext(WorkspaceContext):
184189
"""Local context extends Workspace context to provide extra properties

src/databricks/labs/ucx/installer/workflows.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,11 @@
5555

5656
TEST_RESOURCE_PURGE_TIMEOUT = timedelta(hours=1)
5757
TEST_NIGHTLY_CI_RESOURCES_PURGE_TIMEOUT = timedelta(hours=3) # Buffer for debugging nightly integration test runs
58+
# See https://docs.databricks.com/en/jobs/parameter-value-references.html#supported-value-references
5859
EXTRA_TASK_PARAMS = {
5960
"job_id": "{{job_id}}",
6061
"run_id": "{{run_id}}",
62+
"start_time": "{{job.start_time.iso_datetime}}",
6163
"attempt": "{{job.repair_count}}",
6264
"parent_run_id": "{{parent_run_id}}",
6365
}
@@ -108,6 +110,7 @@
108110
f'--task=' + dbutils.widgets.get('task'),
109111
f'--job_id=' + dbutils.widgets.get('job_id'),
110112
f'--run_id=' + dbutils.widgets.get('run_id'),
113+
f'--start_time=' + dbutils.widgets.get('start_time'),
111114
f'--attempt=' + dbutils.widgets.get('attempt'),
112115
f'--parent_run_id=' + dbutils.widgets.get('parent_run_id'))
113116
"""
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import logging
2+
3+
from databricks.labs.lsql.backends import SqlBackend
4+
from databricks.labs.lsql.deployment import SchemaDeployer
5+
from databricks.labs.ucx.progress.workflow_runs import WorkflowRun
6+
7+
logger = logging.getLogger(__name__)
8+
9+
10+
class ProgressTrackingInstallation:
11+
"""Install resources for UCX's progress tracking."""
12+
13+
_SCHEMA = "multiworkspace"
14+
15+
def __init__(self, sql_backend: SqlBackend, ucx_catalog: str) -> None:
16+
# `mod` is a required parameter, though, it's not used in this context without views.
17+
self._schema_deployer = SchemaDeployer(sql_backend, self._SCHEMA, mod=None, catalog=ucx_catalog)
18+
19+
def run(self) -> None:
20+
self._schema_deployer.deploy_schema()
21+
self._schema_deployer.deploy_table("workflow_runs", WorkflowRun)
22+
logger.info("Installation completed successfully!")
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import datetime as dt
2+
import logging
3+
from dataclasses import dataclass
4+
5+
from databricks.labs.lsql.backends import SqlBackend
6+
from databricks.sdk.errors import NotFound
7+
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
@dataclass(frozen=True, kw_only=True)
13+
class WorkflowRun:
14+
started_at: dt.datetime
15+
"""The timestamp of the workflow run start."""
16+
17+
finished_at: dt.datetime
18+
"""The timestamp of the workflow run end."""
19+
20+
workspace_id: int
21+
"""The workspace id in which the workflow ran."""
22+
23+
workflow_name: str
24+
"""The workflow name that ran."""
25+
26+
workflow_id: int
27+
""""The workflow id of the workflow that ran."""
28+
29+
workflow_run_id: int
30+
"""The workflow run id."""
31+
32+
workflow_run_attempt: int
33+
"""The workflow run attempt."""
34+
35+
36+
class WorkflowRunRecorder:
37+
"""Record workflow runs in a database."""
38+
39+
def __init__(
40+
self,
41+
sql_backend: SqlBackend,
42+
ucx_catalog: str,
43+
*,
44+
workspace_id: int,
45+
workflow_name: str,
46+
workflow_id: int,
47+
workflow_run_id: int,
48+
workflow_run_attempt: int,
49+
workflow_start_time: str,
50+
):
51+
self._sql_backend = sql_backend
52+
self._full_table_name = f"{ucx_catalog}.multiworkspace.workflow_runs"
53+
self._workspace_id = workspace_id
54+
self._workflow_name = workflow_name
55+
self._workflow_start_time = workflow_start_time
56+
self._workflow_id = workflow_id
57+
self._workflow_run_id = workflow_run_id
58+
self._workflow_run_attempt = workflow_run_attempt
59+
60+
def record(self) -> None:
61+
"""Record a workflow run."""
62+
workflow_run = WorkflowRun(
63+
started_at=dt.datetime.fromisoformat(self._workflow_start_time),
64+
finished_at=dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0),
65+
workspace_id=self._workspace_id,
66+
workflow_name=self._workflow_name,
67+
workflow_id=self._workflow_id,
68+
workflow_run_id=self._workflow_run_id,
69+
workflow_run_attempt=self._workflow_run_attempt,
70+
)
71+
try:
72+
self._sql_backend.save_table(
73+
self._full_table_name,
74+
[workflow_run],
75+
WorkflowRun,
76+
mode="append",
77+
)
78+
except NotFound as e:
79+
logger.error(f"Workflow run table not found: {self._full_table_name}", exc_info=e)

src/databricks/labs/ucx/progress/workflows.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,3 +111,17 @@ def refresh_table_migration_status(self, ctx: RuntimeContext) -> None:
111111
The results of the scan are stored in the `$inventory.migration_status` inventory table.
112112
"""
113113
ctx.migration_status_refresher.snapshot(force_refresh=True)
114+
115+
@job_task(
116+
depends_on=[
117+
crawl_grants,
118+
assess_jobs,
119+
assess_clusters,
120+
assess_pipelines,
121+
crawl_cluster_policies,
122+
refresh_table_migration_status,
123+
]
124+
)
125+
def record_workflow_run(self, ctx: RuntimeContext) -> None:
126+
"""Record the workflow run of this workflow."""
127+
ctx.workflow_run_recorder.record()

tests/integration/conftest.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
from databricks.labs.ucx.hive_metastore.tables import Table
5454
from databricks.labs.ucx.install import WorkspaceInstallation, WorkspaceInstaller, AccountInstaller
5555
from databricks.labs.ucx.installer.workflows import WorkflowsDeployment
56-
56+
from databricks.labs.ucx.progress.install import ProgressTrackingInstallation
5757
from databricks.labs.ucx.runtime import Workflows
5858
from databricks.labs.ucx.workspace_access.groups import MigratedGroup, GroupManager
5959

@@ -445,7 +445,7 @@ def inventory_database(self) -> str:
445445

446446
@cached_property
447447
def ucx_catalog(self) -> str:
448-
return self._make_catalog(name=f"ucx-{self._make_random()}").name
448+
return self._make_catalog(name=f"ucx_{self._make_random()}").name
449449

450450
@cached_property
451451
def workspace_client(self) -> WorkspaceClient:
@@ -748,6 +748,7 @@ def config(self) -> WorkspaceConfig:
748748
return WorkspaceConfig(
749749
warehouse_id=self._env_or_skip("TEST_DEFAULT_WAREHOUSE_ID"),
750750
inventory_database=self.inventory_database,
751+
ucx_catalog=self.ucx_catalog,
751752
connect=self.workspace_client.config,
752753
renamed_group_prefix=f'tmp-{self.inventory_database}-',
753754
)
@@ -948,6 +949,7 @@ def config(self) -> WorkspaceConfig:
948949
include_databases=self.created_databases,
949950
include_object_permissions=self.include_object_permissions,
950951
warehouse_id=self._env_or_skip("TEST_DEFAULT_WAREHOUSE_ID"),
952+
ucx_catalog=self.ucx_catalog,
951953
)
952954
workspace_config = self.config_transform(workspace_config)
953955
self.installation.save(workspace_config)
@@ -986,6 +988,10 @@ def workspace_installation(self):
986988
self.product_info,
987989
)
988990

991+
@cached_property
992+
def progress_tracking_installation(self) -> ProgressTrackingInstallation:
993+
return ProgressTrackingInstallation(self.sql_backend, self.ucx_catalog)
994+
989995
@cached_property
990996
def extend_prompts(self):
991997
return {}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
def test_progress_tracking_installer_creates_workflow_runs_table(az_cli_ctx) -> None:
2+
az_cli_ctx.progress_tracking_installation.run()
3+
query = (
4+
f"SELECT 1 FROM tables WHERE table_catalog = '{az_cli_ctx.config.ucx_catalog}' "
5+
"AND table_schema = 'multiworkspace' AND table_name = 'workflow_runs'"
6+
)
7+
assert any(az_cli_ctx.sql_backend.fetch(query, catalog="system", schema="information_schema"))
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import datetime as dt
2+
3+
4+
def test_workflow_run_recorder_records_workflow_run(installation_ctx) -> None:
5+
"""Ensure that the workflow run recorder records a workflow run"""
6+
start_time = dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0)
7+
named_parameters = {
8+
"workflow": "test",
9+
"job_id": "123",
10+
"parent_run_id": "456",
11+
"start_time": start_time.isoformat(),
12+
}
13+
ctx = installation_ctx.replace(named_parameters=named_parameters)
14+
ctx.progress_tracking_installation.run()
15+
select_workflow_runs_query = f"SELECT * FROM {ctx.ucx_catalog}.multiworkspace.workflow_runs"
16+
# Be confident that we are not selecting any workflow runs before the to-be-tested code
17+
assert not any(ctx.sql_backend.fetch(select_workflow_runs_query))
18+
19+
ctx.workflow_run_recorder.record()
20+
21+
rows = list(ctx.sql_backend.fetch(select_workflow_runs_query))
22+
assert len(rows) == 1
23+
assert rows[0].started_at == start_time
24+
assert start_time <= rows[0].finished_at <= dt.datetime.now(tz=dt.timezone.utc)
25+
assert rows[0].workspace_id == installation_ctx.workspace_client.get_workspace_id()
26+
assert rows[0].workflow_name == "test"
27+
assert rows[0].workflow_id == 123
28+
assert rows[0].workflow_run_id == 456
29+
assert rows[0].workflow_run_attempt == 0

tests/integration/progress/test_workflows.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@ def test_running_real_migration_progress_job(installation_ctx: MockInstallationC
1616
installation_ctx.deployed_workflows.run_workflow("assessment")
1717
assert installation_ctx.deployed_workflows.validate_step("assessment")
1818

19+
# After the assessment, a user (maybe) installs the progress tracking
20+
installation_ctx.progress_tracking_installation.run()
21+
1922
# Run the migration-progress workflow until completion.
2023
installation_ctx.deployed_workflows.run_workflow("migration-progress-experimental")
2124
assert installation_ctx.deployed_workflows.validate_step("migration-progress-experimental")
25+
26+
# Ensure that the migration-progress workflow populated the `workflow_runs` table.
27+
query = f"SELECT 1 FROM {installation_ctx.ucx_catalog}.multiworkspace.workflow_runs"
28+
assert any(installation_ctx.sql_backend.fetch(query)), f"No workflow run captured: {query}"

tests/unit/progress/test_install.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from databricks.labs.ucx.progress.install import ProgressTrackingInstallation
2+
3+
4+
def test_progress_tracking_installation_run_creates_progress_tracking_schema(mock_backend) -> None:
5+
installation = ProgressTrackingInstallation(mock_backend, "ucx")
6+
installation.run()
7+
assert "CREATE SCHEMA IF NOT EXISTS ucx.multiworkspace" in mock_backend.queries[0]
8+
9+
10+
def test_progress_tracking_installation_run_creates_workflow_runs_table(mock_backend) -> None:
11+
installation = ProgressTrackingInstallation(mock_backend, "ucx")
12+
installation.run()
13+
# Dataclass to schema conversion is tested within the lsql package
14+
assert any("CREATE TABLE IF NOT EXISTS" in query for query in mock_backend.queries)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import datetime as dt
2+
3+
from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder
4+
5+
6+
def test_workflow_run_record_records_workflow_run(mock_backend) -> None:
7+
"""Ensure that the workflow run recorder records a workflow run"""
8+
start_time = dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0)
9+
workflow_run_recorder = WorkflowRunRecorder(
10+
mock_backend,
11+
ucx_catalog="ucx",
12+
workspace_id=123456789,
13+
workflow_name="workflow",
14+
workflow_id=123,
15+
workflow_run_id=456,
16+
workflow_run_attempt=0,
17+
workflow_start_time=start_time.isoformat(),
18+
)
19+
20+
workflow_run_recorder.record()
21+
22+
rows = mock_backend.rows_written_for("ucx.multiworkspace.workflow_runs", "append")
23+
assert len(rows) == 1
24+
assert rows[0].started_at == start_time
25+
assert start_time <= rows[0].finished_at <= dt.datetime.now(tz=dt.timezone.utc)
26+
assert rows[0].workspace_id == 123456789
27+
assert rows[0].workflow_name == "workflow"
28+
assert rows[0].workflow_id == 123
29+
assert rows[0].workflow_run_id == 456
30+
assert rows[0].workflow_run_attempt == 0

0 commit comments

Comments
 (0)