From 45833de41adc7246d2d61d9bde32de463d6bb5cb Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Fri, 2 Dec 2022 17:31:07 +0530 Subject: [PATCH] Implement rollup resolution decider for incoming queries. Signed-off-by: Harkishen-Singh --- pkg/dataset/config.go | 6 +- pkg/pgclient/client.go | 15 +- pkg/pgclient/config.go | 3 + pkg/pgmodel/querier/querier.go | 21 ++- pkg/pgmodel/querier/querier_sql_test.go | 7 +- pkg/pgmodel/querier/query_remote_read.go | 2 +- pkg/pgmodel/querier/query_sample.go | 27 ++- pkg/rollup/config.go | 1 + pkg/rollup/decider.go | 177 ++++++++++++++++++ pkg/rollup/decider_test.go | 113 +++++++++++ pkg/runner/client.go | 16 +- pkg/tests/constants.go | 5 +- pkg/tests/end_to_end_tests/alerts_test.go | 2 +- .../end_to_end_tests/continuous_agg_test.go | 3 +- pkg/tests/end_to_end_tests/exemplar_test.go | 5 +- .../end_to_end_tests/multi_tenancy_test.go | 28 +-- pkg/tests/end_to_end_tests/nan_test.go | 4 +- pkg/tests/end_to_end_tests/null_chars_test.go | 3 +- .../promql_endpoint_integration_test.go | 2 +- .../query_integration_test.go | 17 +- .../rollup_query_helper_test.go | 102 ++++++++++ pkg/tests/end_to_end_tests/rules_test.go | 2 +- 22 files changed, 511 insertions(+), 50 deletions(-) create mode 100644 pkg/rollup/decider.go create mode 100644 pkg/rollup/decider_test.go create mode 100644 pkg/tests/end_to_end_tests/rollup_query_helper_test.go diff --git a/pkg/dataset/config.go b/pkg/dataset/config.go index a97b61966b..469413491e 100644 --- a/pkg/dataset/config.go +++ b/pkg/dataset/config.go @@ -50,7 +50,7 @@ type Metrics struct { HALeaseRefresh day.Duration `yaml:"ha_lease_refresh"` HALeaseTimeout day.Duration `yaml:"ha_lease_timeout"` RetentionPeriod day.Duration `yaml:"default_retention_period"` - Rollups *rollup.Config `yaml:"rollups,omitempty"` + Rollup *rollup.Config `yaml:"rollup,omitempty"` } // Traces contains dataset configuration options for traces data. @@ -68,8 +68,8 @@ func NewConfig(contents string) (cfg Config, err error) { func (c *Config) Apply(ctx context.Context, conn *pgx.Conn) error { c.applyDefaults() - if c.Metrics.Rollups != nil { - if err := c.Metrics.Rollups.Apply(ctx, conn); err != nil { + if c.Metrics.Rollup != nil { + if err := c.Metrics.Rollup.Apply(ctx, conn); err != nil { return fmt.Errorf("error applying configuration for downsampling: %w", err) } } diff --git a/pkg/pgclient/client.go b/pkg/pgclient/client.go index e6747082c0..b53cfa4bfc 100644 --- a/pkg/pgclient/client.go +++ b/pkg/pgclient/client.go @@ -61,7 +61,7 @@ type Client struct { } // NewClient creates a new PostgreSQL client -func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, schemaLocker LockFunc, readOnly bool) (*Client, error) { +func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, schemaLocker LockFunc, readOnly, useRollups bool) (*Client, error) { var ( err error dbMaxConns int @@ -137,7 +137,7 @@ func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, sche if err != nil { return nil, fmt.Errorf("err creating reader connection pool: %w", err) } - client, err := NewClientWithPool(r, cfg, numCopiers, writerPool, readerPool, maintPool, mt, readOnly) + client, err := NewClientWithPool(r, cfg, numCopiers, writerPool, readerPool, maintPool, mt, readOnly, useRollups) if err != nil { return client, err } @@ -197,7 +197,7 @@ func getRedactedConnStr(s string) string { } // NewClientWithPool creates a new PostgreSQL client with an existing connection pool. -func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, writerPool, readerPool, maintPool *pgxpool.Pool, mt tenancy.Authorizer, readOnly bool) (*Client, error) { +func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, writerPool, readerPool, maintPool *pgxpool.Pool, mt tenancy.Authorizer, readOnly, useRollups bool) (*Client, error) { sigClose := make(chan struct{}) metricsCache := cache.NewMetricCache(cfg.CacheConfig) labelsCache := cache.NewLabelsCache(cfg.CacheConfig) @@ -223,7 +223,11 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri exemplarKeyPosCache := cache.NewExemplarLabelsPosCache(cfg.CacheConfig) labelsReader := lreader.NewLabelsReader(readerConn, labelsCache, mt.ReadAuthorizer()) - dbQuerier := querier.NewQuerier(readerConn, metricsCache, labelsReader, exemplarKeyPosCache, mt.ReadAuthorizer()) + dbQuerier, err := querier.NewQuerier(readerConn, metricsCache, labelsReader, exemplarKeyPosCache, mt.ReadAuthorizer(), cfg.MetricsScrapeInterval, useRollups) + if err != nil { + return nil, fmt.Errorf("error starting querier: %w", err) + } + queryable := query.NewQueryable(dbQuerier, labelsReader) dbIngestor := ingestor.DBInserter(ingestor.ReadOnlyIngestor{}) @@ -232,8 +236,7 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri writerConn = pgxconn.NewPgxConn(writerPool) dbIngestor, err = ingestor.NewPgxIngestor(writerConn, metricsCache, seriesCache, exemplarKeyPosCache, &c) if err != nil { - log.Error("msg", "err starting the ingestor", "err", err) - return nil, err + return nil, fmt.Errorf("error starting ingestor: %w", err) } } if maintPool != nil { diff --git a/pkg/pgclient/config.go b/pkg/pgclient/config.go index 8d03f5be4a..4a8be24b93 100644 --- a/pkg/pgclient/config.go +++ b/pkg/pgclient/config.go @@ -18,6 +18,7 @@ import ( "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgmodel/cache" "github.com/timescale/promscale/pkg/pgmodel/ingestor/trace" + "github.com/timescale/promscale/pkg/rollup" "github.com/timescale/promscale/pkg/version" ) @@ -34,6 +35,7 @@ type Config struct { DbConnectionTimeout time.Duration IgnoreCompressedChunks bool MetricsAsyncAcks bool + MetricsScrapeInterval time.Duration TracesAsyncAcks bool WriteConnections int WriterPoolSize int @@ -107,6 +109,7 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config { fs.IntVar(&cfg.TracesMaxBatchSize, "tracing.max-batch-size", trace.DefaultBatchSize, "Maximum size of trace batch that is written to DB") fs.DurationVar(&cfg.TracesBatchTimeout, "tracing.batch-timeout", trace.DefaultBatchTimeout, "Timeout after new trace batch is created") fs.IntVar(&cfg.TracesBatchWorkers, "tracing.batch-workers", trace.DefaultBatchWorkers, "Number of workers responsible for creating trace batches. Defaults to number of CPUs.") + fs.DurationVar(&cfg.MetricsScrapeInterval, "metrics.rollup.scrape-interval", rollup.DefaultScrapeInterval, "Default scrape interval in Prometheus. This is used to estimate samples while choosing rollup for querying.") return cfg } diff --git a/pkg/pgmodel/querier/querier.go b/pkg/pgmodel/querier/querier.go index d6951d08bf..12f19f62f0 100644 --- a/pkg/pgmodel/querier/querier.go +++ b/pkg/pgmodel/querier/querier.go @@ -6,17 +6,21 @@ package querier import ( "context" + "fmt" + "time" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/timescale/promscale/pkg/pgmodel/cache" "github.com/timescale/promscale/pkg/pgmodel/lreader" "github.com/timescale/promscale/pkg/pgxconn" + "github.com/timescale/promscale/pkg/rollup" "github.com/timescale/promscale/pkg/tenancy" ) type pgxQuerier struct { - tools *queryTools + tools *queryTools + schema *rollup.Decider } var _ Querier = (*pgxQuerier)(nil) @@ -29,7 +33,9 @@ func NewQuerier( labelsReader lreader.LabelsReader, exemplarCache cache.PositionCache, rAuth tenancy.ReadAuthorizer, -) Querier { + scrapeInterval time.Duration, + useRollups bool, +) (Querier, error) { querier := &pgxQuerier{ tools: &queryTools{ conn: conn, @@ -39,7 +45,14 @@ func NewQuerier( rAuth: rAuth, }, } - return querier + if useRollups { + decider, err := rollup.NewDecider(context.Background(), conn, scrapeInterval) + if err != nil { + return nil, fmt.Errorf("error creating rollups schema decider: %w", err) + } + querier.schema = decider + } + return querier, nil } func (q *pgxQuerier) RemoteReadQuerier(ctx context.Context) RemoteReadQuerier { @@ -47,7 +60,7 @@ func (q *pgxQuerier) RemoteReadQuerier(ctx context.Context) RemoteReadQuerier { } func (q *pgxQuerier) SamplesQuerier(ctx context.Context) SamplesQuerier { - return newQuerySamples(ctx, q) + return newQuerySamples(ctx, q, q.schema) } func (q *pgxQuerier) ExemplarsQuerier(ctx context.Context) ExemplarQuerier { diff --git a/pkg/pgmodel/querier/querier_sql_test.go b/pkg/pgmodel/querier/querier_sql_test.go index ce43cddc1f..a8ee0c582f 100644 --- a/pkg/pgmodel/querier/querier_sql_test.go +++ b/pkg/pgmodel/querier/querier_sql_test.go @@ -722,7 +722,12 @@ func TestPGXQuerierQuery(t *testing.T) { if err != nil { t.Fatalf("error setting up mock cache: %s", err.Error()) } - querier := pgxQuerier{&queryTools{conn: mock, metricTableNames: mockMetrics, labelsReader: lreader.NewLabelsReader(mock, clockcache.WithMax(0), tenancy.NewNoopAuthorizer().ReadAuthorizer())}} + querier := pgxQuerier{ + &queryTools{ + conn: mock, metricTableNames: mockMetrics, labelsReader: lreader.NewLabelsReader(mock, clockcache.WithMax(0), tenancy.NewNoopAuthorizer().ReadAuthorizer()), + }, + nil, + } result, err := querier.RemoteReadQuerier(context.Background()).Query(c.query) diff --git a/pkg/pgmodel/querier/query_remote_read.go b/pkg/pgmodel/querier/query_remote_read.go index 4644dbecc5..8ac59fffe6 100644 --- a/pkg/pgmodel/querier/query_remote_read.go +++ b/pkg/pgmodel/querier/query_remote_read.go @@ -28,7 +28,7 @@ func (q *queryRemoteRead) Query(query *prompb.Query) ([]*prompb.TimeSeries, erro return nil, err } - qrySamples := newQuerySamples(q.ctx, q.pgxQuerier) + qrySamples := newQuerySamples(q.ctx, q.pgxQuerier, nil) sampleRows, _, err := qrySamples.fetchSamplesRows(query.StartTimestampMs, query.EndTimestampMs, nil, nil, nil, matchers) if err != nil { return nil, err diff --git a/pkg/pgmodel/querier/query_sample.go b/pkg/pgmodel/querier/query_sample.go index afd4ce0bf0..3127127b33 100644 --- a/pkg/pgmodel/querier/query_sample.go +++ b/pkg/pgmodel/querier/query_sample.go @@ -9,17 +9,20 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" + "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgmodel/common/errors" "github.com/timescale/promscale/pkg/pgmodel/common/schema" + "github.com/timescale/promscale/pkg/rollup" ) type querySamples struct { *pgxQuerier - ctx context.Context + ctx context.Context + schema *rollup.Decider } -func newQuerySamples(ctx context.Context, qr *pgxQuerier) *querySamples { - return &querySamples{qr, ctx} +func newQuerySamples(ctx context.Context, qr *pgxQuerier, schema *rollup.Decider) *querySamples { + return &querySamples{qr, ctx, schema} } // Select implements the SamplesQuerier interface. It is the entry point for our @@ -39,6 +42,24 @@ func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectH return nil, nil, fmt.Errorf("get evaluation metadata: %w", err) } + var useRollups bool + if q.schema != nil { + // Querying via rollups is available. + supported := q.schema.SupportsRollup(metadata.metric) // Ensure that rollups for the given metric is supported. + if !supported { + // Rollups for the given metric wasn't supported. Let's refresh and check again. + if err := q.schema.Refresh(); err != nil { + log.Error("msg", "error refreshing schema decider", "error", err.Error()) + } + supported = q.schema.SupportsRollup(metadata.metric) // If supported is still false, then rollups really don't exist for 'metadata.metric'. + } + if supported { + schemaName := q.schema.Decide(mint, maxt) + useRollups = schemaName != rollup.DefaultSchema + } + } + _ = useRollups // To avoid unused error. This will be used in the following PRs for querying rollups. + filter := metadata.timeFilter if metadata.isSingleMetric { // Single vector selector case. diff --git a/pkg/rollup/config.go b/pkg/rollup/config.go index d1794bf3b5..45a054bee4 100644 --- a/pkg/rollup/config.go +++ b/pkg/rollup/config.go @@ -17,6 +17,7 @@ import ( ) const ( + DefaultScrapeInterval = time.Second * 30 setDefaultDownsampleStateSQL = "SELECT prom_api.set_automatic_downsample($1)" // short and long represent system resolutions. diff --git a/pkg/rollup/decider.go b/pkg/rollup/decider.go new file mode 100644 index 0000000000..bb66420913 --- /dev/null +++ b/pkg/rollup/decider.go @@ -0,0 +1,177 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package rollup + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/timescale/promscale/pkg/log" + "github.com/timescale/promscale/pkg/pgxconn" +) + +const ( + DefaultSchema = "prom_data" + upperLimit = 5000 // Maximum samples allowed + refreshRollupInterval = time.Minute * 30 +) + +type rollupInfo struct { + schemaName string + resolution time.Duration +} + +type Decider struct { + conn pgxconn.PgxConn + ctx context.Context + refreshMtx sync.RWMutex + + scrapeInterval time.Duration + downsamplingEnabled bool + + supportedMetrics map[string]struct{} + rollups []rollupInfo // {schemaName, resolution} in ascending order of interval. Lesser the interval, more the granularity. +} + +func NewDecider(ctx context.Context, conn pgxconn.PgxConn, scrapeInterval time.Duration) (*Decider, error) { + helper := &Decider{ + ctx: ctx, + conn: conn, + scrapeInterval: scrapeInterval, + } + if err := helper.runRefreshRoutine(refreshRollupInterval); err != nil { + return nil, fmt.Errorf("refresh: %w", err) + } + return helper, nil +} + +// Decide returns the schema name of the rollups that should be used for querying. +// The returned schema represents a downsampled resolution that should be an optimal +// resolution for querying. +// +// If no rollups exists or if downsampling is disabled, DefaultSchema (i.e., "prom_data") is returned. +func (h *Decider) Decide(minTs, maxTs int64) string { + h.refreshMtx.RLock() + defer h.refreshMtx.RUnlock() + + if !h.downsamplingEnabled || len(h.rollups) == 0 { + return DefaultSchema + } + estimateSamples := func(resolution time.Duration) int64 { + return int64(float64(maxTs-minTs) / resolution.Seconds()) + } + + numRawSamples := estimateSamples(h.scrapeInterval) + if numRawSamples < upperLimit { + return DefaultSchema + } + + for _, info := range h.rollups { + samples := estimateSamples(info.resolution) + if samples < upperLimit { + // h.rollups is sorted by interval. So, the first rollup that is below upper limit is our answer. + // This is because it gives the maximum granularity while being in acceptable limits. + return info.schemaName + } + } + // All rollups are above upper limit. Hence, send the schema of the highest interval so the granularity + // is minimum and we do not affect the performance of PromQL engine. + highestInterval := h.rollups[len(h.rollups)-1] + return highestInterval.schemaName +} + +func (h *Decider) SupportsRollup(metricName string) bool { + _, rollupExists := h.supportedMetrics[metricName] + return rollupExists +} + +func (h *Decider) Refresh() error { + h.refreshMtx.Lock() + defer h.refreshMtx.Unlock() + + if err := h.refreshDownsamplingState(); err != nil { + return fmt.Errorf("downsampling state: %w", err) + } + if err := h.refreshSupportedMetrics(); err != nil { + return fmt.Errorf("metric-type: %w", err) + } + if err := h.refreshRollup(); err != nil { + return fmt.Errorf("rollup resolutions: %w", err) + } + return nil +} + +func (h *Decider) runRefreshRoutine(refreshInterval time.Duration) error { + if err := h.Refresh(); err != nil { + return fmt.Errorf("refreshing rollup resolution: %w", err) + } + go func() { + t := time.NewTicker(refreshInterval) + defer t.Stop() + for { + select { + case <-h.ctx.Done(): + return + case <-t.C: + } + if err := h.Refresh(); err != nil { + log.Error("msg", "error refreshing rollup resolution", "error", err.Error()) + } + } + }() + return nil +} + +func (h *Decider) refreshDownsamplingState() error { + var state bool + if err := h.conn.QueryRow(h.ctx, "SELECT prom_api.get_automatic_downsample()::BOOLEAN").Scan(&state); err != nil { + return fmt.Errorf("fetching automatic downsampling state: %w", err) + } + h.downsamplingEnabled = state + return nil +} + +const supportedMetricsQuery = `SELECT m.metric_name AS supported_metrics FROM _prom_catalog.metric_rollup mr INNER JOIN _prom_catalog.metric m ON mr.metric_id = m.id GROUP BY supported_metrics;` + +func (h *Decider) refreshSupportedMetrics() error { + rows, err := h.conn.Query(h.ctx, supportedMetricsQuery) + if err != nil { + return fmt.Errorf("fetching supported metrics for rollups: %w", err) + } + defer rows.Close() + + h.supportedMetrics = make(map[string]struct{}) // metric_name: metric_type + for rows.Next() { + var supportedMetric string + err = rows.Scan(&supportedMetric) + if err != nil { + return fmt.Errorf("error scanning the fetched supported metric: %w", err) + } + h.supportedMetrics[supportedMetric] = struct{}{} + } + return nil +} + +func (h *Decider) refreshRollup() error { + rows, err := h.conn.Query(h.ctx, "SELECT schema_name, resolution FROM _prom_catalog.rollup ORDER BY resolution ASC") + if err != nil { + return fmt.Errorf("fetching rollup resolutions: %w", err) + } + defer rows.Close() + h.rollups = []rollupInfo{} + for rows.Next() { + var ( + schemaName string + resolution time.Duration + ) + if err = rows.Scan(&schemaName, &resolution); err != nil { + return fmt.Errorf("error scanning rows: %w", err) + } + h.rollups = append(h.rollups, rollupInfo{schemaName: schemaName, resolution: resolution}) + } + return nil +} diff --git a/pkg/rollup/decider_test.go b/pkg/rollup/decider_test.go new file mode 100644 index 0000000000..3c5b52830c --- /dev/null +++ b/pkg/rollup/decider_test.go @@ -0,0 +1,113 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package rollup + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestDecider(t *testing.T) { + r := &Decider{ + scrapeInterval: DefaultScrapeInterval, + downsamplingEnabled: true, + rollups: []rollupInfo{ + {"5_minute", 5 * time.Minute}, + {"15_minute", 15 * time.Minute}, + {"1_hour", time.Hour}, + {"1_week", 7 * 24 * time.Hour}, + }, + } + tcs := []struct { + name string + min time.Duration + max time.Duration + expectedSchemaName string + }{ + { + name: "1 sec", + min: 0, + max: time.Second, + expectedSchemaName: DefaultSchema, + }, { + name: "5 min", + min: 0, + max: 5 * time.Minute, + expectedSchemaName: DefaultSchema, + }, { + name: "30 mins", + min: 0, + max: 30 * time.Minute, + expectedSchemaName: DefaultSchema, + }, { + name: "1 hour", + min: 0, + max: time.Hour, + expectedSchemaName: DefaultSchema, + }, { + // DRY RUN + // ------- + // + // Assumed default scrape interval being 30 secs + // raw -> 2,880 <-- Falls in the acceptable range. + name: "1 day", + min: 0, + max: 24 * time.Hour, + expectedSchemaName: DefaultSchema, + }, + { + // DRY RUN on 500 - 5000 logic + // -------- + // + // Assumed default scrape interval being 30 secs + // raw -> 20,160 + // + // And, when using following rollup resolutions, num samples: + // 5 mins -> 2,016 <-- Falls in the acceptable range. + // 15 mins -> 672 + // 1 hour -> 168 + // 1 week -> 1 + name: "7 days", + min: 0, + max: 7 * 24 * time.Hour, + expectedSchemaName: "5_minute", + }, + { + name: "30 days", + min: 0, + max: 30 * 24 * time.Hour, + expectedSchemaName: "15_minute", + }, { + // DRY RUN on 500 - 5000 logic + // -------- + // + // Assumed default scrape interval being 30 secs + // raw -> 20,160 + // + // And, when using following rollup resolutions, num samples: + // 5 mins -> 1,051,200 <-- Falls in the acceptable range. + // 15 mins -> 35,040 + // 1 hour -> 8,760 + // 1 week -> 52 + name: "1 year", + min: 0, + max: 12 * 30 * 24 * time.Hour, + expectedSchemaName: "1_week", + }, { + name: "100 years", + min: 0, + max: 100 * 12 * 30 * 24 * time.Hour, + expectedSchemaName: "1_week", + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + schemaName := r.Decide(int64(tc.min.Seconds()), int64(tc.max.Seconds())) + require.Equal(t, tc.expectedSchemaName, schemaName, tc.name) + }) + } +} diff --git a/pkg/runner/client.go b/pkg/runner/client.go index 2c3a141f35..af8164c21e 100644 --- a/pkg/runner/client.go +++ b/pkg/runner/client.go @@ -165,16 +165,18 @@ func CreateClient(r prometheus.Registerer, cfg *Config) (*pgclient.Client, error cfg.APICfg.MultiTenancy = multiTenancy } + var useRollups bool if cfg.DatasetConfig != "" { - err = ApplyDatasetConfig(context.Background(), conn, cfg.DatasetConfig) + cfg, err := ApplyDatasetConfig(context.Background(), conn, cfg.DatasetConfig) if err != nil { return nil, fmt.Errorf("error applying dataset configuration: %w", err) } + useRollups = *cfg.Metrics.Rollup.Enabled } // client has to be initiated after migrate since migrate // can change database GUC settings - client, err := pgclient.NewClient(r, &cfg.PgmodelCfg, multiTenancy, leasingFunction, cfg.APICfg.ReadOnly) + client, err := pgclient.NewClient(r, &cfg.PgmodelCfg, multiTenancy, leasingFunction, cfg.APICfg.ReadOnly, useRollups) if err != nil { return nil, fmt.Errorf("client creation error: %w", err) } @@ -226,13 +228,15 @@ func isBGWLessThanDBs(conn *pgx.Conn) (bool, error) { return false, nil } -func ApplyDatasetConfig(ctx context.Context, conn *pgx.Conn, cfgFilename string) error { +func ApplyDatasetConfig(ctx context.Context, conn *pgx.Conn, cfgFilename string) (*dataset.Config, error) { cfg, err := dataset.NewConfig(cfgFilename) if err != nil { - return err + return nil, err } - - return cfg.Apply(ctx, conn) + if err = cfg.Apply(ctx, conn); err != nil { + return nil, fmt.Errorf("error applying dataset config: %w", err) + } + return &cfg, nil } func compileAnchoredRegexString(s string) (*regexp.Regexp, error) { diff --git a/pkg/tests/constants.go b/pkg/tests/constants.go index e88e8aa959..463f757289 100644 --- a/pkg/tests/constants.go +++ b/pkg/tests/constants.go @@ -10,6 +10,8 @@ var ( PromscaleExtensionContainer string ) +const rollupsDBImage = "ghcr.io/timescale/dev_promscale_extension:rollups-development-ts2.8-pg14" + func init() { content, err := os.ReadFile("../../../EXTENSION_VERSION") if err != nil { @@ -17,5 +19,6 @@ func init() { } PromscaleExtensionVersion = strings.TrimSpace(string(content)) - PromscaleExtensionContainer = "ghcr.io/timescale/dev_promscale_extension:" + PromscaleExtensionVersion + "-ts2-pg14" + //PromscaleExtensionContainer = "ghcr.io/timescale/dev_promscale_extension:" + PromscaleExtensionVersion + "-ts2-pg14" + PromscaleExtensionContainer = rollupsDBImage // This will be removed once we plan to merge with master. } diff --git a/pkg/tests/end_to_end_tests/alerts_test.go b/pkg/tests/end_to_end_tests/alerts_test.go index 9f7902c670..4fbecab42c 100644 --- a/pkg/tests/end_to_end_tests/alerts_test.go +++ b/pkg/tests/end_to_end_tests/alerts_test.go @@ -47,7 +47,7 @@ func TestAlerts(t *testing.T) { MaxConnections: -1, } - pgClient, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), conf, 1, db, db, nil, tenancy.NewNoopAuthorizer(), false) + pgClient, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), conf, 1, db, db, nil, tenancy.NewNoopAuthorizer(), false, false) require.NoError(t, err) defer pgClient.Close() err = pgClient.InitPromQLEngine(&query.Config{ diff --git a/pkg/tests/end_to_end_tests/continuous_agg_test.go b/pkg/tests/end_to_end_tests/continuous_agg_test.go index 56883b6d29..3dbec38a17 100644 --- a/pkg/tests/end_to_end_tests/continuous_agg_test.go +++ b/pkg/tests/end_to_end_tests/continuous_agg_test.go @@ -209,7 +209,8 @@ WITH (timescaledb.continuous) AS lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) queryable := query.NewQueryable(r, labelsReader) queryEngine, err := query.NewEngine(log.GetLogger(), time.Minute, time.Minute*5, time.Minute, 50000000, nil) if err != nil { diff --git a/pkg/tests/end_to_end_tests/exemplar_test.go b/pkg/tests/end_to_end_tests/exemplar_test.go index 2cedc067d8..3e9382a51d 100644 --- a/pkg/tests/end_to_end_tests/exemplar_test.go +++ b/pkg/tests/end_to_end_tests/exemplar_test.go @@ -205,11 +205,12 @@ func TestExemplarQueryingAPI(t *testing.T) { // since the return will be 0, as they have already been ingested by TestExemplarIngestion. labelsReader := lreader.NewLabelsReader(pgxconn.NewPgxConn(db), cache.NewLabelsCache(cache.DefaultConfig), tenancy.NewNoopAuthorizer().ReadAuthorizer()) - r := querier.NewQuerier( + r, err := querier.NewQuerier( pgxconn.NewPgxConn(db), cache.NewMetricCache(cache.DefaultConfig), labelsReader, - cache.NewExemplarLabelsPosCache(cache.DefaultConfig), nil) + cache.NewExemplarLabelsPosCache(cache.DefaultConfig), nil, 0, false) + require.NoError(t, err) queryable := query.NewQueryable(r, labelsReader) // Query all exemplars corresponding to metric_2 histogram. diff --git a/pkg/tests/end_to_end_tests/multi_tenancy_test.go b/pkg/tests/end_to_end_tests/multi_tenancy_test.go index 956bcb497e..999e359385 100644 --- a/pkg/tests/end_to_end_tests/multi_tenancy_test.go +++ b/pkg/tests/end_to_end_tests/multi_tenancy_test.go @@ -36,7 +36,7 @@ func TestMultiTenancyWithoutValidTenants(t *testing.T) { require.NoError(t, err) // Ingestion. - client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false) + client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false, false) require.NoError(t, err) defer client.Close() @@ -56,7 +56,8 @@ func TestMultiTenancyWithoutValidTenants(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), 0, false) + require.NoError(t, err) // ----- query-test: querying a single tenant (tenant-a) ----- expectedResult := []prompb.TimeSeries{ @@ -224,7 +225,7 @@ func TestMultiTenancyWithValidTenants(t *testing.T) { require.NoError(t, err) // Ingestion. - client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false) + client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false, false) require.NoError(t, err) defer client.Close() @@ -256,7 +257,8 @@ func TestMultiTenancyWithValidTenants(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), 0, false) + require.NoError(t, err) // ----- query-test: querying a valid tenant (tenant-a) ----- expectedResult := []prompb.TimeSeries{ @@ -382,7 +384,8 @@ func TestMultiTenancyWithValidTenants(t *testing.T) { require.NoError(t, err) labelsReader = lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr = querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err = querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), 0, false) + require.NoError(t, err) expectedResult = []prompb.TimeSeries{} @@ -413,7 +416,7 @@ func TestMultiTenancyWithValidTenantsAndNonTenantOps(t *testing.T) { require.NoError(t, err) // Ingestion. - client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false) + client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false, false) require.NoError(t, err) defer client.Close() @@ -461,7 +464,8 @@ func TestMultiTenancyWithValidTenantsAndNonTenantOps(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), 0, false) + require.NoError(t, err) // ----- query-test: querying a non-tenant ----- expectedResult := []prompb.TimeSeries{ @@ -550,7 +554,8 @@ func TestMultiTenancyWithValidTenantsAndNonTenantOps(t *testing.T) { require.NoError(t, err) labelsReader = lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr = querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err = querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), 0, false) + require.NoError(t, err) expectedResult = []prompb.TimeSeries{ { @@ -627,7 +632,7 @@ func TestMultiTenancyWithValidTenantsAsLabels(t *testing.T) { require.NoError(t, err) // Ingestion. - client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false) + client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false, false) require.NoError(t, err) defer client.Close() @@ -659,7 +664,8 @@ func TestMultiTenancyWithValidTenantsAsLabels(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), 0, false) + require.NoError(t, err) // ----- query-test: querying a single tenant (tenant-b) ----- expectedResult := []prompb.TimeSeries{ @@ -759,7 +765,7 @@ func TestMultiTenancyLabelNamesValues(t *testing.T) { ts, _ := generateSmallMultiTenantTimeseries() withDB(t, *testDatabase, func(db *pgxpool.Pool, tb testing.TB) { getClient := func(auth tenancy.Authorizer) *pgclient.Client { - client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, auth, false) + client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, auth, false, false) require.NoError(t, err) return client } diff --git a/pkg/tests/end_to_end_tests/nan_test.go b/pkg/tests/end_to_end_tests/nan_test.go index 578923bf4e..95b0d2b9a3 100644 --- a/pkg/tests/end_to_end_tests/nan_test.go +++ b/pkg/tests/end_to_end_tests/nan_test.go @@ -14,6 +14,7 @@ import ( "github.com/jackc/pgx/v4/pgxpool" _ "github.com/jackc/pgx/v4/stdlib" "github.com/prometheus/prometheus/model/value" + "github.com/stretchr/testify/require" "github.com/timescale/promscale/pkg/clockcache" "github.com/timescale/promscale/pkg/internal/testhelpers" "github.com/timescale/promscale/pkg/pgmodel/cache" @@ -129,7 +130,8 @@ func TestSQLStaleNaN(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) resp, err := r.RemoteReadQuerier(ctx).Query(c.query) if err != nil { t.Fatalf("unexpected error while ingesting test dataset: %s", err) diff --git a/pkg/tests/end_to_end_tests/null_chars_test.go b/pkg/tests/end_to_end_tests/null_chars_test.go index 073a7f9a7f..acf6b50e2f 100644 --- a/pkg/tests/end_to_end_tests/null_chars_test.go +++ b/pkg/tests/end_to_end_tests/null_chars_test.go @@ -67,7 +67,8 @@ func TestOperationWithNullChars(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) resp, err := r.RemoteReadQuerier(ctx).Query(&prompb.Query{ Matchers: []*prompb.LabelMatcher{ { diff --git a/pkg/tests/end_to_end_tests/promql_endpoint_integration_test.go b/pkg/tests/end_to_end_tests/promql_endpoint_integration_test.go index 363db64b0b..a57dab1031 100644 --- a/pkg/tests/end_to_end_tests/promql_endpoint_integration_test.go +++ b/pkg/tests/end_to_end_tests/promql_endpoint_integration_test.go @@ -311,7 +311,7 @@ func buildRouterWithAPIConfig(pool *pgxpool.Pool, cfg *api.Config, authWrapper m MaxConnections: -1, } - pgClient, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), conf, 1, pool, pool, nil, tenancy.NewNoopAuthorizer(), cfg.ReadOnly) + pgClient, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), conf, 1, pool, pool, nil, tenancy.NewNoopAuthorizer(), cfg.ReadOnly, false) if err != nil { return nil, pgClient, fmt.Errorf("cannot run test, cannot instantiate pgClient: %w", err) } diff --git a/pkg/tests/end_to_end_tests/query_integration_test.go b/pkg/tests/end_to_end_tests/query_integration_test.go index 0f75d24b40..9fda0862b0 100644 --- a/pkg/tests/end_to_end_tests/query_integration_test.go +++ b/pkg/tests/end_to_end_tests/query_integration_test.go @@ -114,8 +114,9 @@ func TestDroppedViewQuery(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) - _, err := r.RemoteReadQuerier(ctx).Query(&prompb.Query{ + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) + _, err = r.RemoteReadQuerier(ctx).Query(&prompb.Query{ Matchers: []*prompb.LabelMatcher{ { Type: prompb.LabelMatcher_EQ, @@ -692,7 +693,8 @@ func TestSQLQuery(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) for _, c := range testCases { tester.Run(c.name, func(t *testing.T) { resp, err := r.RemoteReadQuerier(context.Background()).Query(c.query) @@ -1099,7 +1101,8 @@ func TestPromQL(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) for _, c := range testCases { tester.Run(c.name, func(t *testing.T) { connResp, connErr := r.RemoteReadQuerier(context.Background()).Query(c.query) @@ -1300,7 +1303,8 @@ func TestPushdownDelta(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) queryable := query.NewQueryable(r, labelsReader) queryEngine, err := query.NewEngine(log.GetLogger(), time.Minute, time.Minute*5, time.Minute, 50000000, nil) if err != nil { @@ -1375,7 +1379,8 @@ func TestPushdownVecSel(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) queryable := query.NewQueryable(r, labelsReader) queryEngine, err := query.NewEngine(log.GetLogger(), time.Minute, time.Minute*5, time.Minute, 50000000, nil) if err != nil { diff --git a/pkg/tests/end_to_end_tests/rollup_query_helper_test.go b/pkg/tests/end_to_end_tests/rollup_query_helper_test.go new file mode 100644 index 0000000000..9c183ac02c --- /dev/null +++ b/pkg/tests/end_to_end_tests/rollup_query_helper_test.go @@ -0,0 +1,102 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package end_to_end_tests + +import ( + "context" + "testing" + "time" + + "github.com/jackc/pgx/v4/pgxpool" + "github.com/stretchr/testify/require" + "github.com/timescale/promscale/pkg/pgxconn" + "github.com/timescale/promscale/pkg/rollup" +) + +func TestRollupQueryHelper(t *testing.T) { + withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) { + _, err := db.Exec(context.Background(), "SELECT prom_api.set_automatic_downsample($1)", true) + require.NoError(t, err) + _, err = db.Exec(context.Background(), "CALL _prom_catalog.create_rollup('short', interval '5 minutes', interval '30 days')") + require.NoError(t, err) + _, err = db.Exec(context.Background(), "CALL _prom_catalog.create_rollup('medium', interval '15 minutes', interval '30 days')") + require.NoError(t, err) + _, err = db.Exec(context.Background(), "CALL _prom_catalog.create_rollup('long', interval '1 hour', interval '30 days')") + require.NoError(t, err) + _, err = db.Exec(context.Background(), "CALL _prom_catalog.create_rollup('very_long', interval '1 week', interval '30 days')") + require.NoError(t, err) + + var numResolutions int + err = db.QueryRow(context.Background(), "SELECT count(*) FROM _prom_catalog.rollup").Scan(&numResolutions) + require.NoError(t, err) + require.Equal(t, 4, numResolutions) + + helper, err := rollup.NewDecider(context.Background(), pgxconn.NewPgxConn(db), rollup.DefaultScrapeInterval) + require.NoError(t, err) + require.NotNil(t, helper) + + const originalSchema = "prom_data" + + tcs := []struct { + name string + min time.Duration + max time.Duration + expectedSchemaName string + }{ + { + name: "1 sec", + min: 0, + max: time.Second, + expectedSchemaName: originalSchema, + }, { + name: "5 min", + min: 0, + max: 5 * time.Minute, + expectedSchemaName: originalSchema, + }, { + name: "30 mins", + min: 0, + max: 30 * time.Minute, + expectedSchemaName: originalSchema, + }, { + name: "1 hour", + min: 0, + max: time.Hour, + expectedSchemaName: originalSchema, + }, { + name: "1 day", + min: 0, + max: 24 * time.Hour, + expectedSchemaName: originalSchema, + }, + { + name: "7 days", + min: 0, + max: 7 * 24 * time.Hour, + expectedSchemaName: "ps_short", + }, + { + name: "30 days", + min: 0, + max: 30 * 24 * time.Hour, + expectedSchemaName: "ps_medium", + }, { + name: "1 year", + min: 0, + max: 12 * 30 * 24 * time.Hour, + expectedSchemaName: "ps_very_long", + }, { + name: "100 years", + min: 0, + max: 100 * 12 * 30 * 24 * time.Hour, + expectedSchemaName: "ps_very_long", + }, + } + for _, tc := range tcs { + recommendedSchema := helper.Decide(int64(tc.min.Seconds()), int64(tc.max.Seconds())) + require.Equal(t, tc.expectedSchemaName, recommendedSchema, tc.name) + } + }) +} diff --git a/pkg/tests/end_to_end_tests/rules_test.go b/pkg/tests/end_to_end_tests/rules_test.go index 56b2be39df..5b558bfbe0 100644 --- a/pkg/tests/end_to_end_tests/rules_test.go +++ b/pkg/tests/end_to_end_tests/rules_test.go @@ -35,7 +35,7 @@ func TestRecordingRulesEval(t *testing.T) { MaxConnections: -1, } - pgClient, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), conf, 1, db, db, nil, tenancy.NewNoopAuthorizer(), false) + pgClient, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), conf, 1, db, db, nil, tenancy.NewNoopAuthorizer(), false, false) require.NoError(t, err) defer pgClient.Close() err = pgClient.InitPromQLEngine(&query.Config{