Skip to content

Commit

Permalink
Improve block plan creation for standalone binary mode (#3325)
Browse files Browse the repository at this point in the history
* Improve block plan handling for standalone binary mode

* Adapt tests to change in block plan struct

* Add check before querying store gateways
  • Loading branch information
aleks-p authored May 31, 2024
1 parent 8348529 commit 0c63617
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 45 deletions.
28 changes: 16 additions & 12 deletions pkg/querier/analyze_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ func (q *Querier) AnalyzeQuery(ctx context.Context, req *connect.Request[querier
}
addBlockStatsToQueryScope(blockStatsFromReplicas, ingesterQueryScope)

blockStatsFromReplicas, err = q.getBlockStatsFromStoreGateways(ctx, plan, storeGatewayQueryScope.blockIds)
if err != nil {
return nil, err
if q.storeGatewayQuerier != nil {
blockStatsFromReplicas, err = q.getBlockStatsFromStoreGateways(ctx, plan, storeGatewayQueryScope.blockIds)
if err != nil {
return nil, err
}
addBlockStatsToQueryScope(blockStatsFromReplicas, storeGatewayQueryScope)
}
addBlockStatsToQueryScope(blockStatsFromReplicas, storeGatewayQueryScope)

queriedSeries, err := q.getQueriedSeriesCount(ctx, req.Msg)
if err != nil {
Expand Down Expand Up @@ -66,14 +68,16 @@ func getDataFromPlan(plan blockPlan) (ingesterQueryScope *queryScope, storeGatew
deduplicationNeeded = false
for _, planEntry := range plan {
deduplicationNeeded = deduplicationNeeded || planEntry.Deduplication
if planEntry.InstanceType == ingesterInstance {
ingesterQueryScope.ComponentCount += 1
ingesterQueryScope.BlockCount += uint64(len(planEntry.Ulids))
ingesterQueryScope.blockIds = append(ingesterQueryScope.blockIds, planEntry.Ulids...)
} else {
storeGatewayQueryScope.ComponentCount += 1
storeGatewayQueryScope.BlockCount += uint64(len(planEntry.Ulids))
storeGatewayQueryScope.blockIds = append(storeGatewayQueryScope.blockIds, planEntry.Ulids...)
for _, t := range planEntry.InstanceTypes {
if t == ingesterInstance {
ingesterQueryScope.ComponentCount += 1
ingesterQueryScope.BlockCount += uint64(len(planEntry.Ulids))
ingesterQueryScope.blockIds = append(ingesterQueryScope.blockIds, planEntry.Ulids...)
} else {
storeGatewayQueryScope.ComponentCount += 1
storeGatewayQueryScope.BlockCount += uint64(len(planEntry.Ulids))
storeGatewayQueryScope.blockIds = append(storeGatewayQueryScope.blockIds, planEntry.Ulids...)
}
}
}
return ingesterQueryScope, storeGatewayQueryScope, deduplicationNeeded
Expand Down
44 changes: 34 additions & 10 deletions pkg/querier/analyze_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ func Test_getDataFromPlan(t *testing.T) {
name: "plan with ingesters only",
plan: blockPlan{
"replica 1": &blockPlanEntry{
BlockHints: &ingestv1.BlockHints{Ulids: []string{"block A", "block B"}, Deduplication: true},
InstanceType: ingesterInstance,
BlockHints: &ingestv1.BlockHints{Ulids: []string{"block A", "block B"}, Deduplication: true},
InstanceTypes: []instanceType{ingesterInstance},
},
"replica 2": &blockPlanEntry{
BlockHints: &ingestv1.BlockHints{Ulids: []string{"block C", "block D"}, Deduplication: true},
InstanceType: ingesterInstance,
BlockHints: &ingestv1.BlockHints{Ulids: []string{"block C", "block D"}, Deduplication: true},
InstanceTypes: []instanceType{ingesterInstance},
},
},
verifyIngesterQueryScope: func(t *testing.T, scope *queryScope) {
Expand All @@ -66,16 +66,16 @@ func Test_getDataFromPlan(t *testing.T) {
name: "plan with ingesters and store gateways",
plan: blockPlan{
"replica 1": &blockPlanEntry{
BlockHints: &ingestv1.BlockHints{Ulids: []string{"block A", "block B"}, Deduplication: true},
InstanceType: ingesterInstance,
BlockHints: &ingestv1.BlockHints{Ulids: []string{"block A", "block B"}, Deduplication: true},
InstanceTypes: []instanceType{ingesterInstance},
},
"replica 2": &blockPlanEntry{
BlockHints: &ingestv1.BlockHints{Ulids: []string{"block C", "block D"}, Deduplication: true},
InstanceType: ingesterInstance,
BlockHints: &ingestv1.BlockHints{Ulids: []string{"block C", "block D"}, Deduplication: true},
InstanceTypes: []instanceType{ingesterInstance},
},
"replica 3": &blockPlanEntry{
BlockHints: &ingestv1.BlockHints{Ulids: []string{"block E", "block F"}, Deduplication: true},
InstanceType: storeGatewayInstance,
BlockHints: &ingestv1.BlockHints{Ulids: []string{"block E", "block F"}, Deduplication: true},
InstanceTypes: []instanceType{storeGatewayInstance},
},
},
verifyIngesterQueryScope: func(t *testing.T, scope *queryScope) {
Expand All @@ -94,6 +94,30 @@ func Test_getDataFromPlan(t *testing.T) {
},
wantDeduplicationNeeded: true,
},
{
name: "plan with a single replica with dual instance types (standalone binary)",
plan: blockPlan{
"replica 1": &blockPlanEntry{
BlockHints: &ingestv1.BlockHints{Ulids: []string{"block A"}, Deduplication: true},
InstanceTypes: []instanceType{ingesterInstance, storeGatewayInstance},
},
},
verifyIngesterQueryScope: func(t *testing.T, scope *queryScope) {
require.Equal(t, uint64(1), scope.ComponentCount)
require.Equal(t, uint64(1), scope.BlockCount)
for _, block := range []string{"block A"} {
require.True(t, slices.Contains(scope.blockIds, block))
}
},
verifyStoreGatewayQueryScope: func(t *testing.T, scope *queryScope) {
require.Equal(t, uint64(1), scope.ComponentCount)
require.Equal(t, uint64(1), scope.BlockCount)
for _, block := range []string{"block A"} {
require.True(t, slices.Contains(scope.blockIds, block))
}
},
wantDeduplicationNeeded: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,7 @@ func Test_splitQueryToStores(t *testing.T) {
start: model.TimeFromUnixNano(int64(30 * time.Minute)),
end: model.TimeFromUnixNano(int64(45*time.Minute) + int64(3*time.Hour)),
queryStoreAfter: 30 * time.Minute,
plan: blockPlan{"replica-a": &blockPlanEntry{InstanceType: ingesterInstance, BlockHints: &ingestv1.BlockHints{Ulids: []string{"block-a", "block-b"}}}},
plan: blockPlan{"replica-a": &blockPlanEntry{InstanceTypes: []instanceType{ingesterInstance}, BlockHints: &ingestv1.BlockHints{Ulids: []string{"block-a", "block-b"}}}},

expected: storeQueries{
queryStoreAfter: 0,
Expand Down
50 changes: 28 additions & 22 deletions pkg/querier/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,26 +144,25 @@ const (
// map of block ID to replicas containing the block, when empty replicas, the
// block is already contained by a higher compaction level block in full.
type replicasPerBlockID struct {
m map[string][]string
meta map[string]*typesv1.BlockInfo
instanceType map[string]instanceType
logger log.Logger
m map[string][]string
meta map[string]*typesv1.BlockInfo
instanceTypes map[string][]instanceType
logger log.Logger
}

func newReplicasPerBlockID(logger log.Logger) *replicasPerBlockID {
return &replicasPerBlockID{
m: make(map[string][]string),
meta: make(map[string]*typesv1.BlockInfo),
instanceType: make(map[string]instanceType),
logger: logger,
m: make(map[string][]string),
meta: make(map[string]*typesv1.BlockInfo),
instanceTypes: make(map[string][]instanceType),
logger: logger,
}
}

func (r *replicasPerBlockID) add(result []ResponseFromReplica[[]*typesv1.BlockInfo], t instanceType) {
for _, replica := range result {
// mark the replica's instance type
// TODO: Figure out if that breaks in single binary mode
r.instanceType[replica.addr] = t
// mark the replica's instance types (in single binary we can have the same replica have multiple types)
r.instanceTypes[replica.addr] = append(r.instanceTypes[replica.addr], t)

for _, block := range replica.response {
// add block to map
Expand Down Expand Up @@ -322,7 +321,7 @@ func (r *replicasPerBlockID) pruneSupersededBlocks(sharded bool) error {

type blockPlanEntry struct {
*ingestv1.BlockHints
InstanceType instanceType
InstanceTypes []instanceType
}

type blockPlan map[string]*blockPlanEntry
Expand Down Expand Up @@ -397,11 +396,16 @@ func (r *replicasPerBlockID) blockPlan(ctx context.Context) map[string]*blockPla

// only get store gateways replicas
sgReplicas := lo.Filter(replicas, func(replica string, _ int) bool {
t, ok := r.instanceType[replica]
instanceTypes, ok := r.instanceTypes[replica]
if !ok {
return false
}
return t == storeGatewayInstance
for _, t := range instanceTypes {
if t == storeGatewayInstance {
return true
}
}
return false
})

if len(sgReplicas) > 0 {
Expand All @@ -423,8 +427,8 @@ func (r *replicasPerBlockID) blockPlan(ctx context.Context) map[string]*blockPla
p, exists := plan[selectedReplica]
if !exists {
p = &blockPlanEntry{
BlockHints: &ingestv1.BlockHints{},
InstanceType: r.instanceType[selectedReplica],
BlockHints: &ingestv1.BlockHints{},
InstanceTypes: r.instanceTypes[selectedReplica],
}
plan[selectedReplica] = p
}
Expand All @@ -443,15 +447,17 @@ func (r *replicasPerBlockID) blockPlan(ctx context.Context) map[string]*blockPla

var plannedIngesterBlocks, plannedStoreGatewayBlocks int
for replica, blocks := range plan {
t, ok := r.instanceType[replica]
instanceTypes, ok := r.instanceTypes[replica]
if !ok {
continue
}
if t == storeGatewayInstance {
plannedStoreGatewayBlocks += len(blocks.Ulids)
}
if t == ingesterInstance {
plannedIngesterBlocks += len(blocks.Ulids)
for _, t := range instanceTypes {
if t == storeGatewayInstance {
plannedStoreGatewayBlocks += len(blocks.Ulids)
}
if t == ingesterInstance {
plannedIngesterBlocks += len(blocks.Ulids)
}
}
}

Expand Down

0 comments on commit 0c63617

Please sign in to comment.