From 308c157b566ee3bbc1ba2b0885b8056ce3729461 Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Sun, 17 Mar 2024 15:41:15 +0100 Subject: [PATCH] statedb: Use table positions instead of names in txn MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit WriteTxn() is fairly slow and partially the reason is the heaviness of using hash maps to look up the tables that are being modified. Since the set of tables is static at runtime we can just use integer index to look up. This can later be extended to the indexTxn as well. Before: goos: linux goarch: amd64 pkg: github.com/cilium/statedb cpu: Intel(R) Core(TM) i5-10210U CPU @ 1.60GHz BenchmarkDB_WriteTxn_1-8 132523 9268 ns/op 107899 objects/sec BenchmarkDB_WriteTxn_10-8 367114 3044 ns/op 328531 objects/sec BenchmarkDB_WriteTxn_100-8 430207 2459 ns/op 406751 objects/sec BenchmarkDB_WriteTxn_100_SecondaryIndex-8 503071 2358 ns/op 424111 objects/sec BenchmarkDB_RandomInsert-8 945 1288636 ns/op 776014 objects/sec BenchmarkDB_RandomReplace-8 226 5256999 ns/op 190223 objects/sec BenchmarkDB_SequentialInsert-8 370 3580799 ns/op 279267 objects/sec ... BenchmarkDB_DeleteTracker_Baseline-8 356 3314962 ns/op 301663 objects/sec BenchmarkDB_DeleteTracker-8 181 6609844 ns/op 151290 objects/sec BenchmarkDB_RandomLookup-8 3289 354713 ns/op 2819181 objects/sec BenchmarkDB_SequentialLookup-8 3519 334955 ns/op 2985479 objects/sec BenchmarkDB_FullIteration_All-8 88718 12817 ns/op 78022832 objects/sec BenchmarkDB_FullIteration_Get-8 71965 15974 ns/op 62599999 objects/sec BenchmarkDB_PropagationDelay-8 188659 6398 ns/op 55.00 50th_µs 74.00 90th_µs 277.0 99th_µs PASS ok github.com/cilium/statedb 33.132s After: goos: linux goarch: amd64 pkg: github.com/cilium/statedb cpu: Intel(R) Core(TM) i5-10210U CPU @ 1.60GHz BenchmarkDB_WriteTxn_1-8 160879 7232 ns/op 138271 objects/sec BenchmarkDB_WriteTxn_10-8 442489 2597 ns/op 385104 objects/sec BenchmarkDB_WriteTxn_100-8 511983 2201 ns/op 454271 objects/sec BenchmarkDB_WriteTxn_100_SecondaryIndex-8 511678 2205 ns/op 453489 objects/sec BenchmarkDB_RandomInsert-8 1045 1134981 ns/op 881072 objects/sec BenchmarkDB_RandomReplace-8 246 4879081 ns/op 204957 objects/sec BenchmarkDB_SequentialInsert-8 387 3108060 ns/op 321744 objects/sec ... BenchmarkDB_DeleteTracker_Baseline-8 374 3167598 ns/op 315697 objects/sec BenchmarkDB_DeleteTracker-8 182 6409534 ns/op 156018 objects/sec BenchmarkDB_RandomLookup-8 3505 317084 ns/op 3153747 objects/sec BenchmarkDB_SequentialLookup-8 3951 293740 ns/op 3404371 objects/sec BenchmarkDB_FullIteration_All-8 98962 12085 ns/op 82749863 objects/sec BenchmarkDB_FullIteration_Get-8 81453 14711 ns/op 67978410 objects/sec BenchmarkDB_PropagationDelay-8 206851 5742 ns/op 50.00 50th_µs 64.00 90th_µs 261.0 99th_µs PASS ok github.com/cilium/statedb 31.966s Signed-off-by: Jussi Maki --- db.go | 58 +++++++++------- deletetracker.go | 4 +- graveyard.go | 18 +++-- metrics.go | 6 +- table.go | 23 +++++-- txn.go | 169 ++++++++++++++++++++--------------------------- types.go | 4 +- 7 files changed, 136 insertions(+), 146 deletions(-) diff --git a/db.go b/db.go index ec6a6b5..1bbee01 100644 --- a/db.go +++ b/db.go @@ -9,6 +9,7 @@ import ( "net/http" "reflect" "runtime" + "slices" "strings" "sync" "sync/atomic" @@ -87,29 +88,29 @@ import ( // the lowest revision of all delete trackers. type DB struct { mu sync.Mutex // protects 'tables' and sequences modifications to the root tree - tables map[TableName]TableMeta ctx context.Context cancel context.CancelFunc - root atomic.Pointer[iradix.Tree[tableEntry]] + root atomic.Pointer[dbRoot] gcTrigger chan struct{} // trigger for graveyard garbage collection gcExited chan struct{} gcRateLimitInterval time.Duration metrics Metrics } +type dbRoot []tableEntry + func NewDB(tables []TableMeta, metrics Metrics) (*DB, error) { - txn := iradix.New[tableEntry]().Txn() db := &DB{ - tables: make(map[TableName]TableMeta), metrics: metrics, gcRateLimitInterval: defaultGCRateLimitInterval, } + root := make(dbRoot, 0, len(tables)) for _, t := range tables { - if err := db.registerTable(t, txn); err != nil { + if err := db.registerTable(t, &root); err != nil { return nil, err } } - db.root.Store(txn.CommitOnly()) + db.root.Store(&root) return db, nil } @@ -128,25 +129,31 @@ func (db *DB) RegisterTable(table TableMeta, tables ...TableMeta) error { db.mu.Lock() defer db.mu.Unlock() - txn := db.root.Load().Txn() - if err := db.registerTable(table, txn); err != nil { + root := slices.Clone(*db.root.Load()) + + if err := db.registerTable(table, &root); err != nil { return err } for _, t := range tables { - if err := db.registerTable(t, txn); err != nil { + if err := db.registerTable(t, &root); err != nil { return err } } - db.root.Store(txn.CommitOnly()) + db.root.Store(&root) return nil } -func (db *DB) registerTable(table TableMeta, txn *iradix.Txn[tableEntry]) error { +func (db *DB) registerTable(table TableMeta, root *dbRoot) error { name := table.Name() - if _, ok := db.tables[name]; ok { - return tableError(name, ErrDuplicateTable) + for _, t := range *root { + if t.meta.Name() == name { + return tableError(name, ErrDuplicateTable) + } } - db.tables[name] = table + + pos := len(*root) + table.setTablePos(pos) + var entry tableEntry entry.meta = table entry.deleteTrackers = iradix.New[deleteTracker]() @@ -159,7 +166,8 @@ func (db *DB) registerTable(table TableMeta, txn *iradix.Txn[tableEntry]) error indexTxn.Insert([]byte(index), indexEntry{iradix.New[object](), indexer.unique}) } entry.indexes = indexTxn.CommitOnly() - txn.Insert(table.tableKey(), entry) + + *root = append(*root, entry) return nil } @@ -169,8 +177,8 @@ func (db *DB) registerTable(table TableMeta, txn *iradix.Txn[tableEntry]) error // ReadTxn is not thread-safe! func (db *DB) ReadTxn() ReadTxn { return &txn{ - db: db, - rootReadTxn: db.root.Load().Txn(), + db: db, + root: *db.root.Load(), } } @@ -192,15 +200,12 @@ func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn { smus.Lock() acquiredAt := time.Now() - rootReadTxn := db.root.Load().Txn() - tableEntries := make(map[TableName]*tableEntry, len(tables)) + root := *db.root.Load() + tableEntries := make([]*tableEntry, len(root)) var tableNames []string for _, table := range allTables { - tableEntry, ok := rootReadTxn.Get(table.tableKey()) - if !ok { - panic("BUG: Table '" + table.Name() + "' not found") - } - tableEntries[table.Name()] = &tableEntry + tableEntry := root[table.tablePos()] + tableEntries[table.tablePos()] = &tableEntry tableNames = append(tableNames, table.Name()) db.metrics.WriteTxnTableAcquisition( @@ -217,7 +222,7 @@ func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn { txn := &txn{ db: db, - rootReadTxn: rootReadTxn, + root: root, modifiedTables: tableEntries, writeTxns: make(map[tableIndex]indexTxn), smus: smus, @@ -278,6 +283,7 @@ var ciliumPackagePrefix = func() string { func callerPackage() string { var callerPkg string pc, _, _, ok := runtime.Caller(2) + if ok { f := runtime.FuncForPC(pc) if f != nil { @@ -288,7 +294,9 @@ func callerPackage() string { callerPkg = "unknown" } } else { + callerPkg = "unknown" } + return callerPkg } diff --git a/deletetracker.go b/deletetracker.go index 5ccd371..8f87914 100644 --- a/deletetracker.go +++ b/deletetracker.go @@ -36,7 +36,7 @@ func (dt *DeleteTracker[Obj]) getRevision() uint64 { // 'minRevision'. The deleted objects are not garbage-collected unless 'Mark' is // called! func (dt *DeleteTracker[Obj]) Deleted(txn ReadTxn, minRevision Revision) Iterator[Obj] { - indexTxn := txn.getTxn().mustIndexReadTxn(dt.table.Name(), GraveyardRevisionIndex) + indexTxn := txn.getTxn().mustIndexReadTxn(dt.table, GraveyardRevisionIndex) iter := indexTxn.Root().Iterator() iter.SeekLowerBound(index.Uint64(minRevision)) return &iterator[Obj]{iter} @@ -57,7 +57,7 @@ func (dt *DeleteTracker[Obj]) Close() { // Remove the delete tracker from the table. txn := dt.db.WriteTxn(dt.table).getTxn() db := txn.db - table := txn.modifiedTables[dt.table.Name()] + table := txn.modifiedTables[dt.table.tablePos()] if table == nil { panic("BUG: Table missing from write transaction") } diff --git a/graveyard.go b/graveyard.go index 2f7c5f6..f18a116 100644 --- a/graveyard.go +++ b/graveyard.go @@ -39,9 +39,8 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat // Do a lockless read transaction to find potential dead objects. txn := db.ReadTxn().getTxn() - tableIter := txn.rootReadTxn.Root().Iterator() - for nameKey, table, ok := tableIter.Next(); ok; nameKey, table, ok = tableIter.Next() { - tableName := string(nameKey) + for _, table := range txn.root { + tableName := table.meta.Name() start := time.Now() // Find the low watermark @@ -64,7 +63,7 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat // Find objects to be deleted by iterating over the graveyard revision index up // to the low watermark. - indexTree := txn.mustIndexReadTxn(tableName, GraveyardRevisionIndex) + indexTree := txn.mustIndexReadTxn(table.meta, GraveyardRevisionIndex) objIter := indexTree.Root().Iterator() for key, obj, ok := objIter.Next(); ok; key, obj, ok = objIter.Next() { @@ -93,12 +92,12 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat tableName := meta.Name() start := time.Now() for _, key := range deadObjs { - oldObj, existed := txn.mustIndexWriteTxn(tableName, GraveyardRevisionIndex).Delete(key) + oldObj, existed := txn.mustIndexWriteTxn(meta, GraveyardRevisionIndex).Delete(key) if existed { // The dead object still existed (and wasn't replaced by a create->delete), // delete it from the primary index. key = meta.primary().fromObject(oldObj).First() - txn.mustIndexWriteTxn(tableName, GraveyardIndex).Delete(key) + txn.mustIndexWriteTxn(meta, GraveyardIndex).Delete(key) } } cleaningTimes[tableName] = time.Since(start) @@ -114,8 +113,8 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat // Update object count metrics. txn = db.ReadTxn().getTxn() - tableIter = txn.rootReadTxn.Root().Iterator() - for name, table, ok := tableIter.Next(); ok; name, table, ok = tableIter.Next() { + for _, table := range txn.root { + name := table.meta.Name() db.metrics.GraveyardObjectCount(string(name), table.numDeletedObjects()) db.metrics.ObjectCount(string(name), table.numObjects()) } @@ -126,8 +125,7 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat // Used in tests. func (db *DB) graveyardIsEmpty() bool { txn := db.ReadTxn().getTxn() - tableIter := txn.rootReadTxn.Root().Iterator() - for _, table, ok := tableIter.Next(); ok; _, table, ok = tableIter.Next() { + for _, table := range txn.root { indexEntry, ok := table.indexes.Get([]byte(GraveyardIndex)) if !ok { panic("BUG: GraveyardIndex not found from table") diff --git a/metrics.go b/metrics.go index 0af0d29..3613cf2 100644 --- a/metrics.go +++ b/metrics.go @@ -10,7 +10,7 @@ import ( type Metrics interface { WriteTxnTableAcquisition(tableName string, acquire time.Duration) WriteTxnTotalAcquisition(goPackage string, tables []string, acquire time.Duration) - WriteTxnDuration(goPackage string, s []string, acquire time.Duration) + WriteTxnDuration(goPackage string, acquire time.Duration) GraveyardLowWatermark(tableName string, lowWatermark Revision) GraveyardCleaningDuration(tableName string, duration time.Duration) @@ -121,7 +121,7 @@ func (m *ExpVarMetrics) ObjectCount(name string, numObjects int) { m.ObjectCountVar.Set(name, &intVar) } -func (m *ExpVarMetrics) WriteTxnDuration(goPackage string, s []string, acquire time.Duration) { +func (m *ExpVarMetrics) WriteTxnDuration(goPackage string, acquire time.Duration) { m.WriteTxnDurationVar.AddFloat(goPackage, acquire.Seconds()) } @@ -162,7 +162,7 @@ func (*NopMetrics) Revision(tableName string, revision uint64) { } // WriteTxnDuration implements Metrics. -func (*NopMetrics) WriteTxnDuration(goPackage string, s []string, acquire time.Duration) { +func (*NopMetrics) WriteTxnDuration(goPackage string, acquire time.Duration) { } // WriteTxnTableAcquisition implements Metrics. diff --git a/table.go b/table.go index 121f138..fa4c11c 100644 --- a/table.go +++ b/table.go @@ -88,6 +88,7 @@ func MustNewTable[Obj any]( } type genTable[Obj any] struct { + pos int table TableName smu internal.SortableMutex primaryIndexer Indexer[Obj] @@ -95,6 +96,14 @@ type genTable[Obj any] struct { secondaryAnyIndexers map[string]anyIndexer } +func (t *genTable[Obj]) setTablePos(pos int) { + t.pos = pos +} + +func (t *genTable[Obj]) tablePos() int { + return t.pos +} + func (t *genTable[Obj]) tableKey() []byte { return []byte(t.table) } @@ -120,11 +129,11 @@ func (t *genTable[Obj]) ToTable() Table[Obj] { } func (t *genTable[Obj]) Revision(txn ReadTxn) Revision { - return txn.getTxn().GetRevision(t.table) + return txn.getTxn().getRevision(t) } func (t *genTable[Obj]) NumObjects(txn ReadTxn) int { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, t.primaryAnyIndexer.name) + indexTxn := txn.getTxn().mustIndexReadTxn(t, t.primaryAnyIndexer.name) return indexTxn.entry.tree.Len() } @@ -134,7 +143,7 @@ func (t *genTable[Obj]) First(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint } func (t *genTable[Obj]) FirstWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint64, watch <-chan struct{}, ok bool) { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index) iter := indexTxn.Root().Iterator() watch = iter.SeekPrefixWatch(q.key) @@ -172,7 +181,7 @@ func (t *genTable[Obj]) Last(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint6 } func (t *genTable[Obj]) LastWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint64, watch <-chan struct{}, ok bool) { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index) iter := indexTxn.Root().ReverseIterator() watch = iter.SeekPrefixWatch(q.key) @@ -205,7 +214,7 @@ func (t *genTable[Obj]) LastWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision } func (t *genTable[Obj]) LowerBound(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <-chan struct{}) { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index) root := indexTxn.Root() // Since LowerBound query may be invalidated by changes in another branch @@ -218,7 +227,7 @@ func (t *genTable[Obj]) LowerBound(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <- } func (t *genTable[Obj]) All(txn ReadTxn) (Iterator[Obj], <-chan struct{}) { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, t.primaryAnyIndexer.name) + indexTxn := txn.getTxn().mustIndexReadTxn(t, t.primaryAnyIndexer.name) root := indexTxn.Root() // Grab the watch channel for the root node watchCh, _, _ := root.GetWatch(nil) @@ -226,7 +235,7 @@ func (t *genTable[Obj]) All(txn ReadTxn) (Iterator[Obj], <-chan struct{}) { } func (t *genTable[Obj]) Get(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <-chan struct{}) { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index) iter := indexTxn.Root().Iterator() watchCh := iter.SeekPrefixWatch(q.key) diff --git a/txn.go b/txn.go index 8f06ef3..98defd0 100644 --- a/txn.go +++ b/txn.go @@ -14,7 +14,6 @@ import ( "time" iradix "github.com/hashicorp/go-immutable-radix/v2" - "golang.org/x/exp/maps" "github.com/cilium/statedb/index" "github.com/cilium/statedb/internal" @@ -22,17 +21,17 @@ import ( type txn struct { db *DB - rootReadTxn *iradix.Txn[tableEntry] // read transaction onto the tree of tables - writeTxns map[tableIndex]indexTxn // opened per-index write transactions - modifiedTables map[TableName]*tableEntry // table entries being modified - smus internal.SortableMutexes // the (sorted) table locks - acquiredAt time.Time // the time at which the transaction acquired the locks - packageName string // name of the package that created the transaction + root dbRoot + writeTxns map[tableIndex]indexTxn // opened per-index write transactions + modifiedTables []*tableEntry // table entries being modified + smus internal.SortableMutexes // the (sorted) table locks + acquiredAt time.Time // the time at which the transaction acquired the locks + packageName string // name of the package that created the transaction } type tableIndex struct { - table TableName - index IndexName + tablePos int + index IndexName } type indexTxn struct { @@ -58,33 +57,28 @@ func (txn *txn) getTxn() *txn { // has been Aborted or Committed. This is a safeguard against forgetting to // Abort/Commit which would cause the table to be locked forever. func txnFinalizer(txn *txn) { - if txn.writeTxns != nil { - panic(fmt.Sprintf("WriteTxn from package %q against tables %v was never Abort()'d or Commit()'d", txn.packageName, maps.Keys(txn.modifiedTables))) + if txn.db != nil { + panic(fmt.Sprintf("WriteTxn acquired by package %q was never Abort()'d or Commit()'d", txn.packageName)) } } -func (txn *txn) GetRevision(name TableName) Revision { - if table, ok := txn.modifiedTables[name]; ok { - // This is a write transaction preparing to modify the table with a - // new revision. - return table.revision - } - - // This is either a read transaction, or a write transaction to tables - // other than this table. Look up the revision from the index. - table, ok := txn.rootReadTxn.Get([]byte(name)) - if !ok { - panic("BUG: Table " + name + " not found") +func (txn *txn) getRevision(meta TableMeta) Revision { + if txn.modifiedTables != nil { + entry := txn.modifiedTables[meta.tablePos()] + if entry != nil { + return entry.revision + } } - return table.revision + return txn.root[meta.tablePos()].revision } // indexReadTxn returns a transaction to read from the specific index. // If the table or index is not found this returns nil & error. -func (txn *txn) indexReadTxn(name TableName, index IndexName) (indexTxn, error) { +func (txn *txn) indexReadTxn(meta TableMeta, index IndexName) (indexTxn, error) { if txn.writeTxns != nil { - if _, ok := txn.modifiedTables[name]; ok { - itxn, err := txn.indexWriteTxn(name, index) + entry := txn.modifiedTables[meta.tablePos()] + if entry != nil { + itxn, err := txn.indexWriteTxn(meta, index) if err == nil { return itxn.Clone(), nil } @@ -92,13 +86,10 @@ func (txn *txn) indexReadTxn(name TableName, index IndexName) (indexTxn, error) } } - table, ok := txn.rootReadTxn.Get([]byte(name)) - if !ok { - return indexTxn{}, fmt.Errorf("table %q not found", name) - } + table := txn.root[meta.tablePos()] indexEntry, ok := table.indexes.Get([]byte(index)) if !ok { - return indexTxn{}, fmt.Errorf("index %q not found from table %q", index, name) + return indexTxn{}, fmt.Errorf("index %q not found from table %q", index, meta.Name()) } return indexTxn{ @@ -108,29 +99,26 @@ func (txn *txn) indexReadTxn(name TableName, index IndexName) (indexTxn, error) // indexWriteTxn returns a transaction to read/write to a specific index. // The created transaction is memoized and used for subsequent reads and/or writes. -func (txn *txn) indexWriteTxn(name TableName, index IndexName) (indexTxn, error) { - if indexTreeTxn, ok := txn.writeTxns[tableIndex{name, index}]; ok { +func (txn *txn) indexWriteTxn(meta TableMeta, index IndexName) (indexTxn, error) { + if indexTreeTxn, ok := txn.writeTxns[tableIndex{meta.tablePos(), index}]; ok { return indexTreeTxn, nil } - table, ok := txn.modifiedTables[name] - if !ok { - return indexTxn{}, fmt.Errorf("table %q not found", name) - } + table := txn.modifiedTables[meta.tablePos()] indexEntry, ok := table.indexes.Get([]byte(index)) if !ok { - return indexTxn{}, fmt.Errorf("index %q not found from table %q", index, name) + return indexTxn{}, fmt.Errorf("index %q not found from table %q", index, meta.Name()) } itxn := indexEntry.tree.Txn() itxn.TrackMutate(true) indexWriteTxn := indexTxn{itxn, indexEntry} - txn.writeTxns[tableIndex{name, index}] = indexWriteTxn + txn.writeTxns[tableIndex{meta.tablePos(), index}] = indexWriteTxn return indexWriteTxn, nil } // mustIndexReadTxn returns a transaction to read from the specific index. // Panics if table or index are not found. -func (txn *txn) mustIndexReadTxn(name TableName, index IndexName) indexTxn { - indexTxn, err := txn.indexReadTxn(name, index) +func (txn *txn) mustIndexReadTxn(meta TableMeta, index IndexName) indexTxn { + indexTxn, err := txn.indexReadTxn(meta, index) if err != nil { panic(err) } @@ -139,8 +127,8 @@ func (txn *txn) mustIndexReadTxn(name TableName, index IndexName) indexTxn { // mustIndexReadTxn returns a transaction to read or write from the specific index. // Panics if table or index not found. -func (txn *txn) mustIndexWriteTxn(name TableName, index IndexName) indexTxn { - indexTxn, err := txn.indexWriteTxn(name, index) +func (txn *txn) mustIndexWriteTxn(meta TableMeta, index IndexName) indexTxn { + indexTxn, err := txn.indexWriteTxn(meta, index) if err != nil { panic(err) } @@ -148,14 +136,14 @@ func (txn *txn) mustIndexWriteTxn(name TableName, index IndexName) indexTxn { } func (txn *txn) Insert(meta TableMeta, guardRevision Revision, data any) (object, bool, error) { - if txn.rootReadTxn == nil { + if txn.db == nil { return object{}, false, ErrTransactionClosed } // Look up table and allocate a new revision. tableName := meta.Name() - table, ok := txn.modifiedTables[tableName] - if !ok { + table := txn.modifiedTables[meta.tablePos()] + if table == nil { return object{}, false, tableError(tableName, ErrTableNotLockedForWriting) } oldRevision := table.revision @@ -169,7 +157,7 @@ func (txn *txn) Insert(meta TableMeta, guardRevision Revision, data any) (object // Update the primary index first idKey := meta.primary().fromObject(obj).First() - idIndexTxn := txn.mustIndexWriteTxn(tableName, meta.primary().name) + idIndexTxn := txn.mustIndexWriteTxn(meta, meta.primary().name) oldObj, oldExists := idIndexTxn.Insert(idKey, obj) // Sanity check: is the same object being inserted back and thus the @@ -206,27 +194,26 @@ func (txn *txn) Insert(meta TableMeta, guardRevision Revision, data any) (object } // Update revision index - revIndexTxn := txn.mustIndexWriteTxn(tableName, RevisionIndex) + revIndexTxn := txn.mustIndexWriteTxn(meta, RevisionIndex) if oldExists { _, ok := revIndexTxn.Delete([]byte(index.Uint64(oldObj.revision))) if !ok { panic("BUG: Old revision index entry not found") } - } revIndexTxn.Insert([]byte(index.Uint64(revision)), obj) // If it's new, possibly remove an older deleted object with the same // primary key from the graveyard. - if !oldExists && txn.hasDeleteTrackers(tableName) { - if old, existed := txn.mustIndexWriteTxn(tableName, GraveyardIndex).Delete(idKey); existed { - txn.mustIndexWriteTxn(tableName, GraveyardRevisionIndex).Delete([]byte(index.Uint64(old.revision))) + if !oldExists && txn.hasDeleteTrackers(meta) { + if old, existed := txn.mustIndexWriteTxn(meta, GraveyardIndex).Delete(idKey); existed { + txn.mustIndexWriteTxn(meta, GraveyardRevisionIndex).Delete([]byte(index.Uint64(old.revision))) } } // Then update secondary indexes for idx, indexer := range meta.secondary() { - indexTxn := txn.mustIndexWriteTxn(tableName, idx) + indexTxn := txn.mustIndexWriteTxn(meta, idx) newKeys := indexer.fromObject(obj) if oldExists { @@ -255,29 +242,19 @@ func (txn *txn) Insert(meta TableMeta, guardRevision Revision, data any) (object return oldObj, oldExists, nil } -func (txn *txn) hasDeleteTrackers(name TableName) bool { - // Table is being modified, return the entry we're mutating, - // so we can read the latest changes. - table, ok := txn.modifiedTables[name] - if !ok { - // Table is not being modified, look it up from the root. - if t, ok := txn.rootReadTxn.Get([]byte(name)); ok { - table = &t - } else { - panic(fmt.Sprintf("BUG: table %q not found", name)) - } +func (txn *txn) hasDeleteTrackers(meta TableMeta) bool { + table := txn.modifiedTables[meta.tablePos()] + if table != nil { + return table.deleteTrackers.Len() > 0 } - return table.deleteTrackers.Len() > 0 + return txn.root[meta.tablePos()].deleteTrackers.Len() > 0 } func (txn *txn) addDeleteTracker(meta TableMeta, trackerName string, dt deleteTracker) error { - if txn.rootReadTxn == nil { + if txn.db == nil { return ErrTransactionClosed } - table, ok := txn.modifiedTables[meta.Name()] - if !ok { - return tableError(meta.Name(), ErrTableNotLockedForWriting) - } + table := txn.modifiedTables[meta.tablePos()] table.deleteTrackers, _, _ = table.deleteTrackers.Insert([]byte(trackerName), dt) txn.db.metrics.DeleteTrackerCount(meta.Name(), table.deleteTrackers.Len()) return nil @@ -285,14 +262,14 @@ func (txn *txn) addDeleteTracker(meta TableMeta, trackerName string, dt deleteTr } func (txn *txn) Delete(meta TableMeta, guardRevision Revision, data any) (object, bool, error) { - if txn.rootReadTxn == nil { + if txn.db == nil { return object{}, false, ErrTransactionClosed } // Look up table and allocate a new revision. tableName := meta.Name() - table, ok := txn.modifiedTables[tableName] - if !ok { + table := txn.modifiedTables[meta.tablePos()] + if table == nil { return object{}, false, tableError(tableName, ErrTableNotLockedForWriting) } oldRevision := table.revision @@ -303,7 +280,7 @@ func (txn *txn) Delete(meta TableMeta, guardRevision Revision, data any) (object // We assume that "data" has only enough defined fields to // compute the primary key. idKey := meta.primary().fromObject(object{data: data}).First() - idIndexTree := txn.mustIndexWriteTxn(tableName, meta.primary().name) + idIndexTree := txn.mustIndexWriteTxn(meta, meta.primary().name) obj, existed := idIndexTree.Delete(idKey) if !existed { return object{}, false, nil @@ -320,7 +297,7 @@ func (txn *txn) Delete(meta TableMeta, guardRevision Revision, data any) (object } // Update revision index. - indexTree := txn.mustIndexWriteTxn(tableName, RevisionIndex) + indexTree := txn.mustIndexWriteTxn(meta, RevisionIndex) if _, ok := indexTree.Delete(index.Uint64(obj.revision)); !ok { panic("BUG: Object to be deleted not found from revision index") } @@ -331,18 +308,18 @@ func (txn *txn) Delete(meta TableMeta, guardRevision Revision, data any) (object if !indexer.unique { key = encodeNonUniqueKey(idKey, key) } - txn.mustIndexWriteTxn(tableName, idx).Delete(key) + txn.mustIndexWriteTxn(meta, idx).Delete(key) }) } // And finally insert the object into the graveyard. - if txn.hasDeleteTrackers(tableName) { - graveyardIndex := txn.mustIndexWriteTxn(tableName, GraveyardIndex) + if txn.hasDeleteTrackers(meta) { + graveyardIndex := txn.mustIndexWriteTxn(meta, GraveyardIndex) obj.revision = revision if _, existed := graveyardIndex.Insert(idKey, obj); existed { panic("BUG: Double deletion! Deleted object already existed in graveyard") } - txn.mustIndexWriteTxn(tableName, GraveyardRevisionIndex).Insert(index.Uint64(revision), obj) + txn.mustIndexWriteTxn(meta, GraveyardRevisionIndex).Insert(index.Uint64(revision), obj) } return obj, true, nil @@ -399,7 +376,6 @@ func (txn *txn) Abort() { txn.smus.Unlock() txn.db.metrics.WriteTxnDuration( txn.packageName, - maps.Keys(txn.modifiedTables), time.Since(txn.acquiredAt)) *txn = zeroTxn @@ -438,18 +414,16 @@ func (txn *txn) Commit() { // We don't notify yet (CommitOnly) as the root needs to be updated // first as otherwise readers would wake up too early. for tableIndex, subTxn := range txn.writeTxns { - table, ok := txn.modifiedTables[tableIndex.table] - if !ok { - panic("BUG: Table " + tableIndex.table + " in writeTxns, but not in modifiedTables") - } + table := txn.modifiedTables[tableIndex.tablePos] subTxn.entry.tree = subTxn.CommitOnly() table.indexes, _, _ = table.indexes.Insert([]byte(tableIndex.index), subTxn.entry) // Update metrics - db.metrics.GraveyardObjectCount(tableIndex.table, table.numDeletedObjects()) - db.metrics.ObjectCount(tableIndex.table, table.numObjects()) - db.metrics.Revision(tableIndex.table, table.revision) + name := table.meta.Name() + db.metrics.GraveyardObjectCount(name, table.numDeletedObjects()) + db.metrics.ObjectCount(name, table.numObjects()) + db.metrics.Revision(name, table.revision) } // Acquire the lock on the root tree to sequence the updates to it. We can acquire @@ -460,17 +434,18 @@ func (txn *txn) Commit() { // Since the root may have changed since the pointer was last read in WriteTxn(), // load it again and modify the latest version that we now have immobilised by // the root lock. - rootTxn := db.root.Load().Txn() + root := *db.root.Load() // Insert the modified tables into the root tree of tables. - for name, table := range txn.modifiedTables { - rootTxn.Insert([]byte(name), *table) + for pos, table := range txn.modifiedTables { + if table != nil { + root[pos] = *table + } } // Commit the transaction to build the new root tree and then // atomically store it. - newRoot := rootTxn.CommitOnly() - db.root.Store(newRoot) + db.root.Store(&root) db.mu.Unlock() // With the root pointer updated, we can now release the tables for the next write transaction. @@ -481,11 +456,9 @@ func (txn *txn) Commit() { for _, subTxn := range txn.writeTxns { subTxn.Notify() } - rootTxn.Notify() txn.db.metrics.WriteTxnDuration( txn.packageName, - maps.Keys(txn.modifiedTables), time.Since(txn.acquiredAt)) // Zero out the transaction to make it inert. @@ -497,18 +470,18 @@ func (txn *txn) WriteJSON(w io.Writer) error { buf := bufio.NewWriter(w) buf.WriteString("{\n") first := true - for _, table := range txn.db.tables { + for _, table := range txn.root { if !first { buf.WriteString(",\n") } else { first = false } - indexTxn := txn.getTxn().mustIndexReadTxn(table.Name(), table.primary().name) + indexTxn := txn.getTxn().mustIndexReadTxn(table.meta, table.meta.primary().name) root := indexTxn.Root() iter := root.Iterator() - buf.WriteString(" \"" + table.Name() + "\": [\n") + buf.WriteString(" \"" + table.meta.Name() + "\": [\n") _, obj, ok := iter.Next() for ok { diff --git a/types.go b/types.go index 9fde16a..466a28b 100644 --- a/types.go +++ b/types.go @@ -149,7 +149,9 @@ type RWTable[Obj any] interface { // TableMeta provides information about the table that is independent of // the object type (the 'Obj' constraint). type TableMeta interface { - Name() TableName // The name of the table + Name() TableName // The name of the table + tablePos() int + setTablePos(int) tableKey() []byte // The radix key for the table in the root tree primary() anyIndexer // The untyped primary indexer for the table secondary() map[string]anyIndexer // Secondary indexers (if any)