Skip to content

Commit

Permalink
Merge pull request #791 from grafana/jvp/reimplement-quiet-zeros
Browse files Browse the repository at this point in the history
TSDB: Reintroduce 'quiet zeros' for otel start time handling
  • Loading branch information
jesusvazquez authored Jan 2, 2025
2 parents 0c24d16 + d8187ec commit 93fa761
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 20 deletions.
3 changes: 3 additions & 0 deletions model/value/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const (
// complicated values in the future. It is 2 rather than 1 to make
// it easier to distinguish from the NormalNaN by a human when debugging.
StaleNaN uint64 = 0x7ff0000000000002

// QuietZeroNaN signals TSDB to add a zero, but do nothing if there is already a value at that timestamp.
QuietZeroNaN uint64 = 0x7ff0000000000003
)

// IsStaleNaN returns true when the provided NaN value is a stale marker.
Expand Down
12 changes: 8 additions & 4 deletions storage/remote/otlptranslator/prometheusremotewrite/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,9 +595,10 @@ const defaultIntervalForStartTimestamps = int64(300_000)
// handleStartTime adds a zero sample at startTs only if startTs is within validIntervalForStartTimestamps of the sample timestamp.
// The reason for doing this is that PRW v1 doesn't support Created Timestamps. After switching to PRW v2's direct CT support,
// make use of its direct support fort Created Timestamps instead.
// See https://github.com/prometheus/prometheus/issues/14600 for context.
// See https://opentelemetry.io/docs/specs/otel/metrics/data-model/#resets-and-gaps to know more about how OTel handles
// resets for cumulative metrics.
func (c *PrometheusConverter) handleStartTime(startTs, ts int64, labels []prompb.Label, settings Settings, typ string, value float64, logger *slog.Logger) {
func (c *PrometheusConverter) handleStartTime(startTs, ts int64, labels []prompb.Label, settings Settings, typ string, val float64, logger *slog.Logger) {
if !settings.EnableCreatedTimestampZeroIngestion {
return
}
Expand All @@ -619,10 +620,13 @@ func (c *PrometheusConverter) handleStartTime(startTs, ts int64, labels []prompb
return
}

logger.Debug("adding zero value at start_ts", "type", typ, "labels", labelsStringer(labels), "start_ts", startTs, "sample_ts", ts, "sample_value", value)
logger.Debug("adding zero value at start_ts", "type", typ, "labels", labelsStringer(labels), "start_ts", startTs, "sample_ts", ts, "sample_value", val)

// See https://github.com/prometheus/prometheus/issues/14600 for context.
c.addSample(&prompb.Sample{Timestamp: startTs}, labels)
var createdTimeValue float64
if settings.EnableStartTimeQuietZero {
createdTimeValue = math.Float64frombits(value.QuietZeroNaN)
}
c.addSample(&prompb.Sample{Timestamp: startTs, Value: createdTimeValue}, labels)
}

// handleHistogramStartTime similar to the method above but for native histograms..
Expand Down
20 changes: 15 additions & 5 deletions storage/remote/otlptranslator/prometheusremotewrite/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package prometheusremotewrite

import (
"context"
"math"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -29,7 +31,9 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"

"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/util/testutil"
)

func TestCreateAttributes(t *testing.T) {
Expand Down Expand Up @@ -313,14 +317,14 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) {
timeSeriesSignature(labels): {
Labels: labels,
Samples: []prompb.Sample{
{Value: 0, Timestamp: convertTimeStamp(nowMinus2m30s)},
{Value: math.Float64frombits(value.QuietZeroNaN), Timestamp: convertTimeStamp(nowMinus2m30s)},
{Value: 0, Timestamp: convertTimeStamp(nowUnixNano)},
},
},
timeSeriesSignature(sumLabels): {
Labels: sumLabels,
Samples: []prompb.Sample{
{Value: 0, Timestamp: convertTimeStamp(nowMinus2m30s)},
{Value: math.Float64frombits(value.QuietZeroNaN), Timestamp: convertTimeStamp(nowMinus2m30s)},
{Value: 0, Timestamp: convertTimeStamp(nowUnixNano)},
},
},
Expand Down Expand Up @@ -361,14 +365,14 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) {
timeSeriesSignature(labels): {
Labels: labels,
Samples: []prompb.Sample{
{Value: 0, Timestamp: convertTimeStamp(nowMinus6m)},
{Value: math.Float64frombits(value.QuietZeroNaN), Timestamp: convertTimeStamp(nowMinus6m)},
{Value: 0, Timestamp: convertTimeStamp(nowUnixNano)},
},
},
timeSeriesSignature(sumLabels): {
Labels: sumLabels,
Samples: []prompb.Sample{
{Value: 0, Timestamp: convertTimeStamp(nowMinus6m)},
{Value: math.Float64frombits(value.QuietZeroNaN), Timestamp: convertTimeStamp(nowMinus6m)},
{Value: 0, Timestamp: convertTimeStamp(nowUnixNano)},
},
},
Expand Down Expand Up @@ -474,19 +478,25 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) {
Settings{
ExportCreatedMetric: true,
EnableCreatedTimestampZeroIngestion: true,
EnableStartTimeQuietZero: true,
ValidIntervalCreatedTimestampZeroIngestion: tt.overrideValidInterval,
},
metric.Name(),
promslog.NewNopLogger(),
)
require.NoError(t, err)

assert.Equal(t, tt.want(), converter.unique)
testutil.RequireEqualWithOptions(t, tt.want(), converter.unique, []cmp.Option{cmp.Comparer(equalSamples)})
assert.Empty(t, converter.conflicts)
})
}
}

func equalSamples(a, b prompb.Sample) bool {
// Compare Float64bits so NaN values which are exactly the same will compare equal.
return a.Timestamp == b.Timestamp && math.Float64bits(a.Value) == math.Float64bits(b.Value)
}

func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) {
ts := pcommon.Timestamp(time.Now().UnixNano())
tests := []struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Settings struct {

// Mimir specifics.
EnableCreatedTimestampZeroIngestion bool
EnableStartTimeQuietZero bool
ValidIntervalCreatedTimestampZeroIngestion time.Duration
}

Expand Down
9 changes: 9 additions & 0 deletions tsdb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,15 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB) {
return db
}

// queryHead is a helper to query the head for a given time range and labelset.
func queryHead(t testing.TB, head *Head, mint, maxt int64, label labels.Label) (map[string][]chunks.Sample, error) {
q, err := NewBlockQuerier(head, mint, maxt)
if err != nil {
return nil, err
}
return query(t, q, labels.MustNewMatcher(labels.MatchEqual, label.Name, label.Value)), nil
}

// query runs a matcher query against the querier and fully expands its data.
func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample {
ss := q.Select(context.Background(), false, nil, matchers...)
Expand Down
11 changes: 10 additions & 1 deletion tsdb/head_append.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,14 +497,18 @@ func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTi
if s.lastHistogramValue != nil || s.lastFloatHistogramValue != nil {
return false, 0, storage.NewDuplicateHistogramToFloatErr(t, v)
}
if math.Float64bits(s.lastValue) != math.Float64bits(v) {
if math.Float64bits(s.lastValue) != math.Float64bits(v) && math.Float64bits(v) != value.QuietZeroNaN {
return false, 0, storage.NewDuplicateFloatErr(t, s.lastValue, v)
}
// Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf.
return false, 0, nil
}
}

if math.Float64bits(v) == value.QuietZeroNaN { // Say it's allowed; it will be dropped later in commitSamples.
return true, 0, nil
}

// The sample cannot go in the in-order chunk. Check if it can go in the out-of-order chunk.
if oooTimeWindow > 0 && t >= headMaxt-oooTimeWindow {
return true, headMaxt - t, nil
Expand Down Expand Up @@ -1144,6 +1148,8 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) {
switch {
case err != nil:
// Do nothing here.
case oooSample && math.Float64bits(s.V) == value.QuietZeroNaN:
// No-op: we don't store quiet zeros out-of-order.
case oooSample:
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
Expand Down Expand Up @@ -1190,6 +1196,9 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) {
acc.floatsAppended--
}
default:
if math.Float64bits(s.V) == value.QuietZeroNaN {
s.V = 0 // Note that this is modifying the copy which is what will be appended but the WAL got the NaN already.
}
ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
Expand Down
123 changes: 113 additions & 10 deletions tsdb/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,15 +547,6 @@ func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
})
}

// queryHead is a helper to query the head for a given time range and labelset.
queryHead := func(mint, maxt uint64, label labels.Label) (map[string][]chunks.Sample, error) {
q, err := NewBlockQuerier(head, int64(mint), int64(maxt))
if err != nil {
return nil, err
}
return query(t, q, labels.MustNewMatcher(labels.MatchEqual, label.Name, label.Value)), nil
}

// readerTsCh will be used by the coordinator go routine to coordinate which timestamps the reader should read.
readerTsCh := make(chan uint64)

Expand Down Expand Up @@ -583,7 +574,7 @@ func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
lbls.Range(func(l labels.Label) {
lbl = l
})
samples, err := queryHead(ts-qryRange, ts, lbl)
samples, err := queryHead(t, head, int64(ts-qryRange), int64(ts), lbl)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -6098,6 +6089,118 @@ func TestCuttingNewHeadChunks(t *testing.T) {
}
}

func TestAppendQuietZeroDuplicates(t *testing.T) {
ts := int64(1695209650)
lbls := labels.FromStrings("foo", "bar")
h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
defer func() {
require.NoError(t, h.Close())
}()

a := h.Appender(context.Background())
_, err := a.Append(0, lbls, ts, 42.0)
require.NoError(t, err)
_, err = a.Append(0, lbls, ts, 42.0) // Exactly the same value.
require.NoError(t, err)
_, err = a.Append(0, lbls, ts, math.Float64frombits(value.QuietZeroNaN)) // Should be a no-op.
require.NoError(t, err)
require.NoError(t, a.Commit())

result, err := queryHead(t, h, math.MinInt64, math.MaxInt64, labels.Label{Name: "foo", Value: "bar"})
require.NoError(t, err)
expectedSamples := []chunks.Sample{sample{t: ts, f: 42.0}}
require.Equal(t, expectedSamples, result[`{foo="bar"}`])

a = h.Appender(context.Background())
_, err = a.Append(0, lbls, ts+10, math.Float64frombits(value.QuietZeroNaN)) // This is at a different timestamp so should append a real zero.
require.NoError(t, err)
_, err = a.Append(0, lbls, ts+15, 5.0) // We append a normal value to reflect what would happen in reality.
require.NoError(t, err)
require.NoError(t, a.Commit())

result, err = queryHead(t, h, math.MinInt64, math.MaxInt64, labels.Label{Name: "foo", Value: "bar"})
require.NoError(t, err)
expectedSamples = []chunks.Sample{
sample{t: ts, f: 42.0},
sample{t: ts + 10, f: 0},
sample{t: ts + 15, f: 5},
}
require.Equal(t, expectedSamples, result[`{foo="bar"}`])

a = h.Appender(context.Background())
_, err = a.Append(0, lbls, ts+5, math.Float64frombits(value.QuietZeroNaN)) // This is out-of-order, so should be dropped.
require.NoError(t, err)
require.NoError(t, a.Commit())

result, err = queryHead(t, h, math.MinInt64, math.MaxInt64, labels.Label{Name: "foo", Value: "bar"})
require.NoError(t, err)
require.Equal(t, expectedSamples, result[`{foo="bar"}`]) // Same expectedSamples as before.
}

func TestQuietZeroWALReplay(t *testing.T) {
ts := int64(1695209650)
lbls := labels.FromStrings("foo", "bar")
h, w := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, true)

a := h.Appender(context.Background())
_, err := a.Append(0, lbls, ts, 42.0)
require.NoError(t, err)
_, err = a.Append(0, lbls, ts, 42.0) // Exactly the same value.
require.NoError(t, err)
_, err = a.Append(0, lbls, ts, math.Float64frombits(value.QuietZeroNaN)) // Should be a no-op.
require.NoError(t, err)
_, err = a.Append(0, lbls, ts+10, math.Float64frombits(value.QuietZeroNaN)) // This is at a different timestamp so should append a real zero.
require.NoError(t, err)
_, err = a.Append(0, lbls, ts+5, math.Float64frombits(value.QuietZeroNaN)) // This is out-of-order, so should be dropped.
require.NoError(t, err)
require.NoError(t, a.Commit())

result, err := queryHead(t, h, math.MinInt64, math.MaxInt64, labels.Label{Name: "foo", Value: "bar"})
require.NoError(t, err)
expectedSamples := []chunks.Sample{
sample{t: ts, f: 42.0},
sample{t: ts + 10, f: 0},
}
require.Equal(t, expectedSamples, result[`{foo="bar"}`])

require.NoError(t, h.Close())

// Next we replay the WAL by creating a new head and then verify that previous samples are there as we expect them.
w, err = wlog.New(nil, nil, w.Dir(), wlog.CompressionNone)
require.NoError(t, err)
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = w.Dir()
h, err = NewHead(nil, nil, w, nil, opts, nil)
require.NoError(t, err)
require.NoError(t, h.Init(0))
defer func() {
require.NoError(t, h.Close())
}()

result, err = queryHead(t, h, math.MinInt64, math.MaxInt64, labels.Label{Name: "foo", Value: "bar"})
require.NoError(t, err)
require.Equal(t, expectedSamples, result[`{foo="bar"}`])

// For correctness, we also verify that the WAL contains the expected records.
recs := readTestWAL(t, h.wal.Dir())
require.NotEmpty(t, recs, "WAL should contain records")

if samples, ok := recs[1].([]record.RefSample); ok {
require.Len(t, samples, 5)
require.Equal(t, record.RefSample{Ref: 1, T: ts, V: 42.0}, samples[0])
require.Equal(t, record.RefSample{Ref: 1, T: ts, V: 42.0}, samples[1])
require.True(t, math.IsNaN(samples[2].V))
require.Equal(t, ts, samples[2].T)
require.True(t, math.IsNaN(samples[3].V))
require.Equal(t, ts+10, samples[3].T)
require.True(t, math.IsNaN(samples[4].V))
require.Equal(t, ts+5, samples[4].T)
} else {
t.Fatalf("unexpected record type: %T", recs[1])
}
}

// TestHeadDetectsDuplicateSampleAtSizeLimit tests a regression where a duplicate sample
// is appended to the head, right when the head chunk is at the size limit.
// The test adds all samples as duplicate, thus expecting that the result has
Expand Down
7 changes: 7 additions & 0 deletions tsdb/head_wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
Expand Down Expand Up @@ -589,6 +590,9 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
if s.T <= ms.mmMaxTime {
continue
}
if math.Float64bits(s.V) == value.QuietZeroNaN {
s.V = 0
}
if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
Expand Down Expand Up @@ -989,6 +993,9 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs, unknownHi
unknownRefs++
continue
}
if math.Float64bits(s.V) == value.QuietZeroNaN {
continue
}
ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger)
if chunkCreated {
h.metrics.chunksCreated.Inc()
Expand Down

0 comments on commit 93fa761

Please sign in to comment.