diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index ba00a6e811..bf9ba75dbd 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -131,11 +131,25 @@ func (c *XORChunk) iterator(it Iterator) *xorIterator { } } +func (c *XORChunk) iteratorWithoutSampleLimit() *xorIteratorWithoutSampleLimit { + return &xorIteratorWithoutSampleLimit{ + // The first 2 bytes contain chunk headers. + // We skip that for actual samples. + br: newBReader(c.b.bytes()[2:]), + numTotal: binary.BigEndian.Uint16(c.b.bytes()), + t: math.MinInt64, + } +} + // Iterator implements the Chunk interface. func (c *XORChunk) Iterator(it Iterator) Iterator { return c.iterator(it) } +func (c *XORChunk) IteratorWithoutSampleLimit() Iterator { + return c.iteratorWithoutSampleLimit() +} + type xorAppender struct { b *bstream diff --git a/tsdb/chunkenc/xor_without_sample_limit.go b/tsdb/chunkenc/xor_without_sample_limit.go new file mode 100644 index 0000000000..c919572a22 --- /dev/null +++ b/tsdb/chunkenc/xor_without_sample_limit.go @@ -0,0 +1,224 @@ +package chunkenc + +import ( + "encoding/binary" + "math" +) + +type xorIteratorWithoutSampleLimit struct { + br bstreamReader + numTotal uint16 + numRead uint16 + + t int64 + val float64 + + leading uint8 + trailing uint8 + + tDelta uint64 + err error +} + +func (it *xorIteratorWithoutSampleLimit) Seek(t int64) bool { + if it.err != nil { + return false + } + + for t > it.t || it.numRead == 0 { + if !it.Next() { + return false + } + } + return true +} + +func (it *xorIteratorWithoutSampleLimit) At() (int64, float64) { + return it.t, it.val +} + +func (it *xorIteratorWithoutSampleLimit) Err() error { + return it.err +} + +func (it *xorIteratorWithoutSampleLimit) Reset(b []byte) { + // The first 2 bytes contain chunk headers. + // We skip that for actual samples. + it.br = newBReader(b[2:]) + it.numTotal = binary.BigEndian.Uint16(b) + + it.numRead = 0 + it.t = 0 + it.val = 0 + it.leading = 0 + it.trailing = 0 + it.tDelta = 0 + it.err = nil +} + +func (it *xorIteratorWithoutSampleLimit) Next() bool { + // Comment this to not limit the sample count + /*if it.err != nil || it.numRead == it.numTotal { + return false + }*/ + + if it.numRead == 0 { + t, err := binary.ReadVarint(&it.br) + if err != nil { + it.err = err + return false + } + v, err := it.br.readBits(64) + if err != nil { + it.err = err + return false + } + it.t = t + it.val = math.Float64frombits(v) + + it.numRead++ + return true + } + if it.numRead == 1 { + tDelta, err := binary.ReadUvarint(&it.br) + if err != nil { + it.err = err + return false + } + it.tDelta = tDelta + it.t = it.t + int64(it.tDelta) + + return it.readValue() + } + + var d byte + // read delta-of-delta + for i := 0; i < 4; i++ { + d <<= 1 + bit, err := it.br.readBitFast() + if err != nil { + bit, err = it.br.readBit() + } + if err != nil { + it.err = err + return false + } + if bit == zero { + break + } + d |= 1 + } + var sz uint8 + var dod int64 + switch d { + case 0b0: + // dod == 0 + case 0b10: + sz = 14 + case 0b110: + sz = 17 + case 0b1110: + sz = 20 + case 0b1111: + // Do not use fast because it's very unlikely it will succeed. + bits, err := it.br.readBits(64) + if err != nil { + it.err = err + return false + } + + dod = int64(bits) + } + + if sz != 0 { + bits, err := it.br.readBitsFast(sz) + if err != nil { + bits, err = it.br.readBits(sz) + } + if err != nil { + it.err = err + return false + } + + // Account for negative numbers, which come back as high unsigned numbers. + // See docs/bstream.md. + if bits > (1 << (sz - 1)) { + bits -= 1 << sz + } + dod = int64(bits) + } + + it.tDelta = uint64(int64(it.tDelta) + dod) + it.t = it.t + int64(it.tDelta) + + return it.readValue() +} + +func (it *xorIteratorWithoutSampleLimit) readValue() bool { + bit, err := it.br.readBitFast() + if err != nil { + bit, err = it.br.readBit() + } + if err != nil { + it.err = err + return false + } + + if bit == zero { + // it.val = it.val + } else { + bit, err := it.br.readBitFast() + if err != nil { + bit, err = it.br.readBit() + } + if err != nil { + it.err = err + return false + } + if bit == zero { + // reuse leading/trailing zero bits + // it.leading, it.trailing = it.leading, it.trailing + } else { + bits, err := it.br.readBitsFast(5) + if err != nil { + bits, err = it.br.readBits(5) + } + if err != nil { + it.err = err + return false + } + it.leading = uint8(bits) + + bits, err = it.br.readBitsFast(6) + if err != nil { + bits, err = it.br.readBits(6) + } + if err != nil { + it.err = err + return false + } + mbits := uint8(bits) + // 0 significant bits here means we overflowed and we actually need 64; see comment in encoder + if mbits == 0 { + mbits = 64 + } + it.trailing = 64 - it.leading - mbits + } + + mbits := 64 - it.leading - it.trailing + bits, err := it.br.readBitsFast(mbits) + if err != nil { + bits, err = it.br.readBits(mbits) + } + if err != nil { + it.err = err + return false + } + vbits := math.Float64bits(it.val) + vbits ^= bits << it.trailing + it.val = math.Float64frombits(vbits) + } + + it.numRead++ + return true +} diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 8c882ab7b2..1b008e6c7b 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -28,6 +28,7 @@ import ( "strings" "sync" "testing" + "text/tabwriter" "time" "github.com/pkg/errors" @@ -40,6 +41,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" @@ -3211,3 +3213,176 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) { require.Equal(t, 0, idx) require.Greater(t, offset, 0) } + +func TestWritingHighFreqSamples(t *testing.T) { + sampleCount := 12000 // one chunk file is supposed to have max 120 samples, this should create at least 100 chunk files. + compactInterval := 1000 // compact after every 1k samples + sampleRate := time.Millisecond // time delta between each two samples + metricName := "my_test_metric" + startTs := time.Now() + untilTs := time.Now().Add(time.Duration(sampleCount) * sampleRate) + sampleTs := startTs + + dir := t.TempDir() + db, err := Open(dir, nil, nil, &Options{ + RetentionDuration: time.Hour.Milliseconds() * 24, + MinBlockDuration: time.Hour.Milliseconds() * 2, + MaxBlockDuration: time.Hour.Milliseconds() * 2, + NoLockfile: true, + StripeSize: 16384, + HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize, + HeadChunksEndTimeVariance: 0, + WALCompression: false, + WALSegmentSize: wal.DefaultSegmentSize, + EnableExemplarStorage: true, + MaxExemplars: 100000, + EnableMemorySnapshotOnShutdown: false, + IsolationDisabled: true, + }, nil) + require.NoError(t, err) + defer func() { + require.NoError(t, db.Close()) + }() + + compact := func() { + h := db.Head() + db.CompactHead(NewRangeHead(h, h.MinTime(), h.MaxTime())) + } + + for sampleIdx := 0; sampleIdx < sampleCount; sampleIdx++ { + app := db.Appender(context.Background()) + _, err = app.Append(0, labels.Labels{{Name: "__name__", Value: metricName}}, sampleTs.UnixMilli(), 10) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + if sampleIdx%compactInterval == 0 { + compact() + } + + sampleTs = sampleTs.Add(sampleRate) + } + compact() + + cq, err := db.ChunkQuerier(context.Background(), startTs.UnixMilli(), untilTs.UnixMilli()) + require.NoError(t, err) + defer func() { + require.NoError(t, cq.Close()) + }() + + type sample struct { + t int64 + v float64 + } + + matcher, err := labels.NewMatcher(labels.MatchEqual, "__name__", metricName) + require.NoError(t, err) + css := cq.Select(false, nil, matcher) + require.NoError(t, css.Err()) + + samplesPerChunkWithSampleLimit := make(map[uint64][]sample) + for css.Next() { + series := css.At() + sit := series.Iterator() + for sit.Next() { + meta := sit.At() + + cit := meta.Chunk.Iterator(nil) + for cit.Err() == nil && cit.Next() { + ts, val := cit.At() + samplesPerChunkWithSampleLimit[uint64(meta.Ref)] = append( + samplesPerChunkWithSampleLimit[uint64(meta.Ref)], + sample{ + t: ts, v: val, + }, + ) + } + + require.NoError(t, cit.Err()) + } + } + + css = cq.Select(false, nil, matcher) + require.NoError(t, css.Err()) + + samplesPerChunkWithoutSampleLimit := make(map[uint64][]sample) + for css.Next() { + series := css.At() + sit := series.Iterator() + for sit.Next() { + meta := sit.At() + + cit := meta.Chunk.(*chunkenc.XORChunk).IteratorWithoutSampleLimit() + for cit.Err() == nil && cit.Next() { + ts, val := cit.At() + samplesPerChunkWithoutSampleLimit[uint64(meta.Ref)] = append( + samplesPerChunkWithoutSampleLimit[uint64(meta.Ref)], + sample{ + t: ts, v: val, + }, + ) + } + + err = cit.Err() + // ignore EOF errors, they happen because we don't limit by sample count anymore. + if err != io.EOF { + require.NoError(t, err) + } + + if len(samplesPerChunkWithSampleLimit[uint64(meta.Ref)]) != len(samplesPerChunkWithoutSampleLimit[uint64(meta.Ref)]) { + fmt.Println("--- new chunk ---") + //if len(samplesPerChunkWithSampleLimit[uint64(meta.Ref)]) < len(samplesPerChunkWithoutSampleLimit[uint64(meta.Ref)])-1 { + w := tabwriter.NewWriter(os.Stdout, 1, 1, 1, ' ', 0) + + samplesWith := samplesPerChunkWithSampleLimit[uint64(meta.Ref)] + samplesWithout := samplesPerChunkWithoutSampleLimit[uint64(meta.Ref)] + var idxWith, idxWithout int + var vWith, vWithout float64 + var tWith, tWithout int64 + for { + if idxWith >= len(samplesWith) && idxWithout >= len(samplesWithout) { + break + } + + onlyWith := func() { + vWithout = 0 + tWithout = 0 + + vWith = samplesWith[idxWith].v + tWith = samplesWith[idxWith].t + idxWith++ + } + onlyWithout := func() { + vWith = 0 + tWith = 0 + + vWithout = samplesWithout[idxWithout].v + tWithout = samplesWithout[idxWithout].t + idxWithout++ + } + + if idxWith >= len(samplesWith) { + onlyWithout() + } else if idxWithout >= len(samplesWithout) { + onlyWith() + } else if samplesWith[idxWith].t > samplesWithout[idxWithout].t { + onlyWithout() + } else if samplesWith[idxWith].t < samplesWithout[idxWithout].t { + onlyWith() + } else { + vWith = samplesWith[idxWith].v + tWith = samplesWith[idxWith].t + idxWith++ + + vWithout = samplesWithout[idxWithout].v + tWithout = samplesWithout[idxWithout].t + idxWithout++ + } + + fmt.Fprintf(w, "%f\t%d (%s) \t|\t%f\t%d (%s)\n", vWith, tWith, timestamp.Time(tWith).UTC().Format(time.RFC3339Nano), vWithout, tWithout, timestamp.Time(tWithout).UTC().Format(time.RFC3339Nano)) + } + + w.Flush() + } + } + } +}