diff --git a/EXTENSION_VERSION b/EXTENSION_VERSION index 1f7391f92b..318b82ecbb 100644 --- a/EXTENSION_VERSION +++ b/EXTENSION_VERSION @@ -1 +1 @@ -master +feature_metric_rollup \ No newline at end of file diff --git a/pkg/dataset/config.go b/pkg/dataset/config.go index f61c7f8b06..354693f1ca 100644 --- a/pkg/dataset/config.go +++ b/pkg/dataset/config.go @@ -2,6 +2,7 @@ // Please see the included NOTICE for copyright information and // LICENSE for a copy of the license. // +k8s:deepcopy-gen=package + package dataset import ( @@ -10,8 +11,11 @@ import ( "time" "github.com/jackc/pgx/v4" - "github.com/timescale/promscale/pkg/log" "gopkg.in/yaml.v2" + + "github.com/timescale/promscale/pkg/downsample" + "github.com/timescale/promscale/pkg/internal/day" + "github.com/timescale/promscale/pkg/log" ) const ( @@ -42,16 +46,17 @@ type Config struct { // Metrics contains dataset configuration options for metrics data. type Metrics struct { - ChunkInterval DayDuration `mapstructure:"default_chunk_interval" yaml:"default_chunk_interval"` - Compression *bool `mapstructure:"compress_data" yaml:"compress_data"` // Using pointer to check if the the value was set. - HALeaseRefresh DayDuration `mapstructure:"ha_lease_refresh" yaml:"ha_lease_refresh"` - HALeaseTimeout DayDuration `mapstructure:"ha_lease_timeout" yaml:"ha_lease_timeout"` - RetentionPeriod DayDuration `mapstructure:"default_retention_period" yaml:"default_retention_period"` + ChunkInterval day.Duration `mapstructure:"default_chunk_interval" yaml:"default_chunk_interval"` + Compression *bool `mapstructure:"compress_data" yaml:"compress_data"` // Using pointer to check if the the value was set. + HALeaseRefresh day.Duration `mapstructure:"ha_lease_refresh" yaml:"ha_lease_refresh"` + HALeaseTimeout day.Duration `mapstructure:"ha_lease_timeout" yaml:"ha_lease_timeout"` + RetentionPeriod day.Duration `mapstructure:"default_retention_period" yaml:"default_retention_period"` + Downsampling *[]downsample.Config `mapstructure:"downsampling" yaml:"downsampling,omitempty"` } // Traces contains dataset configuration options for traces data. type Traces struct { - RetentionPeriod DayDuration `mapstructure:"default_retention_period" yaml:"default_retention_period"` + RetentionPeriod day.Duration `mapstructure:"default_retention_period" yaml:"default_retention_period"` } // NewConfig creates a new dataset config based on the configuration YAML contents. @@ -61,9 +66,25 @@ func NewConfig(contents string) (cfg Config, err error) { } // Apply applies the configuration to the database via the supplied DB connection. -func (c *Config) Apply(conn *pgx.Conn) error { +func (c *Config) Apply(ctx context.Context, conn *pgx.Conn) error { c.applyDefaults() + if c.Metrics.Downsampling == nil { + if err := downsample.SetState(ctx, conn, false); err != nil { + return fmt.Errorf("error setting state for automatic-downsampling: %w", err) + } + log.Info("msg", "Metric downsampling is disabled") + } else { + if err := downsample.SetState(ctx, conn, true); err != nil { + return fmt.Errorf("error setting state for automatic-downsampling: %w", err) + } + log.Info("msg", "Metric downsampling is enabled") + if err := downsample.Sync(ctx, conn, *c.Metrics.Downsampling); err != nil { + return fmt.Errorf("error syncing downsampling configurations: %w", err) + } + log.Info("msg", "Metric downsampling configurations synced", "configuration", fmt.Sprint(*c.Metrics.Downsampling)) + } + log.Info("msg", fmt.Sprintf("Setting metric dataset default chunk interval to %s", c.Metrics.ChunkInterval)) log.Info("msg", fmt.Sprintf("Setting metric dataset default compression to %t", *c.Metrics.Compression)) log.Info("msg", fmt.Sprintf("Setting metric dataset default high availability lease refresh to %s", c.Metrics.HALeaseRefresh)) @@ -91,21 +112,21 @@ func (c *Config) Apply(conn *pgx.Conn) error { func (c *Config) applyDefaults() { if c.Metrics.ChunkInterval <= 0 { - c.Metrics.ChunkInterval = DayDuration(defaultMetricChunkInterval) + c.Metrics.ChunkInterval = day.Duration(defaultMetricChunkInterval) } if c.Metrics.Compression == nil { c.Metrics.Compression = &defaultMetricCompressionVar } if c.Metrics.HALeaseRefresh <= 0 { - c.Metrics.HALeaseRefresh = DayDuration(defaultMetricHALeaseRefresh) + c.Metrics.HALeaseRefresh = day.Duration(defaultMetricHALeaseRefresh) } if c.Metrics.HALeaseTimeout <= 0 { - c.Metrics.HALeaseTimeout = DayDuration(defaultMetricHALeaseTimeout) + c.Metrics.HALeaseTimeout = day.Duration(defaultMetricHALeaseTimeout) } if c.Metrics.RetentionPeriod <= 0 { - c.Metrics.RetentionPeriod = DayDuration(defaultMetricRetentionPeriod) + c.Metrics.RetentionPeriod = day.Duration(defaultMetricRetentionPeriod) } if c.Traces.RetentionPeriod <= 0 { - c.Traces.RetentionPeriod = DayDuration(defaultTraceRetentionPeriod) + c.Traces.RetentionPeriod = day.Duration(defaultTraceRetentionPeriod) } } diff --git a/pkg/dataset/config_test.go b/pkg/dataset/config_test.go index eafa102231..9a0d0aefd5 100644 --- a/pkg/dataset/config_test.go +++ b/pkg/dataset/config_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/stretchr/testify/require" + "github.com/timescale/promscale/pkg/internal/day" ) var testCompressionSetting = true @@ -48,7 +49,7 @@ func TestNewConfig(t *testing.T) { default_retention_period: 3d2h`, cfg: Config{ Metrics: Metrics{ - RetentionPeriod: DayDuration(((3 * 24) + 2) * time.Hour), + RetentionPeriod: day.Duration(((3 * 24) + 2) * time.Hour), }, }, }, @@ -64,14 +65,14 @@ traces: default_retention_period: 15d`, cfg: Config{ Metrics: Metrics{ - ChunkInterval: DayDuration(3 * time.Hour), + ChunkInterval: day.Duration(3 * time.Hour), Compression: &testCompressionSetting, - HALeaseRefresh: DayDuration(2 * time.Minute), - HALeaseTimeout: DayDuration(5 * time.Second), - RetentionPeriod: DayDuration(30 * 24 * time.Hour), + HALeaseRefresh: day.Duration(2 * time.Minute), + HALeaseTimeout: day.Duration(5 * time.Second), + RetentionPeriod: day.Duration(30 * 24 * time.Hour), }, Traces: Traces{ - RetentionPeriod: DayDuration(15 * 24 * time.Hour), + RetentionPeriod: day.Duration(15 * 24 * time.Hour), }, }, }, @@ -100,14 +101,14 @@ func TestApplyDefaults(t *testing.T) { t, Config{ Metrics: Metrics{ - ChunkInterval: DayDuration(defaultMetricChunkInterval), + ChunkInterval: day.Duration(defaultMetricChunkInterval), Compression: &defaultMetricCompressionVar, - HALeaseRefresh: DayDuration(defaultMetricHALeaseRefresh), - HALeaseTimeout: DayDuration(defaultMetricHALeaseTimeout), - RetentionPeriod: DayDuration(defaultMetricRetentionPeriod), + HALeaseRefresh: day.Duration(defaultMetricHALeaseRefresh), + HALeaseTimeout: day.Duration(defaultMetricHALeaseTimeout), + RetentionPeriod: day.Duration(defaultMetricRetentionPeriod), }, Traces: Traces{ - RetentionPeriod: DayDuration(defaultTraceRetentionPeriod), + RetentionPeriod: day.Duration(defaultTraceRetentionPeriod), }, }, c, @@ -115,14 +116,14 @@ func TestApplyDefaults(t *testing.T) { untouched := Config{ Metrics: Metrics{ - ChunkInterval: DayDuration(3 * time.Hour), + ChunkInterval: day.Duration(3 * time.Hour), Compression: &testCompressionSetting, - HALeaseRefresh: DayDuration(2 * time.Minute), - HALeaseTimeout: DayDuration(5 * time.Second), - RetentionPeriod: DayDuration(30 * 24 * time.Hour), + HALeaseRefresh: day.Duration(2 * time.Minute), + HALeaseTimeout: day.Duration(5 * time.Second), + RetentionPeriod: day.Duration(30 * 24 * time.Hour), }, Traces: Traces{ - RetentionPeriod: DayDuration(15 * 24 * time.Hour), + RetentionPeriod: day.Duration(15 * 24 * time.Hour), }, } diff --git a/pkg/dataset/deepcopy_generated.go b/pkg/dataset/deepcopy_generated.go index d018fcd394..c9041f6857 100644 --- a/pkg/dataset/deepcopy_generated.go +++ b/pkg/dataset/deepcopy_generated.go @@ -9,6 +9,10 @@ package dataset +import ( + downsample "github.com/timescale/promscale/pkg/downsample" +) + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Config) DeepCopyInto(out *Config) { *out = *in @@ -35,6 +39,15 @@ func (in *Metrics) DeepCopyInto(out *Metrics) { *out = new(bool) **out = **in } + if in.Downsampling != nil { + in, out := &in.Downsampling, &out.Downsampling + *out = new([]downsample.Config) + if **in != nil { + in, out := *in, *out + *out = make([]downsample.Config, len(*in)) + copy(*out, *in) + } + } return } diff --git a/pkg/downsample/downsample.go b/pkg/downsample/downsample.go new file mode 100644 index 0000000000..cf213e2063 --- /dev/null +++ b/pkg/downsample/downsample.go @@ -0,0 +1,91 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package downsample + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/jackc/pgx/v4" + "github.com/pkg/errors" + + "github.com/timescale/promscale/pkg/internal/day" + "github.com/timescale/promscale/pkg/util" +) + +const ( + setGlobalDownsamplingStateSQL = "SELECT prom_api.set_global_downsampling_state($1)" + applyDownsampleConfigSQL = "SELECT _prom_catalog.apply_downsample_config($1::jsonb)" + downsamplePrefix = "ds_" // Stands for downsample_ + lockID = 55985173312278 // Choosen randomly +) + +type Config struct { + Interval day.Duration `yaml:"interval"` + Retention day.Duration `yaml:"retention"` +} + +func (c Config) Name() string { + return downsamplePrefix + c.Interval.String() +} + +func SetState(ctx context.Context, conn *pgx.Conn, state bool) error { + _, err := conn.Exec(ctx, setGlobalDownsamplingStateSQL, state) + return errors.WithMessage(err, "error setting downsampling state") +} + +type cfgWithName struct { + Name string `json:"schema_name"` + Interval string `json:"ds_interval"` + Retention string `json:"retention"` +} + +// Sync the given downsampling configurations with the database. +func Sync(ctx context.Context, conn *pgx.Conn, cfgs []Config) error { + pgLock, err := util.NewPgAdvisoryLock(lockID, conn.Config().ConnString()) + if err != nil { + return fmt.Errorf("error getting lock for syncing downsampling config") + } + defer pgLock.Close() + + try := func() (bool, error) { + got, err := pgLock.GetAdvisoryLock() // To prevent failure when multiple Promscale start at the same time. + return got, errors.WithMessage(err, "error trying pg advisory_lock") + } + + got, err := try() + if err != nil { + return err + } + if !got { + // Wait for sometime and try again. If we still did not get the lock, throw an error. + time.Sleep(time.Second * 5) + got, err = try() + if err != nil { + return err + } + if !got { + return fmt.Errorf("timeout: unable to take the advisory lock for syncing downsampling state") + } + } + + var applyCfgs []cfgWithName + for i := range cfgs { + c := cfgs[i] + applyCfgs = append(applyCfgs, cfgWithName{Name: c.Name(), Interval: c.Interval.String(), Retention: c.Retention.String()}) + } + if len(applyCfgs) > 0 { + str, err := json.Marshal(applyCfgs) + if err != nil { + return fmt.Errorf("error marshalling configs: %w", err) + } + if _, err = conn.Exec(ctx, applyDownsampleConfigSQL, str); err != nil { + return fmt.Errorf("error applying downsample config: %w", err) + } + } + return nil +} diff --git a/pkg/dataset/duration.go b/pkg/internal/day/duration.go similarity index 69% rename from pkg/dataset/duration.go rename to pkg/internal/day/duration.go index 128e41e4fb..88d043da29 100644 --- a/pkg/dataset/duration.go +++ b/pkg/internal/day/duration.go @@ -1,7 +1,8 @@ // This file and its contents are licensed under the Apache License 2.0. // Please see the included NOTICE for copyright information and // LICENSE for a copy of the license. -package dataset + +package day import ( "fmt" @@ -15,15 +16,16 @@ import ( const ( dayUnit = 'd' unknownUnitDErrorPrefix = `time: unknown unit "d"` + day = int64(time.Hour * 24) ) -// DayDuration acts like a time.Duration with support for "d" unit +// Duration acts like a time.Duration with support for "d" unit // which is used for specifying number of days in duration. -type DayDuration time.Duration +type Duration time.Duration // UnmarshalText unmarshals strings into DayDuration values while // handling the day unit. It leans heavily into time.ParseDuration. -func (d *DayDuration) UnmarshalText(s []byte) error { +func (d *Duration) UnmarshalText(s []byte) error { val, err := time.ParseDuration(string(s)) if err != nil { // Check for specific error indicating we are using days unit. @@ -36,7 +38,7 @@ func (d *DayDuration) UnmarshalText(s []byte) error { return err } } - *d = DayDuration(val) + *d = Duration(val) return nil } @@ -67,8 +69,30 @@ func handleDays(s []byte) (time.Duration, error) { } // String returns a string value of DayDuration. -func (d DayDuration) String() string { - return time.Duration(d).String() +func (d Duration) String() string { + remainder := int64(d) + days := remainder / day + remainder = remainder % day + hours := remainder / int64(time.Hour) + remainder = remainder % int64(time.Hour) + mins := remainder / int64(time.Minute) + remainder = remainder % int64(time.Minute) + secs := remainder / int64(time.Second) + + display := "" + if days != 0 { + display = fmt.Sprintf("%dd", days) + } + if hours != 0 { + display = fmt.Sprintf("%s%dh", display, hours) + } + if mins != 0 { + display = fmt.Sprintf("%s%dm", display, mins) + } + if secs != 0 { + display = fmt.Sprintf("%s%ds", display, secs) + } + return display } // StringToDayDurationHookFunc returns a mapstructure.DecodeHookFunc that @@ -82,7 +106,7 @@ func StringToDayDurationHookFunc() mapstructure.DecodeHookFunc { return data, nil } - var d DayDuration + var d Duration if t != reflect.TypeOf(d) { return data, nil @@ -92,6 +116,6 @@ func StringToDayDurationHookFunc() mapstructure.DecodeHookFunc { if err != nil { return nil, err } - return DayDuration(d), nil + return d, nil } } diff --git a/pkg/internal/day/duration_test.go b/pkg/internal/day/duration_test.go new file mode 100644 index 0000000000..c19b95602b --- /dev/null +++ b/pkg/internal/day/duration_test.go @@ -0,0 +1,51 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package day + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestString(t *testing.T) { + tcs := []struct { + in string + out string + }{ + { + in: "5m", + out: "5m", + }, { + in: "2.5m", + out: "2m30s", + }, { + in: "1.1d1s", + out: "1d2h24m1s", + }, { + in: "24d", + out: "24d", + }, { + in: "1.5d", + out: "1d12h", + }, { + in: "4d1h5m", + out: "4d1h5m", + }, { + in: "1000h", + out: "41d16h", + }, { + in: "1000h1m", + out: "41d16h1m", + }, + } + for _, tc := range tcs { + t.Run(tc.in, func(t *testing.T) { + var d Duration + require.NoError(t, d.UnmarshalText([]byte(tc.in))) + require.Equal(t, tc.out, d.String()) + }) + } +} diff --git a/pkg/pgmodel/metrics/database/database.go b/pkg/pgmodel/metrics/database/database.go index 9241a32294..8e877c2837 100644 --- a/pkg/pgmodel/metrics/database/database.go +++ b/pkg/pgmodel/metrics/database/database.go @@ -19,10 +19,11 @@ type Engine interface { } type metricsEngineImpl struct { - conn pgxconn.PgxConn - ctx context.Context - isRunning atomic.Value - metrics []metricQueryWrap + conn pgxconn.PgxConn + ctx context.Context + isRunning atomic.Value + metrics []metricQueryWrap + metricSeries []metricsWithSeries } // NewEngine creates an engine that performs database metrics evaluation every evalInterval. @@ -33,9 +34,10 @@ type metricsEngineImpl struct { // will cause evaluation errors. func NewEngine(ctx context.Context, conn pgxconn.PgxConn) *metricsEngineImpl { engine := &metricsEngineImpl{ - conn: conn, - ctx: ctx, - metrics: metrics, + conn: conn, + ctx: ctx, + metrics: metrics, + metricSeries: metricSeries, } engine.isRunning.Store(false) return engine @@ -142,6 +144,13 @@ func (e *metricsEngineImpl) Update() error { return err } handleResults(results, batchMetrics) + + for _, ms := range e.metricSeries { + if err = ms.update(e.conn); err != nil { + log.Warn("msg", "error evaluating metrics with series", "err", err.Error()) + continue // We shouldn't stop everything if this fails. + } + } return results.Close() } diff --git a/pkg/pgmodel/metrics/database/metric_series.go b/pkg/pgmodel/metrics/database/metric_series.go new file mode 100644 index 0000000000..bc23b5e812 --- /dev/null +++ b/pkg/pgmodel/metrics/database/metric_series.go @@ -0,0 +1,69 @@ +package database + +import ( + "context" + "fmt" + "time" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/timescale/promscale/pkg/internal/day" + "github.com/timescale/promscale/pkg/pgxconn" + "github.com/timescale/promscale/pkg/util" +) + +var ( + caggsRefreshTotal = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "sql_database", + Name: "caggs_refresh_total", + Help: "Total number of caggs policy executed.", + }, []string{"refresh_interval"}) + caggsRefreshSuccess = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "sql_database", + Name: "caggs_refresh_success", + Help: "Total number of caggs policy executed successfully.", + }, []string{"refresh_interval"}) +) + +func init() { + prometheus.MustRegister(caggsRefreshSuccess, caggsRefreshTotal) +} + +type metricsWithSeries struct { + update func(conn pgxconn.PgxConn) error +} + +var metricSeries = []metricsWithSeries{ + { + update: func(conn pgxconn.PgxConn) error { + rows, err := conn.Query(context.Background(), ` +SELECT + total_successes, + total_runs, + (config ->> 'refresh_interval')::INTERVAL +FROM timescaledb_information.jobs j +INNER JOIN timescaledb_information.job_stats js ON ( j.job_id = js.job_id AND j.proc_name = 'execute_caggs_refresh_policy') + `) + if err != nil { + return fmt.Errorf("error running instrumentation for execute_caggs_refresh_policy: %w", err) + } + defer rows.Close() + for rows.Next() { + var ( + success, total int64 + refreshInterval time.Duration + ) + err = rows.Scan(&success, &total, &refreshInterval) + if err != nil { + return fmt.Errorf("error scanning values for execute_caggs_refresh_policy: %w", err) + } + tmp := day.Duration(refreshInterval) // This allows label values to have 25h -> 1d1h, which is easier to understand and matches more to the user's original input. + caggsRefreshSuccess.With(prometheus.Labels{"refresh_interval": tmp.String()}).Set(float64(success)) + caggsRefreshTotal.With(prometheus.Labels{"refresh_interval": tmp.String()}).Set(float64(total)) + } + return nil + }, + }, +} diff --git a/pkg/pgmodel/metrics/database/metrics.go b/pkg/pgmodel/metrics/database/metrics.go index d02728776e..28a66556b7 100644 --- a/pkg/pgmodel/metrics/database/metrics.go +++ b/pkg/pgmodel/metrics/database/metrics.go @@ -586,6 +586,48 @@ var metrics = []metricQueryWrap{ }, ), query: `select count(*)::bigint from _prom_catalog.metric`, + }, { + metrics: gauges( + prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "sql_database", + Name: "caggs_compression_policy_total", + Help: "Total number of caggs compression policy executed.", + }, + prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "sql_database", + Name: "caggs_compression_policy_success", + Help: "Total number of caggs compression policy executed successfully.", + }, + ), + query: ` +SELECT + total_runs, + total_successes +FROM timescaledb_information.jobs j + INNER JOIN timescaledb_information.job_stats js ON ( j.job_id = js.job_id AND j.proc_name = 'execute_caggs_compression_policy')`, + }, { + metrics: gauges( + prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "sql_database", + Name: "caggs_retention_policy_total", + Help: "Total number of caggs retention policy executed.", + }, + prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "sql_database", + Name: "caggs_retention_policy_success", + Help: "Total number of caggs retention policy executed successfully.", + }, + ), + query: ` +SELECT + total_runs, + total_successes +FROM timescaledb_information.jobs j + INNER JOIN timescaledb_information.job_stats js ON ( j.job_id = js.job_id AND j.proc_name = 'execute_caggs_retention_policy')`, }, { metrics: gauges( diff --git a/pkg/pgmodel/querier/clauses.go b/pkg/pgmodel/querier/clauses.go index 02fe6cdb0b..9470b0ae8c 100644 --- a/pkg/pgmodel/querier/clauses.go +++ b/pkg/pgmodel/querier/clauses.go @@ -33,12 +33,13 @@ func setParameterNumbers(clause string, existingArgs []interface{}, newArgs ...i } type clauseBuilder struct { - schemaName string - metricName string - columnName string - contradiction bool - clauses []string - args []interface{} + schemaName string + metricName string + columnName string + contradiction bool + downsamplingView bool + clauses []string + args []interface{} } func (c *clauseBuilder) SetMetricName(name string) { @@ -92,6 +93,18 @@ func (c *clauseBuilder) GetColumnName() string { return c.columnName } +// UseDefaultDownsamplingView is set to true when the user applies __schema__ only (and not __column__). In this, we +// query from q_ views since it contains the 'value' column that the connector's SQL query needs. +// Raw downsampled data does not contain a 'value' column, hence we create these default downsampling views in the database +// for querying. +func (c *clauseBuilder) UseDefaultDownsamplingView(b bool) { + c.downsamplingView = b +} + +func (c *clauseBuilder) DefaultDownsamplingView() bool { + return c.downsamplingView +} + func (c *clauseBuilder) addClause(clause string, args ...interface{}) error { if len(args) > 0 { switch args[0] { diff --git a/pkg/pgmodel/querier/metadata.go b/pkg/pgmodel/querier/metadata.go index e98b1daa3a..a63faa3e88 100644 --- a/pkg/pgmodel/querier/metadata.go +++ b/pkg/pgmodel/querier/metadata.go @@ -24,12 +24,13 @@ func GetPromQLMetadata(matchers []*labels.Matcher, hints *storage.SelectHints, q } type timeFilter struct { - metric string - schema string - column string - seriesTable string - start string - end string + metric string + schema string + column string + seriesTable string + start string + end string + useDownsamplingViews bool } type evalMetadata struct { @@ -60,11 +61,12 @@ func getEvaluationMetadata(tools *queryTools, start, end int64, promMetadata *pr metric := builder.GetMetricName() timeFilter := timeFilter{ - metric: metric, - schema: builder.GetSchemaName(), - column: builder.GetColumnName(), - start: toRFC3339Nano(start), - end: toRFC3339Nano(end), + metric: metric, + schema: builder.GetSchemaName(), + column: builder.GetColumnName(), + start: toRFC3339Nano(start), + end: toRFC3339Nano(end), + useDownsamplingViews: builder.DefaultDownsamplingView(), } // If all metric matchers match on a single metric (common case), diff --git a/pkg/pgmodel/querier/querier_sql_test.go b/pkg/pgmodel/querier/querier_sql_test.go index ce43cddc1f..cfaa072bab 100644 --- a/pkg/pgmodel/querier/querier_sql_test.go +++ b/pkg/pgmodel/querier/querier_sql_test.go @@ -370,7 +370,7 @@ func TestPGXQuerierQuery(t *testing.T) { }, }, { - name: "Simple query, metric name matcher, custom column", + name: "Should error for simple query, metric name matcher, custom column but no schema name", query: &prompb.Query{ StartTimestampMs: 1000, EndTimestampMs: 2000, @@ -379,6 +379,7 @@ func TestPGXQuerierQuery(t *testing.T) { {Type: prompb.LabelMatcher_EQ, Name: model.MetricNameLabelName, Value: "bar"}, }, }, + err: fmt.Errorf("get evaluation metadata: build subQueries: '__schema__' label not found"), result: []*prompb.TimeSeries{ { Labels: []prompb.Label{ diff --git a/pkg/pgmodel/querier/query_builder.go b/pkg/pgmodel/querier/query_builder.go index cedf1f9376..cf163f6d4c 100644 --- a/pkg/pgmodel/querier/query_builder.go +++ b/pkg/pgmodel/querier/query_builder.go @@ -55,6 +55,7 @@ var ( func BuildSubQueries(matchers []*labels.Matcher) (*clauseBuilder, error) { var err error cb := &clauseBuilder{} + var hasColumnLabel, hasSchemaLabel bool for _, m := range matchers { // From the PromQL docs: "Label matchers that match @@ -68,8 +69,10 @@ func BuildSubQueries(matchers []*labels.Matcher) (*clauseBuilder, error) { case pgmodel.MetricNameLabelName: cb.SetMetricName(m.Value) case pgmodel.SchemaNameLabelName: + hasSchemaLabel = true cb.SetSchemaName(m.Value) case pgmodel.ColumnNameLabelName: + hasColumnLabel = true cb.SetColumnName(m.Value) default: sq := subQueryEQ @@ -118,6 +121,15 @@ func BuildSubQueries(matchers []*labels.Matcher) (*clauseBuilder, error) { return nil, err } } + if hasSchemaLabel && !hasColumnLabel { + // There is no __column__ label in the given PromQL query. Hence, we need to use a default column to respond. + // We create 'q_' views in the database in the downsampling schema. The aim of these views is to provide 'value' column + // that the connector needs while querying. + cb.UseDefaultDownsamplingView(true) + } else if hasColumnLabel && !hasSchemaLabel { + // We cannot decide which schema to query from if the __schema__ is not provided. Hence, we should error here. + return nil, fmt.Errorf("'__schema__' label not found") + } return cb, err } @@ -247,13 +259,16 @@ type aggregators struct { // getAggregators returns the aggregator which should be used to fetch data for // a single metric. It may apply pushdowns to functions. -func getAggregators(metadata *promqlMetadata) (*aggregators, parser.Node) { - - agg, node, err := tryPushDown(metadata) - if err != nil { - log.Info("msg", "error while trying to push down, will skip pushdown optimization", "error", err) - } else if agg != nil { - return agg, node +func getAggregators(metadata *evalMetadata) (*aggregators, parser.Node) { + if !metadata.timeFilter.useDownsamplingViews { + // Pushdown functions do not behave properly with downsampling views. + // Hence, try a pushdown only if we do not aim to use a downsampling views. + agg, node, err := tryPushDown(metadata.promqlMetadata) + if err != nil { + log.Info("msg", "error while trying to push down, will skip pushdown optimization", "error", err) + } else if agg != nil { + return agg, node + } } defaultAggregators := &aggregators{ diff --git a/pkg/pgmodel/querier/query_builder_samples.go b/pkg/pgmodel/querier/query_builder_samples.go index 8250cd78d6..e11f811cd2 100644 --- a/pkg/pgmodel/querier/query_builder_samples.go +++ b/pkg/pgmodel/querier/query_builder_samples.go @@ -127,7 +127,7 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{ // When pushdowns are available, the is a pushdown // function which the promscale extension provides. - qf, node := getAggregators(metadata.promqlMetadata) + qf, node := getAggregators(metadata) var selectors, selectorClauses []string values := metadata.values @@ -213,6 +213,10 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{ start, end = metadata.timeFilter.start, metadata.timeFilter.end } + if filter.useDownsamplingViews { + filter.metric = "q_" + filter.metric + } + finalSQL := fmt.Sprintf(template, pgx.Identifier{filter.schema, filter.metric}.Sanitize(), pgx.Identifier{schema.PromDataSeries, filter.seriesTable}.Sanitize(), diff --git a/pkg/pgmodel/querier/query_sample.go b/pkg/pgmodel/querier/query_sample.go index afd4ce0bf0..9aff1d0fe3 100644 --- a/pkg/pgmodel/querier/query_sample.go +++ b/pkg/pgmodel/querier/query_sample.go @@ -122,6 +122,14 @@ func fetchMultipleMetricsSamples(ctx context.Context, tools *queryTools, metadat return nil, err } + // We only support default data schema for multi-metric queries + // NOTE: this needs to be updated once we add support for storing + // non-view metrics into multiple schemas. This also applies to + // fetching downsampling data too. + if metadata.timeFilter.schema != schema.PromData && metadata.timeFilter.schema != "" { + return nil, fmt.Errorf("__schema__ not allowed when fetching multiple metrics in single PromQL expression") + } + // TODO this assume on average on row per-metric. Is this right? results := make([]sampleRow, 0, len(metrics)) numQueries := 0 @@ -139,13 +147,6 @@ func fetchMultipleMetricsSamples(ctx context.Context, tools *queryTools, metadat return nil, err } - // We only support default data schema for multi-metric queries - // NOTE: this needs to be updated once we add support for storing - // non-view metrics into multiple schemas - if metricInfo.TableSchema != schema.PromData { - return nil, fmt.Errorf("found unsupported metric schema in multi-metric matching query") - } - filter := timeFilter{ metric: metricInfo.TableName, schema: metricInfo.TableSchema, diff --git a/pkg/runner/client.go b/pkg/runner/client.go index 3e43fc5d66..c265cc0caa 100644 --- a/pkg/runner/client.go +++ b/pkg/runner/client.go @@ -165,13 +165,13 @@ func CreateClient(r prometheus.Registerer, cfg *Config) (*pgclient.Client, error cfg.APICfg.MultiTenancy = multiTenancy } - if (cfg.DatasetCfg != dataset.Config{}) { + if cfg.DatasetCfg != (dataset.Config{}) { if cfg.DatasetConfig != "" { log.Warn("msg", "Ignoring `startup.dataset.config` in favor of the newer `startup.dataset` config option since both were set.") } - err = cfg.DatasetCfg.Apply(conn) + err = cfg.DatasetCfg.Apply(context.Background(), conn) } else if cfg.DatasetConfig != "" { - err = applyDatasetConfig(conn, cfg.DatasetConfig) + err = applyDatasetConfig(context.Background(), conn, cfg.DatasetConfig) } if err != nil { return nil, fmt.Errorf("error applying dataset configuration: %w", err) @@ -231,13 +231,13 @@ func isBGWLessThanDBs(conn *pgx.Conn) (bool, error) { return false, nil } -func applyDatasetConfig(conn *pgx.Conn, cfgFilename string) error { +func applyDatasetConfig(ctx context.Context, conn *pgx.Conn, cfgFilename string) error { cfg, err := dataset.NewConfig(cfgFilename) if err != nil { return err } - return cfg.Apply(conn) + return cfg.Apply(ctx, conn) } func compileAnchoredRegexString(s string) (*regexp.Regexp, error) { diff --git a/pkg/runner/config_parser.go b/pkg/runner/config_parser.go index 6e94ef4fbe..65e894fd28 100644 --- a/pkg/runner/config_parser.go +++ b/pkg/runner/config_parser.go @@ -12,7 +12,7 @@ import ( "github.com/mitchellh/mapstructure" "github.com/spf13/viper" - "github.com/timescale/promscale/pkg/dataset" + "github.com/timescale/promscale/pkg/internal/day" ) // unmarshalRule defines that the subtree located on the `key` of the Viper @@ -260,7 +260,7 @@ func applyUnmarshalRules(v *viper.Viper, unmarshalRules []unmarshalRule) error { rule.target, viper.DecodeHook( mapstructure.ComposeDecodeHookFunc( - dataset.StringToDayDurationHookFunc(), + day.StringToDayDurationHookFunc(), mapstructure.StringToTimeDurationHookFunc(), mapstructure.StringToSliceHookFunc(","), ), diff --git a/pkg/runner/flags_test.go b/pkg/runner/flags_test.go index 74d484bf99..f6eee4ba49 100644 --- a/pkg/runner/flags_test.go +++ b/pkg/runner/flags_test.go @@ -13,7 +13,7 @@ import ( "time" "github.com/stretchr/testify/require" - "github.com/timescale/promscale/pkg/dataset" + "github.com/timescale/promscale/pkg/internal/day" ) func TestParseFlags(t *testing.T) { @@ -281,12 +281,12 @@ startup: c.ListenAddr = "localhost:9201" c.AuthConfig.BasicAuthUsername = "promscale" c.AuthConfig.BasicAuthPassword = "my-password" - c.DatasetCfg.Metrics.ChunkInterval = dataset.DayDuration(24 * time.Hour) + c.DatasetCfg.Metrics.ChunkInterval = day.Duration(24 * time.Hour) c.DatasetCfg.Metrics.Compression = func(b bool) *bool { return &b }(false) - c.DatasetCfg.Metrics.HALeaseRefresh = dataset.DayDuration(24 * time.Hour * 2) - c.DatasetCfg.Metrics.HALeaseTimeout = dataset.DayDuration(24 * time.Hour * 3) - c.DatasetCfg.Metrics.RetentionPeriod = dataset.DayDuration(24 * time.Hour * 4) - c.DatasetCfg.Traces.RetentionPeriod = dataset.DayDuration(24 * time.Hour * 5) + c.DatasetCfg.Metrics.HALeaseRefresh = day.Duration(24 * time.Hour * 2) + c.DatasetCfg.Metrics.HALeaseTimeout = day.Duration(24 * time.Hour * 3) + c.DatasetCfg.Metrics.RetentionPeriod = day.Duration(24 * time.Hour * 4) + c.DatasetCfg.Traces.RetentionPeriod = day.Duration(24 * time.Hour * 5) c.DatasetConfig = "metrics:\n default_chunk_interval: 1h\n" return c }, diff --git a/pkg/tests/constants.go b/pkg/tests/constants.go index e88e8aa959..b8e5e49819 100644 --- a/pkg/tests/constants.go +++ b/pkg/tests/constants.go @@ -18,4 +18,5 @@ func init() { PromscaleExtensionVersion = strings.TrimSpace(string(content)) PromscaleExtensionContainer = "ghcr.io/timescale/dev_promscale_extension:" + PromscaleExtensionVersion + "-ts2-pg14" + //PromscaleExtensionContainer = "local/dev_promscale_extension:head-ts2-pg14" // This will be removed once the PR against master is made. } diff --git a/pkg/tests/end_to_end_tests/config_dataset_test.go b/pkg/tests/end_to_end_tests/config_dataset_test.go index 02592ef79c..86473e57b3 100644 --- a/pkg/tests/end_to_end_tests/config_dataset_test.go +++ b/pkg/tests/end_to_end_tests/config_dataset_test.go @@ -9,6 +9,7 @@ import ( "github.com/jackc/pgx/v4/pgxpool" "github.com/stretchr/testify/require" "github.com/timescale/promscale/pkg/dataset" + "github.com/timescale/promscale/pkg/internal/day" ) func TestDatasetConfigApply(t *testing.T) { @@ -28,18 +29,18 @@ func TestDatasetConfigApply(t *testing.T) { cfg := dataset.Config{ Metrics: dataset.Metrics{ - ChunkInterval: dataset.DayDuration(4 * time.Hour), + ChunkInterval: day.Duration(4 * time.Hour), Compression: &disableCompression, - HALeaseRefresh: dataset.DayDuration(15 * time.Second), - HALeaseTimeout: dataset.DayDuration(2 * time.Minute), - RetentionPeriod: dataset.DayDuration(15 * 24 * time.Hour), + HALeaseRefresh: day.Duration(15 * time.Second), + HALeaseTimeout: day.Duration(2 * time.Minute), + RetentionPeriod: day.Duration(15 * 24 * time.Hour), }, Traces: dataset.Traces{ - RetentionPeriod: dataset.DayDuration(10 * 24 * time.Hour), + RetentionPeriod: day.Duration(10 * 24 * time.Hour), }, } - err = cfg.Apply(pgxConn) + err = cfg.Apply(context.Background(), pgxConn) require.NoError(t, err) require.Equal(t, 4*time.Hour, getMetricsDefaultChunkInterval(t, pgxConn)) @@ -52,7 +53,7 @@ func TestDatasetConfigApply(t *testing.T) { // Set to default if chunk interval is not specified. cfg = dataset.Config{} - err = cfg.Apply(pgxConn) + err = cfg.Apply(context.Background(), pgxConn) require.NoError(t, err) require.Equal(t, 8*time.Hour, getMetricsDefaultChunkInterval(t, pgxConn)) diff --git a/pkg/tests/end_to_end_tests/continuous_agg_test.go b/pkg/tests/end_to_end_tests/continuous_agg_test.go index 850140e590..e82204f69c 100644 --- a/pkg/tests/end_to_end_tests/continuous_agg_test.go +++ b/pkg/tests/end_to_end_tests/continuous_agg_test.go @@ -43,7 +43,7 @@ func TestContinuousAggDownsampling(t *testing.T) { }{ { name: "Query non-existant column, empty result", - query: `cagg{__column__="nonexistant"}`, + query: `cagg{__schema__="cagg_schema",__column__="nonexistant"}`, startMs: startTime, endMs: endTime, stepMs: 360 * 1000, @@ -78,7 +78,7 @@ func TestContinuousAggDownsampling(t *testing.T) { }, { name: "Query max column", - query: `cagg{__column__="max",instance="1"}`, + query: `cagg{__schema__="cagg_schema",__column__="max",instance="1"}`, startMs: startTime, endMs: startTime + 4*3600*1000 - 1, // -1ms to exclude fifth value stepMs: 3600 * 1000, @@ -104,7 +104,7 @@ func TestContinuousAggDownsampling(t *testing.T) { }, { name: "Query min column", - query: `cagg{__column__="min",instance="1"}`, + query: `cagg{__schema__="cagg_schema",__column__="min",instance="1"}`, startMs: startTime, endMs: startTime + 4*3600*1000 - 1, // -1ms to exclude fifth value stepMs: 3600 * 1000, @@ -130,7 +130,7 @@ func TestContinuousAggDownsampling(t *testing.T) { }, { name: "Query avg column", - query: `cagg{__column__="avg",instance="1"}`, + query: `cagg{__schema__="cagg_schema",__column__="avg",instance="1"}`, startMs: startTime, endMs: startTime + 4*3600*1000 - 1, // -1ms to exclude fifth value stepMs: 3600 * 1000, @@ -190,7 +190,7 @@ WITH (timescaledb.continuous) AS t.Fatalf("unexpected error while creating metric view: %s", err) } - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('cagg_schema', 'cagg')"); err != nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('cagg_schema', 'cagg', NULL)"); err != nil { t.Fatalf("unexpected error while registering metric view: %s", err) } @@ -372,7 +372,7 @@ WITH (timescaledb.continuous) AS FROM prom_data.test GROUP BY public.time_bucket('1hour', time), series_id`) require.NoError(t, err) - _, err = db.Exec(context.Background(), "SELECT prom_api.register_metric_view('cagg_schema', 'cagg')") + _, err = db.Exec(context.Background(), "SELECT prom_api.register_metric_view('cagg_schema', 'cagg', NULL)") require.NoError(t, err) _, err = db.Exec(context.Background(), "SELECT prom_api.set_metric_retention_period('cagg_schema', 'cagg', INTERVAL '180 days')") @@ -450,7 +450,7 @@ WITH (timescaledb.continuous) AS t.Fatalf("unexpected error while creating metric view: %s", err) } - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('public', 'tw_1hour')"); err != nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('public', 'tw_1hour', NULL)"); err != nil { t.Fatalf("unexpected error while registering metric view: %s", err) } diff --git a/pkg/tests/end_to_end_tests/create_test.go b/pkg/tests/end_to_end_tests/create_test.go index 978da08863..0687949a0b 100644 --- a/pkg/tests/end_to_end_tests/create_test.go +++ b/pkg/tests/end_to_end_tests/create_test.go @@ -1351,7 +1351,7 @@ func TestRegisterMetricView(t *testing.T) { withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) { // Cannot register non-existant schema. - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('nonexistant', 'missing')"); err == nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('nonexistant', 'missing', NULL)"); err == nil { t.Fatal("Should not be able to register a metric view from a non-existant schema") } @@ -1360,7 +1360,7 @@ func TestRegisterMetricView(t *testing.T) { } // Cannot register non-existant view. - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'missing')"); err == nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'missing', NULL)"); err == nil { t.Fatal("Should not be able to register a metric view from a non-existant metric view") } @@ -1394,7 +1394,7 @@ func TestRegisterMetricView(t *testing.T) { if _, err = db.Exec(context.Background(), `CREATE VIEW prom_view.metric_view_in_data_schema AS SELECT * FROM prom_data."rawMetric"`); err != nil { t.Fatalf("unexpected error while creating view in data schema: %s", err) } - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_data', 'metric_view_in_data_schema')"); err == nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_data', 'metric_view_in_data_schema', NULL)"); err == nil { t.Fatal("Should not be able to register a metric view in data schema") } @@ -1402,7 +1402,7 @@ func TestRegisterMetricView(t *testing.T) { if _, err = db.Exec(context.Background(), `CREATE VIEW prom_view.metric_view_bad_columns AS SELECT time, series_id, true as bad_column FROM prom_data."rawMetric"`); err != nil { t.Fatalf("unexpected error while creating view: %s", err) } - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view_bad_columns')"); err == nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view_bad_columns', NULL)"); err == nil { t.Fatal("Should not be able to register a metric view with different columns than raw metric") } @@ -1410,7 +1410,7 @@ func TestRegisterMetricView(t *testing.T) { if _, err = db.Exec(context.Background(), `CREATE VIEW prom_view.metric_view_bad_column_types AS SELECT time, series_id, true as value FROM prom_data."rawMetric"`); err != nil { t.Fatalf("unexpected error while creating view: %s", err) } - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view_bad_column_types')"); err == nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view_bad_column_types', NULL)"); err == nil { t.Fatal("Should not be able to register a metric view with column types different than raw metric") } @@ -1418,7 +1418,7 @@ func TestRegisterMetricView(t *testing.T) { if _, err = db.Exec(context.Background(), `CREATE VIEW prom_view.metric_view_not_based AS SELECT time, series_id, 1.0 as value FROM prom_view."metric_view_bad_columns"`); err != nil { t.Fatalf("unexpected error while creating view: %s", err) } - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view_not_based')"); err == nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view_not_based', NULL)"); err == nil { t.Fatal("Should not be able to register a metric view with column types different than raw metric") } @@ -1426,7 +1426,7 @@ func TestRegisterMetricView(t *testing.T) { if _, err = db.Exec(context.Background(), `CREATE VIEW prom_view.metric_view AS SELECT * FROM prom_data."rawMetric"`); err != nil { t.Fatalf("unexpected error while creating view: %s", err) } - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view')"); err != nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view', NULL)"); err != nil { t.Fatalf("Error creating valid metric view: %v", err) } @@ -1448,12 +1448,12 @@ func TestRegisterMetricView(t *testing.T) { } // Cannot register the same view twice. - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view')"); err == nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view', NULL)"); err == nil { t.Fatal("Should not be able to register the same view twice") } // Should succeed if we register same view twice but also use `if_not_exists` - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view', true)"); err != nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view', NULL, true)"); err != nil { t.Fatalf("Should be able to register the same view twice when using `if_not_exists`: %v", err) } diff --git a/pkg/tests/end_to_end_tests/metric_downsampling_test.go b/pkg/tests/end_to_end_tests/metric_downsampling_test.go new file mode 100644 index 0000000000..46acd91145 --- /dev/null +++ b/pkg/tests/end_to_end_tests/metric_downsampling_test.go @@ -0,0 +1,96 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package end_to_end_tests + +import ( + "context" + "testing" + "time" + + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" + "github.com/stretchr/testify/require" + + "github.com/timescale/promscale/pkg/downsample" + "github.com/timescale/promscale/pkg/internal/day" +) + +func TestMetricDownsampleSync(t *testing.T) { + withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) { + downsamplingCfgs := []downsample.Config{ + {Interval: day.Duration(time.Minute * 5), Retention: day.Duration(time.Hour * 24 * 30)}, + } + + pgCon, err := db.Acquire(context.Background()) + require.NoError(t, err) + defer pgCon.Release() + + ctx := context.Background() + pc := pgCon.Conn() + + // Test 1: Check if 'ds_5m' downsampling is created. + err = downsample.Sync(ctx, pc, downsamplingCfgs) + require.NoError(t, err) + + verifyDownsamplingExistence(t, pc, "ds_5m", + downsamplingCfgs[0].Interval, downsamplingCfgs[0].Retention, false) + + downsamplingCfgs = append(downsamplingCfgs, downsample.Config{Interval: day.Duration(time.Hour), Retention: day.Duration(time.Hour * 24 * 365)}) + // Test 2: Check if 'ds_1h' downsampling is created. + err = downsample.Sync(ctx, pc, downsamplingCfgs) + require.NoError(t, err) + + verifyDownsamplingExistence(t, pc, "ds_1h", + downsamplingCfgs[1].Interval, downsamplingCfgs[1].Retention, false) + + // Test 3: Remove the first entry and see if the entry is disabled or not. + downsamplingCfgs = downsamplingCfgs[1:] + err = downsample.Sync(ctx, pc, downsamplingCfgs) + require.NoError(t, err) + // Check if ds_1h exists. + verifyDownsamplingExistence(t, pc, "ds_1h", + downsamplingCfgs[0].Interval, downsamplingCfgs[0].Retention, false) + // Check if ds_5m is disabled. + verifyDownsamplingExistence(t, pc, "ds_5m", + day.Duration(time.Minute*5), day.Duration(time.Hour*24*30), true) + + // Test 4: Update retention of ds_1h and check if the same is reflected in the DB. + downsamplingCfgs[0].Retention = day.Duration(time.Hour * 24 * 500) + err = downsample.Sync(ctx, pc, downsamplingCfgs) + require.NoError(t, err) + verifyDownsamplingExistence(t, pc, "ds_1h", + downsamplingCfgs[0].Interval, downsamplingCfgs[0].Retention, false) + // ds_5m should still be disabled. + verifyDownsamplingExistence(t, pc, "ds_5m", + day.Duration(time.Minute*5), day.Duration(time.Hour*24*30), true) + + // Test 5: Enable the ds_5m downsampling that was already in the database. + downsamplingCfgs = append(downsamplingCfgs, downsample.Config{Interval: day.Duration(time.Minute * 5), Retention: day.Duration(time.Hour * 24 * 30)}) + err = downsample.Sync(ctx, pc, downsamplingCfgs) + require.NoError(t, err) + verifyDownsamplingExistence(t, pc, "ds_5m", + downsamplingCfgs[1].Interval, downsamplingCfgs[1].Retention, false) + + // Test 6: Add a resolution similar to 5m, but with different unit. This should error. + downsamplingCfgs = append(downsamplingCfgs, downsample.Config{Interval: day.Duration(time.Second * 300), Retention: day.Duration(time.Hour * 24 * 30)}) + err = downsample.Sync(ctx, pc, downsamplingCfgs) + require.Error(t, err) + }) +} + +func verifyDownsamplingExistence(t testing.TB, pgCon *pgx.Conn, schemaName string, interval, retention day.Duration, shouldBeDisabled bool) { + var ( + dSchemaName string + dInterval time.Duration + dRetention time.Duration + dShouldRefresh bool + ) + err := pgCon.QueryRow(context.Background(), "SELECT schema_name, ds_interval, retention, should_refresh FROM _prom_catalog.downsample WHERE schema_name = $1", schemaName).Scan(&dSchemaName, &dInterval, &dRetention, &dShouldRefresh) + require.NoError(t, err) + require.Equal(t, schemaName, dSchemaName) + require.Equal(t, time.Duration(interval), dInterval) + require.Equal(t, time.Duration(retention), dRetention) + require.Equal(t, shouldBeDisabled, !dShouldRefresh) +} diff --git a/pkg/tests/end_to_end_tests/query_integration_test.go b/pkg/tests/end_to_end_tests/query_integration_test.go index 2b25312305..0f75d24b40 100644 --- a/pkg/tests/end_to_end_tests/query_integration_test.go +++ b/pkg/tests/end_to_end_tests/query_integration_test.go @@ -766,7 +766,7 @@ func createMetricView(db *pgxpool.Pool, t testing.TB, schemaName, viewName, metr if _, err := db.Exec(context.Background(), fmt.Sprintf(`CREATE VIEW "%s"."%s" AS SELECT * FROM prom_data."%s"`, schemaName, viewName, metricName)); err != nil { t.Fatalf("unexpected error while creating metric view: %s", err) } - if _, err := db.Exec(context.Background(), fmt.Sprintf("SELECT prom_api.register_metric_view('%s', '%s')", schemaName, viewName)); err != nil { + if _, err := db.Exec(context.Background(), fmt.Sprintf("SELECT prom_api.register_metric_view('%s', '%s', NULL)", schemaName, viewName)); err != nil { t.Fatalf("unexpected error while registering metric view: %s", err) } } diff --git a/pkg/tests/end_to_end_tests/telemetry_test.go b/pkg/tests/end_to_end_tests/telemetry_test.go index f4c531b01a..141a88d28e 100644 --- a/pkg/tests/end_to_end_tests/telemetry_test.go +++ b/pkg/tests/end_to_end_tests/telemetry_test.go @@ -288,8 +288,8 @@ func TestTelemetrySQLStats(t *testing.T) { require.NoError(t, engine.Sync()) err = conn.QueryRow(context.Background(), "SELECT value FROM _timescaledb_catalog.metadata WHERE key = 'promscale_metrics_total' AND value IS NOT NULL").Scan(&metrics) - require.NoError(t, err) - require.Equal(t, "0", metrics) // Without promscale_extension, this will give error saying "no rows in result set". + require.NoError(t, err) // Without promscale_extension, this will give error saying "no rows in result set". + require.Equal(t, "0", metrics) // Add dummy metric. _, err = conn.Exec(context.Background(), "SELECT _prom_catalog.create_metric_table('test_metric')")