Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Update rollups creation/deletion/updation using dataset-config.
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen-Singh <[email protected]>
  • Loading branch information
Harkishen-Singh committed Dec 6, 2022
1 parent 8281a59 commit ec7c1d5
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 209 deletions.
19 changes: 10 additions & 9 deletions pkg/dataset/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/timescale/promscale/pkg/internal/day"
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/rollup"
)

const (
Expand Down Expand Up @@ -44,12 +45,12 @@ type Config struct {

// Metrics contains dataset configuration options for metrics data.
type Metrics struct {
ChunkInterval day.Duration `yaml:"default_chunk_interval"`
Compression *bool `yaml:"compress_data"` // Using pointer to check if the value was set.
HALeaseRefresh day.Duration `yaml:"ha_lease_refresh"`
HALeaseTimeout day.Duration `yaml:"ha_lease_timeout"`
RetentionPeriod day.Duration `yaml:"default_retention_period"`
Downsample *Downsample `yaml:"downsample,omitempty"`
ChunkInterval day.Duration `yaml:"default_chunk_interval"`
Compression *bool `yaml:"compress_data"` // Using pointer to check if the value was set.
HALeaseRefresh day.Duration `yaml:"ha_lease_refresh"`
HALeaseTimeout day.Duration `yaml:"ha_lease_timeout"`
RetentionPeriod day.Duration `yaml:"default_retention_period"`
Rollups *rollup.Config `yaml:"rollups,omitempty"`
}

// Traces contains dataset configuration options for traces data.
Expand All @@ -64,11 +65,11 @@ func NewConfig(contents string) (cfg Config, err error) {
}

// Apply applies the configuration to the database via the supplied DB connection.
func (c *Config) Apply(conn *pgx.Conn) error {
func (c *Config) Apply(ctx context.Context, conn *pgx.Conn) error {
c.applyDefaults()

if c.Metrics.Downsample != nil {
if err := c.Metrics.Downsample.Apply(conn); err != nil {
if c.Metrics.Rollups != nil {
if err := c.Metrics.Rollups.Apply(ctx, conn); err != nil {
return fmt.Errorf("error applying configuration for downsampling: %w", err)
}
}
Expand Down
68 changes: 0 additions & 68 deletions pkg/dataset/downsample.go

This file was deleted.

97 changes: 97 additions & 0 deletions pkg/rollup/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// This file and its contents are licensed under the Apache License 2.0.
// Please see the included NOTICE for copyright information and
// LICENSE for a copy of the license.

package rollup

import (
"context"
"fmt"
"strings"
"time"

"github.com/jackc/pgx/v4"

"github.com/timescale/promscale/pkg/internal/day"
"github.com/timescale/promscale/pkg/log"
)

const (
setDefaultDownsampleStateSQL = "SELECT prom_api.set_automatic_downsample($1)"

// short and long represent system resolutions.
short = "short"
long = "long"
)

var (
defaultDownsampleState = false
useDefaultResolution = false
systemResolution = map[string]Definition{
short: {
Resolution: day.Duration(5 * time.Minute),
Retention: day.Duration(90 * 24 * time.Hour),
},
long: {
Resolution: day.Duration(time.Hour),
Retention: day.Duration(395 * 24 * time.Hour),
},
}
)

type Config struct {
Enabled *bool `yaml:"enabled,omitempty"`
UseDefaultResolution *bool `yaml:"use_default_resolution"`
Resolutions `yaml:"resolutions,omitempty"`
}

type Definition struct {
Resolution day.Duration `yaml:"resolution"`
Retention day.Duration `yaml:"retention"`
Delete bool `yaml:"delete"`
}

type Resolutions map[string]Definition

func (d *Config) Apply(ctx context.Context, conn *pgx.Conn) error {
d.applyDefaults()

if containsSystemResolutions(d.Resolutions) {
return fmt.Errorf("'short' and 'long' are system resolutions. These cannot be applied as rollup labels")
}

log.Info("msg", fmt.Sprintf("Setting automatic metric downsample to %t", *d.Enabled))
if _, err := conn.Exec(context.Background(), setDefaultDownsampleStateSQL, d.Enabled); err != nil {
return err
}

if *d.Enabled {
if *d.UseDefaultResolution {
d.Resolutions["short"] = systemResolution["short"]
d.Resolutions["long"] = systemResolution["long"]
}
if err := Sync(ctx, conn, d.Resolutions); err != nil {
return fmt.Errorf("ensure rollup with: %w", err)
}
}
return nil
}

func (d *Config) applyDefaults() {
if d.Enabled == nil {
d.Enabled = &defaultDownsampleState
}
if d.UseDefaultResolution == nil {
d.UseDefaultResolution = &useDefaultResolution
}
}

func containsSystemResolutions(r Resolutions) bool {
for k := range r {
k = strings.ToLower(k)
if k == short || k == long {
return true
}
}
return false
}
118 changes: 67 additions & 51 deletions pkg/rollup/rollup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,93 +14,109 @@ import (
"github.com/timescale/promscale/pkg/internal/day"
)

type DownsampleResolution struct {
Label string `yaml:"label"`
Resolution day.Duration `yaml:"resolution"`
Retention day.Duration `yaml:"retention"`
}

// EnsureRollupWith ensures "strictly" that the given new resolutions are applied in the database.
//
// Note: It follows a "strict" behaviour, meaning any existing resolutions of downsampling in
// the database will be removed, so that the all downsampling data in the database strictly
// matches the provided newResolutions.
//
// Example: If the DB already contains metric rollups for `short` and `long`, and in dataset-config,
// connector sees `very_short` and `long`, then EnsureRollupWith will remove the `short` downsampled data
// and create `very_short`, while not touching `long`.
func EnsureRollupWith(conn *pgx.Conn, newResolutions []DownsampleResolution) error {
// Sync updates the rollups in the DB in accordance with the given resolutions. It handles:
// 1. Creating of new rollups
// 2. Deletion of rollups that have `delete: true`
// 3. Update retention duration of rollups that have same label name but different retention duration. If resolution of
// existing rollups are updated, an error is returned
func Sync(ctx context.Context, conn *pgx.Conn, r Resolutions) error {
rows, err := conn.Query(context.Background(), "SELECT name, resolution, retention FROM _prom_catalog.rollup")
if err != nil {
return fmt.Errorf("querying existing resolutions: %w", err)
}
defer rows.Close()

var existingResolutions []DownsampleResolution
existingResolutions := make(Resolutions)
for rows.Next() {
var lName string
var resolution, retention time.Duration
if err := rows.Scan(&lName, &resolution, &retention); err != nil {
return fmt.Errorf("error scanning output rows for existing resolutions: %w", err)
}
existingResolutions = append(existingResolutions, DownsampleResolution{Label: lName, Resolution: day.Duration(resolution), Retention: day.Duration(retention)})
existingResolutions[lName] = Definition{Resolution: day.Duration(resolution), Retention: day.Duration(retention)}
}

// Determine which resolutions need to be created and deleted from the DB.
pendingCreation := diff(newResolutions, existingResolutions)
pendingDeletion := diff(existingResolutions, newResolutions)
if err := errOnResolutionMismatch(existingResolutions, r); err != nil {
return fmt.Errorf("error on existing resolution mismatch: %w", err)
}

if err := updateExistingRollups(ctx, conn, existingResolutions, r); err != nil {
return fmt.Errorf("update existing rollups: %w", err)
}

// Delete rollups that are no longer required.
if err = deleteRollups(conn, pendingDeletion); err != nil {
if err = deleteRollups(ctx, conn, existingResolutions, r); err != nil {
return fmt.Errorf("delete rollups: %w", err)
}

// Create new rollups.
if err = createRollups(conn, pendingCreation); err != nil {
if err = createRollups(ctx, conn, existingResolutions, r); err != nil {
return fmt.Errorf("create rollups: %w", err)
}
return nil
}

func createRollups(conn *pgx.Conn, res []DownsampleResolution) error {
for _, r := range res {
_, err := conn.Exec(context.Background(), "CALL _prom_catalog.create_rollup($1, $2, $3)", r.Label, time.Duration(r.Resolution), time.Duration(r.Retention))
if err != nil {
return fmt.Errorf("error creating rollup for %s: %w", r.Label, err)
// errOnResolutionMismatch returns an error if a given resolution exists in the DB with a different resolution duration.
func errOnResolutionMismatch(existing, r Resolutions) error {
for labelName, res := range r {
if oldRes, exists := existing[labelName]; exists {
if oldRes.Resolution != res.Resolution {
return fmt.Errorf("existing rollup resolutions cannot be updated. Either keep the resolution of existing rollup labels same or remove them")
}
}
}
return nil
}

func deleteRollups(conn *pgx.Conn, res []DownsampleResolution) error {
for _, r := range res {
_, err := conn.Exec(context.Background(), "CALL _prom_catalog.delete_rollup($1)", r.Label)
if err != nil {
return fmt.Errorf("error deleting rollup for %s: %w", r.Label, err)
// updateExistingRollups updates the existing rollups retention if the new resolutions with a same name has
// different retention duration.
func updateExistingRollups(ctx context.Context, conn *pgx.Conn, existingRes, r Resolutions) error {
var batch pgx.Batch
for labelName, res := range r {
if oldRes, exists := existingRes[labelName]; exists && oldRes.Retention != res.Retention {
batch.Queue("UPDATE _prom_catalog.rollup SET retention = $1 WHERE name = $2", time.Duration(res.Retention), labelName)
}
}
if batch.Len() > 0 {
results := conn.SendBatch(ctx, &batch)
if err := results.Close(); err != nil {
return fmt.Errorf("error closing batch: %w", err)
}
}
return nil
}

// diff returns the elements of a that are not in b.
//
// We need this since we want to support a "strict" behaviour in downsampling. This basically means, to have the exact
// downsampling data in the DB based on what's mentioned in the dataset-config.
//
// See the comment for EnsureRollupWith for example.
func diff(a, b []DownsampleResolution) []DownsampleResolution {
var difference []DownsampleResolution
for i := range a {
found := false
for j := range b {
if a[i].Label == b[j].Label {
found = true
break
}
func createRollups(ctx context.Context, conn *pgx.Conn, existingRes, r Resolutions) error {
var batch pgx.Batch
for lName, res := range r {
_, exists := existingRes[lName]
if !exists && !res.Delete {
batch.Queue("CALL _prom_catalog.create_rollup($1, $2, $3)", lName, time.Duration(res.Resolution), time.Duration(res.Retention))
}
if !found {
difference = append(difference, a[i])
}
if batch.Len() > 0 {
results := conn.SendBatch(ctx, &batch)
if err := results.Close(); err != nil {
return fmt.Errorf("error creating new rollups: %w", err)
}
}
return difference
return nil
}

func deleteRollups(ctx context.Context, conn *pgx.Conn, existingRes, r Resolutions) error {
var batch pgx.Batch
for lName, res := range r {
_, exists := existingRes[lName]
if exists && res.Delete {
// Delete the rollup only if it exists in the DB.
batch.Queue("CALL _prom_catalog.delete_rollup($1)", lName)
}
}
if batch.Len() > 0 {
results := conn.SendBatch(ctx, &batch)
if err := results.Close(); err != nil {
return fmt.Errorf("error deleting new rollups: %w", err)
}
}
return nil
}
Loading

0 comments on commit ec7c1d5

Please sign in to comment.