Skip to content

Commit

Permalink
Merge pull request #94 from vinted/pull_listing_strategies
Browse files Browse the repository at this point in the history
Allow using different listing strategies (thanos-io#7134)
  • Loading branch information
GiedriusS authored Apr 4, 2024
2 parents aa493ea + 204e4a9 commit f5b0c74
Show file tree
Hide file tree
Showing 17 changed files with 182 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7082](https://github.com/thanos-io/thanos/pull/7082) Stores: fix label values edge case when requesting external label values with matchers
- [#7080](https://github.com/thanos-io/thanos/pull/7080) Receive: race condition in handler Close() when stopped early
- [#7132](https://github.com/thanos-io/thanos/pull/7132) Documentation: fix broken helm installation instruction
- [#7134](https://github.com/thanos-io/thanos/pull/7134) Store, Compact: Revert the recursive block listing mechanism introduced in https://github.com/thanos-io/thanos/pull/6474 and use the same strategy as in 0.31. Introduce a `--block-discovery-strategy` flag to control the listing strategy so that a recursive lister can still be used if the tradeoff of slower but cheaper discovery is preferred.

### Added
- [#7105](https://github.com/thanos-io/thanos/pull/7105) Rule: add flag `--query.enable-x-functions` to allow usage of extended promql functions (xrate, xincrease, xdelta) in loaded rules
Expand Down
16 changes: 14 additions & 2 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,16 @@ func runCompact(
consistencyDelayMetaFilter := block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg))
timePartitionMetaFilter := block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime)

baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, baseBlockIDsFetcher, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg))
var blockLister block.Lister
switch syncStrategy(conf.blockListStrategy) {
case concurrentDiscovery:
blockLister = block.NewConcurrentLister(logger, insBkt)
case recursiveDiscovery:
blockLister = block.NewRecursiveLister(logger, insBkt)
default:
return errors.Errorf("unknown sync strategy %s", conf.blockListStrategy)
}
baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, blockLister, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
Expand Down Expand Up @@ -693,6 +701,7 @@ type compactConfig struct {
wait bool
waitInterval time.Duration
disableDownsampling bool
blockListStrategy string
blockMetaFetchConcurrency int
blockFilesConcurrency int
blockViewerSyncBlockInterval time.Duration
Expand Down Expand Up @@ -754,6 +763,9 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
"as querying long time ranges without non-downsampled data is not efficient and useful e.g it is not possible to render all samples for a human eye anyway").
Default("false").BoolVar(&cc.disableDownsampling)

strategies := strings.Join([]string{string(concurrentDiscovery), string(recursiveDiscovery)}, ", ")
cmd.Flag("block-discovery-strategy", "One of "+strategies+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations.").
Default(string(concurrentDiscovery)).StringVar(&cc.blockListStrategy)
cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage.").
Default("32").IntVar(&cc.blockMetaFetchConcurrency)
cmd.Flag("block-files-concurrency", "Number of goroutines to use when fetching/uploading block files from object storage.").
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func RunDownsample(
insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name()))

// While fetching blocks, filter out blocks that were marked for no downsample.
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
block.NewDeduplicateFilter(block.FetcherConcurrency),
downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, block.FetcherConcurrency),
Expand Down
5 changes: 3 additions & 2 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/thanos-io/objstore"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
Expand Down Expand Up @@ -157,7 +158,7 @@ func TestRegression4960_Deadlock(t *testing.T) {

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey())))
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt)
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, baseBlockIDsFetcher, "", nil, nil)
testutil.Ok(t, err)

Expand Down Expand Up @@ -197,7 +198,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey())))
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt)
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, baseBlockIDsFetcher, "", nil, nil)
testutil.Ok(t, err)

Expand Down
25 changes: 23 additions & 2 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"strconv"
"strings"
"time"

"github.com/alecthomas/units"
Expand Down Expand Up @@ -56,6 +57,13 @@ const (
retryIntervalDuration = 10
)

type syncStrategy string

const (
concurrentDiscovery syncStrategy = "concurrent"
recursiveDiscovery syncStrategy = "recursive"
)

type storeConfig struct {
indexCacheConfigs extflag.PathOrContent
objStoreConfig extflag.PathOrContent
Expand All @@ -74,6 +82,7 @@ type storeConfig struct {
component component.StoreAPI
debugLogging bool
syncInterval time.Duration
blockListStrategy string
blockSyncConcurrency int
blockMetaFetchConcurrency int
filterConf *store.FilterConfig
Expand Down Expand Up @@ -137,6 +146,10 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view.").
Default("3m").DurationVar(&sc.syncInterval)

strategies := strings.Join([]string{string(concurrentDiscovery), string(recursiveDiscovery)}, ", ")
cmd.Flag("block-discovery-strategy", "One of "+strategies+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations.").
Default(string(concurrentDiscovery)).StringVar(&sc.blockListStrategy)

cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage. Must be equal or greater than 1.").
Default("20").IntVar(&sc.blockSyncConcurrency)

Expand Down Expand Up @@ -345,9 +358,17 @@ func runStore(
return errors.Wrap(err, "create index cache")
}

var blockLister block.Lister
switch syncStrategy(conf.blockListStrategy) {
case concurrentDiscovery:
blockLister = block.NewConcurrentLister(logger, insBkt)
case recursiveDiscovery:
blockLister = block.NewRecursiveLister(logger, insBkt)
default:
return errors.Errorf("unknown sync strategy %s", conf.blockListStrategy)
}
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, time.Duration(conf.ignoreDeletionMarksDelay), conf.blockMetaFetchConcurrency)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, baseBlockIDsFetcher, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, blockLister, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
[]block.MetadataFilter{
block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
Expand Down
12 changes: 6 additions & 6 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path

// We ignore any block that has the deletion marker file.
filters := []block.MetadataFilter{block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency)}
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters)
if err != nil {
return err
Expand Down Expand Up @@ -408,7 +408,7 @@ func registerBucketLs(app extkingpin.AppClause, objStoreConfig *extflag.PathOrCo
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency)
filters = append(filters, ignoreDeletionMarkFilter)
}
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters)
if err != nil {
return err
Expand Down Expand Up @@ -510,7 +510,7 @@ func registerBucketInspect(app extkingpin.AppClause, objStoreConfig *extflag.Pat
}
insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name()))

baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
Expand Down Expand Up @@ -654,7 +654,7 @@ func registerBucketWeb(app extkingpin.AppClause, objStoreConfig *extflag.PathOrC
return err
}
// TODO(bwplotka): Allow Bucket UI to visualize the state of block as well.
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg),
[]block.MetadataFilter{
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime),
Expand Down Expand Up @@ -833,7 +833,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat

var sy *compact.Syncer
{
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
Expand Down Expand Up @@ -1376,7 +1376,7 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P

var sy *compact.Syncer
{
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
Expand Down
9 changes: 9 additions & 0 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,15 @@ usage: thanos compact [<flags>]
Continuously compacts blocks in an object store bucket.
Flags:
--block-discovery-strategy="concurrent"
One of concurrent, recursive. When set to
concurrent, stores will concurrently issue
one call per directory to discover active
blocks in the bucket. The recursive strategy
iterates through all objects in the bucket,
recursively traversing into each directory.
This avoids N+1 calls at the expense of having
slower bucket iterations.
--block-files-concurrency=1
Number of goroutines to use when
fetching/uploading block files from object
Expand Down
9 changes: 9 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ Store node giving access to blocks in a bucket provider. Now supported GCS, S3,
Azure, Swift, Tencent COS and Aliyun OSS.
Flags:
--block-discovery-strategy="concurrent"
One of concurrent, recursive. When set to
concurrent, stores will concurrently issue
one call per directory to discover active
blocks in the bucket. The recursive strategy
iterates through all objects in the bucket,
recursively traversing into each directory.
This avoids N+1 calls at the expense of having
slower bucket iterations.
--block-meta-fetch-concurrency=32
Number of goroutines to use when fetching block
metadata from object storage.
Expand Down
Loading

0 comments on commit f5b0c74

Please sign in to comment.