From 30510f3ecf5c0c899626bd9d299749ee05990792 Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Fri, 2 Dec 2022 17:31:07 +0530 Subject: [PATCH] Implement rollup resolution decider for incoming queries. Signed-off-by: Harkishen-Singh --- pkg/pgclient/client.go | 15 +- pkg/pgmodel/querier/querier.go | 17 +- pkg/pgmodel/querier/querier_sql_test.go | 7 +- pkg/pgmodel/querier/query_remote_read.go | 2 +- pkg/pgmodel/querier/query_sample.go | 6 +- pkg/rollup/resolution.go | 173 ++++++++++++++++++ pkg/rollup/resolution_test.go | 112 ++++++++++++ pkg/runner/client.go | 16 +- pkg/tests/constants.go | 3 +- .../end_to_end_tests/continuous_agg_test.go | 3 +- pkg/tests/end_to_end_tests/exemplar_test.go | 5 +- .../end_to_end_tests/multi_tenancy_test.go | 18 +- pkg/tests/end_to_end_tests/nan_test.go | 4 +- pkg/tests/end_to_end_tests/null_chars_test.go | 3 +- .../query_integration_test.go | 17 +- .../rollup_query_helper_test.go | 102 +++++++++++ 16 files changed, 467 insertions(+), 36 deletions(-) create mode 100644 pkg/rollup/resolution.go create mode 100644 pkg/rollup/resolution_test.go create mode 100644 pkg/tests/end_to_end_tests/rollup_query_helper_test.go diff --git a/pkg/pgclient/client.go b/pkg/pgclient/client.go index e6747082c0..8bcdf2738b 100644 --- a/pkg/pgclient/client.go +++ b/pkg/pgclient/client.go @@ -61,7 +61,7 @@ type Client struct { } // NewClient creates a new PostgreSQL client -func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, schemaLocker LockFunc, readOnly bool) (*Client, error) { +func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, schemaLocker LockFunc, readOnly, useRollups bool) (*Client, error) { var ( err error dbMaxConns int @@ -137,7 +137,7 @@ func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, sche if err != nil { return nil, fmt.Errorf("err creating reader connection pool: %w", err) } - client, err := NewClientWithPool(r, cfg, numCopiers, writerPool, readerPool, maintPool, mt, readOnly) + client, err := NewClientWithPool(r, cfg, numCopiers, writerPool, readerPool, maintPool, mt, readOnly, useRollups) if err != nil { return client, err } @@ -197,7 +197,7 @@ func getRedactedConnStr(s string) string { } // NewClientWithPool creates a new PostgreSQL client with an existing connection pool. -func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, writerPool, readerPool, maintPool *pgxpool.Pool, mt tenancy.Authorizer, readOnly bool) (*Client, error) { +func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, writerPool, readerPool, maintPool *pgxpool.Pool, mt tenancy.Authorizer, readOnly, useRollups bool) (*Client, error) { sigClose := make(chan struct{}) metricsCache := cache.NewMetricCache(cfg.CacheConfig) labelsCache := cache.NewLabelsCache(cfg.CacheConfig) @@ -223,7 +223,12 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri exemplarKeyPosCache := cache.NewExemplarLabelsPosCache(cfg.CacheConfig) labelsReader := lreader.NewLabelsReader(readerConn, labelsCache, mt.ReadAuthorizer()) - dbQuerier := querier.NewQuerier(readerConn, metricsCache, labelsReader, exemplarKeyPosCache, mt.ReadAuthorizer()) + dbQuerier, err := querier.NewQuerier(readerConn, metricsCache, labelsReader, exemplarKeyPosCache, mt.ReadAuthorizer(), useRollups) + if err != nil { + log.Error("msg", "err starting querier", "error", err.Error()) + return nil, err + } + queryable := query.NewQueryable(dbQuerier, labelsReader) dbIngestor := ingestor.DBInserter(ingestor.ReadOnlyIngestor{}) @@ -232,7 +237,7 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri writerConn = pgxconn.NewPgxConn(writerPool) dbIngestor, err = ingestor.NewPgxIngestor(writerConn, metricsCache, seriesCache, exemplarKeyPosCache, &c) if err != nil { - log.Error("msg", "err starting the ingestor", "err", err) + log.Error("msg", "err starting ingestor", "error", err.Error()) return nil, err } } diff --git a/pkg/pgmodel/querier/querier.go b/pkg/pgmodel/querier/querier.go index d6951d08bf..bd1b174873 100644 --- a/pkg/pgmodel/querier/querier.go +++ b/pkg/pgmodel/querier/querier.go @@ -6,17 +6,20 @@ package querier import ( "context" + "fmt" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/timescale/promscale/pkg/pgmodel/cache" "github.com/timescale/promscale/pkg/pgmodel/lreader" "github.com/timescale/promscale/pkg/pgxconn" + "github.com/timescale/promscale/pkg/rollup" "github.com/timescale/promscale/pkg/tenancy" ) type pgxQuerier struct { tools *queryTools + rollups *rollup.QueryHelper } var _ Querier = (*pgxQuerier)(nil) @@ -29,7 +32,8 @@ func NewQuerier( labelsReader lreader.LabelsReader, exemplarCache cache.PositionCache, rAuth tenancy.ReadAuthorizer, -) Querier { + useRollups bool, +) (Querier, error) { querier := &pgxQuerier{ tools: &queryTools{ conn: conn, @@ -39,7 +43,14 @@ func NewQuerier( rAuth: rAuth, }, } - return querier + if useRollups { + qh, err := rollup.NewQueryHelper(context.Background(), conn) + if err != nil { + return nil, fmt.Errorf("creating rollups query helper: %w", err) + } + querier.rollups = qh + } + return querier, nil } func (q *pgxQuerier) RemoteReadQuerier(ctx context.Context) RemoteReadQuerier { @@ -47,7 +58,7 @@ func (q *pgxQuerier) RemoteReadQuerier(ctx context.Context) RemoteReadQuerier { } func (q *pgxQuerier) SamplesQuerier(ctx context.Context) SamplesQuerier { - return newQuerySamples(ctx, q) + return newQuerySamples(ctx, q, q.rollups) } func (q *pgxQuerier) ExemplarsQuerier(ctx context.Context) ExemplarQuerier { diff --git a/pkg/pgmodel/querier/querier_sql_test.go b/pkg/pgmodel/querier/querier_sql_test.go index ce43cddc1f..a8ee0c582f 100644 --- a/pkg/pgmodel/querier/querier_sql_test.go +++ b/pkg/pgmodel/querier/querier_sql_test.go @@ -722,7 +722,12 @@ func TestPGXQuerierQuery(t *testing.T) { if err != nil { t.Fatalf("error setting up mock cache: %s", err.Error()) } - querier := pgxQuerier{&queryTools{conn: mock, metricTableNames: mockMetrics, labelsReader: lreader.NewLabelsReader(mock, clockcache.WithMax(0), tenancy.NewNoopAuthorizer().ReadAuthorizer())}} + querier := pgxQuerier{ + &queryTools{ + conn: mock, metricTableNames: mockMetrics, labelsReader: lreader.NewLabelsReader(mock, clockcache.WithMax(0), tenancy.NewNoopAuthorizer().ReadAuthorizer()), + }, + nil, + } result, err := querier.RemoteReadQuerier(context.Background()).Query(c.query) diff --git a/pkg/pgmodel/querier/query_remote_read.go b/pkg/pgmodel/querier/query_remote_read.go index 4644dbecc5..8ac59fffe6 100644 --- a/pkg/pgmodel/querier/query_remote_read.go +++ b/pkg/pgmodel/querier/query_remote_read.go @@ -28,7 +28,7 @@ func (q *queryRemoteRead) Query(query *prompb.Query) ([]*prompb.TimeSeries, erro return nil, err } - qrySamples := newQuerySamples(q.ctx, q.pgxQuerier) + qrySamples := newQuerySamples(q.ctx, q.pgxQuerier, nil) sampleRows, _, err := qrySamples.fetchSamplesRows(query.StartTimestampMs, query.EndTimestampMs, nil, nil, nil, matchers) if err != nil { return nil, err diff --git a/pkg/pgmodel/querier/query_sample.go b/pkg/pgmodel/querier/query_sample.go index afd4ce0bf0..1384f7977b 100644 --- a/pkg/pgmodel/querier/query_sample.go +++ b/pkg/pgmodel/querier/query_sample.go @@ -11,15 +11,17 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/timescale/promscale/pkg/pgmodel/common/errors" "github.com/timescale/promscale/pkg/pgmodel/common/schema" + "github.com/timescale/promscale/pkg/rollup" ) type querySamples struct { *pgxQuerier ctx context.Context + rollups *rollup.QueryHelper } -func newQuerySamples(ctx context.Context, qr *pgxQuerier) *querySamples { - return &querySamples{qr, ctx} +func newQuerySamples(ctx context.Context, qr *pgxQuerier, rollups *rollup.QueryHelper) *querySamples { + return &querySamples{qr, ctx, rollups} } // Select implements the SamplesQuerier interface. It is the entry point for our diff --git a/pkg/rollup/resolution.go b/pkg/rollup/resolution.go new file mode 100644 index 0000000000..15d3b19824 --- /dev/null +++ b/pkg/rollup/resolution.go @@ -0,0 +1,173 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package rollup + +import ( + "context" + "fmt" + "sort" + "sync" + "time" + + "github.com/timescale/promscale/pkg/log" + "github.com/timescale/promscale/pkg/pgxconn" +) + +const ( + originalSchema = "prom_data" + upperLimit = 5000 // Maximum samples allowed + assumedScrapeInterval = time.Second * 30 + refreshRollupResolution = time.Minute * 30 +) + +type rollupInfo struct { + schemaName string + resolution time.Duration +} + +type QueryHelper struct { + conn pgxconn.PgxConn + refreshMtx sync.RWMutex + metricType map[string]string // metric_name: metric_type + resolutionsASC []rollupInfo // {schemaName, resolution} in ascending order of resolution. + downsamplingEnabled bool +} + +func NewQueryHelper(ctx context.Context, conn pgxconn.PgxConn) (*QueryHelper, error) { + helper := &QueryHelper{conn: conn} + if err := helper.runRefreshRoutine(ctx, refreshRollupResolution); err != nil { + return nil, fmt.Errorf("refresh: %w", err) + } + return helper, nil +} + +// DecideSchema returns the schema name of the rollups that should be used for querying. +// The returned schema represents a downsampled resolution that should be an optimal +// resolution for querying. +// +// If no rollups exists or if downsampling is disabled, "prom_data" is returned. +func (h *QueryHelper) DecideSchema(min, max int64) string { + h.refreshMtx.RLock() + defer h.refreshMtx.RUnlock() + + if !h.downsamplingEnabled || len(h.resolutionsASC) == 0 { + return originalSchema + } + estimateSamples := func(resolution time.Duration) int64 { + return int64(float64(max-min) / resolution.Seconds()) + } + + numRawSamples := estimateSamples(assumedScrapeInterval) + if numRawSamples < upperLimit { + return originalSchema + } + + for _, info := range h.resolutionsASC { + samples := estimateSamples(info.resolution) + if samples < upperLimit { + // The first highest resolution that is below upper limit is our answer, + // since it provides the highest granularity at the expected samples. + return info.schemaName + } + } + // All rollups are above upper limit. Hence, send the schema of the lowest resolution + // as this is the best we can do. + lowestRollup := h.resolutionsASC[len(h.resolutionsASC)-1] + return lowestRollup.schemaName +} + +func (h *QueryHelper) ContainsMetricType(metricName string) bool { + _, present := h.metricType[metricName] + return present +} + +func (h *QueryHelper) Refresh() error { + h.refreshMtx.Lock() + defer h.refreshMtx.Unlock() + + if err := h.refreshDownsamplingState(); err != nil { + return fmt.Errorf("downsampling state: %w", err) + } + if err := h.refreshMetricTypes(); err != nil { + return fmt.Errorf("metric-type: %w", err) + } + if err := h.refreshRollupResolutions(); err != nil { + return fmt.Errorf("rollup resolutions: %w", err) + } + return nil +} + +func (h *QueryHelper) runRefreshRoutine(ctx context.Context, refreshInterval time.Duration) error { + if err := h.Refresh(); err != nil { + return fmt.Errorf("refreshing rollup resolution: %w", err) + } + go func() { + t := time.NewTicker(refreshInterval) + defer t.Stop() + for range t.C { + if err := h.Refresh(); err != nil { + log.Error("msg", "error refreshing rollup resolution", "error", err.Error()) + } + } + }() + return nil +} + +func (h *QueryHelper) refreshDownsamplingState() error { + var state bool + if err := h.conn.QueryRow(context.Background(), "SELECT prom_api.get_automatic_downsample()::BOOLEAN").Scan(&state); err != nil { + return fmt.Errorf("fetching automatic downsampling state: %w", err) + } + h.downsamplingEnabled = state + return nil +} + +func (h *QueryHelper) refreshMetricTypes() error { + var metricName, metricType []string + err := h.conn.QueryRow(context.Background(), + "select array_agg(metric_family), array_agg(type) from _prom_catalog.metadata").Scan(&metricName, &metricType) + if err != nil { + return fmt.Errorf("fetching metric metadata: %w", err) + } + h.metricType = make(map[string]string) // metric_name: metric_type + for i := range metricName { + h.metricType[metricName[i]] = metricType[i] + } + return nil +} + +func (h *QueryHelper) refreshRollupResolutions() error { + rows, err := h.conn.Query(context.Background(), "SELECT schema_name, resolution FROM _prom_catalog.rollup") + if err != nil { + return fmt.Errorf("fetching rollup resolutions: %w", err) + } + h.resolutionsASC = []rollupInfo{} + for rows.Next() { + var ( + schemaName string + resolution time.Duration + ) + if err = rows.Scan(&schemaName, &resolution); err != nil { + return fmt.Errorf("error scanning rows: %w", err) + } + h.resolutionsASC = append(h.resolutionsASC, rollupInfo{schemaName: schemaName, resolution: resolution}) + } + sort.Sort(sortRollupInfo(h.resolutionsASC)) + return nil +} + +type sortRollupInfo []rollupInfo + +func (s sortRollupInfo) Len() int { + return len(s) +} + +func (s sortRollupInfo) Less(i, j int) bool { + return s[i].resolution.Seconds() < s[j].resolution.Seconds() +} + +func (s sortRollupInfo) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} diff --git a/pkg/rollup/resolution_test.go b/pkg/rollup/resolution_test.go new file mode 100644 index 0000000000..fc39cc5871 --- /dev/null +++ b/pkg/rollup/resolution_test.go @@ -0,0 +1,112 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package rollup + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestDecideRollup(t *testing.T) { + r := &QueryHelper{ + downsamplingEnabled: true, + resolutionsASC: []rollupInfo{ + {"5_minute", 5 * time.Minute}, + {"15_minute", 15 * time.Minute}, + {"1_hour", time.Hour}, + {"1_week", 7 * 24 * time.Hour}, + }, + } + tcs := []struct { + name string + min time.Duration + max time.Duration + expectedSchemaName string + }{ + { + name: "1 sec", + min: 0, + max: time.Second, + expectedSchemaName: originalSchema, + }, { + name: "5 min", + min: 0, + max: 5 * time.Minute, + expectedSchemaName: originalSchema, + }, { + name: "30 mins", + min: 0, + max: 30 * time.Minute, + expectedSchemaName: originalSchema, + }, { + name: "1 hour", + min: 0, + max: time.Hour, + expectedSchemaName: originalSchema, + }, { + // DRY RUN + // ------- + // + // Assumed default scrape interval being 30 secs + // raw -> 2,880 <-- Falls in the acceptable range. + name: "1 day", + min: 0, + max: 24 * time.Hour, + expectedSchemaName: originalSchema, + }, + { + // DRY RUN on 500 - 5000 logic + // -------- + // + // Assumed default scrape interval being 30 secs + // raw -> 20,160 + // + // And, when using following rollup resolutions, num samples: + // 5 mins -> 2,016 <-- Falls in the acceptable range. + // 15 mins -> 672 + // 1 hour -> 168 + // 1 week -> 1 + name: "7 days", + min: 0, + max: 7 * 24 * time.Hour, + expectedSchemaName: "5_minute", + }, + { + name: "30 days", + min: 0, + max: 30 * 24 * time.Hour, + expectedSchemaName: "15_minute", + }, { + // DRY RUN on 500 - 5000 logic + // -------- + // + // Assumed default scrape interval being 30 secs + // raw -> 20,160 + // + // And, when using following rollup resolutions, num samples: + // 5 mins -> 1,051,200 <-- Falls in the acceptable range. + // 15 mins -> 35,040 + // 1 hour -> 8,760 + // 1 week -> 52 + name: "1 year", + min: 0, + max: 12 * 30 * 24 * time.Hour, + expectedSchemaName: "1_week", + }, { + name: "100 years", + min: 0, + max: 100 * 12 * 30 * 24 * time.Hour, + expectedSchemaName: "1_week", + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + schemaName := r.DecideSchema(int64(tc.min.Seconds()), int64(tc.max.Seconds())) + require.Equal(t, tc.expectedSchemaName, schemaName, tc.name) + }) + } +} diff --git a/pkg/runner/client.go b/pkg/runner/client.go index 2c3a141f35..b448070e6c 100644 --- a/pkg/runner/client.go +++ b/pkg/runner/client.go @@ -165,16 +165,18 @@ func CreateClient(r prometheus.Registerer, cfg *Config) (*pgclient.Client, error cfg.APICfg.MultiTenancy = multiTenancy } + var useRollups bool if cfg.DatasetConfig != "" { - err = ApplyDatasetConfig(context.Background(), conn, cfg.DatasetConfig) + cfg, err := ApplyDatasetConfig(context.Background(), conn, cfg.DatasetConfig) if err != nil { return nil, fmt.Errorf("error applying dataset configuration: %w", err) } + useRollups = *cfg.Metrics.Rollups.Enabled } // client has to be initiated after migrate since migrate // can change database GUC settings - client, err := pgclient.NewClient(r, &cfg.PgmodelCfg, multiTenancy, leasingFunction, cfg.APICfg.ReadOnly) + client, err := pgclient.NewClient(r, &cfg.PgmodelCfg, multiTenancy, leasingFunction, cfg.APICfg.ReadOnly, useRollups) if err != nil { return nil, fmt.Errorf("client creation error: %w", err) } @@ -226,13 +228,15 @@ func isBGWLessThanDBs(conn *pgx.Conn) (bool, error) { return false, nil } -func ApplyDatasetConfig(ctx context.Context, conn *pgx.Conn, cfgFilename string) error { +func ApplyDatasetConfig(ctx context.Context, conn *pgx.Conn, cfgFilename string) (*dataset.Config, error) { cfg, err := dataset.NewConfig(cfgFilename) if err != nil { - return err + return nil, err } - - return cfg.Apply(ctx, conn) + if err = cfg.Apply(ctx, conn); err != nil { + return nil, fmt.Errorf("error applying dataset config: %w", err) + } + return &cfg, nil } func compileAnchoredRegexString(s string) (*regexp.Regexp, error) { diff --git a/pkg/tests/constants.go b/pkg/tests/constants.go index e88e8aa959..80b6ee637f 100644 --- a/pkg/tests/constants.go +++ b/pkg/tests/constants.go @@ -17,5 +17,6 @@ func init() { } PromscaleExtensionVersion = strings.TrimSpace(string(content)) - PromscaleExtensionContainer = "ghcr.io/timescale/dev_promscale_extension:" + PromscaleExtensionVersion + "-ts2-pg14" + //PromscaleExtensionContainer = "ghcr.io/timescale/dev_promscale_extension:" + PromscaleExtensionVersion + "-ts2-pg14" + PromscaleExtensionContainer = "ghcr.io/timescale/dev_promscale_extension:benchmark_rollups-ts2.8.1-pg14" } diff --git a/pkg/tests/end_to_end_tests/continuous_agg_test.go b/pkg/tests/end_to_end_tests/continuous_agg_test.go index 56883b6d29..55f77500f8 100644 --- a/pkg/tests/end_to_end_tests/continuous_agg_test.go +++ b/pkg/tests/end_to_end_tests/continuous_agg_test.go @@ -209,7 +209,8 @@ WITH (timescaledb.continuous) AS lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, false) + require.NoError(t, err) queryable := query.NewQueryable(r, labelsReader) queryEngine, err := query.NewEngine(log.GetLogger(), time.Minute, time.Minute*5, time.Minute, 50000000, nil) if err != nil { diff --git a/pkg/tests/end_to_end_tests/exemplar_test.go b/pkg/tests/end_to_end_tests/exemplar_test.go index 2cedc067d8..8b6d6ba529 100644 --- a/pkg/tests/end_to_end_tests/exemplar_test.go +++ b/pkg/tests/end_to_end_tests/exemplar_test.go @@ -205,11 +205,12 @@ func TestExemplarQueryingAPI(t *testing.T) { // since the return will be 0, as they have already been ingested by TestExemplarIngestion. labelsReader := lreader.NewLabelsReader(pgxconn.NewPgxConn(db), cache.NewLabelsCache(cache.DefaultConfig), tenancy.NewNoopAuthorizer().ReadAuthorizer()) - r := querier.NewQuerier( + r, err := querier.NewQuerier( pgxconn.NewPgxConn(db), cache.NewMetricCache(cache.DefaultConfig), labelsReader, - cache.NewExemplarLabelsPosCache(cache.DefaultConfig), nil) + cache.NewExemplarLabelsPosCache(cache.DefaultConfig), nil, false) + require.NoError(t, err) queryable := query.NewQueryable(r, labelsReader) // Query all exemplars corresponding to metric_2 histogram. diff --git a/pkg/tests/end_to_end_tests/multi_tenancy_test.go b/pkg/tests/end_to_end_tests/multi_tenancy_test.go index 956bcb497e..ed9956e6cf 100644 --- a/pkg/tests/end_to_end_tests/multi_tenancy_test.go +++ b/pkg/tests/end_to_end_tests/multi_tenancy_test.go @@ -56,7 +56,8 @@ func TestMultiTenancyWithoutValidTenants(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), false) + require.NoError(t, err) // ----- query-test: querying a single tenant (tenant-a) ----- expectedResult := []prompb.TimeSeries{ @@ -256,7 +257,8 @@ func TestMultiTenancyWithValidTenants(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), false) + require.NoError(t, err) // ----- query-test: querying a valid tenant (tenant-a) ----- expectedResult := []prompb.TimeSeries{ @@ -382,7 +384,8 @@ func TestMultiTenancyWithValidTenants(t *testing.T) { require.NoError(t, err) labelsReader = lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr = querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err = querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), false) + require.NoError(t, err) expectedResult = []prompb.TimeSeries{} @@ -461,7 +464,8 @@ func TestMultiTenancyWithValidTenantsAndNonTenantOps(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), false) + require.NoError(t, err) // ----- query-test: querying a non-tenant ----- expectedResult := []prompb.TimeSeries{ @@ -550,7 +554,8 @@ func TestMultiTenancyWithValidTenantsAndNonTenantOps(t *testing.T) { require.NoError(t, err) labelsReader = lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr = querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err = querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), false) + require.NoError(t, err) expectedResult = []prompb.TimeSeries{ { @@ -659,7 +664,8 @@ func TestMultiTenancyWithValidTenantsAsLabels(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), false) + require.NoError(t, err) // ----- query-test: querying a single tenant (tenant-b) ----- expectedResult := []prompb.TimeSeries{ diff --git a/pkg/tests/end_to_end_tests/nan_test.go b/pkg/tests/end_to_end_tests/nan_test.go index 578923bf4e..2144a26be7 100644 --- a/pkg/tests/end_to_end_tests/nan_test.go +++ b/pkg/tests/end_to_end_tests/nan_test.go @@ -14,6 +14,7 @@ import ( "github.com/jackc/pgx/v4/pgxpool" _ "github.com/jackc/pgx/v4/stdlib" "github.com/prometheus/prometheus/model/value" + "github.com/stretchr/testify/require" "github.com/timescale/promscale/pkg/clockcache" "github.com/timescale/promscale/pkg/internal/testhelpers" "github.com/timescale/promscale/pkg/pgmodel/cache" @@ -129,7 +130,8 @@ func TestSQLStaleNaN(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, false) + require.NoError(t, err) resp, err := r.RemoteReadQuerier(ctx).Query(c.query) if err != nil { t.Fatalf("unexpected error while ingesting test dataset: %s", err) diff --git a/pkg/tests/end_to_end_tests/null_chars_test.go b/pkg/tests/end_to_end_tests/null_chars_test.go index 073a7f9a7f..0404214e18 100644 --- a/pkg/tests/end_to_end_tests/null_chars_test.go +++ b/pkg/tests/end_to_end_tests/null_chars_test.go @@ -67,7 +67,8 @@ func TestOperationWithNullChars(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, false) + require.NoError(t, err) resp, err := r.RemoteReadQuerier(ctx).Query(&prompb.Query{ Matchers: []*prompb.LabelMatcher{ { diff --git a/pkg/tests/end_to_end_tests/query_integration_test.go b/pkg/tests/end_to_end_tests/query_integration_test.go index 0f75d24b40..19fed2eff8 100644 --- a/pkg/tests/end_to_end_tests/query_integration_test.go +++ b/pkg/tests/end_to_end_tests/query_integration_test.go @@ -114,8 +114,9 @@ func TestDroppedViewQuery(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) - _, err := r.RemoteReadQuerier(ctx).Query(&prompb.Query{ + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, false) + require.NoError(t, err) + _, err = r.RemoteReadQuerier(ctx).Query(&prompb.Query{ Matchers: []*prompb.LabelMatcher{ { Type: prompb.LabelMatcher_EQ, @@ -692,7 +693,8 @@ func TestSQLQuery(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, false) + require.NoError(t, err) for _, c := range testCases { tester.Run(c.name, func(t *testing.T) { resp, err := r.RemoteReadQuerier(context.Background()).Query(c.query) @@ -1099,7 +1101,8 @@ func TestPromQL(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, false) + require.NoError(t, err) for _, c := range testCases { tester.Run(c.name, func(t *testing.T) { connResp, connErr := r.RemoteReadQuerier(context.Background()).Query(c.query) @@ -1300,7 +1303,8 @@ func TestPushdownDelta(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, false) + require.NoError(t, err) queryable := query.NewQueryable(r, labelsReader) queryEngine, err := query.NewEngine(log.GetLogger(), time.Minute, time.Minute*5, time.Minute, 50000000, nil) if err != nil { @@ -1375,7 +1379,8 @@ func TestPushdownVecSel(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, false) + require.NoError(t, err) queryable := query.NewQueryable(r, labelsReader) queryEngine, err := query.NewEngine(log.GetLogger(), time.Minute, time.Minute*5, time.Minute, 50000000, nil) if err != nil { diff --git a/pkg/tests/end_to_end_tests/rollup_query_helper_test.go b/pkg/tests/end_to_end_tests/rollup_query_helper_test.go new file mode 100644 index 0000000000..73928fc803 --- /dev/null +++ b/pkg/tests/end_to_end_tests/rollup_query_helper_test.go @@ -0,0 +1,102 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package end_to_end_tests + +import ( + "context" + "testing" + "time" + + "github.com/jackc/pgx/v4/pgxpool" + "github.com/stretchr/testify/require" + "github.com/timescale/promscale/pkg/pgxconn" + "github.com/timescale/promscale/pkg/rollup" +) + +func TestRollupQueryHelper(t *testing.T) { + withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) { + _, err := db.Exec(context.Background(), "SELECT prom_api.set_automatic_downsample($1)", true) + require.NoError(t, err) + _, err = db.Exec(context.Background(), "CALL _prom_catalog.create_rollup('short', interval '5 minutes', interval '30 days')") + require.NoError(t, err) + _, err = db.Exec(context.Background(), "CALL _prom_catalog.create_rollup('medium', interval '15 minutes', interval '30 days')") + require.NoError(t, err) + _, err = db.Exec(context.Background(), "CALL _prom_catalog.create_rollup('long', interval '1 hour', interval '30 days')") + require.NoError(t, err) + _, err = db.Exec(context.Background(), "CALL _prom_catalog.create_rollup('very_long', interval '1 week', interval '30 days')") + require.NoError(t, err) + + var numResolutions int + err = db.QueryRow(context.Background(), "SELECT count(*) FROM _prom_catalog.rollup").Scan(&numResolutions) + require.NoError(t, err) + require.Equal(t, 4, numResolutions) + + helper, err := rollup.NewQueryHelper(context.Background(), pgxconn.NewPgxConn(db)) + require.NoError(t, err) + require.NotNil(t, helper) + + const originalSchema = "prom_data" + + tcs := []struct { + name string + min time.Duration + max time.Duration + expectedSchemaName string + }{ + { + name: "1 sec", + min: 0, + max: time.Second, + expectedSchemaName: originalSchema, + }, { + name: "5 min", + min: 0, + max: 5 * time.Minute, + expectedSchemaName: originalSchema, + }, { + name: "30 mins", + min: 0, + max: 30 * time.Minute, + expectedSchemaName: originalSchema, + }, { + name: "1 hour", + min: 0, + max: time.Hour, + expectedSchemaName: originalSchema, + }, { + name: "1 day", + min: 0, + max: 24 * time.Hour, + expectedSchemaName: originalSchema, + }, + { + name: "7 days", + min: 0, + max: 7 * 24 * time.Hour, + expectedSchemaName: "ps_short", + }, + { + name: "30 days", + min: 0, + max: 30 * 24 * time.Hour, + expectedSchemaName: "ps_medium", + }, { + name: "1 year", + min: 0, + max: 12 * 30 * 24 * time.Hour, + expectedSchemaName: "ps_very_long", + }, { + name: "100 years", + min: 0, + max: 100 * 12 * 30 * 24 * time.Hour, + expectedSchemaName: "ps_very_long", + }, + } + for _, tc := range tcs { + recommendedSchema := helper.DecideSchema(int64(tc.min.Seconds()), int64(tc.max.Seconds())) + require.Equal(t, tc.expectedSchemaName, recommendedSchema, tc.name) + } + }) +}