Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Implement rollup resolution decider for incoming queries.
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen-Singh <[email protected]>
  • Loading branch information
Harkishen-Singh committed Dec 9, 2022
1 parent ec7c1d5 commit b5d699b
Show file tree
Hide file tree
Showing 22 changed files with 511 additions and 50 deletions.
6 changes: 3 additions & 3 deletions pkg/dataset/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Metrics struct {
HALeaseRefresh day.Duration `yaml:"ha_lease_refresh"`
HALeaseTimeout day.Duration `yaml:"ha_lease_timeout"`
RetentionPeriod day.Duration `yaml:"default_retention_period"`
Rollups *rollup.Config `yaml:"rollups,omitempty"`
Rollup *rollup.Config `yaml:"rollup,omitempty"`
}

// Traces contains dataset configuration options for traces data.
Expand All @@ -68,8 +68,8 @@ func NewConfig(contents string) (cfg Config, err error) {
func (c *Config) Apply(ctx context.Context, conn *pgx.Conn) error {
c.applyDefaults()

if c.Metrics.Rollups != nil {
if err := c.Metrics.Rollups.Apply(ctx, conn); err != nil {
if c.Metrics.Rollup != nil {
if err := c.Metrics.Rollup.Apply(ctx, conn); err != nil {
return fmt.Errorf("error applying configuration for downsampling: %w", err)
}
}
Expand Down
15 changes: 9 additions & 6 deletions pkg/pgclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type Client struct {
}

// NewClient creates a new PostgreSQL client
func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, schemaLocker LockFunc, readOnly bool) (*Client, error) {
func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, schemaLocker LockFunc, readOnly, useRollups bool) (*Client, error) {
var (
err error
dbMaxConns int
Expand Down Expand Up @@ -137,7 +137,7 @@ func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, sche
if err != nil {
return nil, fmt.Errorf("err creating reader connection pool: %w", err)
}
client, err := NewClientWithPool(r, cfg, numCopiers, writerPool, readerPool, maintPool, mt, readOnly)
client, err := NewClientWithPool(r, cfg, numCopiers, writerPool, readerPool, maintPool, mt, readOnly, useRollups)
if err != nil {
return client, err
}
Expand Down Expand Up @@ -197,7 +197,7 @@ func getRedactedConnStr(s string) string {
}

// NewClientWithPool creates a new PostgreSQL client with an existing connection pool.
func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, writerPool, readerPool, maintPool *pgxpool.Pool, mt tenancy.Authorizer, readOnly bool) (*Client, error) {
func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, writerPool, readerPool, maintPool *pgxpool.Pool, mt tenancy.Authorizer, readOnly, useRollups bool) (*Client, error) {
sigClose := make(chan struct{})
metricsCache := cache.NewMetricCache(cfg.CacheConfig)
labelsCache := cache.NewLabelsCache(cfg.CacheConfig)
Expand All @@ -223,7 +223,11 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri
exemplarKeyPosCache := cache.NewExemplarLabelsPosCache(cfg.CacheConfig)

labelsReader := lreader.NewLabelsReader(readerConn, labelsCache, mt.ReadAuthorizer())
dbQuerier := querier.NewQuerier(readerConn, metricsCache, labelsReader, exemplarKeyPosCache, mt.ReadAuthorizer())
dbQuerier, err := querier.NewQuerier(readerConn, metricsCache, labelsReader, exemplarKeyPosCache, mt.ReadAuthorizer(), cfg.MetricsScrapeInterval, useRollups)
if err != nil {
return nil, fmt.Errorf("error starting querier: %w", err)
}

queryable := query.NewQueryable(dbQuerier, labelsReader)

dbIngestor := ingestor.DBInserter(ingestor.ReadOnlyIngestor{})
Expand All @@ -232,8 +236,7 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri
writerConn = pgxconn.NewPgxConn(writerPool)
dbIngestor, err = ingestor.NewPgxIngestor(writerConn, metricsCache, seriesCache, exemplarKeyPosCache, &c)
if err != nil {
log.Error("msg", "err starting the ingestor", "err", err)
return nil, err
return nil, fmt.Errorf("error starting ingestor: %w", err)
}
}
if maintPool != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/pgclient/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgmodel/cache"
"github.com/timescale/promscale/pkg/pgmodel/ingestor/trace"
"github.com/timescale/promscale/pkg/rollup"
"github.com/timescale/promscale/pkg/version"
)

Expand All @@ -34,6 +35,7 @@ type Config struct {
DbConnectionTimeout time.Duration
IgnoreCompressedChunks bool
MetricsAsyncAcks bool
MetricsScrapeInterval time.Duration
TracesAsyncAcks bool
WriteConnections int
WriterPoolSize int
Expand Down Expand Up @@ -107,6 +109,7 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
fs.IntVar(&cfg.TracesMaxBatchSize, "tracing.max-batch-size", trace.DefaultBatchSize, "Maximum size of trace batch that is written to DB")
fs.DurationVar(&cfg.TracesBatchTimeout, "tracing.batch-timeout", trace.DefaultBatchTimeout, "Timeout after new trace batch is created")
fs.IntVar(&cfg.TracesBatchWorkers, "tracing.batch-workers", trace.DefaultBatchWorkers, "Number of workers responsible for creating trace batches. Defaults to number of CPUs.")
fs.DurationVar(&cfg.MetricsScrapeInterval, "metrics.rollup.scrape-interval", rollup.DefaultScrapeInterval.Abs(), "Default scrape interval in Prometheus. This is used to estimate samples while choosing rollup for querying.")
return cfg
}

Expand Down
21 changes: 17 additions & 4 deletions pkg/pgmodel/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,21 @@ package querier

import (
"context"
"fmt"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/timescale/promscale/pkg/pgmodel/cache"
"github.com/timescale/promscale/pkg/pgmodel/lreader"
"github.com/timescale/promscale/pkg/pgxconn"
"github.com/timescale/promscale/pkg/rollup"
"github.com/timescale/promscale/pkg/tenancy"
)

type pgxQuerier struct {
tools *queryTools
tools *queryTools
schema *rollup.Decider
}

var _ Querier = (*pgxQuerier)(nil)
Expand All @@ -29,7 +33,9 @@ func NewQuerier(
labelsReader lreader.LabelsReader,
exemplarCache cache.PositionCache,
rAuth tenancy.ReadAuthorizer,
) Querier {
scrapeInterval time.Duration,
useRollups bool,
) (Querier, error) {
querier := &pgxQuerier{
tools: &queryTools{
conn: conn,
Expand All @@ -39,15 +45,22 @@ func NewQuerier(
rAuth: rAuth,
},
}
return querier
if useRollups {
decider, err := rollup.NewDecider(context.Background(), conn, scrapeInterval)
if err != nil {
return nil, fmt.Errorf("error creating rollups schema decider: %w", err)
}
querier.schema = decider
}
return querier, nil
}

func (q *pgxQuerier) RemoteReadQuerier(ctx context.Context) RemoteReadQuerier {
return newQueryRemoteRead(ctx, q)
}

func (q *pgxQuerier) SamplesQuerier(ctx context.Context) SamplesQuerier {
return newQuerySamples(ctx, q)
return newQuerySamples(ctx, q, q.schema)
}

func (q *pgxQuerier) ExemplarsQuerier(ctx context.Context) ExemplarQuerier {
Expand Down
7 changes: 6 additions & 1 deletion pkg/pgmodel/querier/querier_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,12 @@ func TestPGXQuerierQuery(t *testing.T) {
if err != nil {
t.Fatalf("error setting up mock cache: %s", err.Error())
}
querier := pgxQuerier{&queryTools{conn: mock, metricTableNames: mockMetrics, labelsReader: lreader.NewLabelsReader(mock, clockcache.WithMax(0), tenancy.NewNoopAuthorizer().ReadAuthorizer())}}
querier := pgxQuerier{
&queryTools{
conn: mock, metricTableNames: mockMetrics, labelsReader: lreader.NewLabelsReader(mock, clockcache.WithMax(0), tenancy.NewNoopAuthorizer().ReadAuthorizer()),
},
nil,
}

result, err := querier.RemoteReadQuerier(context.Background()).Query(c.query)

Expand Down
2 changes: 1 addition & 1 deletion pkg/pgmodel/querier/query_remote_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (q *queryRemoteRead) Query(query *prompb.Query) ([]*prompb.TimeSeries, erro
return nil, err
}

qrySamples := newQuerySamples(q.ctx, q.pgxQuerier)
qrySamples := newQuerySamples(q.ctx, q.pgxQuerier, nil)
sampleRows, _, err := qrySamples.fetchSamplesRows(query.StartTimestampMs, query.EndTimestampMs, nil, nil, nil, matchers)
if err != nil {
return nil, err
Expand Down
27 changes: 24 additions & 3 deletions pkg/pgmodel/querier/query_sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,20 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgmodel/common/errors"
"github.com/timescale/promscale/pkg/pgmodel/common/schema"
"github.com/timescale/promscale/pkg/rollup"
)

type querySamples struct {
*pgxQuerier
ctx context.Context
ctx context.Context
schema *rollup.Decider
}

func newQuerySamples(ctx context.Context, qr *pgxQuerier) *querySamples {
return &querySamples{qr, ctx}
func newQuerySamples(ctx context.Context, qr *pgxQuerier, schema *rollup.Decider) *querySamples {
return &querySamples{qr, ctx, schema}
}

// Select implements the SamplesQuerier interface. It is the entry point for our
Expand All @@ -39,6 +42,24 @@ func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectH
return nil, nil, fmt.Errorf("get evaluation metadata: %w", err)
}

var useRollups bool
if q.schema != nil {
// Querying via rollups is available.
supported := q.schema.SupportsRollup(metadata.metric) // Ensure that rollups for the given metric is supported.
if !supported {
// Rollups for the given metric wasn't supported. Let's refresh and check again.
if err := q.schema.Refresh(); err != nil {
log.Error("msg", "error refreshing schema decider", "error", err.Error())
}
supported = q.schema.SupportsRollup(metadata.metric) // If supported is still false, then rollups really don't exist for 'metadata.metric'.
}
if supported {
schemaName := q.schema.Decide(mint, maxt)
useRollups = schemaName != rollup.DefaultSchema
}
}
_ = useRollups // To avoid unused error. This will be used in the following PRs for querying rollups.

filter := metadata.timeFilter
if metadata.isSingleMetric {
// Single vector selector case.
Expand Down
1 change: 1 addition & 0 deletions pkg/rollup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
)

const (
DefaultScrapeInterval = time.Second * 30
setDefaultDownsampleStateSQL = "SELECT prom_api.set_automatic_downsample($1)"

// short and long represent system resolutions.
Expand Down
Loading

0 comments on commit b5d699b

Please sign in to comment.