Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Adds support for rollup query helper based on query range inputs #1780

Open
wants to merge 7 commits into
base: feature_metric_rollup
Choose a base branch
from
40 changes: 27 additions & 13 deletions pkg/dataset/config.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// This file and its contents are licensed under the Apache License 2.0.
// Please see the included NOTICE for copyright information and
// LICENSE for a copy of the license.

package dataset

import (
Expand All @@ -6,8 +10,11 @@ import (
"time"

"github.com/jackc/pgx/v4"
"github.com/timescale/promscale/pkg/log"
"gopkg.in/yaml.v2"

"github.com/timescale/promscale/pkg/internal/day"
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/rollup"
)

const (
Expand Down Expand Up @@ -38,16 +45,17 @@ type Config struct {

// Metrics contains dataset configuration options for metrics data.
type Metrics struct {
ChunkInterval DayDuration `yaml:"default_chunk_interval"`
Compression *bool `yaml:"compress_data"` // Using pointer to check if the the value was set.
HALeaseRefresh DayDuration `yaml:"ha_lease_refresh"`
HALeaseTimeout DayDuration `yaml:"ha_lease_timeout"`
RetentionPeriod DayDuration `yaml:"default_retention_period"`
ChunkInterval day.Duration `yaml:"default_chunk_interval"`
Compression *bool `yaml:"compress_data"` // Using pointer to check if the value was set.
HALeaseRefresh day.Duration `yaml:"ha_lease_refresh"`
HALeaseTimeout day.Duration `yaml:"ha_lease_timeout"`
RetentionPeriod day.Duration `yaml:"default_retention_period"`
Rollup *rollup.Config `yaml:"rollup,omitempty"`
}

// Traces contains dataset configuration options for traces data.
type Traces struct {
RetentionPeriod DayDuration `yaml:"default_retention_period"`
RetentionPeriod day.Duration `yaml:"default_retention_period"`
}

// NewConfig creates a new dataset config based on the configuration YAML contents.
Expand All @@ -57,9 +65,15 @@ func NewConfig(contents string) (cfg Config, err error) {
}

// Apply applies the configuration to the database via the supplied DB connection.
func (c *Config) Apply(conn *pgx.Conn) error {
func (c *Config) Apply(ctx context.Context, conn *pgx.Conn) error {
c.applyDefaults()

if c.Metrics.Rollup != nil {
if err := c.Metrics.Rollup.Apply(ctx, conn); err != nil {
return fmt.Errorf("error applying configuration for downsampling: %w", err)
}
}

log.Info("msg", fmt.Sprintf("Setting metric dataset default chunk interval to %s", c.Metrics.ChunkInterval))
log.Info("msg", fmt.Sprintf("Setting metric dataset default compression to %t", *c.Metrics.Compression))
log.Info("msg", fmt.Sprintf("Setting metric dataset default high availability lease refresh to %s", c.Metrics.HALeaseRefresh))
Expand Down Expand Up @@ -87,21 +101,21 @@ func (c *Config) Apply(conn *pgx.Conn) error {

func (c *Config) applyDefaults() {
if c.Metrics.ChunkInterval <= 0 {
c.Metrics.ChunkInterval = DayDuration(defaultMetricChunkInterval)
c.Metrics.ChunkInterval = day.Duration(defaultMetricChunkInterval)
}
if c.Metrics.Compression == nil {
c.Metrics.Compression = &defaultMetricCompressionVar
}
if c.Metrics.HALeaseRefresh <= 0 {
c.Metrics.HALeaseRefresh = DayDuration(defaultMetricHALeaseRefresh)
c.Metrics.HALeaseRefresh = day.Duration(defaultMetricHALeaseRefresh)
}
if c.Metrics.HALeaseTimeout <= 0 {
c.Metrics.HALeaseTimeout = DayDuration(defaultMetricHALeaseTimeout)
c.Metrics.HALeaseTimeout = day.Duration(defaultMetricHALeaseTimeout)
}
if c.Metrics.RetentionPeriod <= 0 {
c.Metrics.RetentionPeriod = DayDuration(defaultMetricRetentionPeriod)
c.Metrics.RetentionPeriod = day.Duration(defaultMetricRetentionPeriod)
}
if c.Traces.RetentionPeriod <= 0 {
c.Traces.RetentionPeriod = DayDuration(defaultTraceRetentionPeriod)
c.Traces.RetentionPeriod = day.Duration(defaultTraceRetentionPeriod)
}
}
33 changes: 17 additions & 16 deletions pkg/dataset/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"github.com/timescale/promscale/pkg/internal/day"
)

var testCompressionSetting = true
Expand Down Expand Up @@ -45,7 +46,7 @@ func TestNewConfig(t *testing.T) {
default_retention_period: 3d2h`,
cfg: Config{
Metrics: Metrics{
RetentionPeriod: DayDuration(((3 * 24) + 2) * time.Hour),
RetentionPeriod: day.Duration(((3 * 24) + 2) * time.Hour),
},
},
},
Expand All @@ -61,14 +62,14 @@ traces:
default_retention_period: 15d`,
cfg: Config{
Metrics: Metrics{
ChunkInterval: DayDuration(3 * time.Hour),
ChunkInterval: day.Duration(3 * time.Hour),
Compression: &testCompressionSetting,
HALeaseRefresh: DayDuration(2 * time.Minute),
HALeaseTimeout: DayDuration(5 * time.Second),
RetentionPeriod: DayDuration(30 * 24 * time.Hour),
HALeaseRefresh: day.Duration(2 * time.Minute),
HALeaseTimeout: day.Duration(5 * time.Second),
RetentionPeriod: day.Duration(30 * 24 * time.Hour),
},
Traces: Traces{
RetentionPeriod: DayDuration(15 * 24 * time.Hour),
RetentionPeriod: day.Duration(15 * 24 * time.Hour),
},
},
},
Expand Down Expand Up @@ -97,29 +98,29 @@ func TestApplyDefaults(t *testing.T) {
t,
Config{
Metrics: Metrics{
ChunkInterval: DayDuration(defaultMetricChunkInterval),
ChunkInterval: day.Duration(defaultMetricChunkInterval),
Compression: &defaultMetricCompressionVar,
HALeaseRefresh: DayDuration(defaultMetricHALeaseRefresh),
HALeaseTimeout: DayDuration(defaultMetricHALeaseTimeout),
RetentionPeriod: DayDuration(defaultMetricRetentionPeriod),
HALeaseRefresh: day.Duration(defaultMetricHALeaseRefresh),
HALeaseTimeout: day.Duration(defaultMetricHALeaseTimeout),
RetentionPeriod: day.Duration(defaultMetricRetentionPeriod),
},
Traces: Traces{
RetentionPeriod: DayDuration(defaultTraceRetentionPeriod),
RetentionPeriod: day.Duration(defaultTraceRetentionPeriod),
},
},
c,
)

untouched := Config{
Metrics: Metrics{
ChunkInterval: DayDuration(3 * time.Hour),
ChunkInterval: day.Duration(3 * time.Hour),
Compression: &testCompressionSetting,
HALeaseRefresh: DayDuration(2 * time.Minute),
HALeaseTimeout: DayDuration(5 * time.Second),
RetentionPeriod: DayDuration(30 * 24 * time.Hour),
HALeaseRefresh: day.Duration(2 * time.Minute),
HALeaseTimeout: day.Duration(5 * time.Second),
RetentionPeriod: day.Duration(30 * 24 * time.Hour),
},
Traces: Traces{
RetentionPeriod: DayDuration(15 * 24 * time.Hour),
RetentionPeriod: day.Duration(15 * 24 * time.Hour),
},
}

Expand Down
16 changes: 10 additions & 6 deletions pkg/dataset/duration.go → pkg/internal/day/duration.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
package dataset
// This file and its contents are licensed under the Apache License 2.0.
// Please see the included NOTICE for copyright information and
// LICENSE for a copy of the license.

package day

import (
"fmt"
Expand All @@ -11,13 +15,13 @@ const (
unknownUnitDErrorPrefix = `time: unknown unit "d"`
)

// DayDuration acts like a time.Duration with support for "d" unit
// Duration acts like a time.Duration with support for "d" unit
// which is used for specifying number of days in duration.
type DayDuration time.Duration
type Duration time.Duration

// UnmarshalText unmarshals strings into DayDuration values while
// handling the day unit. It leans heavily into time.ParseDuration.
func (d *DayDuration) UnmarshalText(s []byte) error {
func (d *Duration) UnmarshalText(s []byte) error {
val, err := time.ParseDuration(string(s))
if err != nil {
// Check for specific error indicating we are using days unit.
Expand All @@ -30,7 +34,7 @@ func (d *DayDuration) UnmarshalText(s []byte) error {
return err
}
}
*d = DayDuration(val)
*d = Duration(val)
return nil
}

Expand Down Expand Up @@ -61,6 +65,6 @@ func handleDays(s []byte) (time.Duration, error) {
}

// String returns a string value of DayDuration.
func (d DayDuration) String() string {
func (d Duration) String() string {
return time.Duration(d).String()
}
15 changes: 9 additions & 6 deletions pkg/pgclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type Client struct {
}

// NewClient creates a new PostgreSQL client
func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, schemaLocker LockFunc, readOnly bool) (*Client, error) {
func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, schemaLocker LockFunc, readOnly, useRollups bool) (*Client, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we add useRollups to cfg *Config? It reads a little strange that we have a config struct, but it doesn't have all the config options and we need extra parameters in the signature.

I haven't checked, but maybe we can't because we tied our configs, cli flags and their namespaces are structure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked and we can't because rollup comes from dataset

var (
err error
dbMaxConns int
Expand Down Expand Up @@ -137,7 +137,7 @@ func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, sche
if err != nil {
return nil, fmt.Errorf("err creating reader connection pool: %w", err)
}
client, err := NewClientWithPool(r, cfg, numCopiers, writerPool, readerPool, maintPool, mt, readOnly)
client, err := NewClientWithPool(r, cfg, numCopiers, writerPool, readerPool, maintPool, mt, readOnly, useRollups)
if err != nil {
return client, err
}
Expand Down Expand Up @@ -197,7 +197,7 @@ func getRedactedConnStr(s string) string {
}

// NewClientWithPool creates a new PostgreSQL client with an existing connection pool.
func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, writerPool, readerPool, maintPool *pgxpool.Pool, mt tenancy.Authorizer, readOnly bool) (*Client, error) {
func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, writerPool, readerPool, maintPool *pgxpool.Pool, mt tenancy.Authorizer, readOnly, useRollups bool) (*Client, error) {
sigClose := make(chan struct{})
metricsCache := cache.NewMetricCache(cfg.CacheConfig)
labelsCache := cache.NewLabelsCache(cfg.CacheConfig)
Expand All @@ -223,7 +223,11 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri
exemplarKeyPosCache := cache.NewExemplarLabelsPosCache(cfg.CacheConfig)

labelsReader := lreader.NewLabelsReader(readerConn, labelsCache, mt.ReadAuthorizer())
dbQuerier := querier.NewQuerier(readerConn, metricsCache, labelsReader, exemplarKeyPosCache, mt.ReadAuthorizer())
dbQuerier, err := querier.NewQuerier(readerConn, metricsCache, labelsReader, exemplarKeyPosCache, mt.ReadAuthorizer(), cfg.MetricsScrapeInterval, useRollups)
if err != nil {
return nil, fmt.Errorf("error starting querier: %w", err)
}

queryable := query.NewQueryable(dbQuerier, labelsReader)

dbIngestor := ingestor.DBInserter(ingestor.ReadOnlyIngestor{})
Expand All @@ -232,8 +236,7 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri
writerConn = pgxconn.NewPgxConn(writerPool)
dbIngestor, err = ingestor.NewPgxIngestor(writerConn, metricsCache, seriesCache, exemplarKeyPosCache, &c)
if err != nil {
log.Error("msg", "err starting the ingestor", "err", err)
return nil, err
return nil, fmt.Errorf("error starting ingestor: %w", err)
}
}
if maintPool != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/pgclient/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgmodel/cache"
"github.com/timescale/promscale/pkg/pgmodel/ingestor/trace"
"github.com/timescale/promscale/pkg/rollup"
"github.com/timescale/promscale/pkg/version"
)

Expand All @@ -34,6 +35,7 @@ type Config struct {
DbConnectionTimeout time.Duration
IgnoreCompressedChunks bool
MetricsAsyncAcks bool
MetricsScrapeInterval time.Duration
TracesAsyncAcks bool
WriteConnections int
WriterPoolSize int
Expand Down Expand Up @@ -107,6 +109,7 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
fs.IntVar(&cfg.TracesMaxBatchSize, "tracing.max-batch-size", trace.DefaultBatchSize, "Maximum size of trace batch that is written to DB")
fs.DurationVar(&cfg.TracesBatchTimeout, "tracing.batch-timeout", trace.DefaultBatchTimeout, "Timeout after new trace batch is created")
fs.IntVar(&cfg.TracesBatchWorkers, "tracing.batch-workers", trace.DefaultBatchWorkers, "Number of workers responsible for creating trace batches. Defaults to number of CPUs.")
fs.DurationVar(&cfg.MetricsScrapeInterval, "metrics.rollup.scrape-interval", rollup.DefaultScrapeInterval, "Default scrape interval in Prometheus. This is used to estimate samples while choosing rollup for querying.")
return cfg
}

Expand Down
23 changes: 16 additions & 7 deletions pkg/pgmodel/metrics/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ type Engine interface {
}

type metricsEngineImpl struct {
conn pgxconn.PgxConn
ctx context.Context
isRunning atomic.Value
metrics []metricQueryWrap
conn pgxconn.PgxConn
ctx context.Context
isRunning atomic.Value
metrics []metricQueryWrap
metricSeries []metricsWithSeries
}

// NewEngine creates an engine that performs database metrics evaluation every evalInterval.
Expand All @@ -33,9 +34,10 @@ type metricsEngineImpl struct {
// will cause evaluation errors.
func NewEngine(ctx context.Context, conn pgxconn.PgxConn) *metricsEngineImpl {
engine := &metricsEngineImpl{
conn: conn,
ctx: ctx,
metrics: metrics,
conn: conn,
ctx: ctx,
metrics: metrics,
metricSeries: metricSeries,
}
engine.isRunning.Store(false)
return engine
Expand Down Expand Up @@ -136,6 +138,13 @@ func (e *metricsEngineImpl) Update() error {
return err
}
handleResults(results, batchMetrics)

for _, ms := range e.metricSeries {
if err = ms.update(e.conn); err != nil {
log.Warn("msg", "error evaluating metrics with series", "err", err.Error())
continue // We shouldn't stop everything if this fails.
}
}
return results.Close()
}

Expand Down
66 changes: 66 additions & 0 deletions pkg/pgmodel/metrics/database/metric_series.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package database

import (
"context"
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/timescale/promscale/pkg/pgxconn"
"github.com/timescale/promscale/pkg/util"
)

var (
caggsRefreshTotal = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "sql_database",
Name: "caggs_refresh_total",
Help: "Total number of caggs policy executed.",
}, []string{"refresh_interval"})
caggsRefreshSuccess = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "sql_database",
Name: "caggs_refresh_success",
Help: "Total number of caggs policy executed successfully.",
}, []string{"refresh_interval"})
)

func init() {
prometheus.MustRegister(caggsRefreshSuccess, caggsRefreshTotal)
}

type metricsWithSeries struct {
update func(conn pgxconn.PgxConn) error
}

var metricSeries = []metricsWithSeries{
{
update: func(conn pgxconn.PgxConn) error {
rows, err := conn.Query(context.Background(), `
SELECT
total_successes,
total_runs,
(config ->> 'refresh_interval')::INTERVAL
FROM timescaledb_information.jobs j
INNER JOIN timescaledb_information.job_stats js ON ( j.job_id = js.job_id AND j.proc_name = 'execute_caggs_refresh_policy')
`)
if err != nil {
return fmt.Errorf("error running instrumentation for execute_caggs_refresh_policy: %w", err)
}
defer rows.Close()
for rows.Next() {
var (
success, total int64
refreshInterval time.Duration
)
err = rows.Scan(&success, &total, &refreshInterval)
if err != nil {
return fmt.Errorf("error scanning values for execute_caggs_refresh_policy: %w", err)
}
caggsRefreshSuccess.With(prometheus.Labels{"refresh_interval": refreshInterval.String()}).Set(float64(success))
caggsRefreshTotal.With(prometheus.Labels{"refresh_interval": refreshInterval.String()}).Set(float64(total))
}
return nil
},
},
}
Loading