diff --git a/pkg/dataset/config.go b/pkg/dataset/config.go index 3d4e210035..a97b61966b 100644 --- a/pkg/dataset/config.go +++ b/pkg/dataset/config.go @@ -14,6 +14,7 @@ import ( "github.com/timescale/promscale/pkg/internal/day" "github.com/timescale/promscale/pkg/log" + "github.com/timescale/promscale/pkg/rollup" ) const ( @@ -44,12 +45,12 @@ type Config struct { // Metrics contains dataset configuration options for metrics data. type Metrics struct { - ChunkInterval day.Duration `yaml:"default_chunk_interval"` - Compression *bool `yaml:"compress_data"` // Using pointer to check if the value was set. - HALeaseRefresh day.Duration `yaml:"ha_lease_refresh"` - HALeaseTimeout day.Duration `yaml:"ha_lease_timeout"` - RetentionPeriod day.Duration `yaml:"default_retention_period"` - Downsample *Downsample `yaml:"downsample,omitempty"` + ChunkInterval day.Duration `yaml:"default_chunk_interval"` + Compression *bool `yaml:"compress_data"` // Using pointer to check if the value was set. + HALeaseRefresh day.Duration `yaml:"ha_lease_refresh"` + HALeaseTimeout day.Duration `yaml:"ha_lease_timeout"` + RetentionPeriod day.Duration `yaml:"default_retention_period"` + Rollups *rollup.Config `yaml:"rollups,omitempty"` } // Traces contains dataset configuration options for traces data. @@ -64,11 +65,11 @@ func NewConfig(contents string) (cfg Config, err error) { } // Apply applies the configuration to the database via the supplied DB connection. -func (c *Config) Apply(conn *pgx.Conn) error { +func (c *Config) Apply(ctx context.Context, conn *pgx.Conn) error { c.applyDefaults() - if c.Metrics.Downsample != nil { - if err := c.Metrics.Downsample.Apply(conn); err != nil { + if c.Metrics.Rollups != nil { + if err := c.Metrics.Rollups.Apply(ctx, conn); err != nil { return fmt.Errorf("error applying configuration for downsampling: %w", err) } } diff --git a/pkg/dataset/downsample.go b/pkg/dataset/downsample.go deleted file mode 100644 index f65b0cff84..0000000000 --- a/pkg/dataset/downsample.go +++ /dev/null @@ -1,68 +0,0 @@ -// This file and its contents are licensed under the Apache License 2.0. -// Please see the included NOTICE for copyright information and -// LICENSE for a copy of the license. - -package dataset - -import ( - "context" - "fmt" - "time" - - "github.com/jackc/pgx/v4" - - "github.com/timescale/promscale/pkg/internal/day" - "github.com/timescale/promscale/pkg/log" - "github.com/timescale/promscale/pkg/rollup" -) - -const defaultDownsampleState = true - -var ( - setDefaultDownsampleStateSQL = "SELECT prom_api.set_automatic_downsample($1)" - - defaultDownsampleStateVar = defaultDownsampleState - defaultDownsampleResolution = []rollup.DownsampleResolution{ - { - Label: "short", - Resolution: day.Duration(5 * time.Minute), - Retention: day.Duration(90 * 24 * time.Hour), - }, - { - Label: "long", - Resolution: day.Duration(time.Hour), - Retention: day.Duration(395 * 24 * time.Hour), - }, - } -) - -type Downsample struct { - Automatic *bool `yaml:"automatic,omitempty"` - Resolution []rollup.DownsampleResolution `yaml:"resolutions,omitempty"` -} - -func (d *Downsample) Apply(conn *pgx.Conn) error { - d.applyDefaults() - - log.Info("msg", fmt.Sprintf("Setting metric automatic downsample to %t", *d.Automatic)) - if _, err := conn.Exec(context.Background(), setDefaultDownsampleStateSQL, d.Automatic); err != nil { - return err - } - - if *d.Automatic { - if err := rollup.EnsureRollupWith(conn, d.Resolution); err != nil { - return fmt.Errorf("ensure rollup with: %w", err) - } - } - return nil -} - -func (d *Downsample) applyDefaults() { - if d.Automatic == nil { - // In default case, we plan to downsample data. - d.Automatic = &defaultDownsampleStateVar - } - if d.Resolution == nil { - d.Resolution = defaultDownsampleResolution - } -} diff --git a/pkg/rollup/config.go b/pkg/rollup/config.go new file mode 100644 index 0000000000..d1794bf3b5 --- /dev/null +++ b/pkg/rollup/config.go @@ -0,0 +1,97 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package rollup + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/jackc/pgx/v4" + + "github.com/timescale/promscale/pkg/internal/day" + "github.com/timescale/promscale/pkg/log" +) + +const ( + setDefaultDownsampleStateSQL = "SELECT prom_api.set_automatic_downsample($1)" + + // short and long represent system resolutions. + short = "short" + long = "long" +) + +var ( + defaultDownsampleState = false + useDefaultResolution = false + systemResolution = map[string]Definition{ + short: { + Resolution: day.Duration(5 * time.Minute), + Retention: day.Duration(90 * 24 * time.Hour), + }, + long: { + Resolution: day.Duration(time.Hour), + Retention: day.Duration(395 * 24 * time.Hour), + }, + } +) + +type Config struct { + Enabled *bool `yaml:"enabled,omitempty"` + UseDefaultResolution *bool `yaml:"use_default_resolution"` + Resolutions `yaml:"resolutions,omitempty"` +} + +type Definition struct { + Resolution day.Duration `yaml:"resolution"` + Retention day.Duration `yaml:"retention"` + Delete bool `yaml:"delete"` +} + +type Resolutions map[string]Definition + +func (d *Config) Apply(ctx context.Context, conn *pgx.Conn) error { + d.applyDefaults() + + if containsSystemResolutions(d.Resolutions) { + return fmt.Errorf("'short' and 'long' are system resolutions. These cannot be applied as rollup labels") + } + + log.Info("msg", fmt.Sprintf("Setting automatic metric downsample to %t", *d.Enabled)) + if _, err := conn.Exec(context.Background(), setDefaultDownsampleStateSQL, d.Enabled); err != nil { + return err + } + + if *d.Enabled { + if *d.UseDefaultResolution { + d.Resolutions["short"] = systemResolution["short"] + d.Resolutions["long"] = systemResolution["long"] + } + if err := Sync(ctx, conn, d.Resolutions); err != nil { + return fmt.Errorf("ensure rollup with: %w", err) + } + } + return nil +} + +func (d *Config) applyDefaults() { + if d.Enabled == nil { + d.Enabled = &defaultDownsampleState + } + if d.UseDefaultResolution == nil { + d.UseDefaultResolution = &useDefaultResolution + } +} + +func containsSystemResolutions(r Resolutions) bool { + for k := range r { + k = strings.ToLower(k) + if k == short || k == long { + return true + } + } + return false +} diff --git a/pkg/rollup/rollup.go b/pkg/rollup/rollup.go index 663659d4e9..fe6a9134aa 100644 --- a/pkg/rollup/rollup.go +++ b/pkg/rollup/rollup.go @@ -14,93 +14,109 @@ import ( "github.com/timescale/promscale/pkg/internal/day" ) -type DownsampleResolution struct { - Label string `yaml:"label"` - Resolution day.Duration `yaml:"resolution"` - Retention day.Duration `yaml:"retention"` -} - -// EnsureRollupWith ensures "strictly" that the given new resolutions are applied in the database. -// -// Note: It follows a "strict" behaviour, meaning any existing resolutions of downsampling in -// the database will be removed, so that the all downsampling data in the database strictly -// matches the provided newResolutions. -// -// Example: If the DB already contains metric rollups for `short` and `long`, and in dataset-config, -// connector sees `very_short` and `long`, then EnsureRollupWith will remove the `short` downsampled data -// and create `very_short`, while not touching `long`. -func EnsureRollupWith(conn *pgx.Conn, newResolutions []DownsampleResolution) error { +// Sync updates the rollups in the DB in accordance with the given resolutions. It handles: +// 1. Creating of new rollups +// 2. Deletion of rollups that have `delete: true` +// 3. Update retention duration of rollups that have same label name but different retention duration. If resolution of +// existing rollups are updated, an error is returned +func Sync(ctx context.Context, conn *pgx.Conn, r Resolutions) error { rows, err := conn.Query(context.Background(), "SELECT name, resolution, retention FROM _prom_catalog.rollup") if err != nil { return fmt.Errorf("querying existing resolutions: %w", err) } defer rows.Close() - var existingResolutions []DownsampleResolution + existingResolutions := make(Resolutions) for rows.Next() { var lName string var resolution, retention time.Duration if err := rows.Scan(&lName, &resolution, &retention); err != nil { return fmt.Errorf("error scanning output rows for existing resolutions: %w", err) } - existingResolutions = append(existingResolutions, DownsampleResolution{Label: lName, Resolution: day.Duration(resolution), Retention: day.Duration(retention)}) + existingResolutions[lName] = Definition{Resolution: day.Duration(resolution), Retention: day.Duration(retention)} } - // Determine which resolutions need to be created and deleted from the DB. - pendingCreation := diff(newResolutions, existingResolutions) - pendingDeletion := diff(existingResolutions, newResolutions) + if err := errOnResolutionMismatch(existingResolutions, r); err != nil { + return fmt.Errorf("error on existing resolution mismatch: %w", err) + } + + if err := updateExistingRollups(ctx, conn, existingResolutions, r); err != nil { + return fmt.Errorf("update existing rollups: %w", err) + } // Delete rollups that are no longer required. - if err = deleteRollups(conn, pendingDeletion); err != nil { + if err = deleteRollups(ctx, conn, existingResolutions, r); err != nil { return fmt.Errorf("delete rollups: %w", err) } // Create new rollups. - if err = createRollups(conn, pendingCreation); err != nil { + if err = createRollups(ctx, conn, existingResolutions, r); err != nil { return fmt.Errorf("create rollups: %w", err) } return nil } -func createRollups(conn *pgx.Conn, res []DownsampleResolution) error { - for _, r := range res { - _, err := conn.Exec(context.Background(), "CALL _prom_catalog.create_rollup($1, $2, $3)", r.Label, time.Duration(r.Resolution), time.Duration(r.Retention)) - if err != nil { - return fmt.Errorf("error creating rollup for %s: %w", r.Label, err) +// errOnResolutionMismatch returns an error if a given resolution exists in the DB with a different resolution duration. +func errOnResolutionMismatch(existing, r Resolutions) error { + for labelName, res := range r { + if oldRes, exists := existing[labelName]; exists { + if oldRes.Resolution != res.Resolution { + return fmt.Errorf("existing rollup resolutions cannot be updated. Either keep the resolution of existing rollup labels same or remove them") + } } } return nil } -func deleteRollups(conn *pgx.Conn, res []DownsampleResolution) error { - for _, r := range res { - _, err := conn.Exec(context.Background(), "CALL _prom_catalog.delete_rollup($1)", r.Label) - if err != nil { - return fmt.Errorf("error deleting rollup for %s: %w", r.Label, err) +// updateExistingRollups updates the existing rollups retention if the new resolutions with a same name has +// different retention duration. +func updateExistingRollups(ctx context.Context, conn *pgx.Conn, existingRes, r Resolutions) error { + var batch pgx.Batch + for labelName, res := range r { + if oldRes, exists := existingRes[labelName]; exists && oldRes.Retention != res.Retention { + batch.Queue("UPDATE _prom_catalog.rollup SET retention = $1 WHERE name = $2", time.Duration(res.Retention), labelName) + } + } + if batch.Len() > 0 { + results := conn.SendBatch(ctx, &batch) + if err := results.Close(); err != nil { + return fmt.Errorf("error closing batch: %w", err) } } return nil } -// diff returns the elements of a that are not in b. -// -// We need this since we want to support a "strict" behaviour in downsampling. This basically means, to have the exact -// downsampling data in the DB based on what's mentioned in the dataset-config. -// -// See the comment for EnsureRollupWith for example. -func diff(a, b []DownsampleResolution) []DownsampleResolution { - var difference []DownsampleResolution - for i := range a { - found := false - for j := range b { - if a[i].Label == b[j].Label { - found = true - break - } +func createRollups(ctx context.Context, conn *pgx.Conn, existingRes, r Resolutions) error { + var batch pgx.Batch + for lName, res := range r { + _, exists := existingRes[lName] + if !exists && !res.Delete { + batch.Queue("CALL _prom_catalog.create_rollup($1, $2, $3)", lName, time.Duration(res.Resolution), time.Duration(res.Retention)) } - if !found { - difference = append(difference, a[i]) + } + if batch.Len() > 0 { + results := conn.SendBatch(ctx, &batch) + if err := results.Close(); err != nil { + return fmt.Errorf("error creating new rollups: %w", err) } } - return difference + return nil +} + +func deleteRollups(ctx context.Context, conn *pgx.Conn, existingRes, r Resolutions) error { + var batch pgx.Batch + for lName, res := range r { + _, exists := existingRes[lName] + if exists && res.Delete { + // Delete the rollup only if it exists in the DB. + batch.Queue("CALL _prom_catalog.delete_rollup($1)", lName) + } + } + if batch.Len() > 0 { + results := conn.SendBatch(ctx, &batch) + if err := results.Close(); err != nil { + return fmt.Errorf("error deleting new rollups: %w", err) + } + } + return nil } diff --git a/pkg/rollup/rollup_test.go b/pkg/rollup/rollup_test.go deleted file mode 100644 index e9822541c8..0000000000 --- a/pkg/rollup/rollup_test.go +++ /dev/null @@ -1,60 +0,0 @@ -// This file and its contents are licensed under the Apache License 2.0. -// Please see the included NOTICE for copyright information and -// LICENSE for a copy of the license. - -package rollup - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestDiff(t *testing.T) { - tcs := []struct { - name string - a, b, expected []DownsampleResolution - }{ - { - name: "some inclusive elements", - a: []DownsampleResolution{{Label: "a"}, {Label: "b"}, {Label: "c"}, {Label: "d"}}, - b: []DownsampleResolution{{Label: "c"}, {Label: "d"}, {Label: "e"}}, - expected: []DownsampleResolution{{Label: "a"}, {Label: "b"}}, - }, - { - name: "b superset of a", - a: []DownsampleResolution{{Label: "c"}, {Label: "d"}}, - b: []DownsampleResolution{{Label: "c"}, {Label: "d"}, {Label: "e"}}, - expected: []DownsampleResolution(nil), - }, - { - name: "a empty", - a: []DownsampleResolution{}, - b: []DownsampleResolution{{Label: "c"}, {Label: "d"}, {Label: "e"}}, - expected: []DownsampleResolution(nil), - }, - { - name: "all elements exclusive", - a: []DownsampleResolution{{Label: "a"}}, - b: []DownsampleResolution{{Label: "c"}, {Label: "d"}, {Label: "e"}}, - expected: []DownsampleResolution{{Label: "a"}}, - }, - { - name: "same", - a: []DownsampleResolution{{Label: "a"}, {Label: "b"}, {Label: "c"}, {Label: "d"}}, - b: []DownsampleResolution{{Label: "a"}, {Label: "b"}, {Label: "c"}, {Label: "d"}}, - expected: []DownsampleResolution(nil), - }, - { - name: "empty", - a: []DownsampleResolution{}, - b: []DownsampleResolution{}, - expected: []DownsampleResolution(nil), - }, - } - for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { - require.Equal(t, tc.expected, diff(tc.a, tc.b), tc.name) - }) - } -} diff --git a/pkg/runner/client.go b/pkg/runner/client.go index 63dbd49f4c..2c3a141f35 100644 --- a/pkg/runner/client.go +++ b/pkg/runner/client.go @@ -166,7 +166,7 @@ func CreateClient(r prometheus.Registerer, cfg *Config) (*pgclient.Client, error } if cfg.DatasetConfig != "" { - err = ApplyDatasetConfig(conn, cfg.DatasetConfig) + err = ApplyDatasetConfig(context.Background(), conn, cfg.DatasetConfig) if err != nil { return nil, fmt.Errorf("error applying dataset configuration: %w", err) } @@ -226,13 +226,13 @@ func isBGWLessThanDBs(conn *pgx.Conn) (bool, error) { return false, nil } -func ApplyDatasetConfig(conn *pgx.Conn, cfgFilename string) error { +func ApplyDatasetConfig(ctx context.Context, conn *pgx.Conn, cfgFilename string) error { cfg, err := dataset.NewConfig(cfgFilename) if err != nil { return err } - return cfg.Apply(conn) + return cfg.Apply(ctx, conn) } func compileAnchoredRegexString(s string) (*regexp.Regexp, error) { diff --git a/pkg/tests/end_to_end_tests/config_dataset_test.go b/pkg/tests/end_to_end_tests/config_dataset_test.go index f9b1f95541..12b186b5ca 100644 --- a/pkg/tests/end_to_end_tests/config_dataset_test.go +++ b/pkg/tests/end_to_end_tests/config_dataset_test.go @@ -40,7 +40,7 @@ func TestDatasetConfigApply(t *testing.T) { }, } - err = cfg.Apply(pgxConn) + err = cfg.Apply(context.Background(), pgxConn) require.NoError(t, err) require.Equal(t, 4*time.Hour, getMetricsDefaultChunkInterval(t, pgxConn)) @@ -54,7 +54,7 @@ func TestDatasetConfigApply(t *testing.T) { cfg, err = dataset.NewConfig("") require.NoError(t, err) - err = cfg.Apply(pgxConn) + err = cfg.Apply(context.Background(), pgxConn) require.NoError(t, err) require.Equal(t, 8*time.Hour, getMetricsDefaultChunkInterval(t, pgxConn)) diff --git a/pkg/tests/end_to_end_tests/rollup_test.go b/pkg/tests/end_to_end_tests/rollup_test.go index ce36dc51e9..7f38a9f32c 100644 --- a/pkg/tests/end_to_end_tests/rollup_test.go +++ b/pkg/tests/end_to_end_tests/rollup_test.go @@ -1,3 +1,7 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + package end_to_end_tests import ( @@ -13,11 +17,10 @@ import ( "github.com/timescale/promscale/pkg/rollup" ) -func TestRollupCreationDeletion(t *testing.T) { +func TestRollupSync(t *testing.T) { withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) { - rollupResolutions := []rollup.DownsampleResolution{ - { - Label: "short", + rollupResolutions := rollup.Resolutions{ + "short": { Resolution: day.Duration(5 * time.Minute), Retention: day.Duration(30 * 24 * time.Hour), }, @@ -27,30 +30,67 @@ func TestRollupCreationDeletion(t *testing.T) { require.NoError(t, err) defer pgCon.Release() - err = rollup.EnsureRollupWith(pgCon.Conn(), rollupResolutions) + // Test 1: Check if 'short' rollup is created. + err = rollup.Sync(context.Background(), pgCon.Conn(), rollupResolutions) require.NoError(t, err) - verifyRollupExistence(t, pgCon.Conn(), rollupResolutions[0].Label, time.Duration(rollupResolutions[0].Resolution), time.Duration(rollupResolutions[0].Retention), false) + verifyRollupExistence(t, pgCon.Conn(), "short", + time.Duration(rollupResolutions["short"].Resolution), time.Duration(rollupResolutions["short"].Retention), false) - rollupResolutions = append(rollupResolutions, rollup.DownsampleResolution{ - Label: "long", + rollupResolutions["long"] = rollup.Definition{ Resolution: day.Duration(time.Hour), Retention: day.Duration(395 * 24 * time.Hour), - }) + } - err = rollup.EnsureRollupWith(pgCon.Conn(), rollupResolutions) + // Test 2: Check if 'long' rollup is created. + err = rollup.Sync(context.Background(), pgCon.Conn(), rollupResolutions) require.NoError(t, err) - verifyRollupExistence(t, pgCon.Conn(), rollupResolutions[1].Label, time.Duration(rollupResolutions[1].Resolution), time.Duration(rollupResolutions[1].Retention), false) + verifyRollupExistence(t, pgCon.Conn(), "long", + time.Duration(rollupResolutions["long"].Resolution), time.Duration(rollupResolutions["long"].Retention), false) + + // Test 3: Update the resolution and check if error is returned. + rollupResolutions["short"] = rollup.Definition{ + Resolution: day.Duration(4 * time.Minute), + Retention: day.Duration(30 * 24 * time.Hour), + } + err = rollup.Sync(context.Background(), pgCon.Conn(), rollupResolutions) + require.Equal(t, + "error on existing resolution mismatch: existing rollup resolutions cannot be updated. Either keep the resolution of existing rollup labels same or remove them", + err.Error()) + // Reset back to original resolution. + rollupResolutions["short"] = rollup.Definition{ + Resolution: day.Duration(5 * time.Minute), + Retention: day.Duration(30 * 24 * time.Hour), + } - // Remove the first entry and see if the entry is removed or not. - newRes := rollupResolutions[1:] - err = rollup.EnsureRollupWith(pgCon.Conn(), newRes) + // Test 4: Remove the first entry and see if the entry is removed or not. + rollupResolutions["short"] = rollup.Definition{ + Resolution: day.Duration(5 * time.Minute), + Retention: day.Duration(30 * 24 * time.Hour), + Delete: true, + } + err = rollup.Sync(context.Background(), pgCon.Conn(), rollupResolutions) require.NoError(t, err) // Check if long exists. - verifyRollupExistence(t, pgCon.Conn(), rollupResolutions[1].Label, time.Duration(rollupResolutions[1].Resolution), time.Duration(rollupResolutions[1].Retention), false) + verifyRollupExistence(t, pgCon.Conn(), "long", + time.Duration(rollupResolutions["long"].Resolution), time.Duration(rollupResolutions["long"].Retention), false) // Check if short does not exist. - verifyRollupExistence(t, pgCon.Conn(), rollupResolutions[0].Label, time.Duration(rollupResolutions[0].Resolution), time.Duration(rollupResolutions[0].Retention), true) + verifyRollupExistence(t, pgCon.Conn(), "short", + time.Duration(rollupResolutions["short"].Resolution), time.Duration(rollupResolutions["short"].Retention), true) + + // Test 5: Update retention of long and check if the same is reflected in the DB. + rollupResolutions["long"] = rollup.Definition{ + Resolution: day.Duration(time.Hour), + Retention: day.Duration(500 * 24 * time.Hour), // Updated retention duration. + } + err = rollup.Sync(context.Background(), pgCon.Conn(), rollupResolutions) + require.NoError(t, err) + verifyRollupExistence(t, pgCon.Conn(), "long", + time.Duration(rollupResolutions["long"].Resolution), time.Duration(rollupResolutions["long"].Retention), false) + // Short should still not exists. + verifyRollupExistence(t, pgCon.Conn(), "short", + time.Duration(rollupResolutions["short"].Resolution), time.Duration(rollupResolutions["short"].Retention), true) }) }