Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[r324] Distributor: don't return errors when discarding samples with duplicated timestamps #10434

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* [CHANGE] Ruler: cap the rate of retries for remote query evaluation to 170/sec. This is configurable via `-ruler.query-frontend.max-retries-rate`. #10375 #10403
* [ENHANCEMENT] Query Frontend: Return server-side `samples_processed` statistics. #10103
* [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168
* [ENHANCEMENT] Distributor: discard float and histogram samples with duplicated timestamps from each timeseries in a request before the request is forwarded to ingesters. Discarded samples are tracked by the `cortex_discarded_samples_total` metrics with reason `sample_duplicate_timestamp`. #10145
* [ENHANCEMENT] Distributor: discard float and histogram samples with duplicated timestamps from each timeseries in a request before the request is forwarded to ingesters. Discarded samples are tracked by `cortex_discarded_samples_total` metrics with the reason `sample_duplicate_timestamp`. #10145 #10430
* [ENHANCEMENT] Ruler: Add `cortex_prometheus_rule_group_last_rule_duration_sum_seconds` metric to track the total evaluation duration of a rule group regardless of concurrency #10189
* [ENHANCEMENT] Distributor: Add native histogram support for `electedReplicaPropagationTime` metric in ha_tracker. #10264
* [ENHANCEMENT] Ingester: More efficient CPU/memory utilization-based read request limiting. #10325
Expand Down
4 changes: 0 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import (
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/storage/ingest"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/extract"
"github.com/grafana/mimir/pkg/util/globalerror"
mimir_limiter "github.com/grafana/mimir/pkg/util/limiter"
util_math "github.com/grafana/mimir/pkg/util/math"
Expand Down Expand Up @@ -895,11 +894,8 @@ func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeser
d.validateExemplars(ts, userID, minExemplarTS, maxExemplarTS)

deduplicatedSamplesAndHistograms := totalSamplesAndHistograms - len(ts.Samples) - len(ts.Histograms)

if deduplicatedSamplesAndHistograms > 0 {
d.sampleValidationMetrics.duplicateTimestamp.WithLabelValues(userID, group).Add(float64(deduplicatedSamplesAndHistograms))
unsafeMetricName, _ := extract.UnsafeMetricNameFromLabelAdapters(ts.Labels)
return false, fmt.Errorf(duplicateTimestampMsgFormat, deduplicatedSamplesAndHistograms, unsafeMetricName)
}

return false, nil
Expand Down
39 changes: 4 additions & 35 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1472,7 +1472,6 @@ func TestDistributor_SampleDuplicateTimestamp(t *testing.T) {
testCases := map[string]struct {
req *mimirpb.WriteRequest
expectedSamples []mimirpb.PreallocTimeseries
expectedErrors []error
expectedMetrics string
}{
"do not deduplicate if there are no duplicated timestamps": {
Expand All @@ -1494,10 +1493,6 @@ func TestDistributor_SampleDuplicateTimestamp(t *testing.T) {
makeTimeseries(labels, append(makeSamples(10, 1), makeSamples(20, 2)...), nil, nil),
makeTimeseries(labels, nil, append(makeHistograms(30, generateTestHistogram(0)), makeHistograms(40, generateTestHistogram(1))...), nil),
},
expectedErrors: []error{
fmt.Errorf("samples with duplicated timestamps have been discarded, discarded samples: %d series: '%.200s' (err-mimir-sample-duplicate-timestamp)", 2, "series"),
fmt.Errorf("samples with duplicated timestamps have been discarded, discarded samples: %d series: '%.200s' (err-mimir-sample-duplicate-timestamp)", 2, "series"),
},
expectedMetrics: `
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
# TYPE cortex_discarded_samples_total counter
Expand All @@ -1515,11 +1510,6 @@ func TestDistributor_SampleDuplicateTimestamp(t *testing.T) {
makeTimeseries(labels, makeSamples(10, 3), makeHistograms(20, generateTestHistogram(1)), nil),
makeTimeseries(labels, makeSamples(10, 4), append(makeHistograms(20, generateTestHistogram(3)), makeHistograms(30, generateTestHistogram(4))...), nil),
},
expectedErrors: []error{
fmt.Errorf("samples with duplicated timestamps have been discarded, discarded samples: %d series: '%.200s' (err-mimir-sample-duplicate-timestamp)", 1, "series"),
fmt.Errorf("samples with duplicated timestamps have been discarded, discarded samples: %d series: '%.200s' (err-mimir-sample-duplicate-timestamp)", 1, "series"),
nil,
},
expectedMetrics: `
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
# TYPE cortex_discarded_samples_total counter
Expand All @@ -1541,19 +1531,10 @@ func TestDistributor_SampleDuplicateTimestamp(t *testing.T) {
require.Len(t, regs, 1)

now := mtime.Now()
for i, ts := range tc.req.Timeseries {
for _, ts := range tc.req.Timeseries {
shouldRemove, err := ds[0].validateSeries(now, &ts, "user", "test-group", true, true, 0, 0)
require.False(t, shouldRemove)
if len(tc.expectedErrors) == 0 {
require.NoError(t, err)
} else {
if tc.expectedErrors[i] == nil {
require.NoError(t, err)
} else {
require.Error(t, err)
require.Equal(t, tc.expectedErrors[i], err)
}
}
require.NoError(t, err)
}

assert.Equal(t, tc.expectedSamples, tc.req.Timeseries)
Expand All @@ -1574,8 +1555,7 @@ func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) {
timestamp := now.UnixMilli()

testCases := map[string]struct {
setup func(int) [][]mimirpb.PreallocTimeseries
expectedErrors bool
setup func(int) [][]mimirpb.PreallocTimeseries
}{
"one timeseries with one sample": {
setup: func(n int) [][]mimirpb.PreallocTimeseries {
Expand All @@ -1588,7 +1568,6 @@ func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) {
}
return timeseries
},
expectedErrors: false,
},
"one timeseries with one histogram": {
setup: func(n int) [][]mimirpb.PreallocTimeseries {
Expand All @@ -1601,7 +1580,6 @@ func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) {
}
return timeseries
},
expectedErrors: false,
},
"one timeseries with one sample and one histogram": {
setup: func(n int) [][]mimirpb.PreallocTimeseries {
Expand All @@ -1614,7 +1592,6 @@ func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) {
}
return timeseries
},
expectedErrors: false,
},
"one timeseries with two samples": {
setup: func(n int) [][]mimirpb.PreallocTimeseries {
Expand All @@ -1627,7 +1604,6 @@ func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) {
}
return timeseries
},
expectedErrors: false,
},
"one timeseries with two histograms": {
setup: func(n int) [][]mimirpb.PreallocTimeseries {
Expand All @@ -1640,7 +1616,6 @@ func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) {
}
return timeseries
},
expectedErrors: false,
},
"one timeseries with two samples and two histograms": {
setup: func(n int) [][]mimirpb.PreallocTimeseries {
Expand All @@ -1653,7 +1628,6 @@ func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) {
}
return timeseries
},
expectedErrors: false,
},
"one timeseries with 80_000 samples with duplicated timestamps": {
setup: func(n int) [][]mimirpb.PreallocTimeseries {
Expand All @@ -1674,7 +1648,6 @@ func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) {
}
return timeseries
},
expectedErrors: true,
},
"one timeseries with 80_000 histograms with duplicated timestamps": {
setup: func(n int) [][]mimirpb.PreallocTimeseries {
Expand All @@ -1694,7 +1667,6 @@ func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) {
}
return timeseries
},
expectedErrors: true,
},
"one timeseries with 80_000 samples and 80_000 histograms with duplicated timestamps": {
setup: func(n int) [][]mimirpb.PreallocTimeseries {
Expand All @@ -1717,7 +1689,6 @@ func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) {
}
return timeseries
},
expectedErrors: true,
},
}

Expand All @@ -1739,10 +1710,8 @@ func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) {
for n := 0; n < b.N; n++ {
for _, ts := range timeseries[n] {
_, err := ds[0].validateSeries(now, &ts, "user", "test-group", true, true, 0, 0)
if !tc.expectedErrors && err != nil {
if err != nil {
b.Fatal(err)
} else if tc.expectedErrors && err == nil {
b.Fatal("an error was expected")
}
}
}
Expand Down
1 change: 0 additions & 1 deletion pkg/distributor/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ var (
"received a sample whose timestamp is too far in the past, timestamp: %d series: '%.200s'",
validation.PastGracePeriodFlag,
)
duplicateTimestampMsgFormat = globalerror.SampleDuplicateTimestamp.Message("samples with duplicated timestamps have been discarded, discarded samples: %d series: '%.200s'")
exemplarEmptyLabelsMsgFormat = globalerror.ExemplarLabelsMissing.Message(
"received an exemplar with no valid labels, timestamp: %d series: %s labels: %s",
)
Expand Down
Loading