This repository has been archived by the owner on Apr 2, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 169
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement rollup resolution decider for incoming queries.
Signed-off-by: Harkishen-Singh <[email protected]>
- Loading branch information
1 parent
ec7c1d5
commit 916ff43
Showing
19 changed files
with
477 additions
and
46 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,173 @@ | ||
// This file and its contents are licensed under the Apache License 2.0. | ||
// Please see the included NOTICE for copyright information and | ||
// LICENSE for a copy of the license. | ||
|
||
package rollup | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sort" | ||
"sync" | ||
"time" | ||
|
||
"github.com/timescale/promscale/pkg/log" | ||
"github.com/timescale/promscale/pkg/pgxconn" | ||
) | ||
|
||
const ( | ||
originalSchema = "prom_data" | ||
upperLimit = 5000 // Maximum samples allowed | ||
assumedScrapeInterval = time.Second * 30 | ||
refreshRollupResolution = time.Minute * 30 | ||
) | ||
|
||
type rollupInfo struct { | ||
schemaName string | ||
resolution time.Duration | ||
} | ||
|
||
type QueryHelper struct { | ||
conn pgxconn.PgxConn | ||
refreshMtx sync.RWMutex | ||
metricType map[string]string // metric_name: metric_type | ||
resolutionsASC []rollupInfo // {schemaName, resolution} in ascending order of resolution. | ||
downsamplingEnabled bool | ||
} | ||
|
||
func NewQueryHelper(ctx context.Context, conn pgxconn.PgxConn) (*QueryHelper, error) { | ||
helper := &QueryHelper{conn: conn} | ||
if err := helper.runRefreshRoutine(ctx, refreshRollupResolution); err != nil { | ||
return nil, fmt.Errorf("refresh: %w", err) | ||
} | ||
return helper, nil | ||
} | ||
|
||
// DecideSchema returns the schema name of the rollups that should be used for querying. | ||
// The returned schema represents a downsampled resolution that should be an optimal | ||
// resolution for querying. | ||
// | ||
// If no rollups exists or if downsampling is disabled, "prom_data" is returned. | ||
func (h *QueryHelper) DecideSchema(min, max int64) string { | ||
h.refreshMtx.RLock() | ||
defer h.refreshMtx.RUnlock() | ||
|
||
if !h.downsamplingEnabled || len(h.resolutionsASC) == 0 { | ||
return originalSchema | ||
} | ||
estimateSamples := func(resolution time.Duration) int64 { | ||
return int64(float64(max-min) / resolution.Seconds()) | ||
} | ||
|
||
numRawSamples := estimateSamples(assumedScrapeInterval) | ||
if numRawSamples < upperLimit { | ||
return originalSchema | ||
} | ||
|
||
for _, info := range h.resolutionsASC { | ||
samples := estimateSamples(info.resolution) | ||
if samples < upperLimit { | ||
// The first highest resolution that is below upper limit is our answer, | ||
// since it provides the highest granularity at the expected samples. | ||
return info.schemaName | ||
} | ||
} | ||
// All rollups are above upper limit. Hence, send the schema of the lowest resolution | ||
// as this is the best we can do. | ||
lowestRollup := h.resolutionsASC[len(h.resolutionsASC)-1] | ||
return lowestRollup.schemaName | ||
} | ||
|
||
func (h *QueryHelper) ContainsMetricType(metricName string) bool { | ||
_, present := h.metricType[metricName] | ||
return present | ||
} | ||
|
||
func (h *QueryHelper) Refresh() error { | ||
h.refreshMtx.Lock() | ||
defer h.refreshMtx.Unlock() | ||
|
||
if err := h.refreshDownsamplingState(); err != nil { | ||
return fmt.Errorf("downsampling state: %w", err) | ||
} | ||
if err := h.refreshMetricTypes(); err != nil { | ||
return fmt.Errorf("metric-type: %w", err) | ||
} | ||
if err := h.refreshRollupResolutions(); err != nil { | ||
return fmt.Errorf("rollup resolutions: %w", err) | ||
} | ||
return nil | ||
} | ||
|
||
func (h *QueryHelper) runRefreshRoutine(ctx context.Context, refreshInterval time.Duration) error { | ||
if err := h.Refresh(); err != nil { | ||
return fmt.Errorf("refreshing rollup resolution: %w", err) | ||
} | ||
go func() { | ||
t := time.NewTicker(refreshInterval) | ||
defer t.Stop() | ||
for range t.C { | ||
if err := h.Refresh(); err != nil { | ||
log.Error("msg", "error refreshing rollup resolution", "error", err.Error()) | ||
} | ||
} | ||
}() | ||
return nil | ||
} | ||
|
||
func (h *QueryHelper) refreshDownsamplingState() error { | ||
var state bool | ||
if err := h.conn.QueryRow(context.Background(), "SELECT prom_api.get_automatic_downsample()::BOOLEAN").Scan(&state); err != nil { | ||
return fmt.Errorf("fetching automatic downsampling state: %w", err) | ||
} | ||
h.downsamplingEnabled = state | ||
return nil | ||
} | ||
|
||
func (h *QueryHelper) refreshMetricTypes() error { | ||
var metricName, metricType []string | ||
err := h.conn.QueryRow(context.Background(), | ||
"select array_agg(metric_family), array_agg(type) from _prom_catalog.metadata").Scan(&metricName, &metricType) | ||
if err != nil { | ||
return fmt.Errorf("fetching metric metadata: %w", err) | ||
} | ||
h.metricType = make(map[string]string) // metric_name: metric_type | ||
for i := range metricName { | ||
h.metricType[metricName[i]] = metricType[i] | ||
} | ||
return nil | ||
} | ||
|
||
func (h *QueryHelper) refreshRollupResolutions() error { | ||
rows, err := h.conn.Query(context.Background(), "SELECT schema_name, resolution FROM _prom_catalog.rollup") | ||
if err != nil { | ||
return fmt.Errorf("fetching rollup resolutions: %w", err) | ||
} | ||
h.resolutionsASC = []rollupInfo{} | ||
for rows.Next() { | ||
var ( | ||
schemaName string | ||
resolution time.Duration | ||
) | ||
if err = rows.Scan(&schemaName, &resolution); err != nil { | ||
return fmt.Errorf("error scanning rows: %w", err) | ||
} | ||
h.resolutionsASC = append(h.resolutionsASC, rollupInfo{schemaName: schemaName, resolution: resolution}) | ||
} | ||
sort.Sort(sortRollupInfo(h.resolutionsASC)) | ||
return nil | ||
} | ||
|
||
type sortRollupInfo []rollupInfo | ||
|
||
func (s sortRollupInfo) Len() int { | ||
return len(s) | ||
} | ||
|
||
func (s sortRollupInfo) Less(i, j int) bool { | ||
return s[i].resolution.Seconds() < s[j].resolution.Seconds() | ||
} | ||
|
||
func (s sortRollupInfo) Swap(i, j int) { | ||
s[i], s[j] = s[j], s[i] | ||
} |
Oops, something went wrong.