From af6b87b6a2ece39255d2b116d2d587e00e8397cb Mon Sep 17 00:00:00 2001 From: StevenYCChou <3055688+StevenYCChou@users.noreply.github.com> Date: Tue, 5 Mar 2019 15:48:51 +0800 Subject: [PATCH 1/6] Avoid retry on non-recoverable errors. Only retry when calling seriesCache.get(). --- retrieval/manager.go | 29 ++++---------------------- retrieval/transform.go | 46 ++++++++++++++++++++++++++++++++++-------- 2 files changed, 42 insertions(+), 33 deletions(-) diff --git a/retrieval/manager.go b/retrieval/manager.go index ff3501b2..0402dd71 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -162,7 +162,10 @@ func (r *PrometheusReader) Run(ctx context.Context, startOffset int) error { ) go seriesCache.run(ctx) - builder := &sampleBuilder{series: seriesCache} + builder := &sampleBuilder{ + logger: r.logger, + series: seriesCache, + } // NOTE(fabxc): wrap the tailer into a buffered reader once we become concerned // with performance. The WAL reader will do a lot of tiny reads otherwise. @@ -217,7 +220,6 @@ Outer: level.Error(r.logger).Log("error", err) continue } - backoff := time.Duration(0) // Do not increment the metric for produced samples each time but rather // once at the end. // Otherwise it will increase CPU usage by ~10%. @@ -229,19 +231,11 @@ Outer: break Outer default: } - // We intentionally don't use time.After in the select statement above - // since we'd unnecessarily spawn a new goroutine for each sample - // we process even when there are no errors. - if backoff > 0 { - time.Sleep(backoff) - } - var outputSample *monitoring_pb.TimeSeries var hash uint64 outputSample, hash, samples, err = builder.next(ctx, samples) if err != nil { level.Warn(r.logger).Log("msg", "Failed to build sample", "err", err) - backoff = exponential(backoff) continue } if outputSample == nil { @@ -339,18 +333,3 @@ func hashSeries(s *monitoring_pb.TimeSeries) uint64 { } return h } - -func exponential(d time.Duration) time.Duration { - const ( - min = 10 * time.Millisecond - max = 2 * time.Second - ) - d *= 2 - if d < min { - d = min - } - if d > max { - d = max - } - return d -} diff --git a/retrieval/transform.go b/retrieval/transform.go index 9668895b..0cfdc7de 100644 --- a/retrieval/transform.go +++ b/retrieval/transform.go @@ -22,6 +22,8 @@ import ( "time" timestamp_pb "github.com/golang/protobuf/ptypes/timestamp" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/tsdb" @@ -31,6 +33,7 @@ import ( ) type sampleBuilder struct { + logger log.Logger series seriesGetter } @@ -39,10 +42,7 @@ type sampleBuilder struct { func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*monitoring_pb.TimeSeries, uint64, []tsdb.RefSample, error) { sample := samples[0] - entry, ok, err := b.series.get(ctx, sample.Ref) - if err != nil { - return nil, 0, samples, errors.Wrap(err, "get series information") - } + entry, ok := b.seriesGetWithRetry(ctx, sample) if !ok { return nil, 0, samples[1:], nil } @@ -57,6 +57,7 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo } ts.Points = append(ts.Points, point) + var err error var resetTimestamp int64 switch entry.metadata.Type { @@ -120,6 +121,23 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo return &ts, entry.hash, samples[1:], nil } +func (b *sampleBuilder) seriesGetWithRetry(ctx context.Context, sample tsdb.RefSample) (*seriesCacheEntry, bool) { + backoff := time.Duration(0) + entry, ok, err := b.series.get(ctx, sample.Ref) + for { + if err == nil { + break + } + level.Warn(b.logger).Log("msg", "failed to get seriesCacheEntry", "err", err) + backoff = exponential(backoff) + if backoff > 0 { + time.Sleep(backoff) + } + entry, ok, err = b.series.get(ctx, sample.Ref) + } + return entry, ok +} + const ( metricSuffixBucket = "_bucket" metricSuffixSum = "_sum" @@ -198,10 +216,7 @@ func (b *sampleBuilder) buildDistribution( // until we hit a new metric. Loop: for i, s := range samples { - e, ok, err := b.series.get(ctx, s.Ref) - if err != nil { - return nil, 0, samples, err - } + e, ok := b.seriesGetWithRetry(ctx, s) if !ok { consumed++ // TODO(fabxc): increment metric. @@ -337,3 +352,18 @@ func histogramLabelsEqual(a, b tsdbLabels.Labels) bool { // If one label set still has labels left, they are not equal. return i == len(a) && j == len(b) } + +func exponential(d time.Duration) time.Duration { + const ( + min = 10 * time.Millisecond + max = 2 * time.Second + ) + d *= 2 + if d < min { + d = min + } + if d > max { + d = max + } + return d +} From 3905d554267431f5e9afdeab622df9b4dd1df801 Mon Sep 17 00:00:00 2001 From: Yen-Cheng Chou Date: Sun, 10 Mar 2019 18:20:05 +0000 Subject: [PATCH 2/6] Rename function; consider error from series cache; consider context.Done(). --- retrieval/transform.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/retrieval/transform.go b/retrieval/transform.go index 0cfdc7de..8fe95825 100644 --- a/retrieval/transform.go +++ b/retrieval/transform.go @@ -42,7 +42,10 @@ type sampleBuilder struct { func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*monitoring_pb.TimeSeries, uint64, []tsdb.RefSample, error) { sample := samples[0] - entry, ok := b.seriesGetWithRetry(ctx, sample) + entry, ok, err := b.getSeriesWithRetry(ctx, sample) + if err != nil { + return nil, 0, samples, err + } if !ok { return nil, 0, samples[1:], nil } @@ -57,7 +60,6 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo } ts.Points = append(ts.Points, point) - var err error var resetTimestamp int64 switch entry.metadata.Type { @@ -121,10 +123,15 @@ func (b *sampleBuilder) next(ctx context.Context, samples []tsdb.RefSample) (*mo return &ts, entry.hash, samples[1:], nil } -func (b *sampleBuilder) seriesGetWithRetry(ctx context.Context, sample tsdb.RefSample) (*seriesCacheEntry, bool) { +func (b *sampleBuilder) getSeriesWithRetry(ctx context.Context, sample tsdb.RefSample) (entry *seriesCacheEntry, ok bool, err error) { backoff := time.Duration(0) - entry, ok, err := b.series.get(ctx, sample.Ref) for { + select { + case <-ctx.Done(): + return nil, false, ctx.Err() + default: + } + entry, ok, err = b.series.get(ctx, sample.Ref) if err == nil { break } @@ -133,9 +140,8 @@ func (b *sampleBuilder) seriesGetWithRetry(ctx context.Context, sample tsdb.RefS if backoff > 0 { time.Sleep(backoff) } - entry, ok, err = b.series.get(ctx, sample.Ref) } - return entry, ok + return entry, ok, nil } const ( @@ -216,7 +222,10 @@ func (b *sampleBuilder) buildDistribution( // until we hit a new metric. Loop: for i, s := range samples { - e, ok := b.seriesGetWithRetry(ctx, s) + e, ok, err := b.getSeriesWithRetry(ctx, s) + if err != nil { + return nil, 0, samples, err + } if !ok { consumed++ // TODO(fabxc): increment metric. From 2ae281d0482273a14174abafe63f00cd856ca138 Mon Sep 17 00:00:00 2001 From: Yen-Cheng Chou Date: Sun, 10 Mar 2019 18:21:00 +0000 Subject: [PATCH 3/6] Do not retry on unrecoverable errors. For error caused by unexpected metric name suffix or by unexpected metric type, it should return as unrecoverable error because retrying doesn't help the situation. --- retrieval/series_cache.go | 8 ++++++-- retrieval/transform.go | 3 +++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/retrieval/series_cache.go b/retrieval/series_cache.go index 4dfe33fd..7313bd49 100644 --- a/retrieval/series_cache.go +++ b/retrieval/series_cache.go @@ -44,6 +44,10 @@ var ( keyReason, _ = tag.NewKey("reason") ) +type unrecoverableError struct { + error +} + func init() { if err := view.Register(&view.View{ Name: "prometheus_sidecar/dropped_series", @@ -422,14 +426,14 @@ func (c *seriesCache) refresh(ctx context.Context, ref uint64) error { ts.MetricKind = metric_pb.MetricDescriptor_GAUGE ts.ValueType = metric_pb.MetricDescriptor_DOUBLE default: - return errors.Errorf("unexpected metric name suffix %q", suffix) + return unrecoverableError{errors.Errorf("unexpected metric name suffix %q", suffix)} } case textparse.MetricTypeHistogram: ts.Metric.Type = c.getMetricType(c.metricsPrefix, baseMetricName) ts.MetricKind = metric_pb.MetricDescriptor_CUMULATIVE ts.ValueType = metric_pb.MetricDescriptor_DISTRIBUTION default: - return errors.Errorf("unexpected metric type %s", metadata.Type) + return unrecoverableError{errors.Errorf("unexpected metric type %s", metadata.Type)} } entry.proto = ts diff --git a/retrieval/transform.go b/retrieval/transform.go index 8fe95825..4fe1edbd 100644 --- a/retrieval/transform.go +++ b/retrieval/transform.go @@ -135,6 +135,9 @@ func (b *sampleBuilder) getSeriesWithRetry(ctx context.Context, sample tsdb.RefS if err == nil { break } + if _, ok := err.(unrecoverableError); ok { + return nil, false, err + } level.Warn(b.logger).Log("msg", "failed to get seriesCacheEntry", "err", err) backoff = exponential(backoff) if backoff > 0 { From 1b9a48ac257d5dfab193be1550f54a0d5e36c658 Mon Sep 17 00:00:00 2001 From: Yen-Cheng Chou Date: Sun, 10 Mar 2019 18:21:22 +0000 Subject: [PATCH 4/6] Reorder import. --- retrieval/transform.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/retrieval/transform.go b/retrieval/transform.go index 4fe1edbd..207da67c 100644 --- a/retrieval/transform.go +++ b/retrieval/transform.go @@ -21,9 +21,9 @@ import ( "strings" "time" - timestamp_pb "github.com/golang/protobuf/ptypes/timestamp" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + timestamp_pb "github.com/golang/protobuf/ptypes/timestamp" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/tsdb" From caae5cd54345a4390de93fb2624a2486e1a73900 Mon Sep 17 00:00:00 2001 From: Yen-Cheng Chou Date: Sun, 10 Mar 2019 18:22:38 +0000 Subject: [PATCH 5/6] Remove redundant condition checking for Sleep(). --- retrieval/transform.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/retrieval/transform.go b/retrieval/transform.go index 207da67c..72198ed6 100644 --- a/retrieval/transform.go +++ b/retrieval/transform.go @@ -140,9 +140,7 @@ func (b *sampleBuilder) getSeriesWithRetry(ctx context.Context, sample tsdb.RefS } level.Warn(b.logger).Log("msg", "failed to get seriesCacheEntry", "err", err) backoff = exponential(backoff) - if backoff > 0 { - time.Sleep(backoff) - } + time.Sleep(backoff) } return entry, ok, nil } From 7786bc0047cf13cc7cb399e85cb32bfccc442582 Mon Sep 17 00:00:00 2001 From: Yen-Cheng Chou Date: Tue, 12 Mar 2019 14:39:16 +0000 Subject: [PATCH 6/6] Rename error. --- retrieval/series_cache.go | 6 +++--- retrieval/transform.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/retrieval/series_cache.go b/retrieval/series_cache.go index 7313bd49..e3d7bdec 100644 --- a/retrieval/series_cache.go +++ b/retrieval/series_cache.go @@ -44,7 +44,7 @@ var ( keyReason, _ = tag.NewKey("reason") ) -type unrecoverableError struct { +type unknownMetricError struct { error } @@ -426,14 +426,14 @@ func (c *seriesCache) refresh(ctx context.Context, ref uint64) error { ts.MetricKind = metric_pb.MetricDescriptor_GAUGE ts.ValueType = metric_pb.MetricDescriptor_DOUBLE default: - return unrecoverableError{errors.Errorf("unexpected metric name suffix %q", suffix)} + return unknownMetricError{errors.Errorf("unexpected metric name suffix %q", suffix)} } case textparse.MetricTypeHistogram: ts.Metric.Type = c.getMetricType(c.metricsPrefix, baseMetricName) ts.MetricKind = metric_pb.MetricDescriptor_CUMULATIVE ts.ValueType = metric_pb.MetricDescriptor_DISTRIBUTION default: - return unrecoverableError{errors.Errorf("unexpected metric type %s", metadata.Type)} + return unknownMetricError{errors.Errorf("unexpected metric type %s", metadata.Type)} } entry.proto = ts diff --git a/retrieval/transform.go b/retrieval/transform.go index 72198ed6..dc3dfd9b 100644 --- a/retrieval/transform.go +++ b/retrieval/transform.go @@ -135,7 +135,7 @@ func (b *sampleBuilder) getSeriesWithRetry(ctx context.Context, sample tsdb.RefS if err == nil { break } - if _, ok := err.(unrecoverableError); ok { + if _, ok := err.(unknownMetricError); ok { return nil, false, err } level.Warn(b.logger).Log("msg", "failed to get seriesCacheEntry", "err", err)