Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] First-class catalog and schema setting #1979

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
6a7d31f
Add proof of concept for catalog/schema presets
lennartkats-db Oct 20, 2024
7fb646a
WIP
lennartkats-db Oct 22, 2024
480d308
Merge remote-tracking branch 'databricks/main' into presets-catalog-s…
lennartkats-db Nov 2, 2024
c1f5b3c
Use a helper notebook to set catalog/schema
lennartkats-db Nov 2, 2024
3ca2e67
Merge remote-tracking branch 'databricks/main' into presets-catalog-s…
lennartkats-db Dec 1, 2024
487884b
WIP
lennartkats-db Dec 2, 2024
963f5e6
Add translate_paths.GetLocalPath
lennartkats-db Dec 6, 2024
2addd0c
Extend catalog/schema preset support
lennartkats-db Dec 6, 2024
2ab497d
Tweak templates
lennartkats-db Dec 8, 2024
b6fcc1d
Cleanup
lennartkats-db Dec 9, 2024
d4977c6
WIP test
lennartkats-db Dec 10, 2024
a08b59d
Add comments
lennartkats-db Dec 10, 2024
7d1f8ad
Add TODO
lennartkats-db Dec 11, 2024
6b5948c
Add serving end point support
lennartkats-db Dec 14, 2024
35bed3f
Add experimental test
lennartkats-db Dec 14, 2024
3e1e169
Add reflection/dyn-based tests
lennartkats-db Dec 16, 2024
70f54dc
Add missing fields
lennartkats-db Dec 16, 2024
6d63b31
Refine test
lennartkats-db Dec 19, 2024
feb23dd
Move catalog/schema preset logic to a separate module
lennartkats-db Dec 19, 2024
65f10d1
Cleanup
lennartkats-db Dec 20, 2024
e0b6fad
Add a recommendation for pipeline catalogs
lennartkats-db Dec 20, 2024
8eb96cc
Cleanup
lennartkats-db Dec 20, 2024
86cf7e0
Restore Target logic
lennartkats-db Dec 20, 2024
8c2eaab
Merge remote-tracking branch 'origin/main' into presets-catalog-schem…
lennartkats-db Dec 20, 2024
be08585
presets-catalog-schema-as-params # Your branch is ahead of 'origin/pr…
lennartkats-db Dec 20, 2024
fbd4760
Fix test
lennartkats-db Dec 20, 2024
f3923da
Cleanup
lennartkats-db Dec 20, 2024
6d5cb1d
Add volumes and clean room notebooks
lennartkats-db Dec 20, 2024
0acb222
Fix silly lint error
lennartkats-db Dec 20, 2024
204d3b0
Cleanup
lennartkats-db Dec 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 228 additions & 18 deletions bundle/config/mutator/apply_presets.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@ package mutator

import (
"context"
"fmt"
"os"
"path"
"regexp"
"slices"
"sort"
"strings"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/libs/dbr"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/log"
"github.com/databricks/cli/libs/textutil"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/jobs"
Expand All @@ -38,6 +43,9 @@ func (m *applyPresets) Name() string {
func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
var diags diag.Diagnostics

if d := validateCatalogAndSchema(b); d != nil {
return d // fast fail since code below would fail
}
if d := validatePauseStatus(b); d != nil {
diags = diags.Extend(d)
}
Expand All @@ -47,7 +55,8 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
prefix := t.NamePrefix
tags := toTagArray(t.Tags)

// Jobs presets: Prefix, Tags, JobsMaxConcurrentRuns, TriggerPauseStatus
// Jobs presets.
// Supported: Prefix, Tags, JobsMaxConcurrentRuns, TriggerPauseStatus, Catalog, Schema
for key, j := range r.Jobs {
if j.JobSettings == nil {
diags = diags.Extend(diag.Errorf("job %s is not defined", key))
Expand Down Expand Up @@ -81,9 +90,26 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
j.Trigger.PauseStatus = paused
}
}
if t.Catalog != "" || t.Schema != "" {
for _, task := range j.Tasks {
if task.DbtTask != nil {
if task.DbtTask.Catalog == "" {
task.DbtTask.Catalog = t.Catalog
}
if task.DbtTask.Schema == "" {
task.DbtTask.Schema = t.Schema
}
}
}

diags = diags.Extend(addCatalogSchemaParameters(b, key, j, t))
diags = diags.Extend(recommendCatalogSchemaUsage(b, ctx, key, j))
}
}

// Pipelines presets: Prefix, PipelinesDevelopment
// Pipelines presets.
// Supported: Prefix, PipelinesDevelopment, Catalog, Schema
// Not supported: Tags (as of 2024-10 not in pipelines API)
for key, p := range r.Pipelines {
if p.PipelineSpec == nil {
diags = diags.Extend(diag.Errorf("pipeline %s is not defined", key))
Expand All @@ -96,10 +122,16 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
if t.TriggerPauseStatus == config.Paused {
p.Continuous = false
}
// As of 2024-06, pipelines don't yet support tags
if t.Catalog != "" && p.Catalog == "" && p.Catalog != "hive_metastore" {
p.Catalog = t.Catalog
}
if t.Schema != "" && p.Target == "" {
p.Target = t.Schema
}
}

// Models presets: Prefix, Tags
// Models presets
// Supported: Prefix, Tags
for key, m := range r.Models {
if m.Model == nil {
diags = diags.Extend(diag.Errorf("model %s is not defined", key))
Expand All @@ -117,7 +149,8 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
}
}

// Experiments presets: Prefix, Tags
// Experiments presets
// Supported: Prefix, Tags
for key, e := range r.Experiments {
if e.Experiment == nil {
diags = diags.Extend(diag.Errorf("experiment %s is not defined", key))
Expand Down Expand Up @@ -145,29 +178,49 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
}
}

// Model serving endpoint presets: Prefix
// Model serving endpoint presets
// Supported: Prefix, Catalog, Schema
// Not supported: Tags (not in API as of 2024-10)
for key, e := range r.ModelServingEndpoints {
if e.CreateServingEndpoint == nil {
diags = diags.Extend(diag.Errorf("model serving endpoint %s is not defined", key))
continue
}
e.Name = normalizePrefix(prefix) + e.Name

// As of 2024-06, model serving endpoints don't yet support tags
if t.Catalog != "" || t.Schema != "" {
// TODO:
// - e.AiGateway.InferenceTableConfig.CatalogName
// - e.AiGateway.InferenceTableConfig.SchemaName
// - e.Config.AutoCaptureConfig.SchemaName
// - e.Config.AutoCaptureConfig.CatalogName
// - e.Config.ServedEntities[0].EntityName (__catalog_name__.__schema_name__.__model_name__.)
// - e.Config.ServedModels[0].ModelName (__catalog_name__.__schema_name__.__model_name__.)
diags = diags.Extend(diag.Errorf("model serving endpoints are not supported with catalog/schema presets"))
}

}

// Registered models presets: Prefix
// Registered models presets
// Supported: Prefix, Catalog, Schema
// Not supported: Tags (not in API as of 2024-10)
for key, m := range r.RegisteredModels {
if m.CreateRegisteredModelRequest == nil {
diags = diags.Extend(diag.Errorf("registered model %s is not defined", key))
continue
}
m.Name = normalizePrefix(prefix) + m.Name

// As of 2024-06, registered models don't yet support tags
if t.Catalog != "" && m.CatalogName == "" {
m.CatalogName = t.Catalog
}
if t.Schema != "" && m.SchemaName == "" {
m.SchemaName = t.Schema
}
}

// Quality monitors presets: Schedule
// Quality monitors presets
// Supported: Schedule, Catalog, Schema
// Not supported: Tags (not in API as of 2024-10)
if t.TriggerPauseStatus == config.Paused {
for key, q := range r.QualityMonitors {
if q.CreateMonitor == nil {
Expand All @@ -180,21 +233,35 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
if q.Schedule != nil && q.Schedule.PauseStatus != catalog.MonitorCronSchedulePauseStatusUnpaused {
q.Schedule = nil
}
if t.Catalog != "" && t.Schema != "" {
parts := strings.Split(q.TableName, ".")
if len(parts) != 3 {
q.TableName = fmt.Sprintf("%s.%s.%s", t.Catalog, t.Schema, q.TableName)
}
}
}
}

// Schemas: Prefix
// Schemas: Prefix, Catalog, Schema
// Not supported: Tags (as of 2024-10, only supported in Databricks UI / SQL API)
for key, s := range r.Schemas {
if s.CreateSchema == nil {
diags = diags.Extend(diag.Errorf("schema %s is not defined", key))
continue
}
s.Name = normalizePrefix(prefix) + s.Name
// HTTP API for schemas doesn't yet support tags. It's only supported in
// the Databricks UI and via the SQL API.
if t.Catalog != "" && s.CatalogName == "" {
s.CatalogName = t.Catalog
}
if t.Schema != "" && s.Name == "" {
// If there is a schema preset such as 'dev', we directly
// use that name and don't add any prefix (which might result in dev_dev).
s.Name = t.Schema
}
}

// Clusters: Prefix, Tags
// Not supported: Catalog / Schema (not applicable)
for key, c := range r.Clusters {
if c.ClusterSpec == nil {
diags = diags.Extend(diag.Errorf("cluster %s is not defined", key))
Expand All @@ -205,10 +272,10 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
c.CustomTags = make(map[string]string)
}
for _, tag := range tags {
normalisedKey := b.Tagging.NormalizeKey(tag.Key)
normalisedValue := b.Tagging.NormalizeValue(tag.Value)
if _, ok := c.CustomTags[normalisedKey]; !ok {
c.CustomTags[normalisedKey] = normalisedValue
normalizedKey := b.Tagging.NormalizeKey(tag.Key)
normalizedValue := b.Tagging.NormalizeValue(tag.Value)
if _, ok := c.CustomTags[normalizedKey]; !ok {
c.CustomTags[normalizedKey] = normalizedValue
}
}
}
Expand Down Expand Up @@ -258,6 +325,18 @@ func validatePauseStatus(b *bundle.Bundle) diag.Diagnostics {
}}
}

func validateCatalogAndSchema(b *bundle.Bundle) diag.Diagnostics {
p := b.Config.Presets
if (p.Catalog != "" && p.Schema == "") || (p.Catalog == "" && p.Schema != "") {
return diag.Diagnostics{{
Summary: "presets.catalog and presets.schema must always be set together",
Severity: diag.Error,
Locations: []dyn.Location{b.Config.GetLocation("presets")},
}}
}
return nil
}

// toTagArray converts a map of tags to an array of tags.
// We sort tags so ensure stable ordering.
func toTagArray(tags map[string]string) []Tag {
Expand Down Expand Up @@ -289,3 +368,134 @@ func normalizePrefix(prefix string) string {

return textutil.NormalizeString(prefix) + suffix
}

// addCatalogSchemaParameters adds catalog and schema parameters to a job if they don't already exist.
// Returns any warning diagnostics for existing parameters.
func addCatalogSchemaParameters(b *bundle.Bundle, key string, job *resources.Job, t config.Presets) diag.Diagnostics {
var diags diag.Diagnostics

// Check for existing catalog/schema parameters
hasCatalog := false
hasSchema := false
if job.Parameters != nil {
for _, param := range job.Parameters {
if param.Name == "catalog" {
hasCatalog = true
diags = diags.Extend(diag.Diagnostics{{
Summary: fmt.Sprintf("job %s already has 'catalog' parameter defined; ignoring preset value", key),
Severity: diag.Warning,
Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key)},
}})
}
if param.Name == "schema" {
hasSchema = true
diags = diags.Extend(diag.Diagnostics{{
Summary: fmt.Sprintf("job %s already has 'schema' parameter defined; ignoring preset value", key),
Severity: diag.Warning,
Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key)},
}})
}
}
}

// Initialize parameters if nil
if job.Parameters == nil {
job.Parameters = []jobs.JobParameterDefinition{}
}

// Add catalog parameter if not already present
if !hasCatalog && t.Catalog != "" {
job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{
Name: "catalog",
Default: t.Catalog,
})
}

// Add schema parameter if not already present
if !hasSchema && t.Schema != "" {
job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{
Name: "schema",
Default: t.Schema,
})
}

return diags
}

func recommendCatalogSchemaUsage(b *bundle.Bundle, ctx context.Context, key string, job *resources.Job) diag.Diagnostics {
var diags diag.Diagnostics
for _, t := range job.Tasks {
var relPath string
var expected string
var fix string
if t.NotebookTask != nil {
relPath = t.NotebookTask.NotebookPath
expected = `dbutils.widgets.text\(['"]schema|` +
`USE[^)]+schema`
fix = " dbutils.widgets.text('catalog')\n" +
" dbutils.widgets.text('schema')\n" +
" catalog = dbutils.widgets.get('catalog')\n" +
" schema = dbutils.widgets.get('schema')\n" +
" spark.sql(f'USE {catalog}.{schema}')\n"
} else if t.SparkPythonTask != nil {
relPath = t.SparkPythonTask.PythonFile
expected = `add_argument\(['"]--catalog'|` +
`USE[^)]+catalog`
fix = " def main():\n" +
" parser = argparse.ArgumentParser()\n" +
" parser.add_argument('--catalog', required=True)\n" +
" parser.add_argument('--schema', '-s', required=True)\n" +
" args, unknown = parser.parse_known_args()\n" +
" spark.sql(f\"USE {args.catalog}.{args.schema}\")\n"
} else if t.SqlTask != nil && t.SqlTask.File != nil {
relPath = t.SqlTask.File.Path
expected = `:schema|\{\{schema\}\}`
fix = " USE CATALOG {{catalog}};\n" +
" USE IDENTIFIER({schema});\n"
} else {
continue
}

sourceDir, err := b.Config.GetLocation("resources.jobs." + key).Directory()
if err != nil {
continue
}

localPath, _, err := GetLocalPath(ctx, b, sourceDir, relPath)
if err != nil || localPath == "" {
// We ignore errors (they're reported by another mutator)
// and ignore empty local paths (which means we'd have to download the file)
continue
}

if !fileIncludesPattern(ctx, localPath, expected) {
diags = diags.Extend(diag.Diagnostics{{
Summary: fmt.Sprintf("Use the 'catalog' and 'schema' parameters provided via 'presets.catalog' and 'presets.schema' using\n\n" +
fix),
Severity: diag.Recommendation,
Locations: []dyn.Location{{
File: localPath,
Line: 1,
Column: 1,
}},
}})
}
}

return diags
}

func fileIncludesPattern(ctx context.Context, filePath string, expected string) bool {
content, err := os.ReadFile(filePath)
if err != nil {
log.Warnf(ctx, "failed to check file %s: %v", filePath, err)
return true
}

matched, err := regexp.MatchString(expected, string(content))
if err != nil {
log.Warnf(ctx, "failed to check pattern in %s: %v", filePath, err)
return true
}
return matched
}
Loading
Loading