Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 45 additions & 10 deletions internal/cmdopts/cmdconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,26 @@ type ConfigUpgradeCommand struct {
}

// Execute upgrades the configuration schema.
// Supports upgrading metrics/sources database, sink database, or both independently.
// When using YAML files for sources/metrics, only the sink can be upgraded.
func (cmd *ConfigUpgradeCommand) Execute([]string) (err error) {
opts := cmd.owner
if err = opts.ValidateConfig(); err != nil {
return
}
ctx := context.Background()
// Upgrade metrics/sources configuration if it's postgres
if opts.IsPgConnStr(opts.Metrics.Metrics) && opts.IsPgConnStr(opts.Sources.Sources) {

hasMetricsPg := opts.IsPgConnStr(opts.Metrics.Metrics)
hasSourcesPg := opts.IsPgConnStr(opts.Sources.Sources)
hasSinks := len(opts.Sinks.Sinks) > 0

// Require at least one upgradable target
if !hasMetricsPg && !hasSourcesPg && !hasSinks {
opts.CompleteCommand(ExitCodeConfigError)
return errors.New("no upgradable configuration specified: provide --sink or postgres connection strings for --sources/--metrics")
}

var upgraded bool

// Upgrade metrics if postgres
if hasMetricsPg {
err = opts.InitMetricReader(ctx)
if err != nil {
opts.CompleteCommand(ExitCodeConfigError)
Expand All @@ -102,13 +114,29 @@ func (cmd *ConfigUpgradeCommand) Execute([]string) (err error) {
opts.CompleteCommand(ExitCodeConfigError)
return
}
upgraded = true
}
}

// Upgrade sources configuration if postgres
if hasSourcesPg {
err = opts.InitSourceReader(ctx)
if err != nil {
opts.CompleteCommand(ExitCodeConfigError)
return
}
if m, ok := opts.SourcesReaderWriter.(metrics.Migrator); ok {
err = m.Migrate()
if err != nil {
opts.CompleteCommand(ExitCodeConfigError)
return
}
upgraded = true
}
} else {
opts.CompleteCommand(ExitCodeConfigError)
return errors.New("configuration storage does not support upgrade")
}
// Upgrade sinks configuration if it's postgres
if len(opts.Sinks.Sinks) > 0 {

// Upgrade sinks configuration if specified
if hasSinks {
err = opts.InitSinkWriter(ctx)
if err != nil {
opts.CompleteCommand(ExitCodeConfigError)
Expand All @@ -120,11 +148,18 @@ func (cmd *ConfigUpgradeCommand) Execute([]string) (err error) {
opts.CompleteCommand(ExitCodeConfigError)
return
}
upgraded = true
} else {
opts.CompleteCommand(ExitCodeConfigError)
return errors.New("sink storage does not support upgrade")
}
}

if !upgraded {
opts.CompleteCommand(ExitCodeConfigError)
return errors.New("no configuration was upgraded: ensure postgres connection strings are provided")
}

opts.CompleteCommand(ExitCodeOK)
return
}
17 changes: 15 additions & 2 deletions internal/cmdopts/cmdconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func TestConfigInitCommand_InitSinks(t *testing.T) {
func TestConfigUpgradeCommand_Errors(t *testing.T) {
a := assert.New(t)

t.Run("non-postgres configuration not supported", func(*testing.T) {
t.Run("no upgradable configuration specified", func(*testing.T) {
opts := &Options{
Metrics: metrics.CmdOpts{Metrics: "/tmp/metrics.yaml"},
Sources: sources.CmdOpts{Sources: "/tmp/sources.yaml", Refresh: 120, MaxParallelConnectionsPerDb: 1},
Expand All @@ -310,7 +310,7 @@ func TestConfigUpgradeCommand_Errors(t *testing.T) {
cmd := ConfigUpgradeCommand{owner: opts}
err := cmd.Execute(nil)
a.Error(err)
a.ErrorContains(err, "does not support upgrade")
a.ErrorContains(err, "no upgradable configuration specified")
})

t.Run("init metrics reader fails", func(*testing.T) {
Expand All @@ -323,4 +323,17 @@ func TestConfigUpgradeCommand_Errors(t *testing.T) {
err := cmd.Execute(nil)
a.Error(err)
})

t.Run("sink-only upgrade with yaml sources and metrics", func(*testing.T) {
opts := &Options{
Metrics: metrics.CmdOpts{Metrics: "/tmp/metrics.yaml"},
Sources: sources.CmdOpts{Sources: "/tmp/sources.yaml", Refresh: 120, MaxParallelConnectionsPerDb: 1},
Sinks: sinks.CmdOpts{Sinks: []string{"postgresql://invalid@host/db"}, BatchingDelay: time.Second},
}
cmd := ConfigUpgradeCommand{owner: opts}
err := cmd.Execute(nil)
// Should fail to connect, but should NOT fail with "no upgradable configuration"
a.Error(err)
a.NotContains(err.Error(), "no upgradable configuration")
})
}
Loading