Skip to content

Commit

Permalink
metric reset
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae committed Nov 25, 2024
1 parent ce5fe97 commit 8c4ac5f
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 67 deletions.
36 changes: 29 additions & 7 deletions pkg/experiment/compactor/compaction_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -217,6 +219,10 @@ func (w *Worker) collectUpdates() []*metastorev1.CompactionJobStatusUpdate {
updates = append(updates, update)

case done && job.compacted == nil:
// We're not sending a status update for the job and expect that the
// assigment is to be revoked. The job is to be removed at the next
// poll response handling: all jobs without assignments are canceled
// and removed.
level.Warn(w.logger).Log("msg", "skipping update for abandoned job", "job", job.Name)

default:
Expand All @@ -232,7 +238,21 @@ func (w *Worker) collectUpdates() []*metastorev1.CompactionJobStatusUpdate {
func (w *Worker) cleanup(updates []*metastorev1.CompactionJobStatusUpdate) {
for _, update := range updates {
if job := w.jobs[update.Name]; job != nil && job.done.Load() {
w.remove(job)
switch update.Status {
case metastorev1.CompactionJobStatus_COMPACTION_STATUS_SUCCESS:
// In the vast majority of cases, we end up here.
w.remove(job)

case metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS:
// It is possible that the job has been completed after we
// prepared the status update: keep the job for the next
// poll iteration.

default:
// Workers never send other statuses. It's unexpected to get here.
level.Warn(w.logger).Log("msg", "unexpected job status transition; removing the job", "job", job.Name)
w.remove(job)
}
}
}
}
Expand Down Expand Up @@ -288,12 +308,12 @@ func (w *Worker) handleResponse(resp *metastorev1.PollCompactionJobsResponse) (n

func (w *Worker) runCompaction(job *compactionJob) {
start := time.Now()
labels := []string{job.Tenant, fmt.Sprint(job.Shard), fmt.Sprint(job.CompactionLevel)}
statusName := "unknown"
labels := []string{job.Tenant, strconv.Itoa(int(job.CompactionLevel))}
statusName := "failure"
defer func() {
jobStatusLabel := append(labels, statusName)
w.metrics.jobDuration.WithLabelValues(jobStatusLabel...).Observe(time.Since(start).Seconds())
w.metrics.jobsCompleted.WithLabelValues(jobStatusLabel...).Inc()
labelsWithStatus := append(labels, statusName)
w.metrics.jobDuration.WithLabelValues(labelsWithStatus...).Observe(time.Since(start).Seconds())
w.metrics.jobsCompleted.WithLabelValues(labelsWithStatus...).Inc()
w.metrics.jobsInProgress.WithLabelValues(labels...).Dec()
}()

Expand Down Expand Up @@ -372,13 +392,15 @@ func (w *Worker) runCompaction(job *compactionJob) {
},
}

firstBlock := time.UnixMilli(int64(ulid.MustParse(job.blocks[0].Id).Time()))
w.metrics.timeToCompaction.WithLabelValues(labels...).Observe(time.Since(firstBlock).Seconds())

case errors.Is(err, context.Canceled):
level.Warn(logger).Log("msg", "job cancelled")
statusName = "cancelled"

default:
level.Error(logger).Log("msg", "failed to compact blocks", "err", err)
statusName = "failure"
}

// The only error returned by Wait is the context
Expand Down
46 changes: 32 additions & 14 deletions pkg/experiment/compactor/compaction_worker_metrics.go
Original file line number Diff line number Diff line change
@@ -1,40 +1,58 @@
package compactor

import (
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/pyroscope/pkg/util"
)

type metrics struct {
jobsCompleted *prometheus.CounterVec
jobsInProgress *prometheus.GaugeVec
jobDuration *prometheus.HistogramVec
jobsInProgress *prometheus.GaugeVec
jobsCompleted *prometheus.CounterVec
jobDuration *prometheus.HistogramVec
timeToCompaction *prometheus.HistogramVec
}

func newMetrics(r prometheus.Registerer) *metrics {
m := &metrics{
jobsCompleted: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "jobs_completed_total",
Help: "Total number of compaction jobs completed.",
}, []string{"tenant", "shard", "level", "status"}),

jobsInProgress: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "jobs_in_progress",
Help: "The number of active compaction jobs currently running.",
}, []string{"tenant", "shard", "level"}),
}, []string{"tenant", "level"}),

jobsCompleted: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "jobs_completed_total",
Help: "Total number of compaction jobs completed.",
}, []string{"tenant", "level", "status"}),

jobDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "job_duration_seconds",
Help: "Duration of compaction job runs",
Buckets: prometheus.ExponentialBuckets(1, 2, 14),
}, []string{"tenant", "shard", "level", "status"}),
Name: "job_duration_seconds",
Help: "Duration of compaction job runs",

Buckets: prometheus.ExponentialBuckets(1, 300, 16),
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 16,
NativeHistogramMinResetDuration: time.Hour,
}, []string{"tenant", "level", "status"}),

timeToCompaction: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "time_to_compaction_seconds",
Help: "The time elapsed since the oldest compacted block was created.",

Buckets: prometheus.ExponentialBuckets(1, 3600, 16),
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 16,
NativeHistogramMinResetDuration: time.Hour,
}, []string{"tenant", "level"}),
}

util.Register(r,
m.jobsCompleted,
m.jobsInProgress,
m.jobsCompleted,
m.jobDuration,
m.timeToCompaction,
)

return m
Expand Down
22 changes: 18 additions & 4 deletions pkg/experiment/metastore/compaction/compactor/compaction_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ type stagedBlocks struct {
// Incomplete batch of blocks.
batch *batch
// Map of block IDs to their locations in batches.
refs map[string]blockRef
stats *queueStats
refs map[string]blockRef
stats *queueStats
collector *queueStatsCollector
// Parent block queue maintains a priority queue of
// incomplete batches by the last update time.
heapIndex int
Expand Down Expand Up @@ -106,6 +107,14 @@ func newCompactionQueue(strategy Strategy, registerer prometheus.Registerer) *co
}

func (q *compactionQueue) reset() {
for _, level := range q.levels {
if level != nil {
for _, s := range level.staged {
level.removeStaged(s)
}
}
}
clear(q.levels)
q.levels = q.levels[:0]
}

Expand Down Expand Up @@ -160,13 +169,18 @@ func (q *blockQueue) stagedBlocks(k compactionKey) *stagedBlocks {
staged.resetBatch()
q.staged[k] = staged
heap.Push(q.updates, staged)
collector := newQueueStatsCollector(staged)
util.RegisterOrGet(q.registerer, collector)
if q.registerer != nil {
staged.collector = newQueueStatsCollector(staged)
util.RegisterOrGet(q.registerer, staged.collector)
}
}
return staged
}

func (q *blockQueue) removeStaged(s *stagedBlocks) {
if s.collector != nil {
q.registerer.Unregister(s.collector)
}
delete(q.staged, s.key)
if s.heapIndex < 0 {
// We usually end up here since s has already been evicted
Expand Down
42 changes: 11 additions & 31 deletions pkg/experiment/metastore/compaction/scheduler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,43 +9,27 @@ import (
type statsCollector struct {
s *Scheduler

jobs *prometheus.Desc
unassigned *prometheus.Desc
reassigned *prometheus.Desc
failed *prometheus.Desc

addedTotal *prometheus.Desc
completedTotal *prometheus.Desc
assignedTotal *prometheus.Desc
reassignedTotal *prometheus.Desc

// Gauge showing the job queue status breakdown.
jobs *prometheus.Desc
}

const schedulerQueueMetricsPrefix = "compaction_scheduler_queue_"

func newStatsCollector(s *Scheduler) *statsCollector {
variableLabels := []string{"level"}
statusGaugeLabels := append(variableLabels, "status")
return &statsCollector{
s: s,

jobs: prometheus.NewDesc(
schedulerQueueMetricsPrefix+"jobs",
"The total number of jobs in the queue.",
variableLabels, nil,
),
unassigned: prometheus.NewDesc(
schedulerQueueMetricsPrefix+"unassigned_jobs",
"The total number of unassigned jobs in the queue.",
variableLabels, nil,
),
reassigned: prometheus.NewDesc(
schedulerQueueMetricsPrefix+"reassigned_jobs",
"The total number of reassigned jobs in the queue.",
variableLabels, nil,
),
failed: prometheus.NewDesc(
schedulerQueueMetricsPrefix+"failed_jobs",
"The total number of failed jobs in the queue.",
variableLabels, nil,
statusGaugeLabels, nil,
),

addedTotal: prometheus.NewDesc(
Expand Down Expand Up @@ -73,9 +57,6 @@ func newStatsCollector(s *Scheduler) *statsCollector {

func (c *statsCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- c.jobs
ch <- c.unassigned
ch <- c.reassigned
ch <- c.failed
ch <- c.addedTotal
ch <- c.completedTotal
ch <- c.assignedTotal
Expand All @@ -95,7 +76,6 @@ func (c *statsCollector) collectMetrics() []prometheus.Metric {
metrics := make([]prometheus.Metric, 0, 8*len(c.s.queue.levels))
for i, q := range c.s.queue.levels {
var stats queueStats
stats.jobs = uint32(q.jobs.Len())
for _, e := range *q.jobs {
switch {
case e.Status == 0:
Expand All @@ -105,13 +85,13 @@ func (c *statsCollector) collectMetrics() []prometheus.Metric {
case e.Failures > 0:
stats.reassigned++
default:
// Assigned in-progress.
stats.assigned++
}
}

// Update stored gauges. Those are not used at the moment,
// but can help planning schedule updates in the future.
q.stats.jobs = stats.jobs
q.stats.assigned = stats.assigned
q.stats.unassigned = stats.unassigned
q.stats.reassigned = stats.reassigned
q.stats.failed = stats.failed
Expand All @@ -124,10 +104,10 @@ func (c *statsCollector) collectMetrics() []prometheus.Metric {

level := strconv.Itoa(i)
metrics = append(metrics,
prometheus.MustNewConstMetric(c.jobs, prometheus.GaugeValue, float64(stats.jobs), level),
prometheus.MustNewConstMetric(c.unassigned, prometheus.GaugeValue, float64(stats.unassigned), level),
prometheus.MustNewConstMetric(c.reassigned, prometheus.GaugeValue, float64(stats.reassigned), level),
prometheus.MustNewConstMetric(c.failed, prometheus.GaugeValue, float64(stats.failed), level),
prometheus.MustNewConstMetric(c.jobs, prometheus.GaugeValue, float64(stats.assigned), level, "assigned"),
prometheus.MustNewConstMetric(c.jobs, prometheus.GaugeValue, float64(stats.unassigned), level, "unassigned"),
prometheus.MustNewConstMetric(c.jobs, prometheus.GaugeValue, float64(stats.reassigned), level, "reassigned"),
prometheus.MustNewConstMetric(c.jobs, prometheus.GaugeValue, float64(stats.failed), level, "failed"),
prometheus.MustNewConstMetric(c.addedTotal, prometheus.CounterValue, float64(stats.addedTotal), level),
prometheus.MustNewConstMetric(c.completedTotal, prometheus.CounterValue, float64(stats.completedTotal), level),
prometheus.MustNewConstMetric(c.assignedTotal, prometheus.CounterValue, float64(stats.assignedTotal), level),
Expand Down
10 changes: 6 additions & 4 deletions pkg/experiment/metastore/compaction/scheduler/scheduler_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ func newJobQueue() *schedulerQueue {
}

func (q *schedulerQueue) reset() {
q.levels = q.levels[:0]
clear(q.jobs)
q.resetStats()
clear(q.levels)
q.levels = q.levels[:0]
}

func (q *schedulerQueue) put(state *raft_log.CompactionJobState) {
Expand Down Expand Up @@ -63,7 +63,9 @@ func (q *schedulerQueue) level(x uint32) *jobQueue {

func (q *schedulerQueue) resetStats() {
for _, level := range q.levels {
level.stats.reset()
if level != nil {
level.stats.reset()
}
}
}

Expand All @@ -79,7 +81,7 @@ type queueStats struct {
assignedTotal uint32
reassignedTotal uint32
// Gauges. Updated periodically.
jobs uint32
assigned uint32
unassigned uint32
reassigned uint32
failed uint32
Expand Down
15 changes: 8 additions & 7 deletions pkg/experiment/query_backend/block/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"sort"
"strings"
"sync"
"time"

"github.com/grafana/dskit/multierror"
"github.com/oklog/ulid"
Expand Down Expand Up @@ -107,21 +106,23 @@ func PlanCompaction(objects Objects) ([]*CompactionPlan, error) {
}

r := objects[0]
var c uint32
var level uint32
for _, obj := range objects {
if r.meta.Shard != obj.meta.Shard {
return nil, ErrShardMergeMismatch
}
c = max(c, obj.meta.CompactionLevel)
level = max(level, obj.meta.CompactionLevel)
}
c++
level++

// Assuming that the first block in the job is the oldest one.
timestamp := ulid.MustParse(r.meta.Id).Time()
m := make(map[string]*CompactionPlan)
for _, obj := range objects {
for _, s := range obj.meta.Datasets {
tm, ok := m[s.TenantId]
if !ok {
tm = newBlockCompaction(s.TenantId, r.meta.Shard, c)
tm = newBlockCompaction(timestamp, s.TenantId, r.meta.Shard, level)
m[s.TenantId] = tm
}
sm := tm.addDataset(s)
Expand Down Expand Up @@ -151,14 +152,14 @@ type CompactionPlan struct {
meta *metastorev1.BlockMeta
}

func newBlockCompaction(tenantID string, shard uint32, compactionLevel uint32) *CompactionPlan {
func newBlockCompaction(unixMilli uint64, tenantID string, shard uint32, compactionLevel uint32) *CompactionPlan {
return &CompactionPlan{
tenantID: tenantID,
datasetMap: make(map[string]*datasetCompaction),
meta: &metastorev1.BlockMeta{
FormatVersion: 1,
// TODO(kolesnikovae): Make it deterministic?
Id: ulid.MustNew(uint64(time.Now().UnixMilli()), rand.Reader).String(),
Id: ulid.MustNew(unixMilli, rand.Reader).String(),
TenantId: tenantID,
Shard: shard,
CompactionLevel: compactionLevel,
Expand Down

0 comments on commit 8c4ac5f

Please sign in to comment.