Skip to content

Commit

Permalink
Use arrays for index tree lookup
Browse files Browse the repository at this point in the history
Similarly to tables we have few enough indexes per table to not require
anything more complex than an array for storing them.

Before:

goos: linux
goarch: amd64
pkg: github.com/cilium/statedb
cpu: Intel(R) Core(TM) i5-10210U CPU @ 1.60GHz
BenchmarkDB_WriteTxn_1-8                                          160879              7232 ns/op            138271 objects/sec
BenchmarkDB_WriteTxn_10-8                                         442489              2597 ns/op            385104 objects/sec
BenchmarkDB_WriteTxn_100-8                                        511983              2201 ns/op            454271 objects/sec
BenchmarkDB_WriteTxn_100_SecondaryIndex-8                         511678              2205 ns/op            453489 objects/sec
BenchmarkDB_RandomInsert-8                                          1045           1134981 ns/op            881072 objects/sec
BenchmarkDB_RandomReplace-8                                          246           4879081 ns/op            204957 objects/sec
BenchmarkDB_SequentialInsert-8                                       387           3108060 ns/op            321744 objects/sec
...
BenchmarkDB_DeleteTracker_Baseline-8                                 374           3167598 ns/op            315697 objects/sec
BenchmarkDB_DeleteTracker-8                                          182           6409534 ns/op            156018 objects/sec
BenchmarkDB_RandomLookup-8                                          3505            317084 ns/op           3153747 objects/sec
BenchmarkDB_SequentialLookup-8                                      3951            293740 ns/op           3404371 objects/sec
BenchmarkDB_FullIteration_All-8                                    98962             12085 ns/op          82749863 objects/sec
BenchmarkDB_FullIteration_Get-8                                    81453             14711 ns/op          67978410 objects/sec
BenchmarkDB_PropagationDelay-8                                    206851              5742 ns/op
50.00 50th_µs           64.00 90th_µs          261.0 99th_µs
PASS
ok      github.com/cilium/statedb       31.966s

After:

goos: linux
goarch: amd64
pkg: github.com/cilium/statedb
cpu: Intel(R) Core(TM) i5-10210U CPU @ 1.60GHz
BenchmarkDB_WriteTxn_1-8                                          240938              4752 ns/op            210459 objects/sec
BenchmarkDB_WriteTxn_10-8                                         502363              2551 ns/op            392063 objects/sec
BenchmarkDB_WriteTxn_100-8                                        457850              2279 ns/op            438872 objects/sec
BenchmarkDB_WriteTxn_100_SecondaryIndex-8                         526416              2222 ns/op            450100 objects/sec
BenchmarkDB_RandomInsert-8                                          1012           1181665 ns/op            846264 objects/sec
BenchmarkDB_RandomReplace-8                                          216           5048896 ns/op            198063 objects/sec
BenchmarkDB_SequentialInsert-8                                       398           2996997 ns/op            333667 objects/sec
...
BenchmarkDB_DeleteTracker_Baseline-8                                 390           3036951 ns/op            329278 objects/sec
BenchmarkDB_DeleteTracker-8                                          141           8194663 ns/op            122031 objects/sec
BenchmarkDB_RandomLookup-8                                          8846            134745 ns/op           7421428 objects/sec
BenchmarkDB_SequentialLookup-8                                      8425            123284 ns/op           8111372 objects/sec
BenchmarkDB_FullIteration_All-8                                   103279             10996 ns/op          90941891 objects/sec
BenchmarkDB_FullIteration_Get-8                                    84451             13637 ns/op          73328686 objects/sec
BenchmarkDB_PropagationDelay-8                                    235146              5342 ns/op
48.00 50th_µs           57.00 90th_µs          215.0 99th_µs
PASS
ok      github.com/cilium/statedb       31.480s

Signed-off-by: Jussi Maki <[email protected]>
  • Loading branch information
joamaki committed Mar 27, 2024
1 parent b519e34 commit 965dc5b
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 140 deletions.
20 changes: 2 additions & 18 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"sync/atomic"
"time"

iradix "github.com/hashicorp/go-immutable-radix/v2"

"github.com/cilium/hive/cell"
"github.com/cilium/statedb/internal"
)
Expand Down Expand Up @@ -153,21 +151,7 @@ func (db *DB) registerTable(table TableMeta, root *dbRoot) error {

pos := len(*root)
table.setTablePos(pos)

var entry tableEntry
entry.meta = table
entry.deleteTrackers = iradix.New[deleteTracker]()
indexTxn := iradix.New[indexEntry]().Txn()
indexTxn.Insert([]byte(table.primary().name), indexEntry{iradix.New[object](), true})
indexTxn.Insert([]byte(RevisionIndex), indexEntry{iradix.New[object](), true})
indexTxn.Insert([]byte(GraveyardIndex), indexEntry{iradix.New[object](), true})
indexTxn.Insert([]byte(GraveyardRevisionIndex), indexEntry{iradix.New[object](), true})
for index, indexer := range table.secondary() {
indexTxn.Insert([]byte(index), indexEntry{iradix.New[object](), indexer.unique})
}
entry.indexes = indexTxn.CommitOnly()

*root = append(*root, entry)
*root = append(*root, table.tableEntry())
return nil
}

Expand Down Expand Up @@ -205,6 +189,7 @@ func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn {
var tableNames []string
for _, table := range allTables {
tableEntry := root[table.tablePos()]
tableEntry.indexes = slices.Clone(tableEntry.indexes)
tableEntries[table.tablePos()] = &tableEntry
tableNames = append(tableNames, table.Name())

Expand All @@ -224,7 +209,6 @@ func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn {
db: db,
root: root,
modifiedTables: tableEntries,
writeTxns: make(map[tableIndex]indexTxn),
smus: smus,
acquiredAt: acquiredAt,
packageName: callerPkg,
Expand Down
9 changes: 7 additions & 2 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func TestDB_All(t *testing.T) {

select {
case <-watch:
t.Fatalf("expected All() watch channel to not close before changes")
t.Fatalf("expected All() watch channel to not close before delete")
default:
}

Expand All @@ -434,10 +434,15 @@ func TestDB_All(t *testing.T) {
txn.Commit()
}

// Prior read transaction not affected by delete.
iter, _ = table.All(txn)
objs = Collect(iter)
require.Len(t, objs, 3)

select {
case <-watch:
case <-time.After(time.Second):
t.Fatalf("expceted All() watch channel to close after changes")
t.Fatalf("expected All() watch channel to close after delete")
}
}

Expand Down
2 changes: 1 addition & 1 deletion deletetracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (dt *DeleteTracker[Obj]) getRevision() uint64 {
// 'minRevision'. The deleted objects are not garbage-collected unless 'Mark' is
// called!
func (dt *DeleteTracker[Obj]) Deleted(txn ReadTxn, minRevision Revision) Iterator[Obj] {
indexTxn := txn.getTxn().mustIndexReadTxn(dt.table, GraveyardRevisionIndex)
indexTxn := txn.getTxn().mustIndexReadTxn(dt.table, GraveyardRevisionIndexPos)
iter := indexTxn.Root().Iterator()
iter.SeekLowerBound(index.Uint64(minRevision))
return &iterator[Obj]{iter}
Expand Down
11 changes: 4 additions & 7 deletions graveyard.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat

// Find objects to be deleted by iterating over the graveyard revision index up
// to the low watermark.
indexTree := txn.mustIndexReadTxn(table.meta, GraveyardRevisionIndex)
indexTree := txn.mustIndexReadTxn(table.meta, GraveyardRevisionIndexPos)

objIter := indexTree.Root().Iterator()
for key, obj, ok := objIter.Next(); ok; key, obj, ok = objIter.Next() {
Expand Down Expand Up @@ -92,12 +92,12 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat
tableName := meta.Name()
start := time.Now()
for _, key := range deadObjs {
oldObj, existed := txn.mustIndexWriteTxn(meta, GraveyardRevisionIndex).Delete(key)
oldObj, existed := txn.mustIndexWriteTxn(meta, GraveyardRevisionIndexPos).Delete(key)
if existed {
// The dead object still existed (and wasn't replaced by a create->delete),
// delete it from the primary index.
key = meta.primary().fromObject(oldObj).First()
txn.mustIndexWriteTxn(meta, GraveyardIndex).Delete(key)
txn.mustIndexWriteTxn(meta, GraveyardIndexPos).Delete(key)
}
}
cleaningTimes[tableName] = time.Since(start)
Expand Down Expand Up @@ -126,10 +126,7 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat
func (db *DB) graveyardIsEmpty() bool {
txn := db.ReadTxn().getTxn()
for _, table := range txn.root {
indexEntry, ok := table.indexes.Get([]byte(GraveyardIndex))
if !ok {
panic("BUG: GraveyardIndex not found from table")
}
indexEntry := table.indexes[table.meta.indexPos(GraveyardIndex)]
if indexEntry.tree.Len() != 0 {
return false
}
Expand Down
105 changes: 76 additions & 29 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"

"github.com/cilium/statedb/internal"
iradix "github.com/hashicorp/go-immutable-radix/v2"

"github.com/cilium/statedb/index"
)
Expand Down Expand Up @@ -46,10 +47,23 @@ func NewTable[Obj any](
primaryAnyIndexer: toAnyIndexer(primaryIndexer),
primaryIndexer: primaryIndexer,
secondaryAnyIndexers: make(map[string]anyIndexer, len(secondaryIndexers)),
indexPositions: make(map[string]int),
}

table.indexPositions[primaryIndexer.indexName()] = PrimaryIndexPos

// Internal indexes
table.indexPositions[RevisionIndex] = RevisionIndexPos
table.indexPositions[GraveyardIndex] = GraveyardIndexPos
table.indexPositions[GraveyardRevisionIndex] = GraveyardRevisionIndexPos

indexPos := SecondaryIndexStartPos
for _, indexer := range secondaryIndexers {
table.secondaryAnyIndexers[indexer.indexName()] = toAnyIndexer(indexer)
anyIndexer := toAnyIndexer(indexer)
anyIndexer.pos = indexPos
table.secondaryAnyIndexers[indexer.indexName()] = anyIndexer
table.indexPositions[indexer.indexName()] = indexPos
indexPos++
}

// Primary index must always be unique
Expand Down Expand Up @@ -94,6 +108,23 @@ type genTable[Obj any] struct {
primaryIndexer Indexer[Obj]
primaryAnyIndexer anyIndexer
secondaryAnyIndexers map[string]anyIndexer
indexPositions map[string]int
}

func (t *genTable[Obj]) tableEntry() tableEntry {
var entry tableEntry
entry.meta = t
entry.deleteTrackers = iradix.New[deleteTracker]()
entry.indexes = make([]indexEntry, len(t.indexPositions))
entry.indexes[t.indexPositions[t.primaryIndexer.indexName()]] = indexEntry{iradix.New[object](), nil, true}

for index, indexer := range t.secondaryAnyIndexers {
entry.indexes[t.indexPositions[index]] = indexEntry{iradix.New[object](), nil, indexer.unique}
}
entry.indexes[t.indexPositions[RevisionIndex]] = indexEntry{iradix.New[object](), nil, true}
entry.indexes[t.indexPositions[GraveyardIndex]] = indexEntry{iradix.New[object](), nil, true}
entry.indexes[t.indexPositions[GraveyardRevisionIndex]] = indexEntry{iradix.New[object](), nil, true}
return entry
}

func (t *genTable[Obj]) setTablePos(pos int) {
Expand All @@ -108,6 +139,13 @@ func (t *genTable[Obj]) tableKey() []byte {
return []byte(t.table)
}

func (t *genTable[Obj]) indexPos(name string) int {
if t.primaryAnyIndexer.name == name {
return PrimaryIndexPos
}
return t.indexPositions[name]
}

func (t *genTable[Obj]) PrimaryIndexer() Indexer[Obj] {
return t.primaryIndexer
}
Expand All @@ -133,8 +171,8 @@ func (t *genTable[Obj]) Revision(txn ReadTxn) Revision {
}

func (t *genTable[Obj]) NumObjects(txn ReadTxn) int {
indexTxn := txn.getTxn().mustIndexReadTxn(t, t.primaryAnyIndexer.name)
return indexTxn.entry.tree.Len()
table := &txn.getTxn().root[t.tablePos()]
return table.indexes[PrimaryIndexPos].tree.Len()
}

func (t *genTable[Obj]) First(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint64, ok bool) {
Expand All @@ -143,11 +181,22 @@ func (t *genTable[Obj]) First(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint
}

func (t *genTable[Obj]) FirstWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint64, watch <-chan struct{}, ok bool) {
indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index)
indexTxn := txn.getTxn().mustIndexReadTxn(t, t.indexPos(q.index))
var iobj object
if indexTxn.unique {
// On a unique index we can do a direct get rather than a prefix search.
watch, iobj, ok = indexTxn.Root().GetWatch(q.key)
if !ok {
return
}
obj = iobj.data.(Obj)
revision = iobj.revision
return
}

// For a non-unique index we need to do a prefix search.
iter := indexTxn.Root().Iterator()
watch = iter.SeekPrefixWatch(q.key)

var iobj object
for {
var key []byte
key, iobj, ok = iter.Next()
Expand All @@ -156,14 +205,8 @@ func (t *genTable[Obj]) FirstWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision
}

// Check that we have a full match on the key
var match bool
if indexTxn.entry.unique {
match = len(key) == len(q.key)
} else {
_, secondary := decodeNonUniqueKey(key)
match = len(secondary) == len(q.key)
}
if match {
_, secondary := decodeNonUniqueKey(key)
if len(secondary) == len(q.key) {
break
}
}
Expand All @@ -181,11 +224,21 @@ func (t *genTable[Obj]) Last(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint6
}

func (t *genTable[Obj]) LastWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint64, watch <-chan struct{}, ok bool) {
indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index)
indexTxn := txn.getTxn().mustIndexReadTxn(t, t.indexPos(q.index))
var iobj object
if indexTxn.unique {
// On a unique index we can do a direct get rather than a prefix search.
watch, iobj, ok = indexTxn.Root().GetWatch(q.key)
if !ok {
return
}
obj = iobj.data.(Obj)
revision = iobj.revision
return
}

iter := indexTxn.Root().ReverseIterator()
watch = iter.SeekPrefixWatch(q.key)

var iobj object
for {
var key []byte
key, iobj, ok = iter.Previous()
Expand All @@ -194,14 +247,8 @@ func (t *genTable[Obj]) LastWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision
}

// Check that we have a full match on the key
var match bool
if indexTxn.entry.unique {
match = len(key) == len(q.key)
} else {
_, secondary := decodeNonUniqueKey(key)
match = len(secondary) == len(q.key)
}
if match {
_, secondary := decodeNonUniqueKey(key)
if len(secondary) == len(q.key) {
break
}
}
Expand All @@ -214,7 +261,7 @@ func (t *genTable[Obj]) LastWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision
}

func (t *genTable[Obj]) LowerBound(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <-chan struct{}) {
indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index)
indexTxn := txn.getTxn().mustIndexReadTxn(t, t.indexPos(q.index))
root := indexTxn.Root()

// Since LowerBound query may be invalidated by changes in another branch
Expand All @@ -227,19 +274,19 @@ func (t *genTable[Obj]) LowerBound(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <-
}

func (t *genTable[Obj]) All(txn ReadTxn) (Iterator[Obj], <-chan struct{}) {
indexTxn := txn.getTxn().mustIndexReadTxn(t, t.primaryAnyIndexer.name)
indexTxn := txn.getTxn().mustIndexReadTxn(t, PrimaryIndexPos)
root := indexTxn.Root()
// Grab the watch channel for the root node
watchCh, _, _ := root.GetWatch(nil)
return &iterator[Obj]{root.Iterator()}, watchCh
}

func (t *genTable[Obj]) Get(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <-chan struct{}) {
indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index)
indexTxn := txn.getTxn().mustIndexReadTxn(t, t.indexPos(q.index))
iter := indexTxn.Root().Iterator()
watchCh := iter.SeekPrefixWatch(q.key)

if indexTxn.entry.unique {
if indexTxn.unique {
return &uniqueIterator[Obj]{iter, q.key}, watchCh
}
return &nonUniqueIterator[Obj]{iter, q.key}, watchCh
Expand Down
Loading

0 comments on commit 965dc5b

Please sign in to comment.