diff --git a/db.go b/db.go index ec6a6b5..1bbee01 100644 --- a/db.go +++ b/db.go @@ -9,6 +9,7 @@ import ( "net/http" "reflect" "runtime" + "slices" "strings" "sync" "sync/atomic" @@ -87,29 +88,29 @@ import ( // the lowest revision of all delete trackers. type DB struct { mu sync.Mutex // protects 'tables' and sequences modifications to the root tree - tables map[TableName]TableMeta ctx context.Context cancel context.CancelFunc - root atomic.Pointer[iradix.Tree[tableEntry]] + root atomic.Pointer[dbRoot] gcTrigger chan struct{} // trigger for graveyard garbage collection gcExited chan struct{} gcRateLimitInterval time.Duration metrics Metrics } +type dbRoot []tableEntry + func NewDB(tables []TableMeta, metrics Metrics) (*DB, error) { - txn := iradix.New[tableEntry]().Txn() db := &DB{ - tables: make(map[TableName]TableMeta), metrics: metrics, gcRateLimitInterval: defaultGCRateLimitInterval, } + root := make(dbRoot, 0, len(tables)) for _, t := range tables { - if err := db.registerTable(t, txn); err != nil { + if err := db.registerTable(t, &root); err != nil { return nil, err } } - db.root.Store(txn.CommitOnly()) + db.root.Store(&root) return db, nil } @@ -128,25 +129,31 @@ func (db *DB) RegisterTable(table TableMeta, tables ...TableMeta) error { db.mu.Lock() defer db.mu.Unlock() - txn := db.root.Load().Txn() - if err := db.registerTable(table, txn); err != nil { + root := slices.Clone(*db.root.Load()) + + if err := db.registerTable(table, &root); err != nil { return err } for _, t := range tables { - if err := db.registerTable(t, txn); err != nil { + if err := db.registerTable(t, &root); err != nil { return err } } - db.root.Store(txn.CommitOnly()) + db.root.Store(&root) return nil } -func (db *DB) registerTable(table TableMeta, txn *iradix.Txn[tableEntry]) error { +func (db *DB) registerTable(table TableMeta, root *dbRoot) error { name := table.Name() - if _, ok := db.tables[name]; ok { - return tableError(name, ErrDuplicateTable) + for _, t := range *root { + if t.meta.Name() == name { + return tableError(name, ErrDuplicateTable) + } } - db.tables[name] = table + + pos := len(*root) + table.setTablePos(pos) + var entry tableEntry entry.meta = table entry.deleteTrackers = iradix.New[deleteTracker]() @@ -159,7 +166,8 @@ func (db *DB) registerTable(table TableMeta, txn *iradix.Txn[tableEntry]) error indexTxn.Insert([]byte(index), indexEntry{iradix.New[object](), indexer.unique}) } entry.indexes = indexTxn.CommitOnly() - txn.Insert(table.tableKey(), entry) + + *root = append(*root, entry) return nil } @@ -169,8 +177,8 @@ func (db *DB) registerTable(table TableMeta, txn *iradix.Txn[tableEntry]) error // ReadTxn is not thread-safe! func (db *DB) ReadTxn() ReadTxn { return &txn{ - db: db, - rootReadTxn: db.root.Load().Txn(), + db: db, + root: *db.root.Load(), } } @@ -192,15 +200,12 @@ func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn { smus.Lock() acquiredAt := time.Now() - rootReadTxn := db.root.Load().Txn() - tableEntries := make(map[TableName]*tableEntry, len(tables)) + root := *db.root.Load() + tableEntries := make([]*tableEntry, len(root)) var tableNames []string for _, table := range allTables { - tableEntry, ok := rootReadTxn.Get(table.tableKey()) - if !ok { - panic("BUG: Table '" + table.Name() + "' not found") - } - tableEntries[table.Name()] = &tableEntry + tableEntry := root[table.tablePos()] + tableEntries[table.tablePos()] = &tableEntry tableNames = append(tableNames, table.Name()) db.metrics.WriteTxnTableAcquisition( @@ -217,7 +222,7 @@ func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn { txn := &txn{ db: db, - rootReadTxn: rootReadTxn, + root: root, modifiedTables: tableEntries, writeTxns: make(map[tableIndex]indexTxn), smus: smus, @@ -278,6 +283,7 @@ var ciliumPackagePrefix = func() string { func callerPackage() string { var callerPkg string pc, _, _, ok := runtime.Caller(2) + if ok { f := runtime.FuncForPC(pc) if f != nil { @@ -288,7 +294,9 @@ func callerPackage() string { callerPkg = "unknown" } } else { + callerPkg = "unknown" } + return callerPkg } diff --git a/deletetracker.go b/deletetracker.go index 5ccd371..8f87914 100644 --- a/deletetracker.go +++ b/deletetracker.go @@ -36,7 +36,7 @@ func (dt *DeleteTracker[Obj]) getRevision() uint64 { // 'minRevision'. The deleted objects are not garbage-collected unless 'Mark' is // called! func (dt *DeleteTracker[Obj]) Deleted(txn ReadTxn, minRevision Revision) Iterator[Obj] { - indexTxn := txn.getTxn().mustIndexReadTxn(dt.table.Name(), GraveyardRevisionIndex) + indexTxn := txn.getTxn().mustIndexReadTxn(dt.table, GraveyardRevisionIndex) iter := indexTxn.Root().Iterator() iter.SeekLowerBound(index.Uint64(minRevision)) return &iterator[Obj]{iter} @@ -57,7 +57,7 @@ func (dt *DeleteTracker[Obj]) Close() { // Remove the delete tracker from the table. txn := dt.db.WriteTxn(dt.table).getTxn() db := txn.db - table := txn.modifiedTables[dt.table.Name()] + table := txn.modifiedTables[dt.table.tablePos()] if table == nil { panic("BUG: Table missing from write transaction") } diff --git a/graveyard.go b/graveyard.go index 2f7c5f6..f18a116 100644 --- a/graveyard.go +++ b/graveyard.go @@ -39,9 +39,8 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat // Do a lockless read transaction to find potential dead objects. txn := db.ReadTxn().getTxn() - tableIter := txn.rootReadTxn.Root().Iterator() - for nameKey, table, ok := tableIter.Next(); ok; nameKey, table, ok = tableIter.Next() { - tableName := string(nameKey) + for _, table := range txn.root { + tableName := table.meta.Name() start := time.Now() // Find the low watermark @@ -64,7 +63,7 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat // Find objects to be deleted by iterating over the graveyard revision index up // to the low watermark. - indexTree := txn.mustIndexReadTxn(tableName, GraveyardRevisionIndex) + indexTree := txn.mustIndexReadTxn(table.meta, GraveyardRevisionIndex) objIter := indexTree.Root().Iterator() for key, obj, ok := objIter.Next(); ok; key, obj, ok = objIter.Next() { @@ -93,12 +92,12 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat tableName := meta.Name() start := time.Now() for _, key := range deadObjs { - oldObj, existed := txn.mustIndexWriteTxn(tableName, GraveyardRevisionIndex).Delete(key) + oldObj, existed := txn.mustIndexWriteTxn(meta, GraveyardRevisionIndex).Delete(key) if existed { // The dead object still existed (and wasn't replaced by a create->delete), // delete it from the primary index. key = meta.primary().fromObject(oldObj).First() - txn.mustIndexWriteTxn(tableName, GraveyardIndex).Delete(key) + txn.mustIndexWriteTxn(meta, GraveyardIndex).Delete(key) } } cleaningTimes[tableName] = time.Since(start) @@ -114,8 +113,8 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat // Update object count metrics. txn = db.ReadTxn().getTxn() - tableIter = txn.rootReadTxn.Root().Iterator() - for name, table, ok := tableIter.Next(); ok; name, table, ok = tableIter.Next() { + for _, table := range txn.root { + name := table.meta.Name() db.metrics.GraveyardObjectCount(string(name), table.numDeletedObjects()) db.metrics.ObjectCount(string(name), table.numObjects()) } @@ -126,8 +125,7 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat // Used in tests. func (db *DB) graveyardIsEmpty() bool { txn := db.ReadTxn().getTxn() - tableIter := txn.rootReadTxn.Root().Iterator() - for _, table, ok := tableIter.Next(); ok; _, table, ok = tableIter.Next() { + for _, table := range txn.root { indexEntry, ok := table.indexes.Get([]byte(GraveyardIndex)) if !ok { panic("BUG: GraveyardIndex not found from table") diff --git a/metrics.go b/metrics.go index 0af0d29..3613cf2 100644 --- a/metrics.go +++ b/metrics.go @@ -10,7 +10,7 @@ import ( type Metrics interface { WriteTxnTableAcquisition(tableName string, acquire time.Duration) WriteTxnTotalAcquisition(goPackage string, tables []string, acquire time.Duration) - WriteTxnDuration(goPackage string, s []string, acquire time.Duration) + WriteTxnDuration(goPackage string, acquire time.Duration) GraveyardLowWatermark(tableName string, lowWatermark Revision) GraveyardCleaningDuration(tableName string, duration time.Duration) @@ -121,7 +121,7 @@ func (m *ExpVarMetrics) ObjectCount(name string, numObjects int) { m.ObjectCountVar.Set(name, &intVar) } -func (m *ExpVarMetrics) WriteTxnDuration(goPackage string, s []string, acquire time.Duration) { +func (m *ExpVarMetrics) WriteTxnDuration(goPackage string, acquire time.Duration) { m.WriteTxnDurationVar.AddFloat(goPackage, acquire.Seconds()) } @@ -162,7 +162,7 @@ func (*NopMetrics) Revision(tableName string, revision uint64) { } // WriteTxnDuration implements Metrics. -func (*NopMetrics) WriteTxnDuration(goPackage string, s []string, acquire time.Duration) { +func (*NopMetrics) WriteTxnDuration(goPackage string, acquire time.Duration) { } // WriteTxnTableAcquisition implements Metrics. diff --git a/table.go b/table.go index 121f138..fa4c11c 100644 --- a/table.go +++ b/table.go @@ -88,6 +88,7 @@ func MustNewTable[Obj any]( } type genTable[Obj any] struct { + pos int table TableName smu internal.SortableMutex primaryIndexer Indexer[Obj] @@ -95,6 +96,14 @@ type genTable[Obj any] struct { secondaryAnyIndexers map[string]anyIndexer } +func (t *genTable[Obj]) setTablePos(pos int) { + t.pos = pos +} + +func (t *genTable[Obj]) tablePos() int { + return t.pos +} + func (t *genTable[Obj]) tableKey() []byte { return []byte(t.table) } @@ -120,11 +129,11 @@ func (t *genTable[Obj]) ToTable() Table[Obj] { } func (t *genTable[Obj]) Revision(txn ReadTxn) Revision { - return txn.getTxn().GetRevision(t.table) + return txn.getTxn().getRevision(t) } func (t *genTable[Obj]) NumObjects(txn ReadTxn) int { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, t.primaryAnyIndexer.name) + indexTxn := txn.getTxn().mustIndexReadTxn(t, t.primaryAnyIndexer.name) return indexTxn.entry.tree.Len() } @@ -134,7 +143,7 @@ func (t *genTable[Obj]) First(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint } func (t *genTable[Obj]) FirstWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint64, watch <-chan struct{}, ok bool) { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index) iter := indexTxn.Root().Iterator() watch = iter.SeekPrefixWatch(q.key) @@ -172,7 +181,7 @@ func (t *genTable[Obj]) Last(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint6 } func (t *genTable[Obj]) LastWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint64, watch <-chan struct{}, ok bool) { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index) iter := indexTxn.Root().ReverseIterator() watch = iter.SeekPrefixWatch(q.key) @@ -205,7 +214,7 @@ func (t *genTable[Obj]) LastWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision } func (t *genTable[Obj]) LowerBound(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <-chan struct{}) { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index) root := indexTxn.Root() // Since LowerBound query may be invalidated by changes in another branch @@ -218,7 +227,7 @@ func (t *genTable[Obj]) LowerBound(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <- } func (t *genTable[Obj]) All(txn ReadTxn) (Iterator[Obj], <-chan struct{}) { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, t.primaryAnyIndexer.name) + indexTxn := txn.getTxn().mustIndexReadTxn(t, t.primaryAnyIndexer.name) root := indexTxn.Root() // Grab the watch channel for the root node watchCh, _, _ := root.GetWatch(nil) @@ -226,7 +235,7 @@ func (t *genTable[Obj]) All(txn ReadTxn) (Iterator[Obj], <-chan struct{}) { } func (t *genTable[Obj]) Get(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <-chan struct{}) { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index) iter := indexTxn.Root().Iterator() watchCh := iter.SeekPrefixWatch(q.key) diff --git a/txn.go b/txn.go index 8f06ef3..98defd0 100644 --- a/txn.go +++ b/txn.go @@ -14,7 +14,6 @@ import ( "time" iradix "github.com/hashicorp/go-immutable-radix/v2" - "golang.org/x/exp/maps" "github.com/cilium/statedb/index" "github.com/cilium/statedb/internal" @@ -22,17 +21,17 @@ import ( type txn struct { db *DB - rootReadTxn *iradix.Txn[tableEntry] // read transaction onto the tree of tables - writeTxns map[tableIndex]indexTxn // opened per-index write transactions - modifiedTables map[TableName]*tableEntry // table entries being modified - smus internal.SortableMutexes // the (sorted) table locks - acquiredAt time.Time // the time at which the transaction acquired the locks - packageName string // name of the package that created the transaction + root dbRoot + writeTxns map[tableIndex]indexTxn // opened per-index write transactions + modifiedTables []*tableEntry // table entries being modified + smus internal.SortableMutexes // the (sorted) table locks + acquiredAt time.Time // the time at which the transaction acquired the locks + packageName string // name of the package that created the transaction } type tableIndex struct { - table TableName - index IndexName + tablePos int + index IndexName } type indexTxn struct { @@ -58,33 +57,28 @@ func (txn *txn) getTxn() *txn { // has been Aborted or Committed. This is a safeguard against forgetting to // Abort/Commit which would cause the table to be locked forever. func txnFinalizer(txn *txn) { - if txn.writeTxns != nil { - panic(fmt.Sprintf("WriteTxn from package %q against tables %v was never Abort()'d or Commit()'d", txn.packageName, maps.Keys(txn.modifiedTables))) + if txn.db != nil { + panic(fmt.Sprintf("WriteTxn acquired by package %q was never Abort()'d or Commit()'d", txn.packageName)) } } -func (txn *txn) GetRevision(name TableName) Revision { - if table, ok := txn.modifiedTables[name]; ok { - // This is a write transaction preparing to modify the table with a - // new revision. - return table.revision - } - - // This is either a read transaction, or a write transaction to tables - // other than this table. Look up the revision from the index. - table, ok := txn.rootReadTxn.Get([]byte(name)) - if !ok { - panic("BUG: Table " + name + " not found") +func (txn *txn) getRevision(meta TableMeta) Revision { + if txn.modifiedTables != nil { + entry := txn.modifiedTables[meta.tablePos()] + if entry != nil { + return entry.revision + } } - return table.revision + return txn.root[meta.tablePos()].revision } // indexReadTxn returns a transaction to read from the specific index. // If the table or index is not found this returns nil & error. -func (txn *txn) indexReadTxn(name TableName, index IndexName) (indexTxn, error) { +func (txn *txn) indexReadTxn(meta TableMeta, index IndexName) (indexTxn, error) { if txn.writeTxns != nil { - if _, ok := txn.modifiedTables[name]; ok { - itxn, err := txn.indexWriteTxn(name, index) + entry := txn.modifiedTables[meta.tablePos()] + if entry != nil { + itxn, err := txn.indexWriteTxn(meta, index) if err == nil { return itxn.Clone(), nil } @@ -92,13 +86,10 @@ func (txn *txn) indexReadTxn(name TableName, index IndexName) (indexTxn, error) } } - table, ok := txn.rootReadTxn.Get([]byte(name)) - if !ok { - return indexTxn{}, fmt.Errorf("table %q not found", name) - } + table := txn.root[meta.tablePos()] indexEntry, ok := table.indexes.Get([]byte(index)) if !ok { - return indexTxn{}, fmt.Errorf("index %q not found from table %q", index, name) + return indexTxn{}, fmt.Errorf("index %q not found from table %q", index, meta.Name()) } return indexTxn{ @@ -108,29 +99,26 @@ func (txn *txn) indexReadTxn(name TableName, index IndexName) (indexTxn, error) // indexWriteTxn returns a transaction to read/write to a specific index. // The created transaction is memoized and used for subsequent reads and/or writes. -func (txn *txn) indexWriteTxn(name TableName, index IndexName) (indexTxn, error) { - if indexTreeTxn, ok := txn.writeTxns[tableIndex{name, index}]; ok { +func (txn *txn) indexWriteTxn(meta TableMeta, index IndexName) (indexTxn, error) { + if indexTreeTxn, ok := txn.writeTxns[tableIndex{meta.tablePos(), index}]; ok { return indexTreeTxn, nil } - table, ok := txn.modifiedTables[name] - if !ok { - return indexTxn{}, fmt.Errorf("table %q not found", name) - } + table := txn.modifiedTables[meta.tablePos()] indexEntry, ok := table.indexes.Get([]byte(index)) if !ok { - return indexTxn{}, fmt.Errorf("index %q not found from table %q", index, name) + return indexTxn{}, fmt.Errorf("index %q not found from table %q", index, meta.Name()) } itxn := indexEntry.tree.Txn() itxn.TrackMutate(true) indexWriteTxn := indexTxn{itxn, indexEntry} - txn.writeTxns[tableIndex{name, index}] = indexWriteTxn + txn.writeTxns[tableIndex{meta.tablePos(), index}] = indexWriteTxn return indexWriteTxn, nil } // mustIndexReadTxn returns a transaction to read from the specific index. // Panics if table or index are not found. -func (txn *txn) mustIndexReadTxn(name TableName, index IndexName) indexTxn { - indexTxn, err := txn.indexReadTxn(name, index) +func (txn *txn) mustIndexReadTxn(meta TableMeta, index IndexName) indexTxn { + indexTxn, err := txn.indexReadTxn(meta, index) if err != nil { panic(err) } @@ -139,8 +127,8 @@ func (txn *txn) mustIndexReadTxn(name TableName, index IndexName) indexTxn { // mustIndexReadTxn returns a transaction to read or write from the specific index. // Panics if table or index not found. -func (txn *txn) mustIndexWriteTxn(name TableName, index IndexName) indexTxn { - indexTxn, err := txn.indexWriteTxn(name, index) +func (txn *txn) mustIndexWriteTxn(meta TableMeta, index IndexName) indexTxn { + indexTxn, err := txn.indexWriteTxn(meta, index) if err != nil { panic(err) } @@ -148,14 +136,14 @@ func (txn *txn) mustIndexWriteTxn(name TableName, index IndexName) indexTxn { } func (txn *txn) Insert(meta TableMeta, guardRevision Revision, data any) (object, bool, error) { - if txn.rootReadTxn == nil { + if txn.db == nil { return object{}, false, ErrTransactionClosed } // Look up table and allocate a new revision. tableName := meta.Name() - table, ok := txn.modifiedTables[tableName] - if !ok { + table := txn.modifiedTables[meta.tablePos()] + if table == nil { return object{}, false, tableError(tableName, ErrTableNotLockedForWriting) } oldRevision := table.revision @@ -169,7 +157,7 @@ func (txn *txn) Insert(meta TableMeta, guardRevision Revision, data any) (object // Update the primary index first idKey := meta.primary().fromObject(obj).First() - idIndexTxn := txn.mustIndexWriteTxn(tableName, meta.primary().name) + idIndexTxn := txn.mustIndexWriteTxn(meta, meta.primary().name) oldObj, oldExists := idIndexTxn.Insert(idKey, obj) // Sanity check: is the same object being inserted back and thus the @@ -206,27 +194,26 @@ func (txn *txn) Insert(meta TableMeta, guardRevision Revision, data any) (object } // Update revision index - revIndexTxn := txn.mustIndexWriteTxn(tableName, RevisionIndex) + revIndexTxn := txn.mustIndexWriteTxn(meta, RevisionIndex) if oldExists { _, ok := revIndexTxn.Delete([]byte(index.Uint64(oldObj.revision))) if !ok { panic("BUG: Old revision index entry not found") } - } revIndexTxn.Insert([]byte(index.Uint64(revision)), obj) // If it's new, possibly remove an older deleted object with the same // primary key from the graveyard. - if !oldExists && txn.hasDeleteTrackers(tableName) { - if old, existed := txn.mustIndexWriteTxn(tableName, GraveyardIndex).Delete(idKey); existed { - txn.mustIndexWriteTxn(tableName, GraveyardRevisionIndex).Delete([]byte(index.Uint64(old.revision))) + if !oldExists && txn.hasDeleteTrackers(meta) { + if old, existed := txn.mustIndexWriteTxn(meta, GraveyardIndex).Delete(idKey); existed { + txn.mustIndexWriteTxn(meta, GraveyardRevisionIndex).Delete([]byte(index.Uint64(old.revision))) } } // Then update secondary indexes for idx, indexer := range meta.secondary() { - indexTxn := txn.mustIndexWriteTxn(tableName, idx) + indexTxn := txn.mustIndexWriteTxn(meta, idx) newKeys := indexer.fromObject(obj) if oldExists { @@ -255,29 +242,19 @@ func (txn *txn) Insert(meta TableMeta, guardRevision Revision, data any) (object return oldObj, oldExists, nil } -func (txn *txn) hasDeleteTrackers(name TableName) bool { - // Table is being modified, return the entry we're mutating, - // so we can read the latest changes. - table, ok := txn.modifiedTables[name] - if !ok { - // Table is not being modified, look it up from the root. - if t, ok := txn.rootReadTxn.Get([]byte(name)); ok { - table = &t - } else { - panic(fmt.Sprintf("BUG: table %q not found", name)) - } +func (txn *txn) hasDeleteTrackers(meta TableMeta) bool { + table := txn.modifiedTables[meta.tablePos()] + if table != nil { + return table.deleteTrackers.Len() > 0 } - return table.deleteTrackers.Len() > 0 + return txn.root[meta.tablePos()].deleteTrackers.Len() > 0 } func (txn *txn) addDeleteTracker(meta TableMeta, trackerName string, dt deleteTracker) error { - if txn.rootReadTxn == nil { + if txn.db == nil { return ErrTransactionClosed } - table, ok := txn.modifiedTables[meta.Name()] - if !ok { - return tableError(meta.Name(), ErrTableNotLockedForWriting) - } + table := txn.modifiedTables[meta.tablePos()] table.deleteTrackers, _, _ = table.deleteTrackers.Insert([]byte(trackerName), dt) txn.db.metrics.DeleteTrackerCount(meta.Name(), table.deleteTrackers.Len()) return nil @@ -285,14 +262,14 @@ func (txn *txn) addDeleteTracker(meta TableMeta, trackerName string, dt deleteTr } func (txn *txn) Delete(meta TableMeta, guardRevision Revision, data any) (object, bool, error) { - if txn.rootReadTxn == nil { + if txn.db == nil { return object{}, false, ErrTransactionClosed } // Look up table and allocate a new revision. tableName := meta.Name() - table, ok := txn.modifiedTables[tableName] - if !ok { + table := txn.modifiedTables[meta.tablePos()] + if table == nil { return object{}, false, tableError(tableName, ErrTableNotLockedForWriting) } oldRevision := table.revision @@ -303,7 +280,7 @@ func (txn *txn) Delete(meta TableMeta, guardRevision Revision, data any) (object // We assume that "data" has only enough defined fields to // compute the primary key. idKey := meta.primary().fromObject(object{data: data}).First() - idIndexTree := txn.mustIndexWriteTxn(tableName, meta.primary().name) + idIndexTree := txn.mustIndexWriteTxn(meta, meta.primary().name) obj, existed := idIndexTree.Delete(idKey) if !existed { return object{}, false, nil @@ -320,7 +297,7 @@ func (txn *txn) Delete(meta TableMeta, guardRevision Revision, data any) (object } // Update revision index. - indexTree := txn.mustIndexWriteTxn(tableName, RevisionIndex) + indexTree := txn.mustIndexWriteTxn(meta, RevisionIndex) if _, ok := indexTree.Delete(index.Uint64(obj.revision)); !ok { panic("BUG: Object to be deleted not found from revision index") } @@ -331,18 +308,18 @@ func (txn *txn) Delete(meta TableMeta, guardRevision Revision, data any) (object if !indexer.unique { key = encodeNonUniqueKey(idKey, key) } - txn.mustIndexWriteTxn(tableName, idx).Delete(key) + txn.mustIndexWriteTxn(meta, idx).Delete(key) }) } // And finally insert the object into the graveyard. - if txn.hasDeleteTrackers(tableName) { - graveyardIndex := txn.mustIndexWriteTxn(tableName, GraveyardIndex) + if txn.hasDeleteTrackers(meta) { + graveyardIndex := txn.mustIndexWriteTxn(meta, GraveyardIndex) obj.revision = revision if _, existed := graveyardIndex.Insert(idKey, obj); existed { panic("BUG: Double deletion! Deleted object already existed in graveyard") } - txn.mustIndexWriteTxn(tableName, GraveyardRevisionIndex).Insert(index.Uint64(revision), obj) + txn.mustIndexWriteTxn(meta, GraveyardRevisionIndex).Insert(index.Uint64(revision), obj) } return obj, true, nil @@ -399,7 +376,6 @@ func (txn *txn) Abort() { txn.smus.Unlock() txn.db.metrics.WriteTxnDuration( txn.packageName, - maps.Keys(txn.modifiedTables), time.Since(txn.acquiredAt)) *txn = zeroTxn @@ -438,18 +414,16 @@ func (txn *txn) Commit() { // We don't notify yet (CommitOnly) as the root needs to be updated // first as otherwise readers would wake up too early. for tableIndex, subTxn := range txn.writeTxns { - table, ok := txn.modifiedTables[tableIndex.table] - if !ok { - panic("BUG: Table " + tableIndex.table + " in writeTxns, but not in modifiedTables") - } + table := txn.modifiedTables[tableIndex.tablePos] subTxn.entry.tree = subTxn.CommitOnly() table.indexes, _, _ = table.indexes.Insert([]byte(tableIndex.index), subTxn.entry) // Update metrics - db.metrics.GraveyardObjectCount(tableIndex.table, table.numDeletedObjects()) - db.metrics.ObjectCount(tableIndex.table, table.numObjects()) - db.metrics.Revision(tableIndex.table, table.revision) + name := table.meta.Name() + db.metrics.GraveyardObjectCount(name, table.numDeletedObjects()) + db.metrics.ObjectCount(name, table.numObjects()) + db.metrics.Revision(name, table.revision) } // Acquire the lock on the root tree to sequence the updates to it. We can acquire @@ -460,17 +434,18 @@ func (txn *txn) Commit() { // Since the root may have changed since the pointer was last read in WriteTxn(), // load it again and modify the latest version that we now have immobilised by // the root lock. - rootTxn := db.root.Load().Txn() + root := *db.root.Load() // Insert the modified tables into the root tree of tables. - for name, table := range txn.modifiedTables { - rootTxn.Insert([]byte(name), *table) + for pos, table := range txn.modifiedTables { + if table != nil { + root[pos] = *table + } } // Commit the transaction to build the new root tree and then // atomically store it. - newRoot := rootTxn.CommitOnly() - db.root.Store(newRoot) + db.root.Store(&root) db.mu.Unlock() // With the root pointer updated, we can now release the tables for the next write transaction. @@ -481,11 +456,9 @@ func (txn *txn) Commit() { for _, subTxn := range txn.writeTxns { subTxn.Notify() } - rootTxn.Notify() txn.db.metrics.WriteTxnDuration( txn.packageName, - maps.Keys(txn.modifiedTables), time.Since(txn.acquiredAt)) // Zero out the transaction to make it inert. @@ -497,18 +470,18 @@ func (txn *txn) WriteJSON(w io.Writer) error { buf := bufio.NewWriter(w) buf.WriteString("{\n") first := true - for _, table := range txn.db.tables { + for _, table := range txn.root { if !first { buf.WriteString(",\n") } else { first = false } - indexTxn := txn.getTxn().mustIndexReadTxn(table.Name(), table.primary().name) + indexTxn := txn.getTxn().mustIndexReadTxn(table.meta, table.meta.primary().name) root := indexTxn.Root() iter := root.Iterator() - buf.WriteString(" \"" + table.Name() + "\": [\n") + buf.WriteString(" \"" + table.meta.Name() + "\": [\n") _, obj, ok := iter.Next() for ok { diff --git a/types.go b/types.go index 9fde16a..466a28b 100644 --- a/types.go +++ b/types.go @@ -149,7 +149,9 @@ type RWTable[Obj any] interface { // TableMeta provides information about the table that is independent of // the object type (the 'Obj' constraint). type TableMeta interface { - Name() TableName // The name of the table + Name() TableName // The name of the table + tablePos() int + setTablePos(int) tableKey() []byte // The radix key for the table in the root tree primary() anyIndexer // The untyped primary indexer for the table secondary() map[string]anyIndexer // Secondary indexers (if any)