diff --git a/db.go b/db.go index 1bbee01..372c1f8 100644 --- a/db.go +++ b/db.go @@ -15,8 +15,6 @@ import ( "sync/atomic" "time" - iradix "github.com/hashicorp/go-immutable-radix/v2" - "github.com/cilium/hive/cell" "github.com/cilium/statedb/internal" ) @@ -153,21 +151,7 @@ func (db *DB) registerTable(table TableMeta, root *dbRoot) error { pos := len(*root) table.setTablePos(pos) - - var entry tableEntry - entry.meta = table - entry.deleteTrackers = iradix.New[deleteTracker]() - indexTxn := iradix.New[indexEntry]().Txn() - indexTxn.Insert([]byte(table.primary().name), indexEntry{iradix.New[object](), true}) - indexTxn.Insert([]byte(RevisionIndex), indexEntry{iradix.New[object](), true}) - indexTxn.Insert([]byte(GraveyardIndex), indexEntry{iradix.New[object](), true}) - indexTxn.Insert([]byte(GraveyardRevisionIndex), indexEntry{iradix.New[object](), true}) - for index, indexer := range table.secondary() { - indexTxn.Insert([]byte(index), indexEntry{iradix.New[object](), indexer.unique}) - } - entry.indexes = indexTxn.CommitOnly() - - *root = append(*root, entry) + *root = append(*root, table.tableEntry()) return nil } @@ -205,6 +189,7 @@ func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn { var tableNames []string for _, table := range allTables { tableEntry := root[table.tablePos()] + tableEntry.indexes = slices.Clone(tableEntry.indexes) tableEntries[table.tablePos()] = &tableEntry tableNames = append(tableNames, table.Name()) @@ -224,7 +209,6 @@ func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn { db: db, root: root, modifiedTables: tableEntries, - writeTxns: make(map[tableIndex]indexTxn), smus: smus, acquiredAt: acquiredAt, packageName: callerPkg, diff --git a/db_test.go b/db_test.go index 01143e5..143d337 100644 --- a/db_test.go +++ b/db_test.go @@ -424,7 +424,7 @@ func TestDB_All(t *testing.T) { select { case <-watch: - t.Fatalf("expected All() watch channel to not close before changes") + t.Fatalf("expected All() watch channel to not close before delete") default: } @@ -434,10 +434,15 @@ func TestDB_All(t *testing.T) { txn.Commit() } + // Prior read transaction not affected by delete. + iter, _ = table.All(txn) + objs = Collect(iter) + require.Len(t, objs, 3) + select { case <-watch: case <-time.After(time.Second): - t.Fatalf("expceted All() watch channel to close after changes") + t.Fatalf("expected All() watch channel to close after delete") } } diff --git a/deletetracker.go b/deletetracker.go index 8f87914..096902c 100644 --- a/deletetracker.go +++ b/deletetracker.go @@ -36,7 +36,7 @@ func (dt *DeleteTracker[Obj]) getRevision() uint64 { // 'minRevision'. The deleted objects are not garbage-collected unless 'Mark' is // called! func (dt *DeleteTracker[Obj]) Deleted(txn ReadTxn, minRevision Revision) Iterator[Obj] { - indexTxn := txn.getTxn().mustIndexReadTxn(dt.table, GraveyardRevisionIndex) + indexTxn := txn.getTxn().mustIndexReadTxn(dt.table, GraveyardRevisionIndexPos) iter := indexTxn.Root().Iterator() iter.SeekLowerBound(index.Uint64(minRevision)) return &iterator[Obj]{iter} diff --git a/graveyard.go b/graveyard.go index f18a116..0b95fb7 100644 --- a/graveyard.go +++ b/graveyard.go @@ -63,7 +63,7 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat // Find objects to be deleted by iterating over the graveyard revision index up // to the low watermark. - indexTree := txn.mustIndexReadTxn(table.meta, GraveyardRevisionIndex) + indexTree := txn.mustIndexReadTxn(table.meta, GraveyardRevisionIndexPos) objIter := indexTree.Root().Iterator() for key, obj, ok := objIter.Next(); ok; key, obj, ok = objIter.Next() { @@ -92,12 +92,12 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat tableName := meta.Name() start := time.Now() for _, key := range deadObjs { - oldObj, existed := txn.mustIndexWriteTxn(meta, GraveyardRevisionIndex).Delete(key) + oldObj, existed := txn.mustIndexWriteTxn(meta, GraveyardRevisionIndexPos).Delete(key) if existed { // The dead object still existed (and wasn't replaced by a create->delete), // delete it from the primary index. key = meta.primary().fromObject(oldObj).First() - txn.mustIndexWriteTxn(meta, GraveyardIndex).Delete(key) + txn.mustIndexWriteTxn(meta, GraveyardIndexPos).Delete(key) } } cleaningTimes[tableName] = time.Since(start) @@ -126,10 +126,7 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat func (db *DB) graveyardIsEmpty() bool { txn := db.ReadTxn().getTxn() for _, table := range txn.root { - indexEntry, ok := table.indexes.Get([]byte(GraveyardIndex)) - if !ok { - panic("BUG: GraveyardIndex not found from table") - } + indexEntry := table.indexes[table.meta.indexPos(GraveyardIndex)] if indexEntry.tree.Len() != 0 { return false } diff --git a/table.go b/table.go index fa4c11c..9dcbcac 100644 --- a/table.go +++ b/table.go @@ -10,6 +10,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "github.com/cilium/statedb/internal" + iradix "github.com/hashicorp/go-immutable-radix/v2" "github.com/cilium/statedb/index" ) @@ -46,10 +47,23 @@ func NewTable[Obj any]( primaryAnyIndexer: toAnyIndexer(primaryIndexer), primaryIndexer: primaryIndexer, secondaryAnyIndexers: make(map[string]anyIndexer, len(secondaryIndexers)), + indexPositions: make(map[string]int), } + table.indexPositions[primaryIndexer.indexName()] = PrimaryIndexPos + + // Internal indexes + table.indexPositions[RevisionIndex] = RevisionIndexPos + table.indexPositions[GraveyardIndex] = GraveyardIndexPos + table.indexPositions[GraveyardRevisionIndex] = GraveyardRevisionIndexPos + + indexPos := SecondaryIndexStartPos for _, indexer := range secondaryIndexers { - table.secondaryAnyIndexers[indexer.indexName()] = toAnyIndexer(indexer) + anyIndexer := toAnyIndexer(indexer) + anyIndexer.pos = indexPos + table.secondaryAnyIndexers[indexer.indexName()] = anyIndexer + table.indexPositions[indexer.indexName()] = indexPos + indexPos++ } // Primary index must always be unique @@ -94,6 +108,23 @@ type genTable[Obj any] struct { primaryIndexer Indexer[Obj] primaryAnyIndexer anyIndexer secondaryAnyIndexers map[string]anyIndexer + indexPositions map[string]int +} + +func (t *genTable[Obj]) tableEntry() tableEntry { + var entry tableEntry + entry.meta = t + entry.deleteTrackers = iradix.New[deleteTracker]() + entry.indexes = make([]indexEntry, len(t.indexPositions)) + entry.indexes[t.indexPositions[t.primaryIndexer.indexName()]] = indexEntry{iradix.New[object](), nil, true} + + for index, indexer := range t.secondaryAnyIndexers { + entry.indexes[t.indexPositions[index]] = indexEntry{iradix.New[object](), nil, indexer.unique} + } + entry.indexes[t.indexPositions[RevisionIndex]] = indexEntry{iradix.New[object](), nil, true} + entry.indexes[t.indexPositions[GraveyardIndex]] = indexEntry{iradix.New[object](), nil, true} + entry.indexes[t.indexPositions[GraveyardRevisionIndex]] = indexEntry{iradix.New[object](), nil, true} + return entry } func (t *genTable[Obj]) setTablePos(pos int) { @@ -108,6 +139,13 @@ func (t *genTable[Obj]) tableKey() []byte { return []byte(t.table) } +func (t *genTable[Obj]) indexPos(name string) int { + if t.primaryAnyIndexer.name == name { + return PrimaryIndexPos + } + return t.indexPositions[name] +} + func (t *genTable[Obj]) PrimaryIndexer() Indexer[Obj] { return t.primaryIndexer } @@ -133,8 +171,8 @@ func (t *genTable[Obj]) Revision(txn ReadTxn) Revision { } func (t *genTable[Obj]) NumObjects(txn ReadTxn) int { - indexTxn := txn.getTxn().mustIndexReadTxn(t, t.primaryAnyIndexer.name) - return indexTxn.entry.tree.Len() + table := &txn.getTxn().root[t.tablePos()] + return table.indexes[PrimaryIndexPos].tree.Len() } func (t *genTable[Obj]) First(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint64, ok bool) { @@ -143,11 +181,22 @@ func (t *genTable[Obj]) First(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint } func (t *genTable[Obj]) FirstWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint64, watch <-chan struct{}, ok bool) { - indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, t.indexPos(q.index)) + var iobj object + if indexTxn.unique { + // On a unique index we can do a direct get rather than a prefix search. + watch, iobj, ok = indexTxn.Root().GetWatch(q.key) + if !ok { + return + } + obj = iobj.data.(Obj) + revision = iobj.revision + return + } + + // For a non-unique index we need to do a prefix search. iter := indexTxn.Root().Iterator() watch = iter.SeekPrefixWatch(q.key) - - var iobj object for { var key []byte key, iobj, ok = iter.Next() @@ -156,14 +205,8 @@ func (t *genTable[Obj]) FirstWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision } // Check that we have a full match on the key - var match bool - if indexTxn.entry.unique { - match = len(key) == len(q.key) - } else { - _, secondary := decodeNonUniqueKey(key) - match = len(secondary) == len(q.key) - } - if match { + _, secondary := decodeNonUniqueKey(key) + if len(secondary) == len(q.key) { break } } @@ -181,11 +224,21 @@ func (t *genTable[Obj]) Last(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint6 } func (t *genTable[Obj]) LastWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint64, watch <-chan struct{}, ok bool) { - indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, t.indexPos(q.index)) + var iobj object + if indexTxn.unique { + // On a unique index we can do a direct get rather than a prefix search. + watch, iobj, ok = indexTxn.Root().GetWatch(q.key) + if !ok { + return + } + obj = iobj.data.(Obj) + revision = iobj.revision + return + } + iter := indexTxn.Root().ReverseIterator() watch = iter.SeekPrefixWatch(q.key) - - var iobj object for { var key []byte key, iobj, ok = iter.Previous() @@ -194,14 +247,8 @@ func (t *genTable[Obj]) LastWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision } // Check that we have a full match on the key - var match bool - if indexTxn.entry.unique { - match = len(key) == len(q.key) - } else { - _, secondary := decodeNonUniqueKey(key) - match = len(secondary) == len(q.key) - } - if match { + _, secondary := decodeNonUniqueKey(key) + if len(secondary) == len(q.key) { break } } @@ -214,7 +261,7 @@ func (t *genTable[Obj]) LastWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision } func (t *genTable[Obj]) LowerBound(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <-chan struct{}) { - indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, t.indexPos(q.index)) root := indexTxn.Root() // Since LowerBound query may be invalidated by changes in another branch @@ -227,7 +274,7 @@ func (t *genTable[Obj]) LowerBound(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <- } func (t *genTable[Obj]) All(txn ReadTxn) (Iterator[Obj], <-chan struct{}) { - indexTxn := txn.getTxn().mustIndexReadTxn(t, t.primaryAnyIndexer.name) + indexTxn := txn.getTxn().mustIndexReadTxn(t, PrimaryIndexPos) root := indexTxn.Root() // Grab the watch channel for the root node watchCh, _, _ := root.GetWatch(nil) @@ -235,11 +282,11 @@ func (t *genTable[Obj]) All(txn ReadTxn) (Iterator[Obj], <-chan struct{}) { } func (t *genTable[Obj]) Get(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <-chan struct{}) { - indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, t.indexPos(q.index)) iter := indexTxn.Root().Iterator() watchCh := iter.SeekPrefixWatch(q.key) - if indexTxn.entry.unique { + if indexTxn.unique { return &uniqueIterator[Obj]{iter, q.key}, watchCh } return &nonUniqueIterator[Obj]{iter, q.key}, watchCh diff --git a/txn.go b/txn.go index 98defd0..a44ee1a 100644 --- a/txn.go +++ b/txn.go @@ -11,6 +11,7 @@ import ( "io" "reflect" "runtime" + "slices" "time" iradix "github.com/hashicorp/go-immutable-radix/v2" @@ -22,7 +23,6 @@ import ( type txn struct { db *DB root dbRoot - writeTxns map[tableIndex]indexTxn // opened per-index write transactions modifiedTables []*tableEntry // table entries being modified smus internal.SortableMutexes // the (sorted) table locks acquiredAt time.Time // the time at which the transaction acquired the locks @@ -34,16 +34,19 @@ type tableIndex struct { index IndexName } -type indexTxn struct { - *iradix.Txn[object] - entry indexEntry +// rooter is implemented by both iradix.Txn and iradix.Tree. +type rooter interface { + Root() *iradix.Node[object] } -func (i indexTxn) Clone() indexTxn { - return indexTxn{ - i.Txn.Clone(), - i.entry, - } +type indexReadTxn struct { + rooter + unique bool +} + +type indexTxn struct { + *iradix.Txn[object] + unique bool } var zeroTxn = txn{} @@ -74,51 +77,42 @@ func (txn *txn) getRevision(meta TableMeta) Revision { // indexReadTxn returns a transaction to read from the specific index. // If the table or index is not found this returns nil & error. -func (txn *txn) indexReadTxn(meta TableMeta, index IndexName) (indexTxn, error) { - if txn.writeTxns != nil { +func (txn *txn) indexReadTxn(meta TableMeta, indexPos int) (indexReadTxn, error) { + if txn.modifiedTables != nil { entry := txn.modifiedTables[meta.tablePos()] if entry != nil { - itxn, err := txn.indexWriteTxn(meta, index) - if err == nil { - return itxn.Clone(), nil + itxn, err := txn.indexWriteTxn(meta, indexPos) + if err != nil { + return indexReadTxn{}, err } - return indexTxn{}, err + // Since iradix reuses nodes when mutating we need to return a clone + // so that iterators don't become invalid. + return indexReadTxn{itxn.Txn.Clone(), itxn.unique}, nil } } - - table := txn.root[meta.tablePos()] - indexEntry, ok := table.indexes.Get([]byte(index)) - if !ok { - return indexTxn{}, fmt.Errorf("index %q not found from table %q", index, meta.Name()) - } - - return indexTxn{ - indexEntry.tree.Txn(), - indexEntry}, nil + indexEntry := txn.root[meta.tablePos()].indexes[indexPos] + return indexReadTxn{indexEntry.tree, indexEntry.unique}, nil } // indexWriteTxn returns a transaction to read/write to a specific index. // The created transaction is memoized and used for subsequent reads and/or writes. -func (txn *txn) indexWriteTxn(meta TableMeta, index IndexName) (indexTxn, error) { - if indexTreeTxn, ok := txn.writeTxns[tableIndex{meta.tablePos(), index}]; ok { - return indexTreeTxn, nil - } +func (txn *txn) indexWriteTxn(meta TableMeta, indexPos int) (indexTxn, error) { table := txn.modifiedTables[meta.tablePos()] - indexEntry, ok := table.indexes.Get([]byte(index)) - if !ok { - return indexTxn{}, fmt.Errorf("index %q not found from table %q", index, meta.Name()) - } - itxn := indexEntry.tree.Txn() - itxn.TrackMutate(true) - indexWriteTxn := indexTxn{itxn, indexEntry} - txn.writeTxns[tableIndex{meta.tablePos(), index}] = indexWriteTxn - return indexWriteTxn, nil + if table == nil { + return indexTxn{}, tableError(meta.Name(), ErrTableNotLockedForWriting) + } + indexEntry := &table.indexes[indexPos] + if indexEntry.txn == nil { + indexEntry.txn = indexEntry.tree.Txn() + indexEntry.txn.TrackMutate(true) + } + return indexTxn{indexEntry.txn, indexEntry.unique}, nil } // mustIndexReadTxn returns a transaction to read from the specific index. // Panics if table or index are not found. -func (txn *txn) mustIndexReadTxn(meta TableMeta, index IndexName) indexTxn { - indexTxn, err := txn.indexReadTxn(meta, index) +func (txn *txn) mustIndexReadTxn(meta TableMeta, indexPos int) indexReadTxn { + indexTxn, err := txn.indexReadTxn(meta, indexPos) if err != nil { panic(err) } @@ -127,8 +121,8 @@ func (txn *txn) mustIndexReadTxn(meta TableMeta, index IndexName) indexTxn { // mustIndexReadTxn returns a transaction to read or write from the specific index. // Panics if table or index not found. -func (txn *txn) mustIndexWriteTxn(meta TableMeta, index IndexName) indexTxn { - indexTxn, err := txn.indexWriteTxn(meta, index) +func (txn *txn) mustIndexWriteTxn(meta TableMeta, indexPos int) indexTxn { + indexTxn, err := txn.indexWriteTxn(meta, indexPos) if err != nil { panic(err) } @@ -157,7 +151,7 @@ func (txn *txn) Insert(meta TableMeta, guardRevision Revision, data any) (object // Update the primary index first idKey := meta.primary().fromObject(obj).First() - idIndexTxn := txn.mustIndexWriteTxn(meta, meta.primary().name) + idIndexTxn := txn.mustIndexWriteTxn(meta, PrimaryIndexPos) oldObj, oldExists := idIndexTxn.Insert(idKey, obj) // Sanity check: is the same object being inserted back and thus the @@ -194,26 +188,30 @@ func (txn *txn) Insert(meta TableMeta, guardRevision Revision, data any) (object } // Update revision index - revIndexTxn := txn.mustIndexWriteTxn(meta, RevisionIndex) + revIndexTxn := txn.mustIndexWriteTxn(meta, RevisionIndexPos) if oldExists { - _, ok := revIndexTxn.Delete([]byte(index.Uint64(oldObj.revision))) + var revKey [8]byte + binary.BigEndian.PutUint64(revKey[:], oldObj.revision) + _, ok := revIndexTxn.Delete(revKey[:]) if !ok { panic("BUG: Old revision index entry not found") } } - revIndexTxn.Insert([]byte(index.Uint64(revision)), obj) + var revKey [8]byte + binary.BigEndian.PutUint64(revKey[:], obj.revision) + revIndexTxn.Insert(revKey[:], obj) // If it's new, possibly remove an older deleted object with the same // primary key from the graveyard. if !oldExists && txn.hasDeleteTrackers(meta) { - if old, existed := txn.mustIndexWriteTxn(meta, GraveyardIndex).Delete(idKey); existed { - txn.mustIndexWriteTxn(meta, GraveyardRevisionIndex).Delete([]byte(index.Uint64(old.revision))) + if old, existed := txn.mustIndexWriteTxn(meta, GraveyardIndexPos).Delete(idKey); existed { + txn.mustIndexWriteTxn(meta, GraveyardRevisionIndexPos).Delete([]byte(index.Uint64(old.revision))) } } // Then update secondary indexes - for idx, indexer := range meta.secondary() { - indexTxn := txn.mustIndexWriteTxn(meta, idx) + for _, indexer := range meta.secondary() { + indexTxn := txn.mustIndexWriteTxn(meta, indexer.pos) newKeys := indexer.fromObject(obj) if oldExists { @@ -280,7 +278,7 @@ func (txn *txn) Delete(meta TableMeta, guardRevision Revision, data any) (object // We assume that "data" has only enough defined fields to // compute the primary key. idKey := meta.primary().fromObject(object{data: data}).First() - idIndexTree := txn.mustIndexWriteTxn(meta, meta.primary().name) + idIndexTree := txn.mustIndexWriteTxn(meta, PrimaryIndexPos) obj, existed := idIndexTree.Delete(idKey) if !existed { return object{}, false, nil @@ -297,29 +295,31 @@ func (txn *txn) Delete(meta TableMeta, guardRevision Revision, data any) (object } // Update revision index. - indexTree := txn.mustIndexWriteTxn(meta, RevisionIndex) - if _, ok := indexTree.Delete(index.Uint64(obj.revision)); !ok { + indexTree := txn.mustIndexWriteTxn(meta, RevisionIndexPos) + var revKey [8]byte + binary.BigEndian.PutUint64(revKey[:], obj.revision) + if _, ok := indexTree.Delete(index.Key(revKey[:])); !ok { panic("BUG: Object to be deleted not found from revision index") } // Then update secondary indexes. - for idx, indexer := range meta.secondary() { + for _, indexer := range meta.secondary() { indexer.fromObject(obj).Foreach(func(key index.Key) { if !indexer.unique { key = encodeNonUniqueKey(idKey, key) } - txn.mustIndexWriteTxn(meta, idx).Delete(key) + txn.mustIndexWriteTxn(meta, indexer.pos).Delete(key) }) } // And finally insert the object into the graveyard. if txn.hasDeleteTrackers(meta) { - graveyardIndex := txn.mustIndexWriteTxn(meta, GraveyardIndex) + graveyardIndex := txn.mustIndexWriteTxn(meta, GraveyardIndexPos) obj.revision = revision if _, existed := graveyardIndex.Insert(idKey, obj); existed { panic("BUG: Double deletion! Deleted object already existed in graveyard") } - txn.mustIndexWriteTxn(meta, GraveyardRevisionIndex).Insert(index.Uint64(revision), obj) + txn.mustIndexWriteTxn(meta, GraveyardRevisionIndexPos).Insert(index.Uint64(revision), obj) } return obj, true, nil @@ -369,7 +369,7 @@ func (txn *txn) Abort() { // // txn.Commit() // - if txn.writeTxns == nil { + if txn.db == nil { return } @@ -402,9 +402,9 @@ func (txn *txn) Commit() { // a reader can acquire an immutable snapshot of all data in the database with a // simpler atomic pointer load. - // If writeTxns is nil, this transaction has already been committed or aborted, and + // If db is nil, this transaction has already been committed or aborted, and // thus there is nothing to do. - if txn.writeTxns == nil { + if txn.db == nil { return } @@ -413,11 +413,19 @@ func (txn *txn) Commit() { // Commit each individual changed index to each table. // We don't notify yet (CommitOnly) as the root needs to be updated // first as otherwise readers would wake up too early. - for tableIndex, subTxn := range txn.writeTxns { - table := txn.modifiedTables[tableIndex.tablePos] - subTxn.entry.tree = subTxn.CommitOnly() - table.indexes, _, _ = - table.indexes.Insert([]byte(tableIndex.index), subTxn.entry) + txnToNotify := []*iradix.Txn[object]{} + for _, table := range txn.modifiedTables { + if table == nil { + continue + } + for i := range table.indexes { + txn := table.indexes[i].txn + if txn != nil { + table.indexes[i].tree = txn.CommitOnly() + table.indexes[i].txn = nil + txnToNotify = append(txnToNotify, txn) + } + } // Update metrics name := table.meta.Name() @@ -435,6 +443,7 @@ func (txn *txn) Commit() { // load it again and modify the latest version that we now have immobilised by // the root lock. root := *db.root.Load() + root = slices.Clone(root) // Insert the modified tables into the root tree of tables. for pos, table := range txn.modifiedTables { @@ -453,8 +462,8 @@ func (txn *txn) Commit() { // Now that new root is committed, we can notify readers by closing the watch channels of // mutated radix tree nodes in all changed indexes and on the root itself. - for _, subTxn := range txn.writeTxns { - subTxn.Notify() + for _, txn := range txnToNotify { + txn.Notify() } txn.db.metrics.WriteTxnDuration( @@ -477,7 +486,7 @@ func (txn *txn) WriteJSON(w io.Writer) error { first = false } - indexTxn := txn.getTxn().mustIndexReadTxn(table.meta, table.meta.primary().name) + indexTxn := txn.getTxn().mustIndexReadTxn(table.meta, PrimaryIndexPos) root := indexTxn.Root() iter := root.Iterator() diff --git a/types.go b/types.go index 466a28b..2225015 100644 --- a/types.go +++ b/types.go @@ -150,8 +150,10 @@ type RWTable[Obj any] interface { // the object type (the 'Obj' constraint). type TableMeta interface { Name() TableName // The name of the table + tableEntry() tableEntry tablePos() int setTablePos(int) + indexPos(string) int tableKey() []byte // The radix key for the table in the root tree primary() anyIndexer // The untyped primary indexer for the table secondary() map[string]anyIndexer // Secondary indexers (if any) @@ -279,10 +281,17 @@ type TableWritable interface { // const ( - reservedIndexPrefix = "__" - RevisionIndex = "__revision__" - GraveyardIndex = "__graveyard__" - GraveyardRevisionIndex = "__graveyard_revision__" + PrimaryIndexPos = 0 + + reservedIndexPrefix = "__" + RevisionIndex = "__revision__" + RevisionIndexPos = 1 + GraveyardIndex = "__graveyard__" + GraveyardIndexPos = 2 + GraveyardRevisionIndex = "__graveyard_revision__" + GraveyardRevisionIndexPos = 3 + + SecondaryIndexStartPos = 4 ) // object is the format in which data is stored in the tables. @@ -305,6 +314,9 @@ type anyIndexer struct { // values returned by fromObject. If false the primary // key of the object will be appended to the key. unique bool + + // pos is the position of the index in [tableEntry.indexes] + pos int } type deleteTracker interface { @@ -314,28 +326,23 @@ type deleteTracker interface { type indexEntry struct { tree *iradix.Tree[object] + txn *iradix.Txn[object] unique bool } type tableEntry struct { meta TableMeta - indexes *iradix.Tree[indexEntry] + indexes []indexEntry deleteTrackers *iradix.Tree[deleteTracker] revision uint64 } func (t *tableEntry) numObjects() int { - indexEntry, ok := t.indexes.Get([]byte(RevisionIndex)) - if ok { - return indexEntry.tree.Len() - } - return 0 + indexEntry := t.indexes[t.meta.indexPos(RevisionIndex)] + return indexEntry.tree.Len() } func (t *tableEntry) numDeletedObjects() int { - indexEntry, ok := t.indexes.Get([]byte(GraveyardIndex)) - if ok { - return indexEntry.tree.Len() - } - return 0 + indexEntry := t.indexes[t.meta.indexPos(GraveyardIndex)] + return indexEntry.tree.Len() }