Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
implement suggestion
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen-Singh <[email protected]>
  • Loading branch information
Harkishen-Singh committed Dec 12, 2022
1 parent 45833de commit 0344071
Showing 1 changed file with 16 additions and 16 deletions.
32 changes: 16 additions & 16 deletions pkg/rollup/decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ const (
)

type rollupInfo struct {
schemaName string
resolution time.Duration
schemaName string
refreshInterval time.Duration
}

type Decider struct {
Expand All @@ -34,7 +34,7 @@ type Decider struct {
downsamplingEnabled bool

supportedMetrics map[string]struct{}
rollups []rollupInfo // {schemaName, resolution} in ascending order of interval. Lesser the interval, more the granularity.
rollups []rollupInfo // {schemaName, refreshInterval} in ascending order of interval. Lesser the interval, more the granularity.
}

func NewDecider(ctx context.Context, conn pgxconn.PgxConn, scrapeInterval time.Duration) (*Decider, error) {
Expand All @@ -50,8 +50,8 @@ func NewDecider(ctx context.Context, conn pgxconn.PgxConn, scrapeInterval time.D
}

// Decide returns the schema name of the rollups that should be used for querying.
// The returned schema represents a downsampled resolution that should be an optimal
// resolution for querying.
// The returned schema represents a downsampled Prometheus data that should provide optimal
// granularity for querying.
//
// If no rollups exists or if downsampling is disabled, DefaultSchema (i.e., "prom_data") is returned.
func (h *Decider) Decide(minTs, maxTs int64) string {
Expand All @@ -61,8 +61,8 @@ func (h *Decider) Decide(minTs, maxTs int64) string {
if !h.downsamplingEnabled || len(h.rollups) == 0 {
return DefaultSchema
}
estimateSamples := func(resolution time.Duration) int64 {
return int64(float64(maxTs-minTs) / resolution.Seconds())
estimateSamples := func(interval time.Duration) int64 {
return int64(float64(maxTs-minTs) / interval.Seconds())
}

numRawSamples := estimateSamples(h.scrapeInterval)
Expand All @@ -71,7 +71,7 @@ func (h *Decider) Decide(minTs, maxTs int64) string {
}

for _, info := range h.rollups {
samples := estimateSamples(info.resolution)
samples := estimateSamples(info.refreshInterval) // Interval between 2 samples.
if samples < upperLimit {
// h.rollups is sorted by interval. So, the first rollup that is below upper limit is our answer.
// This is because it gives the maximum granularity while being in acceptable limits.
Expand Down Expand Up @@ -100,14 +100,14 @@ func (h *Decider) Refresh() error {
return fmt.Errorf("metric-type: %w", err)
}
if err := h.refreshRollup(); err != nil {
return fmt.Errorf("rollup resolutions: %w", err)
return fmt.Errorf("rollup: %w", err)
}
return nil
}

func (h *Decider) runRefreshRoutine(refreshInterval time.Duration) error {
if err := h.Refresh(); err != nil {
return fmt.Errorf("refreshing rollup resolution: %w", err)
return fmt.Errorf("refresh: %w", err)
}
go func() {
t := time.NewTicker(refreshInterval)
Expand All @@ -119,7 +119,7 @@ func (h *Decider) runRefreshRoutine(refreshInterval time.Duration) error {
case <-t.C:
}
if err := h.Refresh(); err != nil {
log.Error("msg", "error refreshing rollup resolution", "error", err.Error())
log.Error("msg", "error refreshing rollups", "error", err.Error())
}
}
}()
Expand Down Expand Up @@ -159,19 +159,19 @@ func (h *Decider) refreshSupportedMetrics() error {
func (h *Decider) refreshRollup() error {
rows, err := h.conn.Query(h.ctx, "SELECT schema_name, resolution FROM _prom_catalog.rollup ORDER BY resolution ASC")
if err != nil {
return fmt.Errorf("fetching rollup resolutions: %w", err)
return fmt.Errorf("fetching rollup: %w", err)
}
defer rows.Close()
h.rollups = []rollupInfo{}
for rows.Next() {
var (
schemaName string
resolution time.Duration
schemaName string
refreshInterval time.Duration
)
if err = rows.Scan(&schemaName, &resolution); err != nil {
if err = rows.Scan(&schemaName, &refreshInterval); err != nil {
return fmt.Errorf("error scanning rows: %w", err)
}
h.rollups = append(h.rollups, rollupInfo{schemaName: schemaName, resolution: resolution})
h.rollups = append(h.rollups, rollupInfo{schemaName: schemaName, refreshInterval: refreshInterval})
}
return nil
}

0 comments on commit 0344071

Please sign in to comment.