Skip to content

Commit

Permalink
statedb: Use table positions instead of names in txn
Browse files Browse the repository at this point in the history
WriteTxn() is fairly slow and partially the reason is the heaviness of
using hash maps to look up the tables that are being modified. Since the
set of tables is static at runtime we can just use integer index to look
up.

This can later be extended to the indexTxn as well.

Before:

goos: linux
goarch: amd64
pkg: github.com/cilium/statedb
cpu: Intel(R) Core(TM) i5-10210U CPU @ 1.60GHz
BenchmarkDB_WriteTxn_1-8                                          132523              9268 ns/op            107899 objects/sec
BenchmarkDB_WriteTxn_10-8                                         367114              3044 ns/op            328531 objects/sec
BenchmarkDB_WriteTxn_100-8                                        430207              2459 ns/op            406751 objects/sec
BenchmarkDB_WriteTxn_100_SecondaryIndex-8                         503071              2358 ns/op            424111 objects/sec
BenchmarkDB_RandomInsert-8                                           945           1288636 ns/op            776014 objects/sec
BenchmarkDB_RandomReplace-8                                          226           5256999 ns/op            190223 objects/sec
BenchmarkDB_SequentialInsert-8                                       370           3580799 ns/op            279267 objects/sec
...
BenchmarkDB_DeleteTracker_Baseline-8                                 356           3314962 ns/op            301663 objects/sec
BenchmarkDB_DeleteTracker-8                                          181           6609844 ns/op            151290 objects/sec
BenchmarkDB_RandomLookup-8                                          3289            354713 ns/op           2819181 objects/sec
BenchmarkDB_SequentialLookup-8                                      3519            334955 ns/op           2985479 objects/sec
BenchmarkDB_FullIteration_All-8                                    88718             12817 ns/op          78022832 objects/sec
BenchmarkDB_FullIteration_Get-8                                    71965             15974 ns/op          62599999 objects/sec
BenchmarkDB_PropagationDelay-8                                    188659              6398 ns/op
55.00 50th_µs           74.00 90th_µs          277.0 99th_µs
PASS
ok      github.com/cilium/statedb       33.132s

After:

goos: linux
goarch: amd64
pkg: github.com/cilium/statedb
cpu: Intel(R) Core(TM) i5-10210U CPU @ 1.60GHz
BenchmarkDB_WriteTxn_1-8                                          160879              7232 ns/op            138271 objects/sec
BenchmarkDB_WriteTxn_10-8                                         442489              2597 ns/op            385104 objects/sec
BenchmarkDB_WriteTxn_100-8                                        511983              2201 ns/op            454271 objects/sec
BenchmarkDB_WriteTxn_100_SecondaryIndex-8                         511678              2205 ns/op            453489 objects/sec
BenchmarkDB_RandomInsert-8                                          1045           1134981 ns/op            881072 objects/sec
BenchmarkDB_RandomReplace-8                                          246           4879081 ns/op            204957 objects/sec
BenchmarkDB_SequentialInsert-8                                       387           3108060 ns/op            321744 objects/sec
...
BenchmarkDB_DeleteTracker_Baseline-8                                 374           3167598 ns/op            315697 objects/sec
BenchmarkDB_DeleteTracker-8                                          182           6409534 ns/op            156018 objects/sec
BenchmarkDB_RandomLookup-8                                          3505            317084 ns/op           3153747 objects/sec
BenchmarkDB_SequentialLookup-8                                      3951            293740 ns/op           3404371 objects/sec
BenchmarkDB_FullIteration_All-8                                    98962             12085 ns/op          82749863 objects/sec
BenchmarkDB_FullIteration_Get-8                                    81453             14711 ns/op          67978410 objects/sec
BenchmarkDB_PropagationDelay-8                                    206851              5742 ns/op
50.00 50th_µs           64.00 90th_µs          261.0 99th_µs
PASS
ok      github.com/cilium/statedb       31.966s

Signed-off-by: Jussi Maki <[email protected]>
  • Loading branch information
joamaki committed Mar 25, 2024
1 parent 0c84efc commit 308c157
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 146 deletions.
58 changes: 33 additions & 25 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
"reflect"
"runtime"
"slices"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -87,29 +88,29 @@ import (
// the lowest revision of all delete trackers.
type DB struct {
mu sync.Mutex // protects 'tables' and sequences modifications to the root tree
tables map[TableName]TableMeta
ctx context.Context
cancel context.CancelFunc
root atomic.Pointer[iradix.Tree[tableEntry]]
root atomic.Pointer[dbRoot]
gcTrigger chan struct{} // trigger for graveyard garbage collection
gcExited chan struct{}
gcRateLimitInterval time.Duration
metrics Metrics
}

type dbRoot []tableEntry

func NewDB(tables []TableMeta, metrics Metrics) (*DB, error) {
txn := iradix.New[tableEntry]().Txn()
db := &DB{
tables: make(map[TableName]TableMeta),
metrics: metrics,
gcRateLimitInterval: defaultGCRateLimitInterval,
}
root := make(dbRoot, 0, len(tables))
for _, t := range tables {
if err := db.registerTable(t, txn); err != nil {
if err := db.registerTable(t, &root); err != nil {
return nil, err
}
}
db.root.Store(txn.CommitOnly())
db.root.Store(&root)

return db, nil
}
Expand All @@ -128,25 +129,31 @@ func (db *DB) RegisterTable(table TableMeta, tables ...TableMeta) error {
db.mu.Lock()
defer db.mu.Unlock()

txn := db.root.Load().Txn()
if err := db.registerTable(table, txn); err != nil {
root := slices.Clone(*db.root.Load())

if err := db.registerTable(table, &root); err != nil {
return err
}
for _, t := range tables {
if err := db.registerTable(t, txn); err != nil {
if err := db.registerTable(t, &root); err != nil {
return err
}
}
db.root.Store(txn.CommitOnly())
db.root.Store(&root)
return nil
}

func (db *DB) registerTable(table TableMeta, txn *iradix.Txn[tableEntry]) error {
func (db *DB) registerTable(table TableMeta, root *dbRoot) error {
name := table.Name()
if _, ok := db.tables[name]; ok {
return tableError(name, ErrDuplicateTable)
for _, t := range *root {
if t.meta.Name() == name {
return tableError(name, ErrDuplicateTable)
}
}
db.tables[name] = table

pos := len(*root)
table.setTablePos(pos)

var entry tableEntry
entry.meta = table
entry.deleteTrackers = iradix.New[deleteTracker]()
Expand All @@ -159,7 +166,8 @@ func (db *DB) registerTable(table TableMeta, txn *iradix.Txn[tableEntry]) error
indexTxn.Insert([]byte(index), indexEntry{iradix.New[object](), indexer.unique})
}
entry.indexes = indexTxn.CommitOnly()
txn.Insert(table.tableKey(), entry)

*root = append(*root, entry)
return nil
}

Expand All @@ -169,8 +177,8 @@ func (db *DB) registerTable(table TableMeta, txn *iradix.Txn[tableEntry]) error
// ReadTxn is not thread-safe!
func (db *DB) ReadTxn() ReadTxn {
return &txn{
db: db,
rootReadTxn: db.root.Load().Txn(),
db: db,
root: *db.root.Load(),
}
}

Expand All @@ -192,15 +200,12 @@ func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn {
smus.Lock()
acquiredAt := time.Now()

rootReadTxn := db.root.Load().Txn()
tableEntries := make(map[TableName]*tableEntry, len(tables))
root := *db.root.Load()
tableEntries := make([]*tableEntry, len(root))
var tableNames []string
for _, table := range allTables {
tableEntry, ok := rootReadTxn.Get(table.tableKey())
if !ok {
panic("BUG: Table '" + table.Name() + "' not found")
}
tableEntries[table.Name()] = &tableEntry
tableEntry := root[table.tablePos()]
tableEntries[table.tablePos()] = &tableEntry
tableNames = append(tableNames, table.Name())

db.metrics.WriteTxnTableAcquisition(
Expand All @@ -217,7 +222,7 @@ func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn {

txn := &txn{
db: db,
rootReadTxn: rootReadTxn,
root: root,
modifiedTables: tableEntries,
writeTxns: make(map[tableIndex]indexTxn),
smus: smus,
Expand Down Expand Up @@ -278,6 +283,7 @@ var ciliumPackagePrefix = func() string {
func callerPackage() string {
var callerPkg string
pc, _, _, ok := runtime.Caller(2)

if ok {
f := runtime.FuncForPC(pc)
if f != nil {
Expand All @@ -288,7 +294,9 @@ func callerPackage() string {
callerPkg = "unknown"
}
} else {

callerPkg = "unknown"
}

return callerPkg
}
4 changes: 2 additions & 2 deletions deletetracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (dt *DeleteTracker[Obj]) getRevision() uint64 {
// 'minRevision'. The deleted objects are not garbage-collected unless 'Mark' is
// called!
func (dt *DeleteTracker[Obj]) Deleted(txn ReadTxn, minRevision Revision) Iterator[Obj] {
indexTxn := txn.getTxn().mustIndexReadTxn(dt.table.Name(), GraveyardRevisionIndex)
indexTxn := txn.getTxn().mustIndexReadTxn(dt.table, GraveyardRevisionIndex)
iter := indexTxn.Root().Iterator()
iter.SeekLowerBound(index.Uint64(minRevision))
return &iterator[Obj]{iter}
Expand All @@ -57,7 +57,7 @@ func (dt *DeleteTracker[Obj]) Close() {
// Remove the delete tracker from the table.
txn := dt.db.WriteTxn(dt.table).getTxn()
db := txn.db
table := txn.modifiedTables[dt.table.Name()]
table := txn.modifiedTables[dt.table.tablePos()]
if table == nil {
panic("BUG: Table missing from write transaction")
}
Expand Down
18 changes: 8 additions & 10 deletions graveyard.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat

// Do a lockless read transaction to find potential dead objects.
txn := db.ReadTxn().getTxn()
tableIter := txn.rootReadTxn.Root().Iterator()
for nameKey, table, ok := tableIter.Next(); ok; nameKey, table, ok = tableIter.Next() {
tableName := string(nameKey)
for _, table := range txn.root {
tableName := table.meta.Name()
start := time.Now()

// Find the low watermark
Expand All @@ -64,7 +63,7 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat

// Find objects to be deleted by iterating over the graveyard revision index up
// to the low watermark.
indexTree := txn.mustIndexReadTxn(tableName, GraveyardRevisionIndex)
indexTree := txn.mustIndexReadTxn(table.meta, GraveyardRevisionIndex)

objIter := indexTree.Root().Iterator()
for key, obj, ok := objIter.Next(); ok; key, obj, ok = objIter.Next() {
Expand Down Expand Up @@ -93,12 +92,12 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat
tableName := meta.Name()
start := time.Now()
for _, key := range deadObjs {
oldObj, existed := txn.mustIndexWriteTxn(tableName, GraveyardRevisionIndex).Delete(key)
oldObj, existed := txn.mustIndexWriteTxn(meta, GraveyardRevisionIndex).Delete(key)
if existed {
// The dead object still existed (and wasn't replaced by a create->delete),
// delete it from the primary index.
key = meta.primary().fromObject(oldObj).First()
txn.mustIndexWriteTxn(tableName, GraveyardIndex).Delete(key)
txn.mustIndexWriteTxn(meta, GraveyardIndex).Delete(key)
}
}
cleaningTimes[tableName] = time.Since(start)
Expand All @@ -114,8 +113,8 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat

// Update object count metrics.
txn = db.ReadTxn().getTxn()
tableIter = txn.rootReadTxn.Root().Iterator()
for name, table, ok := tableIter.Next(); ok; name, table, ok = tableIter.Next() {
for _, table := range txn.root {
name := table.meta.Name()
db.metrics.GraveyardObjectCount(string(name), table.numDeletedObjects())
db.metrics.ObjectCount(string(name), table.numObjects())
}
Expand All @@ -126,8 +125,7 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat
// Used in tests.
func (db *DB) graveyardIsEmpty() bool {
txn := db.ReadTxn().getTxn()
tableIter := txn.rootReadTxn.Root().Iterator()
for _, table, ok := tableIter.Next(); ok; _, table, ok = tableIter.Next() {
for _, table := range txn.root {
indexEntry, ok := table.indexes.Get([]byte(GraveyardIndex))
if !ok {
panic("BUG: GraveyardIndex not found from table")
Expand Down
6 changes: 3 additions & 3 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
type Metrics interface {
WriteTxnTableAcquisition(tableName string, acquire time.Duration)
WriteTxnTotalAcquisition(goPackage string, tables []string, acquire time.Duration)
WriteTxnDuration(goPackage string, s []string, acquire time.Duration)
WriteTxnDuration(goPackage string, acquire time.Duration)

GraveyardLowWatermark(tableName string, lowWatermark Revision)
GraveyardCleaningDuration(tableName string, duration time.Duration)
Expand Down Expand Up @@ -121,7 +121,7 @@ func (m *ExpVarMetrics) ObjectCount(name string, numObjects int) {
m.ObjectCountVar.Set(name, &intVar)
}

func (m *ExpVarMetrics) WriteTxnDuration(goPackage string, s []string, acquire time.Duration) {
func (m *ExpVarMetrics) WriteTxnDuration(goPackage string, acquire time.Duration) {
m.WriteTxnDurationVar.AddFloat(goPackage, acquire.Seconds())
}

Expand Down Expand Up @@ -162,7 +162,7 @@ func (*NopMetrics) Revision(tableName string, revision uint64) {
}

// WriteTxnDuration implements Metrics.
func (*NopMetrics) WriteTxnDuration(goPackage string, s []string, acquire time.Duration) {
func (*NopMetrics) WriteTxnDuration(goPackage string, acquire time.Duration) {
}

// WriteTxnTableAcquisition implements Metrics.
Expand Down
23 changes: 16 additions & 7 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,22 @@ func MustNewTable[Obj any](
}

type genTable[Obj any] struct {
pos int
table TableName
smu internal.SortableMutex
primaryIndexer Indexer[Obj]
primaryAnyIndexer anyIndexer
secondaryAnyIndexers map[string]anyIndexer
}

func (t *genTable[Obj]) setTablePos(pos int) {
t.pos = pos
}

func (t *genTable[Obj]) tablePos() int {
return t.pos
}

func (t *genTable[Obj]) tableKey() []byte {
return []byte(t.table)
}
Expand All @@ -120,11 +129,11 @@ func (t *genTable[Obj]) ToTable() Table[Obj] {
}

func (t *genTable[Obj]) Revision(txn ReadTxn) Revision {
return txn.getTxn().GetRevision(t.table)
return txn.getTxn().getRevision(t)
}

func (t *genTable[Obj]) NumObjects(txn ReadTxn) int {
indexTxn := txn.getTxn().mustIndexReadTxn(t.table, t.primaryAnyIndexer.name)
indexTxn := txn.getTxn().mustIndexReadTxn(t, t.primaryAnyIndexer.name)
return indexTxn.entry.tree.Len()
}

Expand All @@ -134,7 +143,7 @@ func (t *genTable[Obj]) First(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint
}

func (t *genTable[Obj]) FirstWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint64, watch <-chan struct{}, ok bool) {
indexTxn := txn.getTxn().mustIndexReadTxn(t.table, q.index)
indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index)
iter := indexTxn.Root().Iterator()
watch = iter.SeekPrefixWatch(q.key)

Expand Down Expand Up @@ -172,7 +181,7 @@ func (t *genTable[Obj]) Last(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint6
}

func (t *genTable[Obj]) LastWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint64, watch <-chan struct{}, ok bool) {
indexTxn := txn.getTxn().mustIndexReadTxn(t.table, q.index)
indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index)
iter := indexTxn.Root().ReverseIterator()
watch = iter.SeekPrefixWatch(q.key)

Expand Down Expand Up @@ -205,7 +214,7 @@ func (t *genTable[Obj]) LastWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision
}

func (t *genTable[Obj]) LowerBound(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <-chan struct{}) {
indexTxn := txn.getTxn().mustIndexReadTxn(t.table, q.index)
indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index)
root := indexTxn.Root()

// Since LowerBound query may be invalidated by changes in another branch
Expand All @@ -218,15 +227,15 @@ func (t *genTable[Obj]) LowerBound(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <-
}

func (t *genTable[Obj]) All(txn ReadTxn) (Iterator[Obj], <-chan struct{}) {
indexTxn := txn.getTxn().mustIndexReadTxn(t.table, t.primaryAnyIndexer.name)
indexTxn := txn.getTxn().mustIndexReadTxn(t, t.primaryAnyIndexer.name)
root := indexTxn.Root()
// Grab the watch channel for the root node
watchCh, _, _ := root.GetWatch(nil)
return &iterator[Obj]{root.Iterator()}, watchCh
}

func (t *genTable[Obj]) Get(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <-chan struct{}) {
indexTxn := txn.getTxn().mustIndexReadTxn(t.table, q.index)
indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index)
iter := indexTxn.Root().Iterator()
watchCh := iter.SeekPrefixWatch(q.key)

Expand Down
Loading

0 comments on commit 308c157

Please sign in to comment.