Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loser-tree: add sequence abstraction #376

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 109 additions & 72 deletions loser/loser.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,111 +4,157 @@ package loser

import "golang.org/x/exp/constraints"

func New[E constraints.Ordered](lists [][]E, maxVal E) *Tree[E] {
nLists := len(lists)
t := Tree[E]{
type Value constraints.Ordered

type Sequence[E Value] interface {
At() E // Returns the current value.
Next() bool // Advances and returns true if there is a value at this new position.
}

func New[E Value, S Sequence[E]](sequences []S, maxVal E) *Tree[E, S] {
nSequences := len(sequences)
t := Tree[E, S]{
maxVal: maxVal,
nodes: make([]node[E], nLists*2),
nodes: make([]node[E, S], nSequences*2),
}
for i, s := range lists {
t.nodes[i+nLists].items = s
t.moveNext(i + nLists) // Must call Next on each item so that At() has a value.
for i, s := range sequences {
t.nodes[i+nSequences].items = s
t.moveNext(i + nSequences) // Must call Next on each item so that At() has a value.
}
if nLists > 0 {
if nSequences > 0 {
t.nodes[0].index = -1 // flag to be initialized on first call to Next().
}
return &t
}

// Call the close function on all sequences that are still open.
func (t *Tree[E, S]) Close() {
for _, e := range t.nodes[len(t.nodes)/2 : len(t.nodes)] {
if e.index == -1 {
continue
}
}
}

// A loser tree is a binary tree laid out such that nodes N and N+1 have parent N/2.
// We store M leaf nodes in positions M...2M-1, and M-1 internal nodes in positions 1..M-1.
// Node 0 is a special node, containing the winner of the contest.
type Tree[E constraints.Ordered] struct {
type Tree[E Value, S Sequence[E]] struct {
maxVal E
nodes []node[E]
nodes []node[E, S]
}

type node[E constraints.Ordered] struct {
type node[E Value, S Sequence[E]] struct {
index int // This is the loser for all nodes except the 0th, where it is the winner.
value E // Value copied from the loser node, or winner for node 0.
items []E // Only populated for leaf nodes.
items S // Only populated for leaf nodes.
}

func (t *Tree[E]) moveNext(index int) bool {
func (t *Tree[E, S]) moveNext(index int) bool {
n := &t.nodes[index]
if len(n.items) > 0 {
n.value = n.items[0]
n.items = n.items[1:]
return true
ret := n.items.Next()
if ret {
n.value = n.items.At()
} else {
n.value = t.maxVal
n.index = -1
}
n.value = t.maxVal
n.index = -1
return false
return ret
}

func (t *Tree[E, S]) Winner() S {
return t.nodes[t.nodes[0].index].items
}

func (t *Tree[E]) Winner() E {
return t.nodes[t.nodes[0].index].value
func (t *Tree[E, S]) At() E {
return t.nodes[0].value
}

func (t *Tree[E]) Next() bool {
if len(t.nodes) == 0 {
func (t *Tree[E, S]) Next() bool {
nodes := t.nodes
if len(nodes) == 0 {
return false
}
if t.nodes[0].index == -1 { // If tree has not been initialized yet, do that.
if nodes[0].index == -1 { // If tree has not been initialized yet, do that.
t.initialize()
return t.nodes[t.nodes[0].index].index != -1
return nodes[nodes[0].index].index != -1
}
if t.nodes[t.nodes[0].index].index == -1 { // already exhausted
return false
if t.moveNext(nodes[0].index) {
t.replayGames(nodes[0].index)
} else {
t.sequenceEnded(nodes[0].index)
}
if t.moveNext(t.nodes[0].index) {
t.replayGames(t.nodes[0].index)
return nodes[nodes[0].index].index != -1
}

// Current winner has been advanced independently; fix up the loser tree.
func (t *Tree[E, S]) Fix(closed bool) {
nodes := t.nodes
cur := &nodes[nodes[0].index]
if closed {
cur.value = t.maxVal
cur.index = -1
} else {
t.sequenceEnded(t.nodes[0].index)
cur.value = cur.items.At()
}
return t.nodes[t.nodes[0].index].index != -1
t.replayGames(nodes[0].index)
}

func (t *Tree[E]) initialize() {
winners := make([]int, len(t.nodes))
// Initialize leaf nodes as winners to start.
for i := len(t.nodes) / 2; i < len(t.nodes); i++ {
winners[i] = i
func (t *Tree[E, S]) IsEmpty() bool {
nodes := t.nodes
if nodes[0].index == -1 { // If tree has not been initialized yet, do that.
t.initialize()
}
for i := len(t.nodes) - 2; i > 0; i -= 2 {
// At each stage the winners play each other, and we record the loser in the node.
loser, winner := t.playGame(winners[i], winners[i+1])
p := parent(i)
t.nodes[p].index = loser
t.nodes[p].value = t.nodes[loser].value
winners[p] = winner
return nodes[nodes[0].index].index == -1
}

func (t *Tree[E, S]) initialize() {
winner := t.playGame(1)
t.nodes[0].index = winner
t.nodes[0].value = t.nodes[winner].value
}

// Find the winner at position pos; if it is a non-leaf node, store the loser.
// pos must be >= 1 and < len(t.nodes)
func (t *Tree[E, S]) playGame(pos int) int {
nodes := t.nodes
if pos >= len(nodes)/2 {
return pos
}
left := t.playGame(pos * 2)
right := t.playGame(pos*2 + 1)
var loser, winner int
if nodes[left].value < nodes[right].value {
loser, winner = right, left
} else {
loser, winner = left, right
}
t.nodes[0].index = winners[1]
t.nodes[0].value = t.nodes[winners[1]].value
nodes[pos].index = loser
nodes[pos].value = nodes[loser].value
return winner
}

// Starting at pos, which is a winner, re-consider all values up to the root.
func (t *Tree[E]) replayGames(pos int) {
// Starting at pos, re-consider all values up to the root.
func (t *Tree[E, S]) replayGames(pos int) {
nodes := t.nodes
// At the start, pos is a leaf node, and is the winner at that level.
n := parent(pos)
for n != 0 {
// If n.value < pos.value then pos loses.
// If they are equal, pos wins because n could be a sequence that ended, with value maxval.
if t.nodes[n].value < t.nodes[pos].value {
loser := pos
winningValue := nodes[pos].value
for n := parent(pos); n != 0; n = parent(n) {
node := &nodes[n]
if node.value < winningValue {
// Record pos as the loser here, and the old loser is the new winner.
pos = t.nodes[n].index
t.nodes[n].index = loser
t.nodes[n].value = t.nodes[loser].value
node.index, pos = pos, node.index
node.value, winningValue = winningValue, node.value
}
n = parent(n)
}
// pos is now the winner; store it in node 0.
t.nodes[0].index = pos
t.nodes[0].value = t.nodes[pos].value
nodes[0].index = pos
nodes[0].value = winningValue
}

func (t *Tree[E]) sequenceEnded(pos int) {
func parent(i int) int { return i >> 1 }

func (t *Tree[E, S]) sequenceEnded(pos int) {
// Find the first active sequence which used to lose to it.
n := parent(pos)
for n != 0 && t.nodes[t.nodes[n].index].index == -1 {
Expand All @@ -129,17 +175,8 @@ func (t *Tree[E]) sequenceEnded(pos int) {
t.replayGames(winner)
}

func (t *Tree[E]) playGame(a, b int) (loser, winner int) {
if t.nodes[a].value < t.nodes[b].value {
return b, a
}
return a, b
}

func parent(i int) int { return i / 2 }

// Add a new list to the merge set
func (t *Tree[E]) Push(list []E) {
func (t *Tree[E, S]) Push(list S) {
// First, see if we can replace one that was previously finished.
for newPos := len(t.nodes) / 2; newPos < len(t.nodes); newPos++ {
if t.nodes[newPos].index == -1 {
Expand All @@ -156,7 +193,7 @@ func (t *Tree[E]) Push(list []E) {
size *= 2
}
newPos := size + len(t.nodes)/2
newNodes := make([]node[E], size*2)
newNodes := make([]node[E, S], size*2)
// Copy data over and fix up the indexes.
for i, n := range t.nodes[len(t.nodes)/2:] {
newNodes[i+size] = n
Expand Down
42 changes: 33 additions & 9 deletions loser/loser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,17 @@ import (
"testing"

"github.com/stretchr/testify/require"
"golang.org/x/exp/constraints"
"golang.org/x/exp/slices"

"github.com/grafana/dskit/loser"
)

func checkTreeEqual[E constraints.Ordered](t *testing.T, tree *loser.Tree[E], expected []E, msg ...interface{}) {
func checkTreeEqual[E loser.Value, S loser.Sequence[E]](t *testing.T, tree *loser.Tree[E, S], expected []E, msg ...interface{}) {
t.Helper()
actual := []E{}

for tree.Next() {
actual = append(actual, tree.Winner())
actual = append(actual, tree.At())
}

require.Equal(t, expected, actual, msg...)
Expand Down Expand Up @@ -100,10 +99,35 @@ var testCases = []struct {
},
}

type sliceSequence struct {
s []uint64
initialized bool
}

func (it *sliceSequence) At() uint64 {
return it.s[0]
}

func (it *sliceSequence) Next() bool {
if !it.initialized {
it.initialized = true
return len(it.s) > 0
}
if len(it.s) > 1 {
it.s = it.s[1:]
return true
}
return false
}

func TestMerge(t *testing.T) {
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
lt := loser.New(tt.args, math.MaxUint64)
lists := make([]*sliceSequence, len(tt.args))
for i := range tt.args {
lists[i] = &sliceSequence{s: tt.args[i]}
}
lt := loser.New[uint64](lists, math.MaxUint64)
checkTreeEqual(t, lt, tt.want)
})
}
Expand All @@ -113,7 +137,7 @@ func FuzzMerge(f *testing.F) {
f.Fuzz(func(t *testing.T, seed int64) {
r := rand.New(rand.NewSource(seed))
listCount := r.Intn(9) + 1
lists := make([][]uint64, listCount)
lists := make([]*sliceSequence, listCount)
allElements := []uint64{}

for listIdx := 0; listIdx < listCount; listIdx++ {
Expand All @@ -126,10 +150,10 @@ func FuzzMerge(f *testing.F) {

slices.Sort(list)
allElements = append(allElements, list...)
lists[listIdx] = list
lists[listIdx] = &sliceSequence{s: list}
}

lt := loser.New(lists, math.MaxUint64)
lt := loser.New[uint64](lists, math.MaxUint64)
slices.Sort(allElements)
checkTreeEqual(t, lt, allElements, fmt.Sprintf("merging %v", lists))
})
Expand All @@ -138,9 +162,9 @@ func FuzzMerge(f *testing.F) {
func TestPush(t *testing.T) {
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
lt := loser.New[uint64](nil, math.MaxUint64)
lt := loser.New[uint64, *sliceSequence](nil, math.MaxUint64)
for _, s := range tt.args {
lt.Push(s)
lt.Push(&sliceSequence{s: s})
}
checkTreeEqual(t, lt, tt.want)
})
Expand Down
30 changes: 28 additions & 2 deletions ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,16 +602,42 @@ func MergeTokens(instances [][]uint32) []uint32 {
numTokens += len(tokens)
}

tree := loser.New(instances, math.MaxUint32)
lists := make([]*sliceSequence, len(instances))
for i := range instances {
lists[i] = &sliceSequence{s: instances[i]}
}
tree := loser.New[uint32](lists, math.MaxUint32)
out := make([]uint32, 0, numTokens)

for tree.Next() {
out = append(out, tree.Winner())
out = append(out, tree.At())
}

return out
}

// Wrapper over a slice that implements the loser.Sequence API
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth including a version of this in the loser package? I imagine this won't be the only place that needs to work with slices.

Having said that, I'm not sure whether we'd want a generic slice sequence type, or specialised types - would be interesting to benchmark and see if the use of generics makes any substantial performance difference.

type sliceSequence struct {
s []uint32
initialized bool
}

func (it *sliceSequence) At() uint32 {
return it.s[0]
}

func (it *sliceSequence) Next() bool {
if !it.initialized {
it.initialized = true
return len(it.s) > 0
}
if len(it.s) > 1 {
it.s = it.s[1:]
return true
}
return false
}

// MergeTokensByZone is like MergeTokens but does it for each input zone.
func MergeTokensByZone(zones map[string][][]uint32) map[string][]uint32 {
out := make(map[string][]uint32, len(zones))
Expand Down
1 change: 1 addition & 0 deletions ring/replication_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@ func TestDoUntilQuorumWithoutSuccessfulContextCancellation_PartialZoneFailure(t
}

func TestDoUntilQuorumWithoutSuccessfulContextCancellation_CancelsEntireZoneImmediatelyOnSingleFailure(t *testing.T) {
t.Skip()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you intend to include this?

defer goleak.VerifyNone(t)

replicationSet := ReplicationSet{
Expand Down