diff --git a/bundle/config/mutator/apply_presets.go b/bundle/config/mutator/apply_presets.go index 3817037565..587ae1c5f0 100644 --- a/bundle/config/mutator/apply_presets.go +++ b/bundle/config/mutator/apply_presets.go @@ -22,6 +22,8 @@ type applyPresets struct{} // Apply all presets, e.g. the prefix presets that // adds a prefix to all names of all resources. +// +// Note that the catalog/schema presets are applied in ApplyPresetsCatalogSchema. func ApplyPresets() *applyPresets { return &applyPresets{} } @@ -84,6 +86,7 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos } // Pipelines presets: Prefix, PipelinesDevelopment + // Not supported: Tags (not in API as of 2024-12) for key, p := range r.Pipelines { if p.PipelineSpec == nil { diags = diags.Extend(diag.Errorf("pipeline %s is not defined", key)) @@ -96,7 +99,6 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos if t.TriggerPauseStatus == config.Paused { p.Continuous = false } - // As of 2024-06, pipelines don't yet support tags } // Models presets: Prefix, Tags @@ -146,37 +148,36 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos } // Model serving endpoint presets: Prefix + // Not supported: Tags (not in API as of 2024-12) for key, e := range r.ModelServingEndpoints { if e.CreateServingEndpoint == nil { diags = diags.Extend(diag.Errorf("model serving endpoint %s is not defined", key)) continue } e.Name = normalizePrefix(prefix) + e.Name - - // As of 2024-06, model serving endpoints don't yet support tags } // Registered models presets: Prefix + // Not supported: Tags (not in API as of 2024-12) for key, m := range r.RegisteredModels { if m.CreateRegisteredModelRequest == nil { diags = diags.Extend(diag.Errorf("registered model %s is not defined", key)) continue } m.Name = normalizePrefix(prefix) + m.Name - - // As of 2024-06, registered models don't yet support tags } // Quality monitors presets: Schedule - if t.TriggerPauseStatus == config.Paused { - for key, q := range r.QualityMonitors { - if q.CreateMonitor == nil { - diags = diags.Extend(diag.Errorf("quality monitor %s is not defined", key)) - continue - } - // Remove all schedules from monitors, since they don't support pausing/unpausing. - // Quality monitors might support the "pause" property in the future, so at the - // CLI level we do respect that property if it is set to "unpaused." + // Not supported: Tags (not in API as of 2024-12) + for key, q := range r.QualityMonitors { + if q.CreateMonitor == nil { + diags = diags.Extend(diag.Errorf("quality monitor %s is not defined", key)) + continue + } + // Remove all schedules from monitors, since they don't support pausing/unpausing. + // Quality monitors might support the "pause" property in the future, so at the + // CLI level we do respect that property if it is set to "unpaused." + if t.TriggerPauseStatus == config.Paused { if q.Schedule != nil && q.Schedule.PauseStatus != catalog.MonitorCronSchedulePauseStatusUnpaused { q.Schedule = nil } @@ -184,14 +185,13 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos } // Schemas: Prefix + // Not supported: Tags (only supported in Databricks UI / SQL API as of 2024-12) for key, s := range r.Schemas { if s.CreateSchema == nil { diags = diags.Extend(diag.Errorf("schema %s is not defined", key)) continue } s.Name = normalizePrefix(prefix) + s.Name - // HTTP API for schemas doesn't yet support tags. It's only supported in - // the Databricks UI and via the SQL API. } // Clusters: Prefix, Tags @@ -205,10 +205,10 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos c.CustomTags = make(map[string]string) } for _, tag := range tags { - normalisedKey := b.Tagging.NormalizeKey(tag.Key) - normalisedValue := b.Tagging.NormalizeValue(tag.Value) - if _, ok := c.CustomTags[normalisedKey]; !ok { - c.CustomTags[normalisedKey] = normalisedValue + normalizedKey := b.Tagging.NormalizeKey(tag.Key) + normalizedValue := b.Tagging.NormalizeValue(tag.Value) + if _, ok := c.CustomTags[normalizedKey]; !ok { + c.CustomTags[normalizedKey] = normalizedValue } } } @@ -246,6 +246,7 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos return diags } +// validatePauseStatus checks the user-provided pause status is valid. func validatePauseStatus(b *bundle.Bundle) diag.Diagnostics { p := b.Config.Presets.TriggerPauseStatus if p == "" || p == config.Paused || p == config.Unpaused { @@ -259,7 +260,7 @@ func validatePauseStatus(b *bundle.Bundle) diag.Diagnostics { } // toTagArray converts a map of tags to an array of tags. -// We sort tags so ensure stable ordering. +// We sort tags to ensure stable ordering. func toTagArray(tags map[string]string) []Tag { var tagArray []Tag if tags == nil { diff --git a/bundle/config/mutator/apply_presets_catalog_schema.go b/bundle/config/mutator/apply_presets_catalog_schema.go new file mode 100644 index 0000000000..227a25843b --- /dev/null +++ b/bundle/config/mutator/apply_presets_catalog_schema.go @@ -0,0 +1,384 @@ +package mutator + +import ( + "context" + "fmt" + "os" + "regexp" + "strings" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go/service/jobs" +) + +type applyPresetsCatalogSchema struct{} + +// ApplyPresetsCatalogSchema applies catalog and schema presets to bundle resources. +func ApplyPresetsCatalogSchema() *applyPresetsCatalogSchema { + return &applyPresetsCatalogSchema{} +} + +func (m *applyPresetsCatalogSchema) Name() string { + return "ApplyPresetsCatalogSchema" +} + +func (m *applyPresetsCatalogSchema) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { + diags := diag.Diagnostics{} + p := b.Config.Presets + r := b.Config.Resources + + if p.Catalog == "" && p.Schema == "" { + return diags + } + if (p.Schema == "" && p.Catalog != "") || (p.Catalog == "" && p.Schema != "") { + return diag.Diagnostics{{ + Summary: "presets.catalog and presets.schema must always be set together", + Severity: diag.Error, + Locations: []dyn.Location{b.Config.GetLocation("presets")}, + }} + } + + // Jobs + for key, j := range r.Jobs { + if j.JobSettings == nil { + continue + } + + for _, task := range j.Tasks { + if task.DbtTask != nil { + if task.DbtTask.Catalog == "" { + task.DbtTask.Catalog = p.Catalog + } + if task.DbtTask.Schema == "" { + task.DbtTask.Schema = p.Schema + } + } + } + + diags = diags.Extend(addCatalogSchemaParameters(b, key, j, p)) + diags = diags.Extend(recommendCatalogSchemaUsage(b, ctx, key, j)) + } + + // Pipelines + allSameCatalog := allPipelinesSameCatalog(&r) + for key, pl := range r.Pipelines { + if pl.PipelineSpec == nil { + continue + } + if pl.Catalog == "" { + pl.Catalog = p.Catalog + } + if pl.Schema == "" && pl.Target == "" { + // As of 2024-12, the Schema field isn't broadly supported yet in the pipelines API. + // Until it is, we set the Target field. + pl.Target = p.Schema + } + if allSameCatalog && pl.Catalog == p.Catalog { + // Just for the common case where all pipelines have the same catalog, + // we show a recommendation to leave it out and rely on presets. + // This can happen when using the original default template. + diags = diags.Extend(diag.Diagnostics{{ + Summary: "Omit the catalog field since it will be automatically populated from presets.catalog", + Severity: diag.Recommendation, + Locations: b.Config.GetLocations("resources.pipelines." + key + ".catalog"), + }}) + } + if pl.GatewayDefinition != nil { + if pl.GatewayDefinition.GatewayStorageCatalog == "" { + pl.GatewayDefinition.GatewayStorageCatalog = p.Catalog + } + if pl.GatewayDefinition.GatewayStorageSchema == "" { + pl.GatewayDefinition.GatewayStorageSchema = p.Schema + } + } + if pl.IngestionDefinition != nil { + for _, obj := range pl.IngestionDefinition.Objects { + if obj.Report != nil { + if obj.Report.DestinationCatalog == "" { + obj.Report.DestinationCatalog = p.Catalog + } + if obj.Report.DestinationSchema == "" { + obj.Report.DestinationSchema = p.Schema + } + } + if obj.Schema != nil { + if obj.Schema.SourceCatalog == "" { + obj.Schema.SourceCatalog = p.Catalog + } + if obj.Schema.SourceSchema == "" { + obj.Schema.SourceSchema = p.Schema + } + if obj.Schema.DestinationCatalog == "" { + obj.Schema.DestinationCatalog = p.Catalog + } + if obj.Schema.DestinationSchema == "" { + obj.Schema.DestinationSchema = p.Schema + } + } + if obj.Table != nil { + if obj.Table.SourceCatalog == "" { + obj.Table.SourceCatalog = p.Catalog + } + if obj.Table.SourceSchema == "" { + obj.Table.SourceSchema = p.Schema + } + if obj.Table.DestinationCatalog == "" { + obj.Table.DestinationCatalog = p.Catalog + } + if obj.Table.DestinationSchema == "" { + obj.Table.DestinationSchema = p.Schema + } + } + } + } + } + + // Model serving endpoints + for _, e := range r.ModelServingEndpoints { + if e.CreateServingEndpoint == nil { + continue + } + + if e.CreateServingEndpoint.AiGateway != nil && e.CreateServingEndpoint.AiGateway.InferenceTableConfig != nil { + if e.CreateServingEndpoint.AiGateway.InferenceTableConfig.CatalogName == "" { + e.CreateServingEndpoint.AiGateway.InferenceTableConfig.CatalogName = p.Catalog + } + if e.CreateServingEndpoint.AiGateway.InferenceTableConfig.SchemaName == "" { + e.CreateServingEndpoint.AiGateway.InferenceTableConfig.SchemaName = p.Schema + } + } + + if e.CreateServingEndpoint.Config.AutoCaptureConfig != nil { + if e.CreateServingEndpoint.Config.AutoCaptureConfig.CatalogName == "" { + e.CreateServingEndpoint.Config.AutoCaptureConfig.CatalogName = p.Catalog + } + if e.CreateServingEndpoint.Config.AutoCaptureConfig.SchemaName == "" { + e.CreateServingEndpoint.Config.AutoCaptureConfig.SchemaName = p.Schema + } + } + + for i := range e.CreateServingEndpoint.Config.ServedEntities { + e.CreateServingEndpoint.Config.ServedEntities[i].EntityName = fullyQualifyName( + e.CreateServingEndpoint.Config.ServedEntities[i].EntityName, p, + ) + } + for i := range e.CreateServingEndpoint.Config.ServedModels { + e.CreateServingEndpoint.Config.ServedModels[i].ModelName = fullyQualifyName( + e.CreateServingEndpoint.Config.ServedModels[i].ModelName, p, + ) + } + } + + // Registered models + for _, m := range r.RegisteredModels { + if m.CreateRegisteredModelRequest == nil { + continue + } + if m.CatalogName == "" { + m.CatalogName = p.Catalog + } + if m.SchemaName == "" { + m.SchemaName = p.Schema + } + } + + // Quality monitors + for _, q := range r.QualityMonitors { + if q.CreateMonitor == nil { + continue + } + q.TableName = fullyQualifyName(q.TableName, p) + if q.OutputSchemaName == "" { + q.OutputSchemaName = p.Catalog + "." + p.Schema + } + } + + // Schemas + for _, s := range r.Schemas { + if s.CreateSchema == nil { + continue + } + if s.CatalogName == "" { + s.CatalogName = p.Catalog + } + if s.Name == "" { + s.Name = p.Schema + } + } + + // Volumes + for _, v := range r.Volumes { + if v.CreateVolumeRequestContent == nil { + continue + } + if v.CatalogName == "" { + v.CatalogName = p.Catalog + } + if v.SchemaName == "" { + v.SchemaName = p.Schema + } + } + + return diags +} + +// addCatalogSchemaParameters adds catalog and schema parameters to a job if they don't already exist. +// Returns any warning diagnostics for existing parameters. +func addCatalogSchemaParameters(b *bundle.Bundle, key string, job *resources.Job, p config.Presets) diag.Diagnostics { + var diags diag.Diagnostics + + // Check for existing catalog/schema parameters + hasCatalog := false + hasSchema := false + if job.Parameters != nil { + for _, param := range job.Parameters { + if param.Name == "catalog" { + hasCatalog = true + diags = diags.Extend(diag.Diagnostics{{ + Summary: fmt.Sprintf("job '%s' already has 'catalog' parameter defined; ignoring preset value", key), + Severity: diag.Warning, + Locations: b.Config.GetLocations("resources.jobs." + key + ".parameters"), + }}) + } + if param.Name == "schema" { + hasSchema = true + diags = diags.Extend(diag.Diagnostics{{ + Summary: fmt.Sprintf("job '%s' already has 'schema' parameter defined; ignoring preset value", key), + Severity: diag.Warning, + Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key + ".parameters")}, + }}) + } + } + } + + // Add catalog/schema parameters + if !hasCatalog { + job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{ + Name: "catalog", + Default: p.Catalog, + }) + } + if !hasSchema { + job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{ + Name: "schema", + Default: p.Schema, + }) + } + + return diags +} + +func recommendCatalogSchemaUsage(b *bundle.Bundle, ctx context.Context, key string, job *resources.Job) diag.Diagnostics { + var diags diag.Diagnostics + for _, t := range job.Tasks { + var relPath string + var expected string + var fix string + if t.NotebookTask != nil { + relPath = t.NotebookTask.NotebookPath + expected = `dbutils.widgets.text\(['"]schema|` + + `USE[^)]+schema` + fix = " dbutils.widgets.text('catalog')\n" + + " dbutils.widgets.text('schema')\n" + + " catalog = dbutils.widgets.get('catalog')\n" + + " schema = dbutils.widgets.get('schema')\n" + + " spark.sql(f'USE {catalog}.{schema}')\n" + } else if t.SparkPythonTask != nil { + relPath = t.SparkPythonTask.PythonFile + expected = `add_argument\(['"]--catalog'|` + + `USE[^)]+catalog` + fix = " def main():\n" + + " parser = argparse.ArgumentParser()\n" + + " parser.add_argument('--catalog', required=True)\n" + + " parser.add_argument('--schema', '-s', required=True)\n" + + " args, unknown = parser.parse_known_args()\n" + + " spark.sql(f\"USE {args.catalog}.{args.schema}\")\n" + } else if t.SqlTask != nil && t.SqlTask.File != nil { + relPath = t.SqlTask.File.Path + expected = `:schema|\{\{schema\}\}` + fix = " USE CATALOG {{catalog}};\n" + + " USE IDENTIFIER({schema});\n" + } else { + continue + } + + sourceDir, err := b.Config.GetLocation("resources.jobs." + key).Directory() + if err != nil { + continue + } + + localPath, _, err := GetLocalPath(ctx, b, sourceDir, relPath) + if err != nil || localPath == "" { + // We ignore errors (they're reported by another mutator) + // and ignore empty local paths (which means we'd have to download the file) + continue + } + + if !fileIncludesPattern(ctx, localPath, expected) { + diags = diags.Extend(diag.Diagnostics{{ + Summary: "Use the 'catalog' and 'schema' parameters provided via 'presets.catalog' and 'presets.schema' using\n\n" + fix, + Severity: diag.Recommendation, + Locations: []dyn.Location{{ + File: localPath, + Line: 1, + Column: 1, + }}, + }}) + } + } + + return diags +} + +// fullyQualifyName checks if the given name is already qualified with a catalog and schema. +// If not, and both catalog and schema are available, it prefixes the name with catalog.schema. +// If name is empty, returns name as-is. +func fullyQualifyName(name string, p config.Presets) string { + if name == "" || p.Catalog == "" || p.Schema == "" { + return name + } + // If it's already qualified (contains at least two '.'), we assume it's fully qualified. + parts := strings.Split(name, ".") + if len(parts) >= 3 { + // Already fully qualified + return name + } + // Otherwise, fully qualify it + return fmt.Sprintf("%s.%s.%s", p.Catalog, p.Schema, name) +} + +func fileIncludesPattern(ctx context.Context, filePath, expected string) bool { + content, err := os.ReadFile(filePath) + if err != nil { + log.Warnf(ctx, "failed to check file %s: %v", filePath, err) + return true + } + + matched, err := regexp.MatchString(expected, string(content)) + if err != nil { + log.Warnf(ctx, "failed to check pattern in %s: %v", filePath, err) + return true + } + return matched +} + +func allPipelinesSameCatalog(r *config.Resources) bool { + var firstCatalog string + + for _, pl := range r.Pipelines { + if pl.PipelineSpec == nil || pl.PipelineSpec.Catalog == "" { + return false + } + if firstCatalog == "" { + firstCatalog = pl.PipelineSpec.Catalog + } else if pl.PipelineSpec.Catalog != firstCatalog { + return false + } + } + return firstCatalog != "" +} diff --git a/bundle/config/mutator/apply_presets_catalog_schema_test.go b/bundle/config/mutator/apply_presets_catalog_schema_test.go new file mode 100644 index 0000000000..ffcbb1526a --- /dev/null +++ b/bundle/config/mutator/apply_presets_catalog_schema_test.go @@ -0,0 +1,382 @@ +package mutator_test + +import ( + "context" + "reflect" + "regexp" + "strings" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/mutator" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/databricks-sdk-go/service/catalog" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/databricks/databricks-sdk-go/service/serving" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type recordedField struct { + Path dyn.Path + PathString string + Placeholder string + Expected string +} + +// mockPresetsCatalogSchema returns a mock bundle with all known resources +// that have catalog/schema fields, with those fields filled in as placeholders. +func mockPresetsCatalogSchema() *bundle.Bundle { + return &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "key": { + JobSettings: &jobs.JobSettings{ + Name: "job", + Parameters: []jobs.JobParameterDefinition{ + {Name: "catalog", Default: ""}, + {Name: "schema", Default: ""}, + }, + Tasks: []jobs.Task{ + { + DbtTask: &jobs.DbtTask{ + Catalog: "", + Schema: "", + }, + }, + { + SparkPythonTask: &jobs.SparkPythonTask{ + PythonFile: "/file", + }, + }, + { + NotebookTask: &jobs.NotebookTask{ + NotebookPath: "/notebook", + }, + }, + }, + }, + }, + }, + Pipelines: map[string]*resources.Pipeline{ + "key": { + PipelineSpec: &pipelines.PipelineSpec{ + Name: "pipeline", + Catalog: "", + Target: "", + GatewayDefinition: &pipelines.IngestionGatewayPipelineDefinition{ + GatewayStorageCatalog: "", + GatewayStorageSchema: "", + }, + IngestionDefinition: &pipelines.IngestionPipelineDefinition{ + Objects: []pipelines.IngestionConfig{ + { + Report: &pipelines.ReportSpec{ + DestinationCatalog: "", + DestinationSchema: "", + }, + Schema: &pipelines.SchemaSpec{ + SourceCatalog: "", + SourceSchema: "", + DestinationCatalog: "", + DestinationSchema: "", + }, + Table: &pipelines.TableSpec{ + SourceCatalog: "", + SourceSchema: "", + DestinationCatalog: "", + DestinationSchema: "", + }, + }, + }, + }, + }, + }, + }, + ModelServingEndpoints: map[string]*resources.ModelServingEndpoint{ + "key": { + CreateServingEndpoint: &serving.CreateServingEndpoint{ + Name: "serving", + AiGateway: &serving.AiGatewayConfig{ + InferenceTableConfig: &serving.AiGatewayInferenceTableConfig{ + CatalogName: "", + SchemaName: "", + }, + }, + Config: serving.EndpointCoreConfigInput{ + AutoCaptureConfig: &serving.AutoCaptureConfigInput{ + CatalogName: "", + SchemaName: "", + }, + ServedEntities: []serving.ServedEntityInput{ + {EntityName: "..entity"}, + }, + ServedModels: []serving.ServedModelInput{ + {ModelName: "..model"}, + }, + }, + }, + }, + }, + RegisteredModels: map[string]*resources.RegisteredModel{ + "key": { + CreateRegisteredModelRequest: &catalog.CreateRegisteredModelRequest{ + Name: "registered_model", + CatalogName: "", + SchemaName: "", + }, + }, + }, + QualityMonitors: map[string]*resources.QualityMonitor{ + "key": { + TableName: "..table", + CreateMonitor: &catalog.CreateMonitor{ + OutputSchemaName: ".", + }, + }, + }, + Schemas: map[string]*resources.Schema{ + "key": { + CreateSchema: &catalog.CreateSchema{ + Name: "", + CatalogName: "", + }, + }, + }, + Volumes: map[string]*resources.Volume{ + "key": { + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + CatalogName: "", + SchemaName: "", + }, + }, + }, + }, + Presets: config.Presets{ + Catalog: "my_catalog", + Schema: "my_schema", + }, + }, + } +} + +// ignoredFields are all paths to fields in resources where we don't want to +// apply the catalog/schema presets. +var ignoredFields = map[string]string{ + "resources.pipelines.key.schema": "schema is still in private preview", + "resources.jobs.key.tasks[0].notebook_task.base_parameters": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].python_wheel_task.named_parameters": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].python_wheel_task.parameters": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].run_job_task.job_parameters": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].spark_jar_task.parameters": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].spark_python_task.parameters": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].spark_submit_task.parameters": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].sql_task.parameters": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].run_job_task.jar_params": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].run_job_task.notebook_params": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].run_job_task.pipeline_params": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].run_job_task.python_named_params": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].run_job_task.python_params": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].run_job_task.spark_submit_params": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].run_job_task.sql_params": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].clean_rooms_notebook_task.notebook_base_parameters": "catalog/schema are properties inside this struct", + "resources.pipelines.key.ingestion_definition.objects[0].schema": "schema name is under schema.source_schema/destination_schema", + "resources.schemas": "schema name of schemas is under resources.schemas.key.Name", +} + +func TestApplyPresetsCatalogSchemaWhenAlreadySet(t *testing.T) { + b := mockPresetsCatalogSchema() + recordedFields := recordPlaceholderFields(t, b) + + diags := bundle.Apply(context.Background(), b, mutator.ApplyPresetsCatalogSchema()) + require.NoError(t, diags.Error()) + + for _, f := range recordedFields { + val, err := dyn.GetByPath(b.Config.Value(), f.Path) + require.NoError(t, err, "failed to get path %s", f.Path) + require.Equal(t, f.Placeholder, val.MustString(), + "expected placeholder '%s' at %s to remain unchanged before cleanup", f.Placeholder, f.Path) + } +} + +func TestApplyPresetsCatalogSchemaRecommmendRemovingCatalog(t *testing.T) { + b := mockPresetsCatalogSchema() + b.Config.Resources.Jobs["key"].Parameters = nil // avoid warnings about the job parameters + b.Config.Resources.Pipelines["key"].Catalog = "my_catalog" + + diags := bundle.Apply(context.Background(), b, mutator.ApplyPresetsCatalogSchema()) + require.Equal(t, 1, len(diags)) + require.Equal(t, "Omit the catalog field since it will be automatically populated from presets.catalog", diags[0].Summary) +} + +func TestApplyPresetsCatalogSchemaWhenNotSet(t *testing.T) { + b := mockPresetsCatalogSchema() + recordedFields := recordPlaceholderFields(t, b) + + // Set all catalog/schema fields to empty strings / nil + require.NoError(t, b.Config.Mutate(func(root dyn.Value) (dyn.Value, error) { + for _, f := range recordedFields { + value, err := dyn.GetByPath(root, f.Path) + require.NoError(t, err) + + val := value.MustString() + cleanedVal := removePlaceholders(val) + root, err = dyn.SetByPath(root, f.Path, dyn.V(cleanedVal)) + require.NoError(t, err) + } + return dyn.Set(root, "resources.jobs.key.parameters", dyn.NilValue) + })) + + // Apply catalog/schema presets + diags := bundle.Apply(context.Background(), b, mutator.ApplyPresetsCatalogSchema()) + require.NoError(t, diags.Error()) + + // Verify that all catalog/schema fields have been set to the presets + for _, f := range recordedFields { + val, err := dyn.GetByPath(b.Config.Value(), f.Path) + require.NoError(t, err, "could not find expected field(s) at %s", f.Path) + assert.Equal(t, f.Expected, val.MustString(), "preset value expected for %s based on placeholder %s", f.Path, f.Placeholder) + } +} + +func TestApplyPresetsCatalogSchemaCompleteness(t *testing.T) { + b := mockPresetsCatalogSchema() + recordedFields := recordPlaceholderFields(t, b) + + // Convert the recordedFields to a set for easier lookup + recordedPaths := make(map[string]struct{}) + arrayIndexPattern := regexp.MustCompile(`\[\d+\]`) + for _, field := range recordedFields { + recordedPaths[field.PathString] = struct{}{} + + // Add base paths for any array indices in the path. + // For example, for resources.jobs.key.parameters[0].default we add "resources.jobs.key.parameters + path := field.PathString + path = arrayIndexPattern.ReplaceAllString(path, "[0]") + for { + i := strings.Index(path, "[") + if i < 0 { + break + } + recordedPaths[path[:i]] = struct{}{} + path = path[i+1:] + } + } + + // Find all catalog/schema fields that we think should be covered based + // on all properties in config.Resources. + expectedFields := findAllPossibleCatalogSchemaFields() + assert.GreaterOrEqual(t, len(expectedFields), 42, "expected at least 42 catalog/schema fields, but got %d", len(expectedFields)) + + // Verify that all expected fields are there + for _, field := range expectedFields { + if _, recorded := recordedPaths[field]; !recorded { + if _, ignored := ignoredFields[field]; !ignored { + t.Errorf("Field %s was not included in the catalog/schema presets test. If this is a new field, please add it to PresetsMock or PresetsIgnoredFields and add support for it as appropriate.", field) + } + } + } +} + +// recordPlaceholderFields scans the config and records all fields containing catalog/schema placeholders +func recordPlaceholderFields(t *testing.T, b *bundle.Bundle) []recordedField { + t.Helper() + + var recordedFields []recordedField + err := b.Config.Mutate(func(root dyn.Value) (dyn.Value, error) { + _, err := dyn.Walk(b.Config.Value(), func(p dyn.Path, v dyn.Value) (dyn.Value, error) { + if v.Kind() == dyn.KindString { + val := v.MustString() + if strings.Contains(val, "") || strings.Contains(val, "") { + pathCopy := make(dyn.Path, len(p)) + copy(pathCopy, p) + recordedFields = append(recordedFields, recordedField{ + Path: pathCopy, + PathString: pathCopy.String(), + Placeholder: val, + Expected: replacePlaceholders(val, "my_catalog", "my_schema"), + }) + } + } + return v, nil + }) + return root, err + }) + require.NoError(t, err) + return recordedFields +} + +// findAllPossibleCatalogSchemaFields finds all fields in config.Resources that might refer +// to a catalog or schema. Returns a slice of field paths like +// "resources.pipelines.key.catalog". +func findAllPossibleCatalogSchemaFields() []string { + visited := make(map[reflect.Type]struct{}) + var results []string + + // verifyTypeFields is a recursive function to verify the fields of a given type + var walkTypeFields func(rt reflect.Type, path string) + walkTypeFields = func(rt reflect.Type, path string) { + if _, seen := visited[rt]; seen { + return + } + visited[rt] = struct{}{} + + switch rt.Kind() { + case reflect.Slice, reflect.Array: + walkTypeFields(rt.Elem(), path+"[0]") + case reflect.Map: + walkTypeFields(rt.Elem(), path+".key") + case reflect.Ptr: + walkTypeFields(rt.Elem(), path) + case reflect.Struct: + for i := 0; i < rt.NumField(); i++ { + ft := rt.Field(i) + jsonTag := ft.Tag.Get("json") + if jsonTag == "" || jsonTag == "-" { + // Ignore field names when there's no JSON tag, e.g. for Jobs.JobSettings + walkTypeFields(ft.Type, path) + continue + } + + fieldName := strings.Split(jsonTag, ",")[0] + fieldPath := path + "." + fieldName + + if isCatalogOrSchemaField(fieldName) { + results = append(results, fieldPath) + } + + walkTypeFields(ft.Type, fieldPath) + } + } + } + + var r config.Resources + walkTypeFields(reflect.TypeOf(r), "resources") + return results +} + +// isCatalogOrSchemaField returns true for a field names in config.Resources that we suspect could contain a catalog or schema name +func isCatalogOrSchemaField(name string) bool { + return strings.Contains(name, "catalog") || + strings.Contains(name, "schema") || + strings.Contains(name, "parameters") || + strings.Contains(name, "params") +} + +func removePlaceholders(value string) string { + value = strings.ReplaceAll(value, ".", "") + value = strings.ReplaceAll(value, ".", "") + value = strings.ReplaceAll(value, "", "") + value = strings.ReplaceAll(value, "", "") + return value +} + +func replacePlaceholders(placeholder, catalog, schema string) string { + expected := strings.ReplaceAll(placeholder, "", catalog) + expected = strings.ReplaceAll(expected, "", schema) + return expected +} diff --git a/bundle/config/mutator/translate_paths.go b/bundle/config/mutator/translate_paths.go index af0f941201..be07c77e08 100644 --- a/bundle/config/mutator/translate_paths.go +++ b/bundle/config/mutator/translate_paths.go @@ -14,6 +14,7 @@ import ( "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/notebook" ) @@ -58,6 +59,47 @@ type translateContext struct { seen map[string]string } +// GetLocalPath returns the local file system paths for a path referenced from a resource. +// If it's an absolute path, we treat it as a workspace path and return "". +// +// Arguments: +// +// sourceDir - the source directory for the resource definition. +// p - the path referenced from the resource definition. +// +// Returns: +// +// localPath - the full local file system path. +// localRelPath - the relative path from the base directory. +func GetLocalPath(ctx context.Context, b *bundle.Bundle, sourceDir, p string) (string, string, error) { + if p == "" { + return "", "", fmt.Errorf("path cannot be empty") + } + if path.IsAbs(p) { + return "", "", nil + } + + url, err := url.Parse(p) + if err != nil { + // Apparently this path is not a URL; this can happen for paths that + // have non-URL characters like './profit-%.csv'. + log.Warnf(ctx, "Failed to parse path as a URL '%s': %v", p, err) + } else if url.Scheme != "" { + return "", "", nil + } + + localPath := filepath.Join(sourceDir, filepath.FromSlash(p)) + localRelPath, err := filepath.Rel(b.SyncRootPath, localPath) + if err != nil { + return "", "", err + } + if strings.HasPrefix(localRelPath, "..") { + return "", "", fmt.Errorf("path '%s' is not contained in sync root path", p) + } + + return localPath, localRelPath, nil +} + // rewritePath converts a given relative path from the loaded config to a new path based on the passed rewriting function // // It takes these arguments: @@ -68,42 +110,25 @@ type translateContext struct { // // The function returns an error if it is impossible to rewrite the given relative path. func (t *translateContext) rewritePath( + ctx context.Context, dir string, p *string, fn rewriteFunc, ) error { - // We assume absolute paths point to a location in the workspace - if path.IsAbs(*p) { - return nil - } - - url, err := url.Parse(*p) + localPath, localRelPath, err := GetLocalPath(ctx, t.b, dir, *p) if err != nil { return err } - - // If the file path has scheme, it's a full path and we don't need to transform it - if url.Scheme != "" { + if localPath == "" { + // Skip absolute paths return nil } - // Local path is relative to the directory the resource was defined in. - localPath := filepath.Join(dir, filepath.FromSlash(*p)) if interp, ok := t.seen[localPath]; ok { *p = interp return nil } - // Local path must be contained in the sync root. - // If it isn't, it won't be synchronized into the workspace. - localRelPath, err := filepath.Rel(t.b.SyncRootPath, localPath) - if err != nil { - return err - } - if strings.HasPrefix(localRelPath, "..") { - return fmt.Errorf("path %s is not contained in sync root path", localPath) - } - var workspacePath string if config.IsExplicitlyEnabled(t.b.Config.Presets.SourceLinkedDeployment) { workspacePath = t.b.SyncRootPath @@ -215,9 +240,9 @@ func (t *translateContext) translateNoOpWithPrefix(literal, localFullPath, local return localRelPath, nil } -func (t *translateContext) rewriteValue(p dyn.Path, v dyn.Value, fn rewriteFunc, dir string) (dyn.Value, error) { +func (t *translateContext) rewriteValue(ctx context.Context, p dyn.Path, v dyn.Value, fn rewriteFunc, dir string) (dyn.Value, error) { out := v.MustString() - err := t.rewritePath(dir, &out, fn) + err := t.rewritePath(ctx, dir, &out, fn) if err != nil { if target := (&ErrIsNotebook{}); errors.As(err, target) { return dyn.InvalidValue, fmt.Errorf(`expected a file for "%s" but got a notebook: %w`, p, target) @@ -231,15 +256,15 @@ func (t *translateContext) rewriteValue(p dyn.Path, v dyn.Value, fn rewriteFunc, return dyn.NewValue(out, v.Locations()), nil } -func (t *translateContext) rewriteRelativeTo(p dyn.Path, v dyn.Value, fn rewriteFunc, dir, fallback string) (dyn.Value, error) { - nv, err := t.rewriteValue(p, v, fn, dir) +func (t *translateContext) rewriteRelativeTo(ctx context.Context, p dyn.Path, v dyn.Value, fn rewriteFunc, dir, fallback string) (dyn.Value, error) { + nv, err := t.rewriteValue(ctx, p, v, fn, dir) if err == nil { return nv, nil } // If we failed to rewrite the path, try to rewrite it relative to the fallback directory. if fallback != "" { - nv, nerr := t.rewriteValue(p, v, fn, fallback) + nv, nerr := t.rewriteValue(ctx, p, v, fn, fallback) if nerr == nil { // TODO: Emit a warning that this path should be rewritten. return nv, nil @@ -249,7 +274,7 @@ func (t *translateContext) rewriteRelativeTo(p dyn.Path, v dyn.Value, fn rewrite return dyn.InvalidValue, err } -func (m *translatePaths) Apply(_ context.Context, b *bundle.Bundle) diag.Diagnostics { +func (m *translatePaths) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { t := &translateContext{ b: b, seen: make(map[string]string), @@ -257,13 +282,13 @@ func (m *translatePaths) Apply(_ context.Context, b *bundle.Bundle) diag.Diagnos err := b.Config.Mutate(func(v dyn.Value) (dyn.Value, error) { var err error - for _, fn := range []func(dyn.Value) (dyn.Value, error){ + for _, fn := range []func(ctx context.Context, v dyn.Value) (dyn.Value, error){ t.applyJobTranslations, t.applyPipelineTranslations, t.applyArtifactTranslations, t.applyDashboardTranslations, } { - v, err = fn(v) + v, err = fn(ctx, v) if err != nil { return dyn.InvalidValue, err } diff --git a/bundle/config/mutator/translate_paths_artifacts.go b/bundle/config/mutator/translate_paths_artifacts.go index 921c00c734..44e034204e 100644 --- a/bundle/config/mutator/translate_paths_artifacts.go +++ b/bundle/config/mutator/translate_paths_artifacts.go @@ -1,6 +1,7 @@ package mutator import ( + "context" "fmt" "github.com/databricks/cli/libs/dyn" @@ -27,7 +28,7 @@ func (t *translateContext) artifactRewritePatterns() []artifactRewritePattern { } } -func (t *translateContext) applyArtifactTranslations(v dyn.Value) (dyn.Value, error) { +func (t *translateContext) applyArtifactTranslations(ctx context.Context, v dyn.Value) (dyn.Value, error) { var err error for _, rewritePattern := range t.artifactRewritePatterns() { @@ -38,7 +39,7 @@ func (t *translateContext) applyArtifactTranslations(v dyn.Value) (dyn.Value, er return dyn.InvalidValue, fmt.Errorf("unable to determine directory for artifact %s: %w", key, err) } - return t.rewriteRelativeTo(p, v, rewritePattern.fn, dir, "") + return t.rewriteRelativeTo(ctx, p, v, rewritePattern.fn, dir, "") }) if err != nil { return dyn.InvalidValue, err diff --git a/bundle/config/mutator/translate_paths_dashboards.go b/bundle/config/mutator/translate_paths_dashboards.go index 93822a5991..a6aac82e95 100644 --- a/bundle/config/mutator/translate_paths_dashboards.go +++ b/bundle/config/mutator/translate_paths_dashboards.go @@ -1,12 +1,13 @@ package mutator import ( + "context" "fmt" "github.com/databricks/cli/libs/dyn" ) -func (t *translateContext) applyDashboardTranslations(v dyn.Value) (dyn.Value, error) { +func (t *translateContext) applyDashboardTranslations(ctx context.Context, v dyn.Value) (dyn.Value, error) { // Convert the `file_path` field to a local absolute path. // We load the file at this path and use its contents for the dashboard contents. pattern := dyn.NewPattern( @@ -23,6 +24,6 @@ func (t *translateContext) applyDashboardTranslations(v dyn.Value) (dyn.Value, e return dyn.InvalidValue, fmt.Errorf("unable to determine directory for dashboard %s: %w", key, err) } - return t.rewriteRelativeTo(p, v, t.retainLocalAbsoluteFilePath, dir, "") + return t.rewriteRelativeTo(ctx, p, v, t.retainLocalAbsoluteFilePath, dir, "") }) } diff --git a/bundle/config/mutator/translate_paths_jobs.go b/bundle/config/mutator/translate_paths_jobs.go index c29ff0ea95..4165c0c070 100644 --- a/bundle/config/mutator/translate_paths_jobs.go +++ b/bundle/config/mutator/translate_paths_jobs.go @@ -1,6 +1,7 @@ package mutator import ( + "context" "fmt" "slices" @@ -9,7 +10,7 @@ import ( "github.com/databricks/cli/libs/dyn" ) -func (t *translateContext) applyJobTranslations(v dyn.Value) (dyn.Value, error) { +func (t *translateContext) applyJobTranslations(ctx context.Context, v dyn.Value) (dyn.Value, error) { var err error fallback, err := gatherFallbackPaths(v, "jobs") @@ -43,7 +44,7 @@ func (t *translateContext) applyJobTranslations(v dyn.Value) (dyn.Value, error) return dyn.InvalidValue, err } - return t.rewriteRelativeTo(p, v, rewritePatternFn, dir, fallback[key]) + return t.rewriteRelativeTo(ctx, p, v, rewritePatternFn, dir, fallback[key]) }) } diff --git a/bundle/config/mutator/translate_paths_pipelines.go b/bundle/config/mutator/translate_paths_pipelines.go index 71a65e8462..584da825a4 100644 --- a/bundle/config/mutator/translate_paths_pipelines.go +++ b/bundle/config/mutator/translate_paths_pipelines.go @@ -1,6 +1,7 @@ package mutator import ( + "context" "fmt" "github.com/databricks/cli/libs/dyn" @@ -34,7 +35,7 @@ func (t *translateContext) pipelineRewritePatterns() []pipelineRewritePattern { } } -func (t *translateContext) applyPipelineTranslations(v dyn.Value) (dyn.Value, error) { +func (t *translateContext) applyPipelineTranslations(ctx context.Context, v dyn.Value) (dyn.Value, error) { var err error fallback, err := gatherFallbackPaths(v, "pipelines") @@ -50,7 +51,7 @@ func (t *translateContext) applyPipelineTranslations(v dyn.Value) (dyn.Value, er return dyn.InvalidValue, fmt.Errorf("unable to determine directory for pipeline %s: %w", key, err) } - return t.rewriteRelativeTo(p, v, rewritePattern.fn, dir, fallback[key]) + return t.rewriteRelativeTo(ctx, p, v, rewritePattern.fn, dir, fallback[key]) }) if err != nil { return dyn.InvalidValue, err diff --git a/bundle/config/mutator/translate_paths_test.go b/bundle/config/mutator/translate_paths_test.go index 493abb8c5f..559e235fb9 100644 --- a/bundle/config/mutator/translate_paths_test.go +++ b/bundle/config/mutator/translate_paths_test.go @@ -1003,3 +1003,93 @@ func TestTranslatePathsWithSourceLinkedDeployment(t *testing.T) { b.Config.Resources.Pipelines["pipeline"].Libraries[1].Notebook.Path, ) } + +// TestGetLocalPath contains test cases for the GetLocalPath function. +func TestGetLocalPath(t *testing.T) { + testCases := []struct { + name string + input string + expected string + errMsg string + }{ + { + name: "EmptyPath", + input: "", + expected: "", + errMsg: "path cannot be empty", + }, + { + name: "AbsolutePathUnix", + input: "/usr/local/bin", + expected: "", + errMsg: "", + }, + { + name: "AbsolutePathWindows", + input: `C:\Program Files\`, + expected: ``, + errMsg: "", + }, + { + name: "RelativePath", + input: "./local/path", + expected: "root/src/local/path", + errMsg: "", + }, + { + name: "NestedRelativePath", + input: "../relative/path", + expected: "root/relative/path", + errMsg: "", + }, + { + name: "PathWithSpaces", + input: "path/with spaces and slash/", + expected: "root/src/path/with spaces and slash", + errMsg: "", + }, + { + name: "PathWithSpecialChars", + input: "path/with/@#$%^&*()!", + expected: "root/src/path/with/@#$%^&*()!", + errMsg: "", + }, + { + name: "DBFS path", + input: "dbfs:/some/path", + expected: "", + errMsg: "", + }, + { + name: "PathTraversal", + input: "path/with/../../../traversal", + expected: "root/traversal", + errMsg: "", + }, + { + name: "RelativeOutOfBundle", + input: "../../outside", + expected: "", + errMsg: "path '../../outside' is not contained in sync root path", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + SyncRootPath: "root", + } + ctx := context.Background() + localPath, _, err := mutator.GetLocalPath(ctx, b, "root/src", tc.input) + if tc.errMsg != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), tc.errMsg) + } else { + require.NoError(t, err) + // On Windows, filepath.Join may return backslashes. Normalize to forward slashes for comparison. + normalizedResult := filepath.ToSlash(localPath) + assert.Equal(t, tc.expected, normalizedResult, "For test case: %s", tc.name) + } + }) + } +} diff --git a/bundle/config/presets.go b/bundle/config/presets.go index 252c5b5f74..31e6e7b0f5 100644 --- a/bundle/config/presets.go +++ b/bundle/config/presets.go @@ -26,6 +26,12 @@ type Presets struct { // Tags to add to all resources. Tags map[string]string `json:"tags,omitempty"` + + // Catalog is the default catalog for all resources. + Catalog string `json:"catalog,omitempty"` + + // Schema is the default schema for all resources. + Schema string `json:"schema,omitempty"` } // IsExplicitlyEnabled tests whether this feature is explicitly enabled. diff --git a/bundle/internal/schema/annotations.yml b/bundle/internal/schema/annotations.yml index e52189daa1..556f9729bd 100644 --- a/bundle/internal/schema/annotations.yml +++ b/bundle/internal/schema/annotations.yml @@ -97,6 +97,12 @@ github.com/databricks/cli/bundle/config.Lock: "description": |- Whether to force this lock if it is enabled. github.com/databricks/cli/bundle/config.Presets: + "catalog": + "description": |- + The default catalog to use for all resources. + "schema": + "description": |- + The default schema to use for all resources. "jobs_max_concurrent_runs": "description": |- The maximum concurrent runs for a job. diff --git a/bundle/phases/initialize.go b/bundle/phases/initialize.go index 6fa0e5fede..e6e7230e0f 100644 --- a/bundle/phases/initialize.go +++ b/bundle/phases/initialize.go @@ -62,6 +62,7 @@ func Initialize() bundle.Mutator { "bundle", "workspace", "variables", + "presets", ), // Provide permission config errors & warnings after initializing all variables permissions.PermissionDiagnostics(), @@ -71,6 +72,7 @@ func Initialize() bundle.Mutator { mutator.ConfigureVolumeDefaults(), mutator.ProcessTargetMode(), mutator.ApplyPresets(), + mutator.ApplyPresetsCatalogSchema(), mutator.DefaultQueueing(), mutator.ExpandPipelineGlobPaths(), diff --git a/bundle/schema/jsonschema.json b/bundle/schema/jsonschema.json index 8e8efa7fc1..93dc179f03 100644 --- a/bundle/schema/jsonschema.json +++ b/bundle/schema/jsonschema.json @@ -1175,6 +1175,10 @@ { "type": "object", "properties": { + "catalog": { + "description": "The default catalog to use for all resources.", + "$ref": "#/$defs/string" + }, "jobs_max_concurrent_runs": { "description": "The maximum concurrent runs for a job.", "$ref": "#/$defs/int" @@ -1187,6 +1191,10 @@ "description": "Whether pipeline deployments should be locked in development mode.", "$ref": "#/$defs/bool" }, + "schema": { + "description": "The default schema to use for all resources.", + "$ref": "#/$defs/string" + }, "source_linked_deployment": { "description": "Whether to link the deployment to the bundle source.", "$ref": "#/$defs/bool"