Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check cached postings TTL before returning from cache + metrics #822

Merged
merged 8 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions tsdb/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,11 +352,11 @@ type Block struct {
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
// to instantiate chunk structs.
func OpenBlock(logger *slog.Logger, dir string, pool chunkenc.Pool, postingsDecoderFactory PostingsDecoderFactory) (pb *Block, err error) {
return OpenBlockWithOptions(logger, dir, pool, postingsDecoderFactory, nil, DefaultPostingsForMatchersCacheTTL, DefaultPostingsForMatchersCacheMaxItems, DefaultPostingsForMatchersCacheMaxBytes, DefaultPostingsForMatchersCacheForce)
return OpenBlockWithOptions(logger, dir, pool, postingsDecoderFactory, nil, DefaultPostingsForMatchersCacheTTL, DefaultPostingsForMatchersCacheMaxItems, DefaultPostingsForMatchersCacheMaxBytes, DefaultPostingsForMatchersCacheForce, NewPostingsForMatchersCacheMetrics(nil))
}

// OpenBlockWithOptions is like OpenBlock but allows to pass a cache provider and sharding function.
func OpenBlockWithOptions(logger *slog.Logger, dir string, pool chunkenc.Pool, postingsDecoderFactory PostingsDecoderFactory, cache index.ReaderCacheProvider, postingsCacheTTL time.Duration, postingsCacheMaxItems int, postingsCacheMaxBytes int64, postingsCacheForce bool) (pb *Block, err error) {
func OpenBlockWithOptions(logger *slog.Logger, dir string, pool chunkenc.Pool, postingsDecoderFactory PostingsDecoderFactory, cache index.ReaderCacheProvider, postingsCacheTTL time.Duration, postingsCacheMaxItems int, postingsCacheMaxBytes int64, postingsCacheForce bool, postingsCacheMetrics *PostingsForMatchersCacheMetrics) (pb *Block, err error) {
if logger == nil {
logger = promslog.NewNopLogger()
}
Expand Down Expand Up @@ -385,7 +385,7 @@ func OpenBlockWithOptions(logger *slog.Logger, dir string, pool chunkenc.Pool, p
if err != nil {
return nil, err
}
pfmc := NewPostingsForMatchersCache(postingsCacheTTL, postingsCacheMaxItems, postingsCacheMaxBytes, postingsCacheForce)
pfmc := NewPostingsForMatchersCache(postingsCacheTTL, postingsCacheMaxItems, postingsCacheMaxBytes, postingsCacheForce, postingsCacheMetrics)
ir := indexReaderWithPostingsForMatchers{indexReader, pfmc}
closers = append(closers, ir)

Expand Down
17 changes: 13 additions & 4 deletions tsdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,12 @@ func DefaultOptions() *Options {
HeadPostingsForMatchersCacheMaxItems: DefaultPostingsForMatchersCacheMaxItems,
HeadPostingsForMatchersCacheMaxBytes: DefaultPostingsForMatchersCacheMaxBytes,
HeadPostingsForMatchersCacheForce: DefaultPostingsForMatchersCacheForce,
HeadPostingsForMatchersCacheMetrics: NewPostingsForMatchersCacheMetrics(nil),
BlockPostingsForMatchersCacheTTL: DefaultPostingsForMatchersCacheTTL,
BlockPostingsForMatchersCacheMaxItems: DefaultPostingsForMatchersCacheMaxItems,
BlockPostingsForMatchersCacheMaxBytes: DefaultPostingsForMatchersCacheMaxBytes,
BlockPostingsForMatchersCacheForce: DefaultPostingsForMatchersCacheForce,
BlockPostingsForMatchersCacheMetrics: NewPostingsForMatchersCacheMetrics(nil),
}
}

Expand Down Expand Up @@ -259,6 +261,9 @@ type Options struct {
// HeadPostingsForMatchersCacheForce forces the usage of postings for matchers cache for all calls on Head and OOOHead regardless of the `concurrent` param.
HeadPostingsForMatchersCacheForce bool

// HeadPostingsForMatchersCacheMetrics holds the metrics tracked by PostingsForMatchers cache when querying the Head.
HeadPostingsForMatchersCacheMetrics *PostingsForMatchersCacheMetrics

// BlockPostingsForMatchersCacheTTL is the TTL of the postings for matchers cache of each compacted block.
// If it's 0, the cache will only deduplicate in-flight requests, deleting the results once the first request has finished.
BlockPostingsForMatchersCacheTTL time.Duration
Expand All @@ -275,6 +280,9 @@ type Options struct {
// regardless of the `concurrent` param.
BlockPostingsForMatchersCacheForce bool

// BlockPostingsForMatchersCacheMetrics holds the metrics tracked by PostingsForMatchers cache when querying blocks.
BlockPostingsForMatchersCacheMetrics *PostingsForMatchersCacheMetrics

// SecondaryHashFunction is an optional function that is applied to each series in the Head.
// Values returned from this function are preserved and available by calling ForEachSecondaryHash function on the Head.
SecondaryHashFunction func(labels.Labels) uint32
Expand Down Expand Up @@ -702,7 +710,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) {
return nil, ErrClosed
default:
}
loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, DefaultPostingsDecoderFactory, nil, DefaultPostingsForMatchersCacheTTL, DefaultPostingsForMatchersCacheMaxItems, DefaultPostingsForMatchersCacheMaxBytes, DefaultPostingsForMatchersCacheForce)
loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, DefaultPostingsDecoderFactory, nil, DefaultPostingsForMatchersCacheTTL, DefaultPostingsForMatchersCacheMaxItems, DefaultPostingsForMatchersCacheMaxBytes, DefaultPostingsForMatchersCacheForce, NewPostingsForMatchersCacheMetrics(nil))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1043,6 +1051,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
headOpts.PostingsForMatchersCacheMaxItems = opts.HeadPostingsForMatchersCacheMaxItems
headOpts.PostingsForMatchersCacheMaxBytes = opts.HeadPostingsForMatchersCacheMaxBytes
headOpts.PostingsForMatchersCacheForce = opts.HeadPostingsForMatchersCacheForce
headOpts.PostingsForMatchersCacheMetrics = opts.HeadPostingsForMatchersCacheMetrics
headOpts.SecondaryHashFunction = opts.SecondaryHashFunction
if opts.WALReplayConcurrency > 0 {
headOpts.WALReplayConcurrency = opts.WALReplayConcurrency
Expand Down Expand Up @@ -1685,7 +1694,7 @@ func (db *DB) reloadBlocks() (err error) {
db.mtx.Lock()
defer db.mtx.Unlock()

loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.PostingsDecoderFactory, db.opts.SeriesHashCache, db.opts.BlockPostingsForMatchersCacheTTL, db.opts.BlockPostingsForMatchersCacheMaxItems, db.opts.BlockPostingsForMatchersCacheMaxBytes, db.opts.BlockPostingsForMatchersCacheForce)
loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.PostingsDecoderFactory, db.opts.SeriesHashCache, db.opts.BlockPostingsForMatchersCacheTTL, db.opts.BlockPostingsForMatchersCacheMaxItems, db.opts.BlockPostingsForMatchersCacheMaxBytes, db.opts.BlockPostingsForMatchersCacheForce, db.opts.BlockPostingsForMatchersCacheMetrics)
if err != nil {
return err
}
Expand Down Expand Up @@ -1780,7 +1789,7 @@ func (db *DB) reloadBlocks() (err error) {
return nil
}

func openBlocks(l *slog.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, postingsDecoderFactory PostingsDecoderFactory, cache *hashcache.SeriesHashCache, postingsCacheTTL time.Duration, postingsCacheMaxItems int, postingsCacheMaxBytes int64, postingsCacheForce bool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
func openBlocks(l *slog.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, postingsDecoderFactory PostingsDecoderFactory, cache *hashcache.SeriesHashCache, postingsCacheTTL time.Duration, postingsCacheMaxItems int, postingsCacheMaxBytes int64, postingsCacheForce bool, postingsCacheMetrics *PostingsForMatchersCacheMetrics) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
bDirs, err := blockDirs(dir)
if err != nil {
return nil, nil, fmt.Errorf("find blocks: %w", err)
Expand All @@ -1802,7 +1811,7 @@ func openBlocks(l *slog.Logger, dir string, loaded []*Block, chunkPool chunkenc.
cacheProvider = cache.GetBlockCacheProvider(meta.ULID.String())
}

block, err = OpenBlockWithOptions(l, bDir, chunkPool, postingsDecoderFactory, cacheProvider, postingsCacheTTL, postingsCacheMaxItems, postingsCacheMaxBytes, postingsCacheForce)
block, err = OpenBlockWithOptions(l, bDir, chunkPool, postingsDecoderFactory, cacheProvider, postingsCacheTTL, postingsCacheMaxItems, postingsCacheMaxBytes, postingsCacheForce, postingsCacheMetrics)
if err != nil {
corrupted[meta.ULID] = err
continue
Expand Down
12 changes: 7 additions & 5 deletions tsdb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3118,11 +3118,13 @@ func TestCompactHead(t *testing.T) {

// Open a DB and append data to the WAL.
tsdbCfg := &Options{
RetentionDuration: int64(time.Hour * 24 * 15 / time.Millisecond),
NoLockfile: true,
MinBlockDuration: int64(time.Hour * 2 / time.Millisecond),
MaxBlockDuration: int64(time.Hour * 2 / time.Millisecond),
WALCompression: wlog.CompressionSnappy,
RetentionDuration: int64(time.Hour * 24 * 15 / time.Millisecond),
NoLockfile: true,
MinBlockDuration: int64(time.Hour * 2 / time.Millisecond),
MaxBlockDuration: int64(time.Hour * 2 / time.Millisecond),
WALCompression: wlog.CompressionSnappy,
HeadPostingsForMatchersCacheMetrics: NewPostingsForMatchersCacheMetrics(nil),
BlockPostingsForMatchersCacheMetrics: NewPostingsForMatchersCacheMetrics(nil),
}

db, err := Open(dbDir, promslog.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg, nil)
Expand Down
4 changes: 3 additions & 1 deletion tsdb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ type HeadOptions struct {
PostingsForMatchersCacheMaxItems int
PostingsForMatchersCacheMaxBytes int64
PostingsForMatchersCacheForce bool
PostingsForMatchersCacheMetrics *PostingsForMatchersCacheMetrics

// Optional hash function applied to each new series. Computed hash value is preserved for each series in the head,
// and values can be iterated by using Head.ForEachSecondaryHash method.
Expand Down Expand Up @@ -222,6 +223,7 @@ func DefaultHeadOptions() *HeadOptions {
PostingsForMatchersCacheMaxItems: DefaultPostingsForMatchersCacheMaxItems,
PostingsForMatchersCacheMaxBytes: DefaultPostingsForMatchersCacheMaxBytes,
PostingsForMatchersCacheForce: DefaultPostingsForMatchersCacheForce,
PostingsForMatchersCacheMetrics: NewPostingsForMatchersCacheMetrics(nil),
WALReplayConcurrency: defaultWALReplayConcurrency,
}
ho.OutOfOrderCapMax.Store(DefaultOutOfOrderCapMax)
Expand Down Expand Up @@ -296,7 +298,7 @@ func NewHead(r prometheus.Registerer, l *slog.Logger, wal, wbl *wlog.WL, opts *H
stats: stats,
reg: r,
secondaryHashFunc: shf,
pfmc: NewPostingsForMatchersCache(opts.PostingsForMatchersCacheTTL, opts.PostingsForMatchersCacheMaxItems, opts.PostingsForMatchersCacheMaxBytes, opts.PostingsForMatchersCacheForce),
pfmc: NewPostingsForMatchersCache(opts.PostingsForMatchersCacheTTL, opts.PostingsForMatchersCacheMaxItems, opts.PostingsForMatchersCacheMaxBytes, opts.PostingsForMatchersCacheForce, opts.PostingsForMatchersCacheMetrics),
}
if err := h.resetInMemoryState(); err != nil {
return nil, err
Expand Down
Loading
Loading