Skip to content

Commit

Permalink
Fallback to manual listing, when bucket index is not available (#2609)
Browse files Browse the repository at this point in the history
Since we merged the compactor, store-gateways rely on it to write the block index to quickly list blocks.

This will safely fallback to the previous behaviour, if the bucket index
is not existing, corrupt or too old.
  • Loading branch information
simonswine committed Oct 31, 2023
1 parent 568f33d commit 372f898
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 18 deletions.
6 changes: 5 additions & 1 deletion pkg/phlaredb/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ type MetaFetcher struct {

// NewMetaFetcher returns a MetaFetcher.
func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReader, dir string, reg prometheus.Registerer, filters []MetadataFilter) (*MetaFetcher, error) {
return NewMetaFetcherWithMetrics(logger, concurrency, bkt, dir, NewFetcherMetrics(reg, nil), filters)
}

func NewMetaFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.BucketReader, dir string, metrics *FetcherMetrics, filters []MetadataFilter) (*MetaFetcher, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -164,7 +168,7 @@ func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReade
bkt: bkt,
cacheDir: cacheDir,
cached: map[ulid.ULID]*Meta{},
metrics: NewFetcherMetrics(reg, nil),
metrics: metrics,
filters: filters,
}, nil
}
Expand Down
66 changes: 50 additions & 16 deletions pkg/storegateway/bucket_index_metadata_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package storegateway

import (
"context"
"path/filepath"
"time"

"github.com/go-kit/log"
Expand All @@ -21,8 +22,9 @@ import (
)

const (
corruptedBucketIndex = "corrupted-bucket-index"
noBucketIndex = "no-bucket-index"
corruptedBucketIndex = "corrupted-bucket-index"
noBucketIndex = "no-bucket-index"
bucketIndexOlderThanHour = "bucket-index-older-than-hour"
)

// BucketIndexMetadataFetcher is a Thanos MetadataFetcher implementation leveraging on the Mimir bucket index.
Expand All @@ -33,6 +35,7 @@ type BucketIndexMetadataFetcher struct {
logger log.Logger
filters []block.MetadataFilter
metrics *block.FetcherMetrics
fallback block.MetadataFetcher
}

func NewBucketIndexMetadataFetcher(
Expand All @@ -53,39 +56,70 @@ func NewBucketIndexMetadataFetcher(
}
}

func (f *BucketIndexMetadataFetcher) fallbackFetch(ctx context.Context) (metas map[ulid.ULID]*block.Meta, partial map[ulid.ULID]error, err error) {
if f.fallback == nil {
userBucket := objstore.NewTenantBucketClient(f.userID, f.bkt, f.cfgProvider)
fetcher, err := block.NewMetaFetcherWithMetrics(f.logger, 16, userBucket, filepath.Join("./data-store-gateway", f.userID), f.metrics, f.filters)
if err != nil {
return nil, nil, err
}
f.fallback = fetcher
}

return f.fallback.Fetch(ctx)
}

// Fetch implements block.MetadataFetcher. Not goroutine-safe.
func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*block.Meta, partial map[ulid.ULID]error, err error) {
f.metrics.ResetTx()

start := time.Now()
defer func() {
f.metrics.SyncDuration.Observe(time.Since(start).Seconds())
if err != nil {
f.metrics.SyncFailures.Inc()
}
}()
f.metrics.Syncs.Inc()

// Fetch the bucket index.
idx, err := bucketindex.ReadIndex(ctx, f.bkt, f.userID, f.cfgProvider, f.logger)
if errors.Is(err, bucketindex.ErrIndexNotFound) {
// This is a legit case happening when the first blocks of a tenant have recently been uploaded by ingesters
// and their bucket index has not been created yet.
f.metrics.Synced.WithLabelValues(noBucketIndex).Set(1)
f.metrics.Submit()
defer func() {
f.metrics.Synced.WithLabelValues(noBucketIndex).Set(1)
f.metrics.Submit()
}()

return nil, nil, nil
level.Warn(f.logger).Log("msg", "no bucket index found, falling back to fetching directly from bucket", "user", f.userID)
return f.fallbackFetch(ctx)
}
if errors.Is(err, bucketindex.ErrIndexCorrupted) {
// In case a single tenant bucket index is corrupted, we don't want the store-gateway to fail at startup
// because unable to fetch blocks metadata. We'll act as if the tenant has no bucket index, but the query
// will fail anyway in the querier (the querier fails in the querier if bucket index is corrupted).
level.Error(f.logger).Log("msg", "corrupted bucket index found", "user", f.userID, "err", err)
f.metrics.Synced.WithLabelValues(corruptedBucketIndex).Set(1)
f.metrics.Submit()
level.Error(f.logger).Log("msg", "corrupted bucket index found, falling back to fetching directly from bucket", "user", f.userID, "err", err)
defer func() {
f.metrics.Synced.WithLabelValues(corruptedBucketIndex).Set(1)
f.metrics.Submit()
}()

return f.fallbackFetch(ctx)
}

// check if index is older than 1 hour, fallback to metafetcher
if time.Unix(idx.UpdatedAt, 0).Before(start.Add(-1 * time.Hour)) {
defer func() {
f.metrics.Synced.WithLabelValues(bucketIndexOlderThanHour).Set(1)
f.metrics.Submit()
}()

return nil, nil, nil
level.Warn(f.logger).Log("msg", "bucket index is older than 1 hour, falling back to fetching directly from bucket", "user", f.userID)
return f.fallbackFetch(ctx)
}

defer func() {
f.metrics.SyncDuration.Observe(time.Since(start).Seconds())
if err != nil {
f.metrics.SyncFailures.Inc()
}
}()
f.metrics.Syncs.Inc()

if err != nil {
f.metrics.Synced.WithLabelValues(block.FailedMeta).Set(1)
f.metrics.Submit()
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket_index_metadata_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestBucketIndexMetadataFetcher_Fetch_NoBucketIndex(t *testing.T) {
require.NoError(t, err)
assert.Empty(t, metas)
assert.Empty(t, partials)
assert.Empty(t, logs)
assert.Contains(t, logs.String(), "no bucket index found, falling back to fetching directly from bucket")

assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP blocks_meta_sync_failures_total Total blocks metadata synchronization failures
Expand Down

0 comments on commit 372f898

Please sign in to comment.