Skip to content

Commit

Permalink
Fallback to deduplication when hitting problem block planning (#2732)
Browse files Browse the repository at this point in the history
The panics fail a query hard, when one of the assertions is not met.
With e.g changing compactor shards, it panics and fails the query. This
changes behaviour, so that it instead falls back to deduplication.
  • Loading branch information
simonswine committed Nov 20, 2023
1 parent d87e810 commit a4e0400
Showing 1 changed file with 20 additions and 8 deletions.
28 changes: 20 additions & 8 deletions pkg/querier/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package querier
import (
"context"
"encoding/json"
"fmt"
"sort"

"github.com/cespare/xxhash/v2"
Expand Down Expand Up @@ -201,7 +202,7 @@ func (r *replicasPerBlockID) removeBlock(ulid string) {
}

// this step removes sharded blocks that don't have all the shards present for a time window
func (r *replicasPerBlockID) pruneIncompleteShardedBlocks() {
func (r *replicasPerBlockID) pruneIncompleteShardedBlocks() error {
type compactionKey struct {
level int32
minT int64
Expand All @@ -212,7 +213,7 @@ func (r *replicasPerBlockID) pruneIncompleteShardedBlocks() {
for blockID := range r.m {
meta, ok := r.meta[blockID]
if !ok {
panic("meta missing")
return fmt.Errorf("meta missing for block id %s", blockID)
}
if !ok {
continue
Expand Down Expand Up @@ -240,7 +241,7 @@ func (r *replicasPerBlockID) pruneIncompleteShardedBlocks() {
for _, block := range blocks {
meta, ok := r.meta[block]
if !ok {
panic("meta is missing")
return fmt.Errorf("meta missing for block id %s", block)
}

shardIdx, shards, ok := shardFromBlock(meta)
Expand All @@ -262,7 +263,7 @@ func (r *replicasPerBlockID) pruneIncompleteShardedBlocks() {
}

if len(shardsSeen) != int(shards) {
panic("shard length mismatch")
return fmt.Errorf("shard length mismatch, shards seen: %d, shards as per label: %d", len(shardsSeen), shards)
}

shardsSeen[shardIdx] = true
Expand All @@ -285,14 +286,18 @@ func (r *replicasPerBlockID) pruneIncompleteShardedBlocks() {
r.removeBlock(block)
}
}

return nil
}

// prunes blocks that are contained by a higher compaction level block
func (r *replicasPerBlockID) pruneSupersededBlocks() {
func (r *replicasPerBlockID) pruneSupersededBlocks() error {
for blockID := range r.m {
meta, ok := r.meta[blockID]
if !ok {
panic("meta missing")
if !ok {
return fmt.Errorf("meta missing for block id %s", blockID)
}
}
if meta.Compaction == nil {
continue
Expand All @@ -307,6 +312,7 @@ func (r *replicasPerBlockID) pruneSupersededBlocks() {
r.removeBlock(blockID)
}
}
return nil
}

type blockPlan map[string]*ingestv1.BlockHints
Expand All @@ -327,8 +333,14 @@ func (r *replicasPerBlockID) blockPlan(ctx context.Context) map[string]*ingestv1
smallestCompactionLevel = int32(0)
)

r.pruneIncompleteShardedBlocks()
r.pruneSupersededBlocks()
if err := r.pruneIncompleteShardedBlocks(); err != nil {
level.Warn(r.logger).Log("msg", "block planning failed to prune incomplete sharded blocks", "err", err)
return nil
}
if err := r.pruneSupersededBlocks(); err != nil {
level.Warn(r.logger).Log("msg", "block planning failed to prune superseded blocks", "err", err)
return nil
}

// now we go through all blocks and choose the replicas that we want to query
for blockID, replicas := range r.m {
Expand Down

0 comments on commit a4e0400

Please sign in to comment.