From 9e121c1c3e11d3a538af9ffe90fa71e0b5e9a670 Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Mon, 9 Jan 2023 15:57:04 +0530 Subject: [PATCH] Support for apply_downsample_config() Signed-off-by: Harkishen-Singh --- pkg/dataset/config.go | 40 ++--- pkg/dataset/config_test.go | 36 ++--- pkg/downsample/downsample.go | 139 ++++-------------- pkg/internal/day/duration.go | 59 ++++---- pkg/internal/day/duration_test.go | 51 +++++++ pkg/pgmodel/querier/querier_sql_test.go | 3 +- pkg/pgmodel/querier/query_sample.go | 2 +- pkg/runner/flags_test.go | 10 +- pkg/tests/constants.go | 2 +- .../end_to_end_tests/config_dataset_test.go | 11 +- .../metric_downsampling_test.go | 36 ++--- 11 files changed, 179 insertions(+), 210 deletions(-) create mode 100644 pkg/internal/day/duration_test.go diff --git a/pkg/dataset/config.go b/pkg/dataset/config.go index 4ec1b4f054..354693f1ca 100644 --- a/pkg/dataset/config.go +++ b/pkg/dataset/config.go @@ -85,20 +85,20 @@ func (c *Config) Apply(ctx context.Context, conn *pgx.Conn) error { log.Info("msg", "Metric downsampling configurations synced", "configuration", fmt.Sprint(*c.Metrics.Downsampling)) } - log.Info("msg", fmt.Sprintf("Setting metric dataset default chunk interval to %s", c.Metrics.ChunkInterval.Duration())) + log.Info("msg", fmt.Sprintf("Setting metric dataset default chunk interval to %s", c.Metrics.ChunkInterval)) log.Info("msg", fmt.Sprintf("Setting metric dataset default compression to %t", *c.Metrics.Compression)) - log.Info("msg", fmt.Sprintf("Setting metric dataset default high availability lease refresh to %s", c.Metrics.HALeaseRefresh.Duration())) - log.Info("msg", fmt.Sprintf("Setting metric dataset default high availability lease timeout to %s", c.Metrics.HALeaseTimeout.Duration())) - log.Info("msg", fmt.Sprintf("Setting metric dataset default retention period to %s", c.Metrics.RetentionPeriod.Duration())) - log.Info("msg", fmt.Sprintf("Setting trace dataset default retention period to %s", c.Traces.RetentionPeriod.Duration())) + log.Info("msg", fmt.Sprintf("Setting metric dataset default high availability lease refresh to %s", c.Metrics.HALeaseRefresh)) + log.Info("msg", fmt.Sprintf("Setting metric dataset default high availability lease timeout to %s", c.Metrics.HALeaseTimeout)) + log.Info("msg", fmt.Sprintf("Setting metric dataset default retention period to %s", c.Metrics.RetentionPeriod)) + log.Info("msg", fmt.Sprintf("Setting trace dataset default retention period to %s", c.Traces.RetentionPeriod)) queries := map[string]interface{}{ - setDefaultMetricChunkIntervalSQL: c.Metrics.ChunkInterval.Duration(), + setDefaultMetricChunkIntervalSQL: time.Duration(c.Metrics.ChunkInterval), setDefaultMetricCompressionSQL: c.Metrics.Compression, - setDefaultMetricHAReleaseRefreshSQL: c.Metrics.HALeaseRefresh.Duration(), - setDefaultMetricHAReleaseTimeoutSQL: c.Metrics.HALeaseTimeout.Duration(), - setDefaultMetricRetentionPeriodSQL: c.Metrics.RetentionPeriod.Duration(), - setDefaultTraceRetentionPeriodSQL: c.Traces.RetentionPeriod.Duration(), + setDefaultMetricHAReleaseRefreshSQL: time.Duration(c.Metrics.HALeaseRefresh), + setDefaultMetricHAReleaseTimeoutSQL: time.Duration(c.Metrics.HALeaseTimeout), + setDefaultMetricRetentionPeriodSQL: time.Duration(c.Metrics.RetentionPeriod), + setDefaultTraceRetentionPeriodSQL: time.Duration(c.Traces.RetentionPeriod), } for sql, param := range queries { @@ -111,22 +111,22 @@ func (c *Config) Apply(ctx context.Context, conn *pgx.Conn) error { } func (c *Config) applyDefaults() { - if c.Metrics.ChunkInterval.Duration() <= 0 { - c.Metrics.ChunkInterval.SetDuration(defaultMetricChunkInterval) + if c.Metrics.ChunkInterval <= 0 { + c.Metrics.ChunkInterval = day.Duration(defaultMetricChunkInterval) } if c.Metrics.Compression == nil { c.Metrics.Compression = &defaultMetricCompressionVar } - if c.Metrics.HALeaseRefresh.Duration() <= 0 { - c.Metrics.HALeaseRefresh.SetDuration(defaultMetricHALeaseRefresh) + if c.Metrics.HALeaseRefresh <= 0 { + c.Metrics.HALeaseRefresh = day.Duration(defaultMetricHALeaseRefresh) } - if c.Metrics.HALeaseTimeout.Duration() <= 0 { - c.Metrics.HALeaseTimeout.SetDuration(defaultMetricHALeaseTimeout) + if c.Metrics.HALeaseTimeout <= 0 { + c.Metrics.HALeaseTimeout = day.Duration(defaultMetricHALeaseTimeout) } - if c.Metrics.RetentionPeriod.Duration() <= 0 { - c.Metrics.RetentionPeriod.SetDuration(defaultMetricRetentionPeriod) + if c.Metrics.RetentionPeriod <= 0 { + c.Metrics.RetentionPeriod = day.Duration(defaultMetricRetentionPeriod) } - if c.Traces.RetentionPeriod.Duration() <= 0 { - c.Traces.RetentionPeriod.SetDuration(defaultTraceRetentionPeriod) + if c.Traces.RetentionPeriod <= 0 { + c.Traces.RetentionPeriod = day.Duration(defaultTraceRetentionPeriod) } } diff --git a/pkg/dataset/config_test.go b/pkg/dataset/config_test.go index c32c853a60..9a0d0aefd5 100644 --- a/pkg/dataset/config_test.go +++ b/pkg/dataset/config_test.go @@ -49,7 +49,7 @@ func TestNewConfig(t *testing.T) { default_retention_period: 3d2h`, cfg: Config{ Metrics: Metrics{ - RetentionPeriod: dayDuration(((3*24)+2)*time.Hour, "3d2h"), + RetentionPeriod: day.Duration(((3 * 24) + 2) * time.Hour), }, }, }, @@ -65,14 +65,14 @@ traces: default_retention_period: 15d`, cfg: Config{ Metrics: Metrics{ - ChunkInterval: dayDuration(3*time.Hour, "3h"), + ChunkInterval: day.Duration(3 * time.Hour), Compression: &testCompressionSetting, - HALeaseRefresh: dayDuration(2*time.Minute, "2m"), - HALeaseTimeout: dayDuration(5*time.Second, "5s"), - RetentionPeriod: dayDuration(30*24*time.Hour, "30d"), + HALeaseRefresh: day.Duration(2 * time.Minute), + HALeaseTimeout: day.Duration(5 * time.Second), + RetentionPeriod: day.Duration(30 * 24 * time.Hour), }, Traces: Traces{ - RetentionPeriod: dayDuration(15*24*time.Hour, "15d"), + RetentionPeriod: day.Duration(15 * 24 * time.Hour), }, }, }, @@ -101,14 +101,14 @@ func TestApplyDefaults(t *testing.T) { t, Config{ Metrics: Metrics{ - ChunkInterval: dayDuration(defaultMetricChunkInterval, ""), + ChunkInterval: day.Duration(defaultMetricChunkInterval), Compression: &defaultMetricCompressionVar, - HALeaseRefresh: dayDuration(defaultMetricHALeaseRefresh, ""), - HALeaseTimeout: dayDuration(defaultMetricHALeaseTimeout, ""), - RetentionPeriod: dayDuration(defaultMetricRetentionPeriod, ""), + HALeaseRefresh: day.Duration(defaultMetricHALeaseRefresh), + HALeaseTimeout: day.Duration(defaultMetricHALeaseTimeout), + RetentionPeriod: day.Duration(defaultMetricRetentionPeriod), }, Traces: Traces{ - RetentionPeriod: dayDuration(defaultTraceRetentionPeriod, ""), + RetentionPeriod: day.Duration(defaultTraceRetentionPeriod), }, }, c, @@ -116,14 +116,14 @@ func TestApplyDefaults(t *testing.T) { untouched := Config{ Metrics: Metrics{ - ChunkInterval: dayDuration(3*time.Hour, "3h"), + ChunkInterval: day.Duration(3 * time.Hour), Compression: &testCompressionSetting, - HALeaseRefresh: dayDuration(2*time.Minute, "2m"), - HALeaseTimeout: dayDuration(5*time.Second, "5s"), - RetentionPeriod: dayDuration(30*24*time.Hour, "30d"), + HALeaseRefresh: day.Duration(2 * time.Minute), + HALeaseTimeout: day.Duration(5 * time.Second), + RetentionPeriod: day.Duration(30 * 24 * time.Hour), }, Traces: Traces{ - RetentionPeriod: dayDuration(15*24*time.Hour, "15d"), + RetentionPeriod: day.Duration(15 * 24 * time.Hour), }, } @@ -132,7 +132,3 @@ func TestApplyDefaults(t *testing.T) { require.Equal(t, untouched, copyConfig) } - -func dayDuration(d time.Duration, txt string) day.Duration { - return day.Duration{T: d, Txt: txt} -} diff --git a/pkg/downsample/downsample.go b/pkg/downsample/downsample.go index cdbbdb9dff..3ea640e72e 100644 --- a/pkg/downsample/downsample.go +++ b/pkg/downsample/downsample.go @@ -6,9 +6,8 @@ package downsample import ( "context" + "encoding/json" "fmt" - "time" - "github.com/jackc/pgx/v4" "github.com/timescale/promscale/pkg/internal/day" @@ -17,36 +16,36 @@ import ( ) const ( - setDownsamplingStateSQL = "SELECT prom_api.set_downsampling_state($1)" - createOrUpdateDownsamplingSQL = "CALL _prom_catalog.create_or_update_downsampling($1, $2, $3)" - updateDownsamplingStateForSQL = "SELECT _prom_catalog.update_downsampling_state($1, $2)" - downsamplePrefix = "ds_" // Stands of downsample_ - lockID = 55851985173278 // Choosen randomly + setGlobalDownsamplingStateSQL = "SELECT prom_api.set_global_downsampling_state($1)" + applyDownsampleConfigSQL = "SELECT _prom_catalog.apply_downsample_config($1::jsonb)" + downsamplePrefix = "ds_" // Stands for downsample_ + lockID = 55985173312278 // Choosen randomly ) type Config struct { - Interval day.Duration `yaml:"interval"` - Retention day.Duration `yaml:"retention"` - shouldRefresh bool + Interval day.Duration `yaml:"interval"` + Retention day.Duration `yaml:"retention"` } func (c Config) Name() string { - return downsamplePrefix + c.Interval.Text() + return downsamplePrefix + day.String(c.Interval) } func SetState(ctx context.Context, conn *pgx.Conn, state bool) error { - _, err := conn.Exec(ctx, setDownsamplingStateSQL, state) + _, err := conn.Exec(ctx, setGlobalDownsamplingStateSQL, state) if err != nil { return fmt.Errorf("error setting downsampling state: %w", err) } return nil } -// Sync updates the downsampling cfgs in the DB in accordance with the given new cfgs. It: -// 1. Creates of new downsampling cfgs that are not in the database -// 2. Updates retention duration of downsampling cfgs that are present in the database but with a different retention duration -// 3. Enables refreshing of downsampling cfgs in the database that are in the new cfgs but were previously disabled -// 4. Disables refreshing of downsampling cfgs in the database that were not found in the new cfgs +type cfgWithName struct { + Name string `json:"schema_name"` + Interval string `json:"ds_interval"` + Retention string `json:"retention"` +} + +// Sync the given downsampling configurations with the database. func Sync(ctx context.Context, conn *pgx.Conn, cfgs []Config) error { pgLock, err := util.NewPgAdvisoryLock(lockID, conn.Config().ConnString()) if err != nil { @@ -67,102 +66,20 @@ func Sync(ctx context.Context, conn *pgx.Conn, cfgs []Config) error { log.Error("msg", "error unlocking downsampling.Sync advisory_lock", "err", err.Error()) } }() - - newCfgs := make(map[string]Config) // These are the new downsampling cfgs that the user provided. Relation => schema_name: definition{} - for _, c := range cfgs { - newCfgs[c.Name()] = Config{Interval: c.Interval, Retention: c.Retention} - } - - existingCfgs, err := getExistingCfgs(ctx, conn) - if err != nil { - return fmt.Errorf("error fetching existing downsampling cfgs: %w", err) - } - - createOrUpdate := make(map[string]Config) - for newLabel, newCfg := range newCfgs { - if existingCfg, found := existingCfgs[newLabel]; found { - if !existingCfg.shouldRefresh || existingCfg.Retention.Duration() != newCfg.Retention.Duration() { - createOrUpdate[newLabel] = newCfg - } - if existingCfg.Interval.Duration() != newCfg.Interval.Duration() { - // This should never be the case since newlabel is schema specific. But we still check for safety purpose. - return fmt.Errorf("interval mismatch: existing interval %v, new interval %v", existingCfg.Interval.Duration(), newCfg.Interval.Duration()) - } - } else { - createOrUpdate[newLabel] = newCfg + var applyCfgs []cfgWithName + for i := range cfgs { + c := cfgs[i] + applyCfgs = append(applyCfgs, cfgWithName{Name: c.Name(), Interval: day.String(c.Interval), Retention: day.String(c.Retention)}) + } + if len(applyCfgs) > 0 { + str, err := json.Marshal(applyCfgs) + if err != nil { + return fmt.Errorf("error marshalling configs: %w", err) } - } - - if len(createOrUpdate) > 0 { - if err = createOrUpdateDownsampling(ctx, conn, createOrUpdate); err != nil { - return fmt.Errorf("error creating or updating given downsampling configs: %w", err) + fmt.Println("str", string(str)) + if _, err = conn.Exec(ctx, applyDownsampleConfigSQL, str); err != nil { + return fmt.Errorf("error applying downsample config: %w", err) } } - - if err = disable(ctx, conn, newCfgs, existingCfgs); err != nil { - return fmt.Errorf("error disabling downsampling configs: %w", err) - } - return nil -} - -func getExistingCfgs(ctx context.Context, conn *pgx.Conn) (map[string]Config, error) { - rows, err := conn.Query(ctx, "SELECT schema_name, resolution, retention, should_refresh FROM _prom_catalog.downsample") - if err != nil { - return nil, fmt.Errorf("querying existing resolutions: %w", err) - } - defer rows.Close() - - existingCfgs := make(map[string]Config) // These are the existing downsampling cfgs in the database. - for rows.Next() { - var ( - schemaName string - shouldRefresh bool - interval, retention time.Duration - ) - if err := rows.Scan(&schemaName, &interval, &retention, &shouldRefresh); err != nil { - return nil, fmt.Errorf("error scanning output rows for existing resolutions: %w", err) - } - existingCfgs[schemaName] = Config{Interval: day.Duration{T: interval}, Retention: day.Duration{T: retention}, shouldRefresh: shouldRefresh} - } - return existingCfgs, nil -} - -// createOrUpdateDownsampling does 3 things: -// 1. It creates new downsampling configurations that are given in 'cfgs' -// 2. It updates the retention of a downsampling configuration if it is present in the database with the same lName -// 3. It enables a downsampling if it was previously disabled -// Refer to _prom_catalog.create_or_update_downsampling($1, $2, $3) to learn more. -func createOrUpdateDownsampling(ctx context.Context, conn *pgx.Conn, cfgs map[string]Config) error { - var batch pgx.Batch - for lName, def := range cfgs { - batch.Queue(createOrUpdateDownsamplingSQL, lName, def.Interval.Duration(), def.Retention.Duration()) - } - results := conn.SendBatch(ctx, &batch) - if err := results.Close(); err != nil { - return fmt.Errorf("error closing batch: %w", err) - } - return nil -} - -// disable downsampling cfgs. -func disable(ctx context.Context, conn *pgx.Conn, newCfgs, existingCfgs map[string]Config) error { - disable := []string{} - for existingName := range existingCfgs { - if _, found := newCfgs[existingName]; !found { - disable = append(disable, existingName) - } - } - if len(disable) == 0 { - return nil - } - - var batch pgx.Batch - for _, n := range disable { - batch.Queue(updateDownsamplingStateForSQL, n, false) - } - results := conn.SendBatch(ctx, &batch) - if err := results.Close(); err != nil { - return fmt.Errorf("error closing batch: %w", err) - } return nil } diff --git a/pkg/internal/day/duration.go b/pkg/internal/day/duration.go index 6b7c6b18e1..0611d18939 100644 --- a/pkg/internal/day/duration.go +++ b/pkg/internal/day/duration.go @@ -20,13 +20,7 @@ const ( // Duration acts like a time.Duration with support for "d" unit // which is used for specifying number of days in duration. -// It stores the text of duration while parsing, which can be retrieved via Text(). -// This can be useful when we need to know the num of days user wanted, since -// this information is lost after parsing. -type Duration struct { - Txt string // Holds the original duration text. - T time.Duration -} +type Duration time.Duration // UnmarshalText unmarshals strings into DayDuration values while // handling the day unit. It leans heavily into time.ParseDuration. @@ -43,8 +37,7 @@ func (d *Duration) UnmarshalText(s []byte) error { return err } } - d.T = val - d.Txt = string(s) + *d = Duration(val) return nil } @@ -75,23 +68,8 @@ func handleDays(s []byte) (time.Duration, error) { } // String returns a string value of DayDuration. -func (d *Duration) String() string { - return d.T.String() -} - -// Text returns the original text received while parsing. -func (d *Duration) Text() string { - return d.Txt -} - -// Duration returns the parsed duration. -func (d *Duration) Duration() time.Duration { - return d.T -} - -// SetDuration returns the parsed duration. -func (d *Duration) SetDuration(t time.Duration) { - d.T = t +func (d Duration) String() string { + return time.Duration(d).String() } // StringToDayDurationHookFunc returns a mapstructure.DecodeHookFunc that @@ -118,3 +96,32 @@ func StringToDayDurationHookFunc() mapstructure.DecodeHookFunc { return d, nil } } + +// String returns the output in form of days:hours:mins:secs +func String(d Duration) string { + const day = int64(time.Hour * 24) + + remainder := int64(d) + days := remainder / day + remainder = remainder % day + hours := remainder / int64(time.Hour) + remainder = remainder % int64(time.Hour) + mins := remainder / int64(time.Minute) + remainder = remainder % int64(time.Minute) + secs := remainder / int64(time.Second) + + display := "" + if days != 0 { + display = fmt.Sprintf("%dd", days) + } + if hours != 0 { + display = fmt.Sprintf("%s%dh", display, hours) + } + if mins != 0 { + display = fmt.Sprintf("%s%dm", display, mins) + } + if secs != 0 { + display = fmt.Sprintf("%s%ds", display, secs) + } + return display +} diff --git a/pkg/internal/day/duration_test.go b/pkg/internal/day/duration_test.go new file mode 100644 index 0000000000..b307fc39c1 --- /dev/null +++ b/pkg/internal/day/duration_test.go @@ -0,0 +1,51 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package day + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestString(t *testing.T) { + tcs := []struct { + in string + out string + }{ + { + in: "5m", + out: "5m", + }, { + in: "2.5m", + out: "2m30s", + }, { + in: "1.1d1s", + out: "1d2h24m1s", + }, { + in: "24d", + out: "24d", + }, { + in: "1.5d", + out: "1d12h", + }, { + in: "4d1h5m", + out: "4d1h5m", + }, { + in: "1000h", + out: "41d16h", + }, { + in: "1000h1m", + out: "41d16h1m", + }, + } + for _, tc := range tcs { + t.Run(tc.in, func(t *testing.T) { + var d Duration + require.NoError(t, d.UnmarshalText([]byte(tc.in))) + require.Equal(t, tc.out, String(d)) + }) + } +} diff --git a/pkg/pgmodel/querier/querier_sql_test.go b/pkg/pgmodel/querier/querier_sql_test.go index ce43cddc1f..cfaa072bab 100644 --- a/pkg/pgmodel/querier/querier_sql_test.go +++ b/pkg/pgmodel/querier/querier_sql_test.go @@ -370,7 +370,7 @@ func TestPGXQuerierQuery(t *testing.T) { }, }, { - name: "Simple query, metric name matcher, custom column", + name: "Should error for simple query, metric name matcher, custom column but no schema name", query: &prompb.Query{ StartTimestampMs: 1000, EndTimestampMs: 2000, @@ -379,6 +379,7 @@ func TestPGXQuerierQuery(t *testing.T) { {Type: prompb.LabelMatcher_EQ, Name: model.MetricNameLabelName, Value: "bar"}, }, }, + err: fmt.Errorf("get evaluation metadata: build subQueries: '__schema__' label not found"), result: []*prompb.TimeSeries{ { Labels: []prompb.Label{ diff --git a/pkg/pgmodel/querier/query_sample.go b/pkg/pgmodel/querier/query_sample.go index 59d59f2ba7..9aff1d0fe3 100644 --- a/pkg/pgmodel/querier/query_sample.go +++ b/pkg/pgmodel/querier/query_sample.go @@ -126,7 +126,7 @@ func fetchMultipleMetricsSamples(ctx context.Context, tools *queryTools, metadat // NOTE: this needs to be updated once we add support for storing // non-view metrics into multiple schemas. This also applies to // fetching downsampling data too. - if metadata.timeFilter.schema != schema.PromData { + if metadata.timeFilter.schema != schema.PromData && metadata.timeFilter.schema != "" { return nil, fmt.Errorf("__schema__ not allowed when fetching multiple metrics in single PromQL expression") } diff --git a/pkg/runner/flags_test.go b/pkg/runner/flags_test.go index 19317ea070..f6eee4ba49 100644 --- a/pkg/runner/flags_test.go +++ b/pkg/runner/flags_test.go @@ -281,12 +281,12 @@ startup: c.ListenAddr = "localhost:9201" c.AuthConfig.BasicAuthUsername = "promscale" c.AuthConfig.BasicAuthPassword = "my-password" - c.DatasetCfg.Metrics.ChunkInterval = day.Duration{T: 24 * time.Hour, Txt: "1d"} + c.DatasetCfg.Metrics.ChunkInterval = day.Duration(24 * time.Hour) c.DatasetCfg.Metrics.Compression = func(b bool) *bool { return &b }(false) - c.DatasetCfg.Metrics.HALeaseRefresh = day.Duration{T: 24 * time.Hour * 2, Txt: "2d"} - c.DatasetCfg.Metrics.HALeaseTimeout = day.Duration{T: 24 * time.Hour * 3, Txt: "3d"} - c.DatasetCfg.Metrics.RetentionPeriod = day.Duration{T: 24 * time.Hour * 4, Txt: "4d"} - c.DatasetCfg.Traces.RetentionPeriod = day.Duration{T: 24 * time.Hour * 5, Txt: "5d"} + c.DatasetCfg.Metrics.HALeaseRefresh = day.Duration(24 * time.Hour * 2) + c.DatasetCfg.Metrics.HALeaseTimeout = day.Duration(24 * time.Hour * 3) + c.DatasetCfg.Metrics.RetentionPeriod = day.Duration(24 * time.Hour * 4) + c.DatasetCfg.Traces.RetentionPeriod = day.Duration(24 * time.Hour * 5) c.DatasetConfig = "metrics:\n default_chunk_interval: 1h\n" return c }, diff --git a/pkg/tests/constants.go b/pkg/tests/constants.go index b8e5e49819..2de9dfdc02 100644 --- a/pkg/tests/constants.go +++ b/pkg/tests/constants.go @@ -18,5 +18,5 @@ func init() { PromscaleExtensionVersion = strings.TrimSpace(string(content)) PromscaleExtensionContainer = "ghcr.io/timescale/dev_promscale_extension:" + PromscaleExtensionVersion + "-ts2-pg14" - //PromscaleExtensionContainer = "local/dev_promscale_extension:head-ts2-pg14" // This will be removed once the PR against master is made. + PromscaleExtensionContainer = "local/dev_promscale_extension:head-ts2-pg14" // This will be removed once the PR against master is made. } diff --git a/pkg/tests/end_to_end_tests/config_dataset_test.go b/pkg/tests/end_to_end_tests/config_dataset_test.go index 112659ec20..86473e57b3 100644 --- a/pkg/tests/end_to_end_tests/config_dataset_test.go +++ b/pkg/tests/end_to_end_tests/config_dataset_test.go @@ -9,6 +9,7 @@ import ( "github.com/jackc/pgx/v4/pgxpool" "github.com/stretchr/testify/require" "github.com/timescale/promscale/pkg/dataset" + "github.com/timescale/promscale/pkg/internal/day" ) func TestDatasetConfigApply(t *testing.T) { @@ -28,14 +29,14 @@ func TestDatasetConfigApply(t *testing.T) { cfg := dataset.Config{ Metrics: dataset.Metrics{ - ChunkInterval: dayDuration(4*time.Hour, "4h"), + ChunkInterval: day.Duration(4 * time.Hour), Compression: &disableCompression, - HALeaseRefresh: dayDuration(15*time.Second, "15s"), - HALeaseTimeout: dayDuration(2*time.Minute, "2m"), - RetentionPeriod: dayDuration(15*24*time.Hour, "15d"), + HALeaseRefresh: day.Duration(15 * time.Second), + HALeaseTimeout: day.Duration(2 * time.Minute), + RetentionPeriod: day.Duration(15 * 24 * time.Hour), }, Traces: dataset.Traces{ - RetentionPeriod: dayDuration(10*24*time.Hour, "10d"), + RetentionPeriod: day.Duration(10 * 24 * time.Hour), }, } diff --git a/pkg/tests/end_to_end_tests/metric_downsampling_test.go b/pkg/tests/end_to_end_tests/metric_downsampling_test.go index b5e69b4f45..46acd91145 100644 --- a/pkg/tests/end_to_end_tests/metric_downsampling_test.go +++ b/pkg/tests/end_to_end_tests/metric_downsampling_test.go @@ -20,7 +20,7 @@ import ( func TestMetricDownsampleSync(t *testing.T) { withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) { downsamplingCfgs := []downsample.Config{ - {Interval: dayDuration(time.Minute*5, "5m"), Retention: dayDuration(time.Hour*24*30, "30d")}, + {Interval: day.Duration(time.Minute * 5), Retention: day.Duration(time.Hour * 24 * 30)}, } pgCon, err := db.Acquire(context.Background()) @@ -35,15 +35,15 @@ func TestMetricDownsampleSync(t *testing.T) { require.NoError(t, err) verifyDownsamplingExistence(t, pc, "ds_5m", - downsamplingCfgs[0].Interval.Duration(), downsamplingCfgs[0].Retention.Duration(), false) + downsamplingCfgs[0].Interval, downsamplingCfgs[0].Retention, false) - downsamplingCfgs = append(downsamplingCfgs, downsample.Config{Interval: dayDuration(time.Hour, "1h"), Retention: dayDuration(time.Hour*24*365, "365d")}) + downsamplingCfgs = append(downsamplingCfgs, downsample.Config{Interval: day.Duration(time.Hour), Retention: day.Duration(time.Hour * 24 * 365)}) // Test 2: Check if 'ds_1h' downsampling is created. err = downsample.Sync(ctx, pc, downsamplingCfgs) require.NoError(t, err) verifyDownsamplingExistence(t, pc, "ds_1h", - downsamplingCfgs[1].Interval.Duration(), downsamplingCfgs[1].Retention.Duration(), false) + downsamplingCfgs[1].Interval, downsamplingCfgs[1].Retention, false) // Test 3: Remove the first entry and see if the entry is disabled or not. downsamplingCfgs = downsamplingCfgs[1:] @@ -51,50 +51,46 @@ func TestMetricDownsampleSync(t *testing.T) { require.NoError(t, err) // Check if ds_1h exists. verifyDownsamplingExistence(t, pc, "ds_1h", - downsamplingCfgs[0].Interval.Duration(), downsamplingCfgs[0].Retention.Duration(), false) + downsamplingCfgs[0].Interval, downsamplingCfgs[0].Retention, false) // Check if ds_5m is disabled. verifyDownsamplingExistence(t, pc, "ds_5m", - time.Minute*5, time.Hour*24*30, true) + day.Duration(time.Minute*5), day.Duration(time.Hour*24*30), true) // Test 4: Update retention of ds_1h and check if the same is reflected in the DB. - downsamplingCfgs[0].Retention = dayDuration(time.Hour*24*500, "500d") + downsamplingCfgs[0].Retention = day.Duration(time.Hour * 24 * 500) err = downsample.Sync(ctx, pc, downsamplingCfgs) require.NoError(t, err) verifyDownsamplingExistence(t, pc, "ds_1h", - downsamplingCfgs[0].Interval.Duration(), downsamplingCfgs[0].Retention.Duration(), false) + downsamplingCfgs[0].Interval, downsamplingCfgs[0].Retention, false) // ds_5m should still be disabled. verifyDownsamplingExistence(t, pc, "ds_5m", - time.Minute*5, time.Hour*24*30, true) + day.Duration(time.Minute*5), day.Duration(time.Hour*24*30), true) // Test 5: Enable the ds_5m downsampling that was already in the database. - downsamplingCfgs = append(downsamplingCfgs, downsample.Config{Interval: dayDuration(time.Minute*5, "5m"), Retention: dayDuration(time.Hour*24*30, "30d")}) + downsamplingCfgs = append(downsamplingCfgs, downsample.Config{Interval: day.Duration(time.Minute * 5), Retention: day.Duration(time.Hour * 24 * 30)}) err = downsample.Sync(ctx, pc, downsamplingCfgs) require.NoError(t, err) verifyDownsamplingExistence(t, pc, "ds_5m", - downsamplingCfgs[1].Interval.Duration(), downsamplingCfgs[1].Retention.Duration(), false) + downsamplingCfgs[1].Interval, downsamplingCfgs[1].Retention, false) // Test 6: Add a resolution similar to 5m, but with different unit. This should error. - downsamplingCfgs = append(downsamplingCfgs, downsample.Config{Interval: dayDuration(time.Second*300, "300s"), Retention: dayDuration(time.Hour*24*30, "30d")}) + downsamplingCfgs = append(downsamplingCfgs, downsample.Config{Interval: day.Duration(time.Second * 300), Retention: day.Duration(time.Hour * 24 * 30)}) err = downsample.Sync(ctx, pc, downsamplingCfgs) require.Error(t, err) }) } -func dayDuration(d time.Duration, text string) day.Duration { - return day.Duration{T: d, Txt: text} -} - -func verifyDownsamplingExistence(t testing.TB, pgCon *pgx.Conn, schemaName string, interval, retention time.Duration, shouldBeDisabled bool) { +func verifyDownsamplingExistence(t testing.TB, pgCon *pgx.Conn, schemaName string, interval, retention day.Duration, shouldBeDisabled bool) { var ( dSchemaName string dInterval time.Duration dRetention time.Duration dShouldRefresh bool ) - err := pgCon.QueryRow(context.Background(), "SELECT schema_name, resolution, retention, should_refresh FROM _prom_catalog.downsample WHERE schema_name = $1", schemaName).Scan(&dSchemaName, &dInterval, &dRetention, &dShouldRefresh) + err := pgCon.QueryRow(context.Background(), "SELECT schema_name, ds_interval, retention, should_refresh FROM _prom_catalog.downsample WHERE schema_name = $1", schemaName).Scan(&dSchemaName, &dInterval, &dRetention, &dShouldRefresh) require.NoError(t, err) require.Equal(t, schemaName, dSchemaName) - require.Equal(t, interval, dInterval) - require.Equal(t, retention, dRetention) + require.Equal(t, time.Duration(interval), dInterval) + require.Equal(t, time.Duration(retention), dRetention) require.Equal(t, shouldBeDisabled, !dShouldRefresh) }