Skip to content

Commit

Permalink
Fix Prefix and LowerBound on non-unique indexes
Browse files Browse the repository at this point in the history
Prefix and LowerBound searches did not properly deal with non-unique
indexes. In these indexes the keys are encoded as <secondary><primary><secondary len>,
and prefix searching needs to make sure the secondary key length on the results
is equal or longer than the search key as otherwise we might match into the primary
key.

For example if the object is struct{A, B string}, with A being primary and we have
{"a", "a"}, then the secondary index is key'd as "aa<1>". A prefix search with "aa"
must not match since the secondary index contains only an object with key "a".

Fix this by using a special iteration on non-unique indexes that checks the length
of the secondary key and ignores any other matches that are due to matching into the
primary key.

Another issue with non-unique indexes was due to having no separator between the
secondary and primary key, leading to the primary key having an effect on the iteration
order. Fix this by adding '\0' as a separator and base64+sha256'ing the primary key.

Signed-off-by: Jussi Maki <[email protected]>
  • Loading branch information
joamaki committed Oct 7, 2024
1 parent af31f0a commit 62157fc
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 58 deletions.
12 changes: 9 additions & 3 deletions any_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type AnyTable struct {

func (t AnyTable) All(txn ReadTxn) iter.Seq2[any, Revision] {
indexTxn := txn.getTxn().mustIndexReadTxn(t.Meta, PrimaryIndexPos)
return anySeq(indexTxn.Iterator())
return partSeq[any](indexTxn.Iterator())
}

func (t AnyTable) UnmarshalYAML(data []byte) (any, error) {
Expand Down Expand Up @@ -41,13 +41,19 @@ func (t AnyTable) Delete(txn WriteTxn, obj any) (old any, hadOld bool, err error
func (t AnyTable) Prefix(txn ReadTxn, key string) iter.Seq2[any, Revision] {
indexTxn := txn.getTxn().mustIndexReadTxn(t.Meta, PrimaryIndexPos)
iter, _ := indexTxn.Prefix([]byte(key))
return anySeq(iter)
if indexTxn.unique {
return partSeq[any](iter)
}
return nonUniqueSeq[any](iter, true, []byte(key))
}

func (t AnyTable) LowerBound(txn ReadTxn, key string) iter.Seq2[any, Revision] {
indexTxn := txn.getTxn().mustIndexReadTxn(t.Meta, PrimaryIndexPos)
iter := indexTxn.LowerBound([]byte(key))
return anySeq(iter)
if indexTxn.unique {
return partSeq[any](iter)
}
return nonUniqueLowerBoundSeq[any](iter, []byte(key))
}

func (t AnyTable) TableHeader() []string {
Expand Down
25 changes: 25 additions & 0 deletions benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,31 @@ func BenchmarkDB_SequentialLookup(b *testing.B) {
b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec")
}

func BenchmarkDB_Prefix_SecondaryIndex(b *testing.B) {
db, table := newTestDBWithMetrics(b, &NopMetrics{}, tagsIndex)
tagSet := part.NewSet("test")
txn := db.WriteTxn(table)
for i := 0; i < numObjectsToInsert; i++ {
_, _, err := table.Insert(txn, testObject{ID: uint64(i), Tags: tagSet})
require.NoError(b, err)
}
rtxn := txn.Commit()
b.ResetTimer()

q := tagsIndex.Query("test")
for n := 0; n < b.N; n++ {
count := 0
for range table.Prefix(rtxn, q) {
count++
}
if count != numObjectsToInsert {
b.Fatalf("wrong number of objects, expected %d, got %d", numObjectsToInsert, count)
}
}

b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec")
}

const numObjectsIteration = 100000

func BenchmarkDB_FullIteration_All(b *testing.B) {
Expand Down
18 changes: 11 additions & 7 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,27 +1061,31 @@ func TestWriteJSON(t *testing.T) {
func Test_nonUniqueKey(t *testing.T) {
// empty keys
key := encodeNonUniqueKey(nil, nil)
primary, secondary := decodeNonUniqueKey(key)
assert.Len(t, primary, 0)
secondary, _ := decodeNonUniqueKey(key)
assert.Len(t, secondary, 0)

// empty primary
key = encodeNonUniqueKey(nil, []byte("foo"))
primary, secondary = decodeNonUniqueKey(key)
assert.Len(t, primary, 0)
secondary, _ = decodeNonUniqueKey(key)
assert.Equal(t, string(secondary), "foo")

// empty secondary
key = encodeNonUniqueKey([]byte("quux"), []byte{})
primary, secondary = decodeNonUniqueKey(key)
assert.Equal(t, string(primary), "quux")
secondary, _ = decodeNonUniqueKey(key)
assert.Len(t, secondary, 0)

// non-empty
key = encodeNonUniqueKey([]byte("foo"), []byte("quux"))
primary, secondary = decodeNonUniqueKey(key)
secondary, primary := decodeNonUniqueKey(key)
assert.EqualValues(t, secondary, "quux")
assert.EqualValues(t, primary, "foo")

// non-empty, primary with substitutions:
// 0x0 => 0xfe, 0xfe => 0xfd01, 0xfd => 0xfd00
key = encodeNonUniqueKey([]byte{0x0, 0xfd, 0xfe}, []byte("quux"))
secondary, primary = decodeNonUniqueKey(key)
assert.EqualValues(t, secondary, "quux")
assert.EqualValues(t, primary, []byte{0xfe, 0xfd, 0x01, 0xfd, 0x00})
}

func Test_validateTableName(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion http.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func runQuery(indexTxn indexReadTxn, lowerbound bool, queryKey []byte, onObject
match = func(k []byte) bool { return len(k) == len(queryKey) }
default:
match = func(k []byte) bool {
_, secondary := decodeNonUniqueKey(k)
secondary, _ := decodeNonUniqueKey(k)
return len(secondary) == len(queryKey)
}
}
Expand Down
97 changes: 69 additions & 28 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package statedb

import (
"bytes"
"fmt"
"iter"
"slices"
Expand Down Expand Up @@ -71,29 +72,11 @@ func partSeq[Obj any](iter *part.Iterator[object]) iter.Seq2[Obj, Revision] {
}
}

// anySeq returns a sequence of objects from a part Iterator.
func anySeq(iter *part.Iterator[object]) iter.Seq2[any, Revision] {
return func(yield func(any, Revision) bool) {
// Iterate over a clone of the original iterator to allow the sequence to be iterated
// from scratch multiple times.
it := iter.Clone()
for {
_, iobj, ok := it.Next()
if !ok {
break
}
if !yield(iobj.data, iobj.revision) {
break
}
}
}
}

// nonUniqueSeq returns a sequence of objects for a non-unique index.
// Non-unique indexes work by concatenating the secondary key with the
// primary key and then prefix searching for the items:
//
// <secondary><primary><secondary length>
// <secondary>\0<primary><secondary length>
// ^^^^^^^^^^^
//
// Since the primary key can be of any length and we're prefix searching,
Expand All @@ -102,36 +85,94 @@ func anySeq(iter *part.Iterator[object]) iter.Seq2[any, Revision] {
// For example if we search for the key "aaaa", then we might have the following
// matches (_ is just delimiting, not part of the key):
//
// aaaa_bbb4
// aaa_abab3
// aaaa_ccc4
// aaaa\0bbb4
// aaa\0abab3
// aaaa\0ccc4
//
// We yield "aaaa_bbb4", skip "aaa_abab3" and yield "aaaa_ccc4".
func nonUniqueSeq[Obj any](iter *part.Iterator[object], searchKey []byte) iter.Seq2[Obj, Revision] {
// We yield "aaaa\0bbb4", skip "aaa\0abab3" and yield "aaaa\0ccc4".
func nonUniqueSeq[Obj any](iter *part.Iterator[object], prefixSearch bool, searchKey []byte) iter.Seq2[Obj, Revision] {
return func(yield func(Obj, Revision) bool) {
// Clone the iterator to allow multiple iterations over the sequence.
it := iter.Clone()

var visited map[string]struct{}
if prefixSearch {
// When prefix searching, keep track of objects we've already seen as
// multiple keys in non-unique index may map to a single object.
// When just doing a List() on a non-unique index we will see each object
// only once and do not need to track this.
//
// This of course makes iterating over a non-unique index with a prefix
// (or lowerbound search) about 20x slower than normal!
visited = map[string]struct{}{}
}

for {
key, iobj, ok := it.Next()
if !ok {
break
}

_, secondary := decodeNonUniqueKey(key)
secondary, primary := decodeNonUniqueKey(key)

// The secondary key doesn't match the search key. Since the primary
// key length can vary, we need to continue the prefix search.
if len(secondary) != len(searchKey) {
// The secondary key is shorter than what we're looking for, e.g.
// we match into the primary key. Keep searching for matching secondary
// keys.
switch {
case !prefixSearch && len(secondary) != len(searchKey):
continue
case prefixSearch && len(secondary) < len(searchKey):
continue
}

if prefixSearch {
// When doing a prefix search on a non-unique index we may see the
// same object multiple times since multiple keys may point it.
// Skip if we've already seen this object.
if _, found := visited[string(primary)]; found {
continue
}
visited[string(primary)] = struct{}{}
}

if !yield(iobj.data.(Obj), iobj.revision) {
break
}
}
}
}

func nonUniqueLowerBoundSeq[Obj any](iter *part.Iterator[object], searchKey []byte) iter.Seq2[Obj, Revision] {
return func(yield func(Obj, Revision) bool) {
// Clone the iterator to allow multiple uses.
iter = iter.Clone()

// Keep track of objects we've already seen as multiple keys in non-unique
// index may map to a single object.
visited := map[string]struct{}{}
for {
key, iobj, ok := iter.Next()
if !ok {
break
}
// With a non-unique index we have a composite key <secondary><primary><secondary len>.
// This means we need to check every key that it's larger or equal to the search key.
// Just seeking to the first one isn't enough as the secondary key length may vary.
secondary, primary := decodeNonUniqueKey(key)
if bytes.Compare(secondary, searchKey) >= 0 {
if _, found := visited[string(primary)]; found {
continue
}
visited[string(primary)] = struct{}{}

if !yield(iobj.data.(Obj), iobj.revision) {
return
}
}
}
}
}

// iterator adapts the "any" object iterator to a typed object.
type iterator[Obj any] struct {
iter interface{ Next() ([]byte, object, bool) }
Expand Down
23 changes: 18 additions & 5 deletions quick_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func isOrdered[A cmp.Ordered, B any](t *testing.T, it iter.Seq2[A, B]) bool {
return true
}

func seqLen[A cmp.Ordered, B any](it iter.Seq2[A, B]) int {
func seqLen[A, B any](it iter.Seq2[A, B]) int {
n := 0
for range it {
n++
Expand Down Expand Up @@ -167,13 +167,13 @@ func TestDB_Quick(t *testing.T) {
// Check against the secondary (non-unique index)
//

// Non-unique indexes return at least as many objects as we've inserted.
if numExpected > seqLen(Map(table.Prefix(rtxn, bIndex.Query("")), quickObj.getB)) {
// Non-unique indexes return the same number of objects as we've inserted.
if numExpected != seqLen(table.Prefix(rtxn, bIndex.Query(""))) {
t.Logf("Prefix() via bIndex wrong length")
return false
}
if numExpected > seqLen(Map(table.LowerBound(rtxn, bIndex.Query("")), quickObj.getB)) {
t.Logf("LowerBOund() via bIndex wrong length")
if numExpected != seqLen(table.LowerBound(rtxn, bIndex.Query(""))) {
t.Logf("LowerBound() via bIndex wrong length")
return false
}

Expand All @@ -200,17 +200,30 @@ func TestDB_Quick(t *testing.T) {
return false
}

visited := map[string]struct{}{}
for obj := range table.Prefix(rtxn, bIndex.Query(b)) {
if !strings.HasPrefix(obj.B, b) {
t.Logf("Prefix() via bIndex has wrong prefix")
return false
}
if _, found := visited[obj.A]; found {
t.Logf("Prefix() visited object %q twice", obj.A)
return false
}
visited[obj.A] = struct{}{}
}

visited = map[string]struct{}{}
for obj := range table.LowerBound(rtxn, bIndex.Query(b)) {
if cmp.Compare(obj.B, b) < 0 {
t.Logf("LowerBound() via bIndex has wrong objects, expected %v >= %v", []byte(obj.B), []byte(b))
return false
}
if _, found := visited[obj.A]; found {
t.Logf("Prefix() visited object %q twice", obj.A)
return false
}
visited[obj.A] = struct{}{}
}

// Iterating over the secondary index returns the objects in order
Expand Down
14 changes: 10 additions & 4 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (t *genTable[Obj]) GetWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision u
}

// Check that we have a full match on the key
_, secondary := decodeNonUniqueKey(key)
secondary, _ := decodeNonUniqueKey(key)
if len(secondary) == len(q.key) {
break
}
Expand All @@ -318,7 +318,10 @@ func (t *genTable[Obj]) LowerBoundWatch(txn ReadTxn, q Query[Obj]) (iter.Seq2[Ob
// we watch the whole table for changes.
watch := indexTxn.RootWatch()
iter := indexTxn.LowerBound(q.key)
return partSeq[Obj](iter), watch
if indexTxn.unique {
return partSeq[Obj](iter), watch
}
return nonUniqueLowerBoundSeq[Obj](iter, q.key), watch
}

func (t *genTable[Obj]) Prefix(txn ReadTxn, q Query[Obj]) iter.Seq2[Obj, Revision] {
Expand All @@ -329,7 +332,10 @@ func (t *genTable[Obj]) Prefix(txn ReadTxn, q Query[Obj]) iter.Seq2[Obj, Revisio
func (t *genTable[Obj]) PrefixWatch(txn ReadTxn, q Query[Obj]) (iter.Seq2[Obj, Revision], <-chan struct{}) {
indexTxn := txn.getTxn().mustIndexReadTxn(t, t.indexPos(q.index))
iter, watch := indexTxn.Prefix(q.key)
return partSeq[Obj](iter), watch
if indexTxn.unique {
return partSeq[Obj](iter), watch
}
return nonUniqueSeq[Obj](iter, true, q.key), watch
}

func (t *genTable[Obj]) All(txn ReadTxn) iter.Seq2[Obj, Revision] {
Expand Down Expand Up @@ -366,7 +372,7 @@ func (t *genTable[Obj]) ListWatch(txn ReadTxn, q Query[Obj]) (iter.Seq2[Obj, Rev
// iteration will continue until key length mismatches, e.g. we hit a
// longer key sharing the same prefix.
iter, watch := indexTxn.Prefix(q.key)
return nonUniqueSeq[Obj](iter, q.key), watch
return nonUniqueSeq[Obj](iter, false, q.key), watch
}

func (t *genTable[Obj]) Insert(txn WriteTxn, obj Obj) (oldObj Obj, hadOld bool, err error) {
Expand Down
Loading

0 comments on commit 62157fc

Please sign in to comment.