From 3b8d993500cb715f27e5618d90afe5c8ac9d501d Mon Sep 17 00:00:00 2001 From: Matt Veitas Date: Mon, 6 Jan 2025 08:28:17 -0500 Subject: [PATCH] feat: Skip writeback for chunks fetched by queriers older than a duration (#15605) --- docs/sources/shared/configuration.md | 5 ++ pkg/storage/chunk/cache/cache_test.go | 2 +- pkg/storage/chunk/fetcher/fetcher.go | 27 ++++++--- pkg/storage/chunk/fetcher/fetcher_test.go | 59 ++++++++++++------- pkg/storage/config/store.go | 8 ++- pkg/storage/store.go | 2 +- pkg/storage/stores/series_store_write_test.go | 2 +- pkg/storage/util_test.go | 2 +- 8 files changed, 69 insertions(+), 38 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index c8bc0e9aaeac5..d352cd6960082 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -1732,6 +1732,11 @@ The `chunk_store_config` block configures how chunks will be cached and how long # The CLI flags prefix for this block configuration is: store.index-cache-write [write_dedupe_cache_config: ] +# Chunks fetched from queriers before this duration will not be written to the +# cache. A value of 0 will write all chunks to the cache +# CLI flag: -store.skip-query-writeback-older-than +[skip_query_writeback_cache_older_than: | default = 0s] + # Chunks will be handed off to the L2 cache after this duration. 0 to disable L2 # cache. # CLI flag: -store.chunks-cache-l2.handoff diff --git a/pkg/storage/chunk/cache/cache_test.go b/pkg/storage/chunk/cache/cache_test.go index 3ff473934cdb1..2607595b30cb9 100644 --- a/pkg/storage/chunk/cache/cache_test.go +++ b/pkg/storage/chunk/cache/cache_test.go @@ -132,7 +132,7 @@ func testChunkFetcher(t *testing.T, c cache.Cache, chunks []chunk.Chunk) { }, } - fetcher, err := fetcher.New(c, nil, false, s, nil, 0) + fetcher, err := fetcher.New(c, nil, false, s, nil, 0, 0) require.NoError(t, err) defer fetcher.Stop() diff --git a/pkg/storage/chunk/fetcher/fetcher.go b/pkg/storage/chunk/fetcher/fetcher.go index 45b6970045a91..cd0e4ce6a90b4 100644 --- a/pkg/storage/chunk/fetcher/fetcher.go +++ b/pkg/storage/chunk/fetcher/fetcher.go @@ -49,7 +49,8 @@ type Fetcher struct { cachel2 cache.Cache cacheStubs bool - l2CacheHandoff time.Duration + l2CacheHandoff time.Duration + skipQueryWritebackCacheOlderThan time.Duration wait sync.WaitGroup decodeRequests chan decodeRequest @@ -69,15 +70,16 @@ type decodeResponse struct { } // New makes a new ChunkFetcher. -func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.SchemaConfig, storage client.Client, l2CacheHandoff time.Duration) (*Fetcher, error) { +func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.SchemaConfig, storage client.Client, l2CacheHandoff time.Duration, skipQueryWritebackOlderThan time.Duration) (*Fetcher, error) { c := &Fetcher{ - schema: schema, - storage: storage, - cache: cache, - cachel2: cachel2, - l2CacheHandoff: l2CacheHandoff, - cacheStubs: cacheStubs, - decodeRequests: make(chan decodeRequest), + schema: schema, + storage: storage, + cache: cache, + cachel2: cachel2, + l2CacheHandoff: l2CacheHandoff, + cacheStubs: cacheStubs, + skipQueryWritebackCacheOlderThan: skipQueryWritebackOlderThan, + decodeRequests: make(chan decodeRequest), } c.wait.Add(chunkDecodeParallelism) @@ -138,6 +140,9 @@ func (c *Fetcher) FetchChunks(ctx context.Context, chunks []chunk.Chunk) ([]chun l2OnlyChunks := make([]chunk.Chunk, 0, len(chunks)) for _, m := range chunks { + if c.skipQueryWritebackCacheOlderThan > 0 && m.From.Time().Before(time.Now().UTC().Add(-c.skipQueryWritebackCacheOlderThan)) { + continue + } // Similar to below, this is an optimization to not bother looking in the l1 cache if there isn't a reasonable // expectation to find it there. if c.l2CacheHandoff > 0 && m.From.Time().Before(time.Now().UTC().Add(-extendedHandoff)) { @@ -230,6 +235,10 @@ func (c *Fetcher) WriteBackCache(ctx context.Context, chunks []chunk.Chunk) erro keysL2 := make([]string, 0, len(chunks)) bufsL2 := make([][]byte, 0, len(chunks)) for i := range chunks { + if c.skipQueryWritebackCacheOlderThan > 0 && chunks[i].From.Time().Before(time.Now().UTC().Add(-c.skipQueryWritebackCacheOlderThan)) { + continue + } + var encoded []byte var err error if !c.cacheStubs { diff --git a/pkg/storage/chunk/fetcher/fetcher_test.go b/pkg/storage/chunk/fetcher/fetcher_test.go index 2251c93022b29..5aceefb53c691 100644 --- a/pkg/storage/chunk/fetcher/fetcher_test.go +++ b/pkg/storage/chunk/fetcher/fetcher_test.go @@ -25,16 +25,17 @@ import ( func Test(t *testing.T) { now := time.Now() tests := []struct { - name string - handoff time.Duration - storeStart []chunk.Chunk - l1Start []chunk.Chunk - l2Start []chunk.Chunk - fetch []chunk.Chunk - l1KeysRequested int - l1End []chunk.Chunk - l2KeysRequested int - l2End []chunk.Chunk + name string + handoff time.Duration + skipQueryWriteback time.Duration + storeStart []chunk.Chunk + l1Start []chunk.Chunk + l2Start []chunk.Chunk + fetch []chunk.Chunk + l1KeysRequested int + l1End []chunk.Chunk + l2KeysRequested int + l2End []chunk.Chunk }{ { name: "all found in L1 cache", @@ -82,6 +83,19 @@ func Test(t *testing.T) { l2KeysRequested: 3, l2End: makeChunks(now, c{7 * time.Hour, 8 * time.Hour}, c{8 * time.Hour, 9 * time.Hour}, c{9 * time.Hour, 10 * time.Hour}), }, + { + name: "skipQueryWriteback", + handoff: 24 * time.Hour, + skipQueryWriteback: 3 * 24 * time.Hour, + storeStart: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}, c{5 * 24 * time.Hour, 6 * 24 * time.Hour}, c{5 * 24 * time.Hour, 6 * 24 * time.Hour}), + l1Start: []chunk.Chunk{}, + l2Start: []chunk.Chunk{}, + fetch: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}, c{5 * 24 * time.Hour, 6 * 24 * time.Hour}, c{5 * 24 * time.Hour, 6 * 24 * time.Hour}), + l1KeysRequested: 3, + l1End: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}), + l2KeysRequested: 0, + l2End: []chunk.Chunk{}, + }, { name: "writeback l1", handoff: 24 * time.Hour, @@ -194,7 +208,7 @@ func Test(t *testing.T) { assert.NoError(t, chunkClient.PutChunks(context.Background(), test.storeStart)) // Build fetcher - f, err := New(c1, c2, false, sc, chunkClient, test.handoff) + f, err := New(c1, c2, false, sc, chunkClient, test.handoff, test.skipQueryWriteback) assert.NoError(t, err) // Run the test @@ -235,16 +249,17 @@ func BenchmarkFetch(b *testing.B) { fetch = append(fetch, storeStart...) test := struct { - name string - handoff time.Duration - storeStart []chunk.Chunk - l1Start []chunk.Chunk - l2Start []chunk.Chunk - fetch []chunk.Chunk - l1KeysRequested int - l1End []chunk.Chunk - l2KeysRequested int - l2End []chunk.Chunk + name string + handoff time.Duration + skipQueryWriteback time.Duration + storeStart []chunk.Chunk + l1Start []chunk.Chunk + l2Start []chunk.Chunk + fetch []chunk.Chunk + l1KeysRequested int + l1End []chunk.Chunk + l2KeysRequested int + l2End []chunk.Chunk }{ name: "some in L1, some in L2", handoff: time.Duration(numchunks/3+100) * time.Hour, @@ -291,7 +306,7 @@ func BenchmarkFetch(b *testing.B) { _ = chunkClient.PutChunks(context.Background(), test.storeStart) // Build fetcher - f, _ := New(c1, c2, false, sc, chunkClient, test.handoff) + f, _ := New(c1, c2, false, sc, chunkClient, test.handoff, test.skipQueryWriteback) for i := 0; i < b.N; i++ { _, err := f.FetchChunks(context.Background(), test.fetch) diff --git a/pkg/storage/config/store.go b/pkg/storage/config/store.go index 8dbd57cdc2503..27c48d4b08a37 100644 --- a/pkg/storage/config/store.go +++ b/pkg/storage/config/store.go @@ -10,9 +10,10 @@ import ( ) type ChunkStoreConfig struct { - ChunkCacheConfig cache.Config `yaml:"chunk_cache_config"` - ChunkCacheConfigL2 cache.Config `yaml:"chunk_cache_config_l2"` - WriteDedupeCacheConfig cache.Config `yaml:"write_dedupe_cache_config" doc:"description=Write dedupe cache is deprecated along with legacy index types (aws, aws-dynamo, bigtable, bigtable-hashed, cassandra, gcp, gcp-columnkey, grpc-store).\nConsider using TSDB index which does not require a write dedupe cache."` + ChunkCacheConfig cache.Config `yaml:"chunk_cache_config"` + ChunkCacheConfigL2 cache.Config `yaml:"chunk_cache_config_l2"` + WriteDedupeCacheConfig cache.Config `yaml:"write_dedupe_cache_config" doc:"description=Write dedupe cache is deprecated along with legacy index types (aws, aws-dynamo, bigtable, bigtable-hashed, cassandra, gcp, gcp-columnkey, grpc-store).\nConsider using TSDB index which does not require a write dedupe cache."` + SkipQueryWritebackOlderThan time.Duration `yaml:"skip_query_writeback_cache_older_than"` L2ChunkCacheHandoff time.Duration `yaml:"l2_chunk_cache_handoff"` CacheLookupsOlderThan model.Duration `yaml:"cache_lookups_older_than"` @@ -38,6 +39,7 @@ func (cfg *ChunkStoreConfig) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.L2ChunkCacheHandoff, "store.chunks-cache-l2.handoff", 0, "Chunks will be handed off to the L2 cache after this duration. 0 to disable L2 cache.") f.BoolVar(&cfg.chunkCacheStubs, "store.chunks-cache.cache-stubs", false, "If true, don't write the full chunk to cache, just a stub entry.") cfg.WriteDedupeCacheConfig.RegisterFlagsWithPrefix("store.index-cache-write.", "", f) + f.DurationVar(&cfg.SkipQueryWritebackOlderThan, "store.skip-query-writeback-older-than", 0, "Chunks fetched from queriers before this duration will not be written to the cache. A value of 0 will write all chunks to the cache") f.Var(&cfg.CacheLookupsOlderThan, "store.cache-lookups-older-than", "Cache index entries older than this period. 0 to disable.") } diff --git a/pkg/storage/store.go b/pkg/storage/store.go index a8e6a1add3239..8daf27ce265f8 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -198,7 +198,7 @@ func (s *LokiStore) init() error { if err != nil { return err } - f, err := fetcher.New(s.chunksCache, s.chunksCacheL2, s.storeCfg.ChunkCacheStubs(), s.schemaCfg, chunkClient, s.storeCfg.L2ChunkCacheHandoff) + f, err := fetcher.New(s.chunksCache, s.chunksCacheL2, s.storeCfg.ChunkCacheStubs(), s.schemaCfg, chunkClient, s.storeCfg.L2ChunkCacheHandoff, s.storeCfg.SkipQueryWritebackOlderThan) if err != nil { return err } diff --git a/pkg/storage/stores/series_store_write_test.go b/pkg/storage/stores/series_store_write_test.go index 5ff8a00d99706..a6e0a9a55cb93 100644 --- a/pkg/storage/stores/series_store_write_test.go +++ b/pkg/storage/stores/series_store_write_test.go @@ -160,7 +160,7 @@ func TestChunkWriter_PutOne(t *testing.T) { idx := &mockIndexWriter{} client := &mockChunksClient{} - f, err := fetcher.New(cache, nil, false, schemaConfig, client, 0) + f, err := fetcher.New(cache, nil, false, schemaConfig, client, 0, 0) require.NoError(t, err) cw := NewChunkWriter(f, schemaConfig, idx, true) diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index a0dc75999692f..cc9ded1c53447 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -261,7 +261,7 @@ func (m *mockChunkStore) GetChunks(_ context.Context, _ string, _, _ model.Time, panic(err) } - f, err := fetcher.New(cache, nil, false, m.schemas, m.client, 0) + f, err := fetcher.New(cache, nil, false, m.schemas, m.client, 0, 0) if err != nil { panic(err) }