Skip to content

Commit

Permalink
fix: do not retain copy of chunk while indexing a new chunk in tsdb w…
Browse files Browse the repository at this point in the history
…hile processing delete requests (#15541)
  • Loading branch information
sandeepsukhani authored Jan 2, 2025
1 parent a431ee2 commit ff19955
Showing 1 changed file with 22 additions and 18 deletions.
40 changes: 22 additions & 18 deletions pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ type compactedIndex struct {
tableInterval model.Interval
periodConfig config.PeriodConfig

indexChunks []chunk.Chunk
indexChunks map[string][]tsdbindex.ChunkMeta
deleteChunks map[string][]tsdbindex.ChunkMeta
seriesToCleanup map[string]struct{}
}
Expand All @@ -282,6 +282,7 @@ func newCompactedIndex(ctx context.Context, tableName, userID, workingDir string
workingDir: workingDir,
periodConfig: periodConfig,
tableInterval: retention.ExtractIntervalFromTableName(tableName),
indexChunks: map[string][]tsdbindex.ChunkMeta{},
deleteChunks: map[string][]tsdbindex.ChunkMeta{},
seriesToCleanup: map[string]struct{}{},
}
Expand Down Expand Up @@ -338,7 +339,20 @@ func (c *compactedIndex) IndexChunk(chk chunk.Chunk) (bool, error) {
return false, nil
}

c.indexChunks = append(c.indexChunks, chk)
// TSDB doesnt need the __name__="log" convention the old chunk store index used.
b := labels.NewBuilder(chk.Metric)
b.Del(labels.MetricName)
ls := b.Labels().String()

approxKB := math.Round(float64(chk.Data.UncompressedSize()) / float64(1<<10))

c.indexChunks[ls] = append(c.indexChunks[ls], tsdbindex.ChunkMeta{
Checksum: chk.Checksum,
MinTime: int64(chk.From),
MaxTime: int64(chk.Through),
KB: uint32(approxKB),
Entries: uint32(chk.Data.Entries()),
})

return true, nil
}
Expand Down Expand Up @@ -372,22 +386,12 @@ func (c *compactedIndex) ToIndexFile() (shipperindex.Index, error) {
}
c.deleteChunks = nil

for _, chk := range c.indexChunks {
// TSDB doesnt need the __name__="log" convention the old chunk store index used.
b := labels.NewBuilder(chk.Metric)
b.Del(labels.MetricName)
ls := b.Labels()

approxKB := math.Round(float64(chk.Data.UncompressedSize()) / float64(1<<10))
err := c.builder.InsertChunk(ls.String(), tsdbindex.ChunkMeta{
Checksum: chk.Checksum,
MinTime: int64(chk.From),
MaxTime: int64(chk.Through),
KB: uint32(approxKB),
Entries: uint32(chk.Data.Entries()),
})
if err != nil {
return nil, err
for ls, metas := range c.indexChunks {
for i := range metas {
err := c.builder.InsertChunk(ls, metas[i])
if err != nil {
return nil, err
}
}
}
c.indexChunks = nil
Expand Down

0 comments on commit ff19955

Please sign in to comment.