Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] decode chunks without sample limit and compare against normal decoding #213

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions tsdb/chunkenc/xor.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,25 @@ func (c *XORChunk) iterator(it Iterator) *xorIterator {
}
}

func (c *XORChunk) iteratorWithoutSampleLimit() *xorIteratorWithoutSampleLimit {
return &xorIteratorWithoutSampleLimit{
// The first 2 bytes contain chunk headers.
// We skip that for actual samples.
br: newBReader(c.b.bytes()[2:]),
numTotal: binary.BigEndian.Uint16(c.b.bytes()),
t: math.MinInt64,
}
}

// Iterator implements the Chunk interface.
func (c *XORChunk) Iterator(it Iterator) Iterator {
return c.iterator(it)
}

func (c *XORChunk) IteratorWithoutSampleLimit() Iterator {
return c.iteratorWithoutSampleLimit()
}

type xorAppender struct {
b *bstream

Expand Down
224 changes: 224 additions & 0 deletions tsdb/chunkenc/xor_without_sample_limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
package chunkenc

import (
"encoding/binary"
"math"
)

type xorIteratorWithoutSampleLimit struct {
br bstreamReader
numTotal uint16
numRead uint16

t int64
val float64

leading uint8
trailing uint8

tDelta uint64
err error
}

func (it *xorIteratorWithoutSampleLimit) Seek(t int64) bool {
if it.err != nil {
return false
}

for t > it.t || it.numRead == 0 {
if !it.Next() {
return false
}
}
return true
}

func (it *xorIteratorWithoutSampleLimit) At() (int64, float64) {
return it.t, it.val
}

func (it *xorIteratorWithoutSampleLimit) Err() error {
return it.err
}

func (it *xorIteratorWithoutSampleLimit) Reset(b []byte) {
// The first 2 bytes contain chunk headers.
// We skip that for actual samples.
it.br = newBReader(b[2:])
it.numTotal = binary.BigEndian.Uint16(b)

it.numRead = 0
it.t = 0
it.val = 0
it.leading = 0
it.trailing = 0
it.tDelta = 0
it.err = nil
}

func (it *xorIteratorWithoutSampleLimit) Next() bool {
// Comment this to not limit the sample count
/*if it.err != nil || it.numRead == it.numTotal {
return false
}*/

if it.numRead == 0 {
t, err := binary.ReadVarint(&it.br)
if err != nil {
it.err = err
return false
}
v, err := it.br.readBits(64)
if err != nil {
it.err = err
return false
}
it.t = t
it.val = math.Float64frombits(v)

it.numRead++
return true
}
if it.numRead == 1 {
tDelta, err := binary.ReadUvarint(&it.br)
if err != nil {
it.err = err
return false
}
it.tDelta = tDelta
it.t = it.t + int64(it.tDelta)

return it.readValue()
}

var d byte
// read delta-of-delta
for i := 0; i < 4; i++ {
d <<= 1
bit, err := it.br.readBitFast()
if err != nil {
bit, err = it.br.readBit()
}
if err != nil {
it.err = err
return false
}
if bit == zero {
break
}
d |= 1
}
var sz uint8
var dod int64
switch d {
case 0b0:
// dod == 0
case 0b10:
sz = 14
case 0b110:
sz = 17
case 0b1110:
sz = 20
case 0b1111:
// Do not use fast because it's very unlikely it will succeed.
bits, err := it.br.readBits(64)
if err != nil {
it.err = err
return false
}

dod = int64(bits)
}

if sz != 0 {
bits, err := it.br.readBitsFast(sz)
if err != nil {
bits, err = it.br.readBits(sz)
}
if err != nil {
it.err = err
return false
}

// Account for negative numbers, which come back as high unsigned numbers.
// See docs/bstream.md.
if bits > (1 << (sz - 1)) {
bits -= 1 << sz
}
dod = int64(bits)
}

it.tDelta = uint64(int64(it.tDelta) + dod)
it.t = it.t + int64(it.tDelta)

return it.readValue()
}

func (it *xorIteratorWithoutSampleLimit) readValue() bool {
bit, err := it.br.readBitFast()
if err != nil {
bit, err = it.br.readBit()
}
if err != nil {
it.err = err
return false
}

if bit == zero {
// it.val = it.val
} else {
bit, err := it.br.readBitFast()
if err != nil {
bit, err = it.br.readBit()
}
if err != nil {
it.err = err
return false
}
if bit == zero {
// reuse leading/trailing zero bits
// it.leading, it.trailing = it.leading, it.trailing
} else {
bits, err := it.br.readBitsFast(5)
if err != nil {
bits, err = it.br.readBits(5)
}
if err != nil {
it.err = err
return false
}
it.leading = uint8(bits)

bits, err = it.br.readBitsFast(6)
if err != nil {
bits, err = it.br.readBits(6)
}
if err != nil {
it.err = err
return false
}
mbits := uint8(bits)
// 0 significant bits here means we overflowed and we actually need 64; see comment in encoder
if mbits == 0 {
mbits = 64
}
it.trailing = 64 - it.leading - mbits
}

mbits := 64 - it.leading - it.trailing
bits, err := it.br.readBitsFast(mbits)
if err != nil {
bits, err = it.br.readBits(mbits)
}
if err != nil {
it.err = err
return false
}
vbits := math.Float64bits(it.val)
vbits ^= bits << it.trailing
it.val = math.Float64frombits(vbits)
}

it.numRead++
return true
}
Loading