diff --git a/pkg/losertree/tree.go b/pkg/losertree/tree.go deleted file mode 100644 index d0194d35ec..0000000000 --- a/pkg/losertree/tree.go +++ /dev/null @@ -1,161 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -// Original version copyright Bryan Boreham, 2024. -// https://github.com/bboreham/go-loser/tree/any. -// Loser tree, from https://en.wikipedia.org/wiki/K-way_merge_algorithm#Tournament_Tree - -package losertree - -type Sequence interface { - Next() bool // Advances and returns true if there is a value at this new position. -} - -func New[E any, S Sequence](sequences []S, maxVal E, at func(S) E, less func(E, E) bool, close func(S)) *Tree[E, S] { - nSequences := len(sequences) - t := Tree[E, S]{ - maxVal: maxVal, - at: at, - less: less, - close: close, - nodes: make([]node[E, S], nSequences*2), - } - for i, s := range sequences { - t.nodes[i+nSequences].items = s - t.moveNext(i + nSequences) // Must call Next on each item so that At() has a value. - } - if nSequences > 0 { - t.nodes[0].index = -1 // flag to be initialized on first call to Next(). - } - return &t -} - -// Call the close function on all sequences that are still open. -func (t *Tree[E, S]) Close() { - for _, e := range t.nodes[len(t.nodes)/2 : len(t.nodes)] { - if e.index == -1 { - continue - } - t.close(e.items) - } -} - -// A loser tree is a binary tree laid out such that nodes N and N+1 have parent N/2. -// We store M leaf nodes in positions M...2M-1, and M-1 internal nodes in positions 1..M-1. -// Node 0 is a special node, containing the winner of the contest. -type Tree[E any, S Sequence] struct { - maxVal E - at func(S) E - less func(E, E) bool - close func(S) // Called when Next() returns false. - nodes []node[E, S] -} - -type node[E any, S Sequence] struct { - index int // This is the loser for all nodes except the 0th, where it is the winner. - value E // Value copied from the loser node, or winner for node 0. - items S // Only populated for leaf nodes. -} - -func (t *Tree[E, S]) moveNext(index int) bool { - n := &t.nodes[index] - if n.items.Next() { - n.value = t.at(n.items) - return true - } - t.close(n.items) // Next() returned false; close it and mark as finished. - n.value = t.maxVal - n.index = -1 - return false -} - -func (t *Tree[E, S]) Winner() S { - return t.nodes[t.nodes[0].index].items -} - -func (t *Tree[E, S]) At() E { - return t.nodes[0].value -} - -func (t *Tree[E, S]) Next() bool { - nodes := t.nodes - if len(nodes) == 0 { - return false - } - if nodes[0].index == -1 { // If tree has not been initialized yet, do that. - t.initialize() - return nodes[nodes[0].index].index != -1 - } - if nodes[nodes[0].index].index == -1 { // already exhausted. - return false - } - t.moveNext(nodes[0].index) - t.replayGames(nodes[0].index) - return nodes[nodes[0].index].index != -1 -} - -// Current winner has been advanced independently; fix up the loser tree. -func (t *Tree[E, S]) Fix(closed bool) { - nodes := t.nodes - cur := &nodes[nodes[0].index] - if closed { - cur.value = t.maxVal - cur.index = -1 - } else { - cur.value = t.at(cur.items) - } - t.replayGames(nodes[0].index) -} - -func (t *Tree[E, S]) IsEmpty() bool { - nodes := t.nodes - if nodes[0].index == -1 { // If tree has not been initialized yet, do that. - t.initialize() - } - return nodes[nodes[0].index].index == -1 -} - -func (t *Tree[E, S]) initialize() { - winner := t.playGame(1) - t.nodes[0].index = winner - t.nodes[0].value = t.nodes[winner].value -} - -// Find the winner at position pos; if it is a non-leaf node, store the loser. -// pos must be >= 1 and < len(t.nodes). -func (t *Tree[E, S]) playGame(pos int) int { - nodes := t.nodes - if pos >= len(nodes)/2 { - return pos - } - left := t.playGame(pos * 2) - right := t.playGame(pos*2 + 1) - var loser, winner int - if t.less(nodes[left].value, nodes[right].value) { - loser, winner = right, left - } else { - loser, winner = left, right - } - nodes[pos].index = loser - nodes[pos].value = nodes[loser].value - return winner -} - -// Starting at pos, which is a winner, re-consider all values up to the root. -func (t *Tree[E, S]) replayGames(pos int) { - nodes := t.nodes - winningValue := nodes[pos].value - for n := parent(pos); n != 0; n = parent(n) { - node := &nodes[n] - if t.less(node.value, winningValue) { - // Record pos as the loser here, and the old loser is the new winner. - node.index, pos = pos, node.index - node.value, winningValue = winningValue, node.value - } - } - // pos is now the winner; store it in node 0. - nodes[0].index = pos - nodes[0].value = winningValue -} - -func parent(i int) int { return i >> 1 } diff --git a/pkg/losertree/tree_test.go b/pkg/losertree/tree_test.go deleted file mode 100644 index 4144a81be4..0000000000 --- a/pkg/losertree/tree_test.go +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -// Original version copyright Bryan Boreham, 2024. -// https://github.com/bboreham/go-loser/tree/any. -package losertree - -import ( - "math" - "testing" -) - -type List struct { - list []uint64 - cur uint64 -} - -func NewList(list ...uint64) *List { - return &List{list: list} -} - -func (it *List) At() uint64 { - return it.cur -} - -func (it *List) Next() bool { - if len(it.list) > 0 { - it.cur = it.list[0] - it.list = it.list[1:] - return true - } - it.cur = 0 - return false -} - -func (it *List) Seek(val uint64) bool { - for it.cur < val && len(it.list) > 0 { - it.cur = it.list[0] - it.list = it.list[1:] - } - return len(it.list) > 0 -} - -func checkIterablesEqual[E any, S1 Sequence, S2 Sequence](t *testing.T, a S1, b S2, at1 func(S1) E, at2 func(S2) E, less func(E, E) bool) { - t.Helper() - count := 0 - for a.Next() { - count++ - if !b.Next() { - t.Fatalf("b ended before a after %d elements", count) - } - if less(at1(a), at2(b)) || less(at2(b), at1(a)) { - t.Fatalf("position %d: %v != %v", count, at1(a), at2(b)) - } - } - if b.Next() { - t.Fatalf("a ended before b after %d elements", count) - } -} - -var testCases = []struct { - name string - args []*List - want *List -}{ - { - name: "empty input", - want: NewList(), - }, - { - name: "one list", - args: []*List{NewList(1, 2, 3, 4)}, - want: NewList(1, 2, 3, 4), - }, - { - name: "two lists", - args: []*List{NewList(3, 4, 5), NewList(1, 2)}, - want: NewList(1, 2, 3, 4, 5), - }, - { - name: "two lists, first empty", - args: []*List{NewList(), NewList(1, 2)}, - want: NewList(1, 2), - }, - { - name: "two lists, second empty", - args: []*List{NewList(1, 2), NewList()}, - want: NewList(1, 2), - }, - { - name: "two lists b", - args: []*List{NewList(1, 2), NewList(3, 4, 5)}, - want: NewList(1, 2, 3, 4, 5), - }, - { - name: "two lists c", - args: []*List{NewList(1, 3), NewList(2, 4, 5)}, - want: NewList(1, 2, 3, 4, 5), - }, - { - name: "three lists", - args: []*List{NewList(1, 3), NewList(2, 4), NewList(5)}, - want: NewList(1, 2, 3, 4, 5), - }, -} - -func TestMerge(t *testing.T) { - at := func(s *List) uint64 { return s.At() } - less := func(a, b uint64) bool { return a < b } - at2 := func(s *Tree[uint64, *List]) uint64 { return s.Winner().At() } - for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - numCloses := 0 - closeFn := func(_ *List) { - numCloses++ - } - lt := New(tt.args, math.MaxUint64, at, less, closeFn) - checkIterablesEqual(t, tt.want, lt, at, at2, less) - if numCloses != len(tt.args) { - t.Errorf("Expected %d closes, got %d", len(tt.args), numCloses) - } - }) - } -} diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 1a58770778..3573431270 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1625,8 +1625,13 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store // Merge the sub-results from each selected block. tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) { + defer func() { + for _, resp := range respSets { + resp.Close() + } + }() begin := time.Now() - set := NewResponseDeduplicator(NewProxyResponseLoserTree(respSets...)) + set := NewDedupResponseHeap(NewProxyResponseHeap(respSets...)) for set.Next() { at := set.At() warn := at.GetWarning() diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 749f47e34e..ce4d391bf3 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -362,7 +362,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. level.Debug(reqLogger).Log("msg", "Series: started fanout streams", "status", strings.Join(storeDebugMsgs, ";")) - respHeap := NewResponseDeduplicator(NewProxyResponseLoserTree(storeResponses...)) + respHeap := NewDedupResponseHeap(NewProxyResponseHeap(storeResponses...)) for respHeap.Next() { resp := respHeap.At() diff --git a/pkg/store/proxy_merge.go b/pkg/store/proxy_heap.go similarity index 85% rename from pkg/store/proxy_merge.go rename to pkg/store/proxy_heap.go index b6d7dc3792..b14f287494 100644 --- a/pkg/store/proxy_merge.go +++ b/pkg/store/proxy_heap.go @@ -4,6 +4,7 @@ package store import ( + "container/heap" "context" "fmt" "io" @@ -20,14 +21,13 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" - "github.com/thanos-io/thanos/pkg/losertree" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tracing" ) -type responseDeduplicator struct { - h *losertree.Tree[*storepb.SeriesResponse, respSet] +type dedupResponseHeap struct { + h *ProxyResponseHeap bufferedSameSeries []*storepb.SeriesResponse @@ -38,22 +38,22 @@ type responseDeduplicator struct { ok bool } -// NewResponseDeduplicator returns a wrapper around a loser tree that merges duplicated series messages into one. +// NewDedupResponseHeap returns a wrapper around ProxyResponseHeap that merged duplicated series messages into one. // It also deduplicates identical chunks identified by the same checksum from each series message. -func NewResponseDeduplicator(h *losertree.Tree[*storepb.SeriesResponse, respSet]) *responseDeduplicator { +func NewDedupResponseHeap(h *ProxyResponseHeap) *dedupResponseHeap { ok := h.Next() var prev *storepb.SeriesResponse if ok { prev = h.At() } - return &responseDeduplicator{ + return &dedupResponseHeap{ h: h, ok: ok, prev: prev, } } -func (d *responseDeduplicator) Next() bool { +func (d *dedupResponseHeap) Next() bool { if d.buffRespI+1 < len(d.bufferedResp) { d.buffRespI++ return true @@ -153,43 +153,105 @@ func chainSeriesAndRemIdenticalChunks(series []*storepb.SeriesResponse) *storepb }) } -func (d *responseDeduplicator) At() *storepb.SeriesResponse { +func (d *dedupResponseHeap) At() *storepb.SeriesResponse { return d.bufferedResp[d.buffRespI] } -// NewProxyResponseLoserTree returns heap that k-way merge series together. +// ProxyResponseHeap is a heap for storepb.SeriesSets. +// It performs k-way merge between all of those sets. +// TODO(GiedriusS): can be improved with a tournament tree. +// This is O(n*logk) but can be Theta(n*logk). However, +// tournament trees need n-1 auxiliary nodes so there +// might not be much of a difference. +type ProxyResponseHeap struct { + nodes []ProxyResponseHeapNode +} + +func (h *ProxyResponseHeap) Less(i, j int) bool { + iResp := h.nodes[i].rs.At() + jResp := h.nodes[j].rs.At() + + if iResp.GetSeries() != nil && jResp.GetSeries() != nil { + iLbls := labelpb.ZLabelsToPromLabels(iResp.GetSeries().Labels) + jLbls := labelpb.ZLabelsToPromLabels(jResp.GetSeries().Labels) + + return labels.Compare(iLbls, jLbls) < 0 + } else if iResp.GetSeries() == nil && jResp.GetSeries() != nil { + return true + } else if iResp.GetSeries() != nil && jResp.GetSeries() == nil { + return false + } + + // If it is not a series then the order does not matter. What matters + // is that we get different types of responses one after another. + return false +} + +func (h *ProxyResponseHeap) Len() int { + return len(h.nodes) +} + +func (h *ProxyResponseHeap) Swap(i, j int) { + h.nodes[i], h.nodes[j] = h.nodes[j], h.nodes[i] +} + +func (h *ProxyResponseHeap) Push(x interface{}) { + h.nodes = append(h.nodes, x.(ProxyResponseHeapNode)) +} + +func (h *ProxyResponseHeap) Pop() (v interface{}) { + h.nodes, v = h.nodes[:h.Len()-1], h.nodes[h.Len()-1] + return +} + +func (h *ProxyResponseHeap) Empty() bool { + return h.Len() == 0 +} + +func (h *ProxyResponseHeap) Min() *ProxyResponseHeapNode { + return &h.nodes[0] +} + +type ProxyResponseHeapNode struct { + rs respSet +} + +// NewProxyResponseHeap returns heap that k-way merge series together. // It's agnostic to duplicates and overlaps, it forwards all duplicated series in random order. -func NewProxyResponseLoserTree(seriesSets ...respSet) *losertree.Tree[*storepb.SeriesResponse, respSet] { - var maxVal *storepb.SeriesResponse = storepb.NewSeriesResponse(nil) +func NewProxyResponseHeap(seriesSets ...respSet) *ProxyResponseHeap { + ret := ProxyResponseHeap{ + nodes: make([]ProxyResponseHeapNode, 0, len(seriesSets)), + } - less := func(a, b *storepb.SeriesResponse) bool { - if a == maxVal && b != maxVal { - return false - } - if a != maxVal && b == maxVal { - return true - } - if a == maxVal && b == maxVal { - return true + for _, ss := range seriesSets { + if ss.Empty() { + continue } - if a.GetSeries() != nil && b.GetSeries() != nil { - iLbls := labelpb.ZLabelsToPromLabels(a.GetSeries().Labels) - jLbls := labelpb.ZLabelsToPromLabels(b.GetSeries().Labels) + ss := ss + ret.Push(ProxyResponseHeapNode{rs: ss}) + } - return labels.Compare(iLbls, jLbls) < 0 - } else if a.GetSeries() == nil && b.GetSeries() != nil { - return true - } else if a.GetSeries() != nil && b.GetSeries() == nil { - return false - } - return false + heap.Init(&ret) + + return &ret +} + +func (h *ProxyResponseHeap) Next() bool { + return !h.Empty() +} + +func (h *ProxyResponseHeap) At() *storepb.SeriesResponse { + min := h.Min().rs + + atResp := min.At() + + if min.Next() { + heap.Fix(h, 0) + } else { + heap.Remove(h, 0) } - return losertree.New[*storepb.SeriesResponse, respSet](seriesSets, maxVal, func(s respSet) *storepb.SeriesResponse { - return s.At() - }, less, func(s respSet) { - s.Close() - }) + return atResp } func (l *lazyRespSet) StoreID() string { @@ -256,8 +318,6 @@ func (l *lazyRespSet) Next() bool { l.bufferedResponsesMtx.Lock() defer l.bufferedResponsesMtx.Unlock() - l.initialized = true - if l.noMoreData && len(l.bufferedResponses) == 0 { l.lastResp = nil @@ -273,9 +333,7 @@ func (l *lazyRespSet) Next() bool { if len(l.bufferedResponses) > 0 { l.lastResp = l.bufferedResponses[0] - if l.initialized { - l.bufferedResponses = l.bufferedResponses[1:] - } + l.bufferedResponses = l.bufferedResponses[1:] return true } @@ -284,10 +342,14 @@ func (l *lazyRespSet) Next() bool { } func (l *lazyRespSet) At() *storepb.SeriesResponse { + // We need to wait for at least one response so that we would be able to properly build the heap. if !l.initialized { - panic("please call Next before At") + l.Next() + l.initialized = true + return l.lastResp } + // Next() was called previously. return l.lastResp } @@ -717,9 +779,7 @@ func sortWithoutLabels(set []*storepb.SeriesResponse, labelsToRemove map[string] } func (l *eagerRespSet) Close() { - if l.closeSeries != nil { - l.closeSeries() - } + l.closeSeries() l.shardMatcher.Close() } @@ -730,7 +790,7 @@ func (l *eagerRespSet) At() *storepb.SeriesResponse { return nil } - return l.bufferedResponses[l.i-1] + return l.bufferedResponses[l.i] } func (l *eagerRespSet) Next() bool { @@ -738,7 +798,7 @@ func (l *eagerRespSet) Next() bool { l.i++ - return l.i <= len(l.bufferedResponses) + return l.i < len(l.bufferedResponses) } func (l *eagerRespSet) Empty() bool { diff --git a/pkg/store/proxy_merge_test.go b/pkg/store/proxy_heap_test.go similarity index 89% rename from pkg/store/proxy_merge_test.go rename to pkg/store/proxy_heap_test.go index 9d6f84a71e..50fe2d46be 100644 --- a/pkg/store/proxy_merge_test.go +++ b/pkg/store/proxy_heap_test.go @@ -4,7 +4,6 @@ package store import ( - "fmt" "sync" "testing" @@ -25,7 +24,7 @@ func TestRmLabelsCornerCases(t *testing.T) { }), labels.Labels{}) } -func TestProxyResponseTreeSort(t *testing.T) { +func TestProxyResponseHeapSort(t *testing.T) { for _, tcase := range []struct { title string input []respSet @@ -35,16 +34,14 @@ func TestProxyResponseTreeSort(t *testing.T) { title: "merge sets with different series and common labels", input: []respSet{ &eagerRespSet{ - closeSeries: func() {}, - wg: &sync.WaitGroup{}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3", "d", "4")), }, }, &eagerRespSet{ - closeSeries: func() {}, - wg: &sync.WaitGroup{}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "4", "e", "5")), storeSeriesResponse(t, labelsFromStrings("a", "1", "d", "4")), @@ -62,8 +59,7 @@ func TestProxyResponseTreeSort(t *testing.T) { title: "merge sets with different series and labels", input: []respSet{ &eagerRespSet{ - closeSeries: func() {}, - wg: &sync.WaitGroup{}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), storeSeriesResponse(t, labelsFromStrings("b", "2", "c", "3")), @@ -71,8 +67,7 @@ func TestProxyResponseTreeSort(t *testing.T) { }, }, &eagerRespSet{ - closeSeries: func() {}, - wg: &sync.WaitGroup{}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5")), storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5", "f", "6")), @@ -91,8 +86,7 @@ func TestProxyResponseTreeSort(t *testing.T) { title: "merge repeated series in stores with different external labels", input: []respSet{ &eagerRespSet{ - closeSeries: func() {}, - wg: &sync.WaitGroup{}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), @@ -100,8 +94,7 @@ func TestProxyResponseTreeSort(t *testing.T) { storeLabels: map[string]struct{}{"ext2": {}}, }, &eagerRespSet{ - closeSeries: func() {}, - wg: &sync.WaitGroup{}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), @@ -120,8 +113,7 @@ func TestProxyResponseTreeSort(t *testing.T) { title: "merge series with external labels at beginning of series", input: []respSet{ &eagerRespSet{ - closeSeries: func() {}, - wg: &sync.WaitGroup{}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "2")), @@ -129,8 +121,7 @@ func TestProxyResponseTreeSort(t *testing.T) { storeLabels: map[string]struct{}{"a": {}}, }, &eagerRespSet{ - closeSeries: func() {}, - wg: &sync.WaitGroup{}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "1", "c", "3")), storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), @@ -149,8 +140,7 @@ func TestProxyResponseTreeSort(t *testing.T) { title: "merge series in stores with external labels not present in series (e.g. stripped during dedup)", input: []respSet{ &eagerRespSet{ - closeSeries: func() {}, - wg: &sync.WaitGroup{}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), @@ -158,8 +148,7 @@ func TestProxyResponseTreeSort(t *testing.T) { storeLabels: map[string]struct{}{"ext2": {}, "replica": {}}, }, &eagerRespSet{ - closeSeries: func() {}, - wg: &sync.WaitGroup{}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), @@ -178,8 +167,7 @@ func TestProxyResponseTreeSort(t *testing.T) { title: "test", input: []respSet{ &eagerRespSet{ - closeSeries: func() {}, - wg: &sync.WaitGroup{}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), @@ -188,8 +176,7 @@ func TestProxyResponseTreeSort(t *testing.T) { storeLabels: map[string]struct{}{"receive": {}, "tenant_id": {}, "thanos_replica": {}}, }, &eagerRespSet{ - closeSeries: func() {}, - wg: &sync.WaitGroup{}, + wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), @@ -209,13 +196,14 @@ func TestProxyResponseTreeSort(t *testing.T) { }, } { t.Run(tcase.title, func(t *testing.T) { - h := NewProxyResponseLoserTree(tcase.input...) - got := []*storepb.SeriesResponse{} - for h.Next() { - r := h.At() - got = append(got, r) + h := NewProxyResponseHeap(tcase.input...) + if !h.Empty() { + got := []*storepb.SeriesResponse{h.At()} + for h.Next() { + got = append(got, h.At()) + } + testutil.Equals(t, tcase.exp, got) } - testutil.Equals(t, tcase.exp, got) }) } } @@ -351,29 +339,3 @@ func BenchmarkSortWithoutLabels(b *testing.B) { sortWithoutLabels(resps, labelsToRemove) } } - -func BenchmarkKWayMerge(b *testing.B) { - for i := 0; i < b.N; i++ { - respSets := []respSet{} - for j := 0; j < 1000; j++ { - respSets = append(respSets, &eagerRespSet{ - closeSeries: func() {}, - wg: &sync.WaitGroup{}, - bufferedResponses: []*storepb.SeriesResponse{ - storeSeriesResponse(b, labelsFromStrings("a", "1", "b", fmt.Sprintf("replica-%d", j), "c", "3")), - storeSeriesResponse(b, labelsFromStrings("a", "1", "b", fmt.Sprintf("replica-%d", j), "c", "3", "d", "4")), - storeSeriesResponse(b, labelsFromStrings("a", "1", "b", fmt.Sprintf("replica-%d", j), "c", "4")), - }, - }) - } - lt := NewProxyResponseLoserTree(respSets...) - - got := []*storepb.SeriesResponse{} - for lt.Next() { - r := lt.At() - got = append(got, r) - } - - var _ = got - } -} diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 4c84ec4c1d..25f3e84102 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -2069,13 +2069,13 @@ func TestDedupRespHeap_Deduplication(t *testing.T) { for _, tcase := range []struct { responses []*storepb.SeriesResponse - testFn func(responses []*storepb.SeriesResponse, h *responseDeduplicator) + testFn func(responses []*storepb.SeriesResponse, h *dedupResponseHeap) tname string }{ { tname: "edge case with zero responses", responses: []*storepb.SeriesResponse{}, - testFn: func(responses []*storepb.SeriesResponse, h *responseDeduplicator) { + testFn: func(responses []*storepb.SeriesResponse, h *dedupResponseHeap) { testutil.Equals(t, false, h.Next()) callAtExpectPanic := func() { @@ -2107,7 +2107,7 @@ func TestDedupRespHeap_Deduplication(t *testing.T) { }, }, }, - testFn: func(responses []*storepb.SeriesResponse, h *responseDeduplicator) { + testFn: func(responses []*storepb.SeriesResponse, h *dedupResponseHeap) { testutil.Equals(t, true, h.Next()) resp := h.At() testutil.Equals(t, responses[0], resp) @@ -2149,7 +2149,7 @@ func TestDedupRespHeap_Deduplication(t *testing.T) { }, }, }, - testFn: func(responses []*storepb.SeriesResponse, h *responseDeduplicator) { + testFn: func(responses []*storepb.SeriesResponse, h *dedupResponseHeap) { testutil.Equals(t, true, h.Next()) resp := h.At() testutil.Equals(t, responses[0], resp) @@ -2158,9 +2158,8 @@ func TestDedupRespHeap_Deduplication(t *testing.T) { }, } { t.Run(tcase.tname, func(t *testing.T) { - h := NewResponseDeduplicator(NewProxyResponseLoserTree( + h := NewDedupResponseHeap(NewProxyResponseHeap( &eagerRespSet{ - closeSeries: func() {}, wg: &sync.WaitGroup{}, bufferedResponses: tcase.responses, }, diff --git a/pkg/store/storepb/shard_info.go b/pkg/store/storepb/shard_info.go index 28d559b49a..e69617dd2f 100644 --- a/pkg/store/storepb/shard_info.go +++ b/pkg/store/storepb/shard_info.go @@ -29,9 +29,6 @@ func (s *ShardMatcher) IsSharded() bool { } func (s *ShardMatcher) Close() { - if s == nil { - return - } if s.buffers != nil { s.buffers.Put(s.buf) } diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index 10edefc652..e13f79a495 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -1257,6 +1257,6 @@ func TestStoreGatewayLazyExpandedPostingsEnabled(t *testing.T) { }) // Use greater or equal to handle flakiness. - testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.GreaterOrEqual(1), []string{"thanos_bucket_store_lazy_expanded_postings_total"}, e2emon.WaitMissingMetrics())) - testutil.Ok(t, s2.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"thanos_bucket_store_lazy_expanded_postings_total"}, e2emon.WaitMissingMetrics())) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.GreaterOrEqual(1), "thanos_bucket_store_lazy_expanded_postings_total"), e2emon.WaitMissingMetrics()) + testutil.Ok(t, s2.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_lazy_expanded_postings_total"), e2emon.WaitMissingMetrics()) }