diff --git a/benchmarks_test.go b/benchmarks_test.go index 8da59c5..357ab36 100644 --- a/benchmarks_test.go +++ b/benchmarks_test.go @@ -19,25 +19,32 @@ import ( "github.com/cilium/statedb/index" ) +// Number of objects to insert in tests that do repeated inserts. +const numObjectsToInsert = 1000 + func BenchmarkDB_WriteTxn_1(b *testing.B) { - db, table, _ := newTestDB(b) + db, table := newTestDBWithMetrics(b, &NopMetrics{}) for i := 0; i < b.N; i++ { txn := db.WriteTxn(table) _, _, err := table.Insert(txn, testObject{ID: 123, Tags: nil}) - require.NoError(b, err) + if err != nil { + b.Fatalf("Insert error: %s", err) + } txn.Commit() } b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_WriteTxn_10(b *testing.B) { - db, table, _ := newTestDB(b) + db, table := newTestDBWithMetrics(b, &NopMetrics{}) n := b.N for n > 0 { txn := db.WriteTxn(table) for j := 0; j < 10; j++ { _, _, err := table.Insert(txn, testObject{ID: uint64(j), Tags: nil}) - require.NoError(b, err) + if err != nil { + b.Fatalf("Insert error: %s", err) + } } txn.Commit() n -= 10 @@ -45,20 +52,24 @@ func BenchmarkDB_WriteTxn_10(b *testing.B) { txn := db.WriteTxn(table) for j := 0; j < n; j++ { _, _, err := table.Insert(txn, testObject{ID: uint64(j), Tags: nil}) - require.NoError(b, err) + if err != nil { + b.Fatalf("Insert error: %s", err) + } } txn.Commit() b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_WriteTxn_100(b *testing.B) { - db, table, _ := newTestDB(b) + db, table := newTestDBWithMetrics(b, &NopMetrics{}) n := b.N for n > 0 { txn := db.WriteTxn(table) for j := 0; j < 100; j++ { _, _, err := table.Insert(txn, testObject{ID: uint64(j), Tags: nil}) - require.NoError(b, err) + if err != nil { + b.Fatalf("Insert error: %s", err) + } } txn.Commit() n -= 100 @@ -66,21 +77,25 @@ func BenchmarkDB_WriteTxn_100(b *testing.B) { txn := db.WriteTxn(table) for j := 0; j < n; j++ { _, _, err := table.Insert(txn, testObject{ID: uint64(j), Tags: nil}) - require.NoError(b, err) + if err != nil { + b.Fatalf("Insert error: %s", err) + } } txn.Commit() b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_WriteTxn_100_SecondaryIndex(b *testing.B) { - db, table, _ := newTestDB(b, tagsIndex) + db, table := newTestDBWithMetrics(b, &NopMetrics{}) n := b.N tags := []string{"test"} for n > 0 { txn := db.WriteTxn(table) for j := 0; j < 100; j++ { _, _, err := table.Insert(txn, testObject{ID: uint64(j), Tags: tags}) - require.NoError(b, err) + if err != nil { + b.Fatalf("Insert error: %s", err) + } } txn.Commit() n -= 100 @@ -88,168 +103,264 @@ func BenchmarkDB_WriteTxn_100_SecondaryIndex(b *testing.B) { txn := db.WriteTxn(table) for j := 0; j < n; j++ { _, _, err := table.Insert(txn, testObject{ID: uint64(j), Tags: tags}) - require.NoError(b, err) + if err != nil { + b.Fatalf("Insert error: %s", err) + } } txn.Commit() b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_RandomInsert(b *testing.B) { - db, table, _ := newTestDB(b) - + db, table := newTestDBWithMetrics(b, &NopMetrics{}) ids := []uint64{} - for i := 0; i < b.N; i++ { + for i := 0; i < numObjectsToInsert; i++ { ids = append(ids, uint64(i)) } - rand.Shuffle(b.N, func(i, j int) { + rand.Shuffle(numObjectsToInsert, func(i, j int) { ids[i], ids[j] = ids[j], ids[i] }) b.ResetTimer() + + for j := 0; j < b.N; j++ { + txn := db.WriteTxn(table) + for _, id := range ids { + _, _, err := table.Insert(txn, testObject{ID: id, Tags: nil}) + if err != nil { + b.Fatalf("Insert error: %s", err) + } + } + txn.Abort() + } + b.StopTimer() + + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") +} + +// BenchmarkDB_RandomReplace is like BenchmarkDB_RandomInsert, but instead of +// always inserting a new value this test replaces an existing value. +// This mainly shows the cost of the revision index delete and insert. +// +// This also uses a secondary index to make this a more realistic. +func BenchmarkDB_RandomReplace(b *testing.B) { + db, table := newTestDBWithMetrics(b, &NopMetrics{}, tagsIndex) + ids := []uint64{} txn := db.WriteTxn(table) - for _, id := range ids { - _, _, err := table.Insert(txn, testObject{ID: id, Tags: nil}) - require.NoError(b, err) + for i := 0; i < numObjectsToInsert; i++ { + tag := "odd" + if i%2 == 0 { + tag = "even" + } + table.Insert(txn, testObject{ID: uint64(i), Tags: []string{tag}}) + ids = append(ids, uint64(i)) } txn.Commit() + rand.Shuffle(numObjectsToInsert, func(i, j int) { + ids[i], ids[j] = ids[j], ids[i] + }) + b.ResetTimer() + + for j := 0; j < b.N; j++ { + txn := db.WriteTxn(table) + for _, id := range ids { + tag := "odd" + if id%2 == 0 { + tag = "even" + } + _, _, err := table.Insert(txn, testObject{ID: id, Tags: []string{tag}}) + if err != nil { + b.Fatalf("Insert error: %s", err) + } + } + txn.Abort() + } b.StopTimer() - iter, _ := table.All(db.ReadTxn()) - require.Len(b, Collect(iter), b.N) - b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_SequentialInsert(b *testing.B) { - db, table, _ := newTestDB(b) - + db, table := newTestDBWithMetrics(b, &NopMetrics{}) b.ResetTimer() - txn := db.WriteTxn(table) - for id := uint64(0); id < uint64(b.N); id++ { - _, _, err := table.Insert(txn, testObject{ID: id, Tags: nil}) - require.NoError(b, err) + + for j := 0; j < b.N; j++ { + txn := db.WriteTxn(table) + for id := uint64(0); id < uint64(numObjectsToInsert); id++ { + _, _, err := table.Insert(txn, testObject{ID: id, Tags: nil}) + require.NoError(b, err) + } + txn.Commit() } - txn.Commit() b.StopTimer() - iter, _ := table.All(db.ReadTxn()) - require.Len(b, Collect(iter), b.N) - b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") + require.EqualValues(b, table.NumObjects(db.ReadTxn()), numObjectsToInsert) + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_Baseline_SingleRadix_Insert(b *testing.B) { + for i := 0; i < b.N; i++ { + tree := iradix.New[uint64]() + txn := tree.Txn() + for j := uint64(0); j < numObjectsToInsert; j++ { + txn.Insert(index.Uint64(j), j) + } + tree = txn.Commit() + require.Equal(b, tree.Len(), numObjectsToInsert) + } + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") +} + +func BenchmarkDB_Baseline_SingleRadix_TrackMutate_Insert(b *testing.B) { + for i := 0; i < b.N; i++ { + tree := iradix.New[uint64]() + txn := tree.Txn() + txn.TrackMutate(true) // Enable the watch channels + for j := uint64(0); j < numObjectsToInsert; j++ { + txn.Insert(index.Uint64(j), j) + } + tree = txn.Commit() // Commit and notify + require.Equal(b, tree.Len(), numObjectsToInsert) + } + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") +} +func BenchmarkDB_Baseline_SingleRadix_Lookup(b *testing.B) { tree := iradix.New[uint64]() - txn := tree.Txn() - for i := uint64(0); i < uint64(b.N); i++ { - txn.Insert(index.Uint64(i), i) + for j := uint64(0); j < numObjectsToInsert; j++ { + tree, _, _ = tree.Insert(index.Uint64(j), j) } - txn.Commit() - b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") + b.ResetTimer() + for i := 0; i < b.N; i++ { + for j := uint64(0); j < numObjectsToInsert; j++ { + v, ok := tree.Get(index.Uint64(j)) + if v != j || !ok { + b.Fatalf("impossible: %d != %d || %v", v, j, ok) + } + } + + } + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_Baseline_Hashmap_Insert(b *testing.B) { - m := map[uint64]uint64{} - for i := uint64(0); i < uint64(b.N); i++ { - m[i] = i + for i := 0; i < b.N; i++ { + m := map[uint64]uint64{} + for j := uint64(0); j < numObjectsToInsert; j++ { + m[j] = j + } + if len(m) != numObjectsToInsert { + b.Fatalf("%d != %d", len(m), numObjectsToInsert) + } } - b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_Baseline_Hashmap_Lookup(b *testing.B) { m := map[uint64]uint64{} - for i := uint64(0); i < uint64(b.N); i++ { - m[i] = i + for j := uint64(0); j < numObjectsToInsert; j++ { + m[j] = j } b.ResetTimer() - for i := uint64(0); i < uint64(b.N); i++ { - require.Equal(b, m[i], i) + for i := 0; i < b.N; i++ { + for j := uint64(0); j < numObjectsToInsert; j++ { + if m[j] != j { + b.Fatalf("impossible: %d != %d", m[j], j) + } + } } - b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_DeleteTracker_Baseline(b *testing.B) { - db, table, _ := newTestDB(b) - - // Create b.N objects - txn := db.WriteTxn(table) - for i := 0; i < b.N; i++ { - _, _, err := table.Insert(txn, testObject{ID: uint64(i), Tags: nil}) - require.NoError(b, err) - } - txn.Commit() + db, table := newTestDBWithMetrics(b, &NopMetrics{}) b.ResetTimer() + for n := 0; n < b.N; n++ { + txn := db.WriteTxn(table) + for i := uint64(0); i < numObjectsToInsert; i++ { + _, _, err := table.Insert(txn, testObject{ID: uint64(i), Tags: nil}) + if err != nil { + b.Fatalf("Insert: %s", err) + } + } + txn.Commit() - // Start the timer and delete all objects to time - // the baseline without deletion tracking. - txn = db.WriteTxn(table) - table.DeleteAll(txn) - txn.Commit() - b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") + // Delete all objects to time the baseline without deletion tracking. + txn = db.WriteTxn(table) + table.DeleteAll(txn) + txn.Commit() + } + b.ReportMetric(float64(b.N*numObjectsToInsert)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_DeleteTracker(b *testing.B) { - db, table, _ := newTestDB(b) - - // Start tracking deletions from the start - - // Create b.N objects - txn := db.WriteTxn(table) - dt, err := table.DeleteTracker(txn, "test") - require.NoError(b, err) - defer dt.Close() - for i := 0; i < b.N; i++ { - _, _, err := table.Insert(txn, testObject{ID: uint64(i), Tags: nil}) - require.NoError(b, err) - } - txn.Commit() + db, table := newTestDBWithMetrics(b, &NopMetrics{}) b.ResetTimer() + for n := 0; n < b.N; n++ { + // Create objects + txn := db.WriteTxn(table) + dt, err := table.DeleteTracker(txn, "test") + require.NoError(b, err) + defer dt.Close() + for i := 0; i < numObjectsToInsert; i++ { + _, _, err := table.Insert(txn, testObject{ID: uint64(i), Tags: nil}) + if err != nil { + b.Fatalf("Insert: %s", err) + } + } + txn.Commit() - // Start the timer and delete all objects to time the cost for - // deletion tracking. - txn = db.WriteTxn(table) - table.DeleteAll(txn) - txn.Commit() + // Delete all objects to time the cost for deletion tracking. + txn = db.WriteTxn(table) + table.DeleteAll(txn) + txn.Commit() - nDeleted := 0 - dt.Iterate( - db.ReadTxn(), - func(obj testObject, deleted bool, _ Revision) { - nDeleted++ - }) - require.EqualValues(b, nDeleted, b.N) + // Iterate over the deleted objects + nDeleted := 0 + dt.Iterate( + db.ReadTxn(), + func(obj testObject, deleted bool, _ Revision) { + nDeleted++ + }) + require.EqualValues(b, nDeleted, numObjectsToInsert) + dt.Close() + } b.StopTimer() - - b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") eventuallyGraveyardIsEmpty(b, db) + b.ReportMetric(float64(b.N*numObjectsToInsert)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_RandomLookup(b *testing.B) { - db, table, _ := newTestDB(b) + db, table := newTestDBWithMetrics(b, &NopMetrics{}) wtxn := db.WriteTxn(table) - ids := []uint64{} - for i := 0; i < b.N; i++ { - ids = append(ids, uint64(i)) + queries := []Query[testObject]{} + for i := 0; i < numObjectsToInsert; i++ { + queries = append(queries, idIndex.Query(uint64(i))) _, _, err := table.Insert(wtxn, testObject{ID: uint64(i), Tags: nil}) require.NoError(b, err) } wtxn.Commit() - rand.Shuffle(b.N, func(i, j int) { - ids[i], ids[j] = ids[j], ids[i] + rand.Shuffle(numObjectsToInsert, func(i, j int) { + queries[i], queries[j] = queries[j], queries[i] }) b.ResetTimer() - txn := db.ReadTxn() - for _, id := range ids { - _, _, ok := table.First(txn, idIndex.Query(id)) - require.True(b, ok) + for j := 0; j < b.N; j++ { + txn := db.ReadTxn() + for _, q := range queries { + _, _, ok := table.First(txn, q) + if !ok { + b.Fatal("object not found") + } + } } - b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_SequentialLookup(b *testing.B) { - db, table, _ := newTestDB(b) + db, table := newTestDBWithMetrics(b, &NopMetrics{}) wtxn := db.WriteTxn(table) ids := []uint64{} - for i := 0; i < b.N; i++ { + for i := 0; i < numObjectsToInsert; i++ { ids = append(ids, uint64(i)) _, _, err := table.Insert(wtxn, testObject{ID: uint64(i), Tags: nil}) require.NoError(b, err) @@ -258,52 +369,68 @@ func BenchmarkDB_SequentialLookup(b *testing.B) { b.ResetTimer() txn := db.ReadTxn() - for _, id := range ids { - obj, _, ok := table.First(txn, idIndex.Query(id)) - require.True(b, ok) - require.Equal(b, obj.ID, id) + for n := 0; n < b.N; n++ { + for _, id := range ids { + obj, _, ok := table.First(txn, idIndex.Query(id)) + if !ok { + b.Fatalf("Object not found") + } + if obj.ID != id { + b.Fatalf("expected ID %d, got %d", id, obj.ID) + } + } } - b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_FullIteration_All(b *testing.B) { - db, table, _ := newTestDB(b) + db, table := newTestDBWithMetrics(b, &NopMetrics{}) wtxn := db.WriteTxn(table) - for i := 0; i < b.N; i++ { + for i := 0; i < numObjectsToInsert; i++ { _, _, err := table.Insert(wtxn, testObject{ID: uint64(i), Tags: nil}) require.NoError(b, err) } wtxn.Commit() b.ResetTimer() - txn := db.ReadTxn() - iter, _ := table.All(txn) - i := uint64(0) - for obj, _, ok := iter.Next(); ok; obj, _, ok = iter.Next() { - require.Equal(b, obj.ID, i) - i++ + for j := 0; j < b.N; j++ { + txn := db.ReadTxn() + iter, _ := table.All(txn) + i := uint64(0) + for obj, _, ok := iter.Next(); ok; obj, _, ok = iter.Next() { + if obj.ID != i { + b.Fatalf("expected ID %d, got %d", i, obj.ID) + } + i++ + } + require.EqualValues(b, i, numObjectsToInsert) } - b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_FullIteration_Get(b *testing.B) { - db, table, _ := newTestDB(b, tagsIndex) + db, table := newTestDBWithMetrics(b, &NopMetrics{}, tagsIndex) wtxn := db.WriteTxn(table) - for i := 0; i < b.N; i++ { + for i := 0; i < numObjectsToInsert; i++ { _, _, err := table.Insert(wtxn, testObject{ID: uint64(i), Tags: []string{"foo"}}) require.NoError(b, err) } wtxn.Commit() b.ResetTimer() - txn := db.ReadTxn() - iter, _ := table.Get(txn, tagsIndex.Query("foo")) - i := uint64(0) - for obj, _, ok := iter.Next(); ok; obj, _, ok = iter.Next() { - require.Equal(b, obj.ID, i) - i++ + for j := 0; j < b.N; j++ { + txn := db.ReadTxn() + iter, _ := table.Get(txn, tagsIndex.Query("foo")) + i := uint64(0) + for obj, _, ok := iter.Next(); ok; obj, _, ok = iter.Next() { + if obj.ID != i { + b.Fatalf("expected ID %d, got %d", i, obj.ID) + } + i++ + } + require.EqualValues(b, i, numObjectsToInsert) } - b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") } type testObject2 testObject diff --git a/db.go b/db.go index ec6a6b5..6878e81 100644 --- a/db.go +++ b/db.go @@ -7,15 +7,12 @@ import ( "context" "errors" "net/http" - "reflect" "runtime" - "strings" + "slices" "sync" "sync/atomic" "time" - iradix "github.com/hashicorp/go-immutable-radix/v2" - "github.com/cilium/hive/cell" "github.com/cilium/statedb/internal" ) @@ -87,29 +84,31 @@ import ( // the lowest revision of all delete trackers. type DB struct { mu sync.Mutex // protects 'tables' and sequences modifications to the root tree - tables map[TableName]TableMeta ctx context.Context cancel context.CancelFunc - root atomic.Pointer[iradix.Tree[tableEntry]] + root atomic.Pointer[dbRoot] gcTrigger chan struct{} // trigger for graveyard garbage collection gcExited chan struct{} gcRateLimitInterval time.Duration metrics Metrics + defaultHandle Handle } +type dbRoot []tableEntry + func NewDB(tables []TableMeta, metrics Metrics) (*DB, error) { - txn := iradix.New[tableEntry]().Txn() db := &DB{ - tables: make(map[TableName]TableMeta), metrics: metrics, gcRateLimitInterval: defaultGCRateLimitInterval, } + db.defaultHandle = Handle{db, "DB"} + root := make(dbRoot, 0, len(tables)) for _, t := range tables { - if err := db.registerTable(t, txn); err != nil { + if err := db.registerTable(t, &root); err != nil { return nil, err } } - db.root.Store(txn.CommitOnly()) + db.root.Store(&root) return db, nil } @@ -128,50 +127,40 @@ func (db *DB) RegisterTable(table TableMeta, tables ...TableMeta) error { db.mu.Lock() defer db.mu.Unlock() - txn := db.root.Load().Txn() - if err := db.registerTable(table, txn); err != nil { + root := slices.Clone(*db.root.Load()) + + if err := db.registerTable(table, &root); err != nil { return err } for _, t := range tables { - if err := db.registerTable(t, txn); err != nil { + if err := db.registerTable(t, &root); err != nil { return err } } - db.root.Store(txn.CommitOnly()) + db.root.Store(&root) return nil } -func (db *DB) registerTable(table TableMeta, txn *iradix.Txn[tableEntry]) error { +func (db *DB) registerTable(table TableMeta, root *dbRoot) error { name := table.Name() - if _, ok := db.tables[name]; ok { - return tableError(name, ErrDuplicateTable) - } - db.tables[name] = table - var entry tableEntry - entry.meta = table - entry.deleteTrackers = iradix.New[deleteTracker]() - indexTxn := iradix.New[indexEntry]().Txn() - indexTxn.Insert([]byte(table.primary().name), indexEntry{iradix.New[object](), true}) - indexTxn.Insert([]byte(RevisionIndex), indexEntry{iradix.New[object](), true}) - indexTxn.Insert([]byte(GraveyardIndex), indexEntry{iradix.New[object](), true}) - indexTxn.Insert([]byte(GraveyardRevisionIndex), indexEntry{iradix.New[object](), true}) - for index, indexer := range table.secondary() { - indexTxn.Insert([]byte(index), indexEntry{iradix.New[object](), indexer.unique}) + for _, t := range *root { + if t.meta.Name() == name { + return tableError(name, ErrDuplicateTable) + } } - entry.indexes = indexTxn.CommitOnly() - txn.Insert(table.tableKey(), entry) + + pos := len(*root) + table.setTablePos(pos) + *root = append(*root, table.tableEntry()) return nil } // ReadTxn constructs a new read transaction for performing reads against // a snapshot of the database. // -// ReadTxn is not thread-safe! +// The returned ReadTxn is not thread-safe. func (db *DB) ReadTxn() ReadTxn { - return &txn{ - db: db, - rootReadTxn: db.root.Load().Txn(), - } + return db.defaultHandle.ReadTxn() } // WriteTxn constructs a new write transaction against the given set of tables. @@ -179,53 +168,9 @@ func (db *DB) ReadTxn() ReadTxn { // The modifications performed in the write transaction are not visible outside // it until Commit() is called. To discard the changes call Abort(). // -// WriteTxn is not thread-safe! +// The returned WriteTxn is not thread-safe. func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn { - callerPkg := callerPackage() - - allTables := append(tables, table) - smus := internal.SortableMutexes{} - for _, table := range allTables { - smus = append(smus, table.sortableMutex()) - } - lockAt := time.Now() - smus.Lock() - acquiredAt := time.Now() - - rootReadTxn := db.root.Load().Txn() - tableEntries := make(map[TableName]*tableEntry, len(tables)) - var tableNames []string - for _, table := range allTables { - tableEntry, ok := rootReadTxn.Get(table.tableKey()) - if !ok { - panic("BUG: Table '" + table.Name() + "' not found") - } - tableEntries[table.Name()] = &tableEntry - tableNames = append(tableNames, table.Name()) - - db.metrics.WriteTxnTableAcquisition( - table.Name(), - table.sortableMutex().AcquireDuration(), - ) - } - - db.metrics.WriteTxnTotalAcquisition( - callerPkg, - tableNames, - acquiredAt.Sub(lockAt), - ) - - txn := &txn{ - db: db, - rootReadTxn: rootReadTxn, - modifiedTables: tableEntries, - writeTxns: make(map[tableIndex]indexTxn), - smus: smus, - acquiredAt: acquiredAt, - packageName: callerPkg, - } - runtime.SetFinalizer(txn, txnFinalizer) - return txn + return db.defaultHandle.WriteTxn(table, tables...) } func (db *DB) Start(cell.HookContext) error { @@ -266,29 +211,73 @@ func (db *DB) setGCRateLimitInterval(interval time.Duration) { db.gcRateLimitInterval = interval } -var ciliumPackagePrefix = func() string { - sentinel := func() {} - name := runtime.FuncForPC(reflect.ValueOf(sentinel).Pointer()).Name() - if idx := strings.LastIndex(name, "/"); idx >= 0 { - return name[:idx+1] +// NewHandle returns a named handle to the DB. The handle has the same ReadTxn and +// WriteTxn methods as DB, but annotated with the given name for more accurate +// cost accounting in e.g. metrics. +func (db *DB) NewHandle(name string) Handle { + return Handle{db, name} +} + +// Handle is a named handle to the database for constructing read or write +// transactions. +type Handle struct { + db *DB + name string +} + +func (h Handle) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn { + db := h.db + allTables := append(tables, table) + smus := internal.SortableMutexes{} + for _, table := range allTables { + smus = append(smus, table.sortableMutex()) } - return "" -}() + lockAt := time.Now() + smus.Lock() + acquiredAt := time.Now() -func callerPackage() string { - var callerPkg string - pc, _, _, ok := runtime.Caller(2) - if ok { - f := runtime.FuncForPC(pc) - if f != nil { - callerPkg = f.Name() - callerPkg, _ = strings.CutPrefix(callerPkg, ciliumPackagePrefix) - callerPkg = strings.SplitN(callerPkg, ".", 2)[0] - } else { - callerPkg = "unknown" - } - } else { - callerPkg = "unknown" + root := *db.root.Load() + tableEntries := make([]*tableEntry, len(root)) + var tableNames []string + for _, table := range allTables { + tableEntry := root[table.tablePos()] + tableEntry.indexes = slices.Clone(tableEntry.indexes) + tableEntries[table.tablePos()] = &tableEntry + tableNames = append(tableNames, table.Name()) + + db.metrics.WriteTxnTableAcquisition( + h.name, + table.Name(), + table.sortableMutex().AcquireDuration(), + ) + } + + db.metrics.WriteTxnTotalAcquisition( + h.name, + tableNames, + acquiredAt.Sub(lockAt), + ) + + txn := &txn{ + db: db, + root: root, + modifiedTables: tableEntries, + smus: smus, + acquiredAt: acquiredAt, + tableNames: tableNames, + handle: h.name, + } + runtime.SetFinalizer(txn, txnFinalizer) + return txn +} + +// ReadTxn constructs a new read transaction for performing reads against +// a snapshot of the database. +// +// The returned ReadTxn is not thread-safe. +func (h Handle) ReadTxn() ReadTxn { + return &txn{ + db: h.db, + root: *h.db.root.Load(), } - return callerPkg } diff --git a/db_test.go b/db_test.go index 77a470f..7e8c4ee 100644 --- a/db_test.go +++ b/db_test.go @@ -69,6 +69,12 @@ var logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ })) func newTestDB(t testing.TB, secondaryIndexers ...Indexer[testObject]) (*DB, RWTable[testObject], *ExpVarMetrics) { + metrics := NewExpVarMetrics(false) + db, table := newTestDBWithMetrics(t, metrics, secondaryIndexers...) + return db, table, metrics +} + +func newTestDBWithMetrics(t testing.TB, metrics Metrics, secondaryIndexers ...Indexer[testObject]) (*DB, RWTable[testObject]) { var ( db *DB ) @@ -79,8 +85,6 @@ func newTestDB(t testing.TB, secondaryIndexers ...Indexer[testObject]) (*DB, RWT ) require.NoError(t, err, "NewTable[testObject]") - metrics := NewExpVarMetrics(false) - h := hive.NewWithOptions( hive.Options{Logger: logger}, @@ -100,7 +104,7 @@ func newTestDB(t testing.TB, secondaryIndexers ...Indexer[testObject]) (*DB, RWT t.Cleanup(func() { assert.NoError(t, h.Stop(context.TODO())) }) - return db, table, metrics + return db, table } func TestDB_Insert_SamePointer(t *testing.T) { @@ -136,7 +140,7 @@ func TestDB_Insert_SamePointer(t *testing.T) { func TestDB_LowerBound_ByRevision(t *testing.T) { t.Parallel() - db, table, _ := newTestDB(t, tagsIndex) + db, table := newTestDBWithMetrics(t, &NopMetrics{}, tagsIndex) { txn := db.WriteTxn(table) @@ -420,7 +424,7 @@ func TestDB_All(t *testing.T) { select { case <-watch: - t.Fatalf("expected All() watch channel to not close before changes") + t.Fatalf("expected All() watch channel to not close before delete") default: } @@ -430,10 +434,15 @@ func TestDB_All(t *testing.T) { txn.Commit() } + // Prior read transaction not affected by delete. + iter, _ = table.All(txn) + objs = Collect(iter) + require.Len(t, objs, 3) + select { case <-watch: case <-time.After(time.Second): - t.Fatalf("expceted All() watch channel to close after changes") + t.Fatalf("expected All() watch channel to close after delete") } } @@ -598,7 +607,8 @@ func TestDB_GetFirstLast(t *testing.T) { func TestDB_CommitAbort(t *testing.T) { t.Parallel() - db, table, metrics := newTestDB(t, tagsIndex) + dbX, table, metrics := newTestDB(t, tagsIndex) + db := dbX.NewHandle("test-handle") txn := db.WriteTxn(table) _, _, err := table.Insert(txn, testObject{ID: 123, Tags: nil}) @@ -607,8 +617,8 @@ func TestDB_CommitAbort(t *testing.T) { assert.EqualValues(t, table.Revision(db.ReadTxn()), expvarInt(metrics.RevisionVar.Get("test")), "Revision") assert.EqualValues(t, 1, expvarInt(metrics.ObjectCountVar.Get("test")), "ObjectCount") - assert.Greater(t, expvarFloat(metrics.WriteTxnAcquisitionVar.Get("statedb")), 0.0, "WriteTxnAcquisition") - assert.Greater(t, expvarFloat(metrics.WriteTxnDurationVar.Get("statedb")), 0.0, "WriteTxnDuration") + assert.Greater(t, expvarFloat(metrics.WriteTxnAcquisitionVar.Get("test-handle/test")), 0.0, "WriteTxnAcquisition") + assert.Greater(t, expvarFloat(metrics.WriteTxnDurationVar.Get("test-handle/test")), 0.0, "WriteTxnDuration") obj, rev, ok := table.First(db.ReadTxn(), idIndex.Query(123)) require.True(t, ok, "expected First(1) to return result") @@ -770,15 +780,6 @@ func TestWriteJSON(t *testing.T) { txn.Commit() } -func Test_callerPackage(t *testing.T) { - t.Parallel() - - pkg := func() string { - return callerPackage() - }() - require.Equal(t, "statedb", pkg) -} - func Test_nonUniqueKey(t *testing.T) { // empty keys key := encodeNonUniqueKey(nil, nil) diff --git a/deletetracker.go b/deletetracker.go index 5ccd371..096902c 100644 --- a/deletetracker.go +++ b/deletetracker.go @@ -36,7 +36,7 @@ func (dt *DeleteTracker[Obj]) getRevision() uint64 { // 'minRevision'. The deleted objects are not garbage-collected unless 'Mark' is // called! func (dt *DeleteTracker[Obj]) Deleted(txn ReadTxn, minRevision Revision) Iterator[Obj] { - indexTxn := txn.getTxn().mustIndexReadTxn(dt.table.Name(), GraveyardRevisionIndex) + indexTxn := txn.getTxn().mustIndexReadTxn(dt.table, GraveyardRevisionIndexPos) iter := indexTxn.Root().Iterator() iter.SeekLowerBound(index.Uint64(minRevision)) return &iterator[Obj]{iter} @@ -57,7 +57,7 @@ func (dt *DeleteTracker[Obj]) Close() { // Remove the delete tracker from the table. txn := dt.db.WriteTxn(dt.table).getTxn() db := txn.db - table := txn.modifiedTables[dt.table.Name()] + table := txn.modifiedTables[dt.table.tablePos()] if table == nil { panic("BUG: Table missing from write transaction") } diff --git a/graveyard.go b/graveyard.go index 2f7c5f6..0b95fb7 100644 --- a/graveyard.go +++ b/graveyard.go @@ -39,9 +39,8 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat // Do a lockless read transaction to find potential dead objects. txn := db.ReadTxn().getTxn() - tableIter := txn.rootReadTxn.Root().Iterator() - for nameKey, table, ok := tableIter.Next(); ok; nameKey, table, ok = tableIter.Next() { - tableName := string(nameKey) + for _, table := range txn.root { + tableName := table.meta.Name() start := time.Now() // Find the low watermark @@ -64,7 +63,7 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat // Find objects to be deleted by iterating over the graveyard revision index up // to the low watermark. - indexTree := txn.mustIndexReadTxn(tableName, GraveyardRevisionIndex) + indexTree := txn.mustIndexReadTxn(table.meta, GraveyardRevisionIndexPos) objIter := indexTree.Root().Iterator() for key, obj, ok := objIter.Next(); ok; key, obj, ok = objIter.Next() { @@ -93,12 +92,12 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat tableName := meta.Name() start := time.Now() for _, key := range deadObjs { - oldObj, existed := txn.mustIndexWriteTxn(tableName, GraveyardRevisionIndex).Delete(key) + oldObj, existed := txn.mustIndexWriteTxn(meta, GraveyardRevisionIndexPos).Delete(key) if existed { // The dead object still existed (and wasn't replaced by a create->delete), // delete it from the primary index. key = meta.primary().fromObject(oldObj).First() - txn.mustIndexWriteTxn(tableName, GraveyardIndex).Delete(key) + txn.mustIndexWriteTxn(meta, GraveyardIndexPos).Delete(key) } } cleaningTimes[tableName] = time.Since(start) @@ -114,8 +113,8 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat // Update object count metrics. txn = db.ReadTxn().getTxn() - tableIter = txn.rootReadTxn.Root().Iterator() - for name, table, ok := tableIter.Next(); ok; name, table, ok = tableIter.Next() { + for _, table := range txn.root { + name := table.meta.Name() db.metrics.GraveyardObjectCount(string(name), table.numDeletedObjects()) db.metrics.ObjectCount(string(name), table.numObjects()) } @@ -126,12 +125,8 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat // Used in tests. func (db *DB) graveyardIsEmpty() bool { txn := db.ReadTxn().getTxn() - tableIter := txn.rootReadTxn.Root().Iterator() - for _, table, ok := tableIter.Next(); ok; _, table, ok = tableIter.Next() { - indexEntry, ok := table.indexes.Get([]byte(GraveyardIndex)) - if !ok { - panic("BUG: GraveyardIndex not found from table") - } + for _, table := range txn.root { + indexEntry := table.indexes[table.meta.indexPos(GraveyardIndex)] if indexEntry.tree.Len() != 0 { return false } diff --git a/metrics.go b/metrics.go index 9aca528..e723502 100644 --- a/metrics.go +++ b/metrics.go @@ -8,9 +8,9 @@ import ( ) type Metrics interface { - WriteTxnTableAcquisition(tableName string, acquire time.Duration) - WriteTxnTotalAcquisition(goPackage string, tables []string, acquire time.Duration) - WriteTxnDuration(goPackage string, s []string, acquire time.Duration) + WriteTxnTableAcquisition(handle string, tableName string, acquire time.Duration) + WriteTxnTotalAcquisition(handle string, tables []string, acquire time.Duration) + WriteTxnDuration(handle string, tables []string, acquire time.Duration) GraveyardLowWatermark(tableName string, lowWatermark Revision) GraveyardCleaningDuration(tableName string, duration time.Duration) @@ -121,16 +121,56 @@ func (m *ExpVarMetrics) ObjectCount(name string, numObjects int) { m.ObjectCountVar.Set(name, &intVar) } -func (m *ExpVarMetrics) WriteTxnDuration(goPackage string, s []string, acquire time.Duration) { - m.WriteTxnDurationVar.AddFloat(goPackage, acquire.Seconds()) +func (m *ExpVarMetrics) WriteTxnDuration(handle string, tables []string, acquire time.Duration) { + m.WriteTxnDurationVar.AddFloat(handle+"/"+strings.Join(tables, "+"), acquire.Seconds()) } -func (m *ExpVarMetrics) WriteTxnTotalAcquisition(goPackage string, tables []string, acquire time.Duration) { - m.WriteTxnAcquisitionVar.AddFloat(goPackage, acquire.Seconds()) +func (m *ExpVarMetrics) WriteTxnTotalAcquisition(handle string, tables []string, acquire time.Duration) { + m.WriteTxnAcquisitionVar.AddFloat(handle+"/"+strings.Join(tables, "+"), acquire.Seconds()) } -func (m *ExpVarMetrics) WriteTxnTableAcquisition(name string, acquire time.Duration) { - m.LockContentionVar.AddFloat(name, acquire.Seconds()) +func (m *ExpVarMetrics) WriteTxnTableAcquisition(handle string, tableName string, acquire time.Duration) { + m.LockContentionVar.AddFloat(handle+"/"+tableName, acquire.Seconds()) } var _ Metrics = &ExpVarMetrics{} + +type NopMetrics struct{} + +// DeleteTrackerCount implements Metrics. +func (*NopMetrics) DeleteTrackerCount(tableName string, numTrackers int) { +} + +// GraveyardCleaningDuration implements Metrics. +func (*NopMetrics) GraveyardCleaningDuration(tableName string, duration time.Duration) { +} + +// GraveyardLowWatermark implements Metrics. +func (*NopMetrics) GraveyardLowWatermark(tableName string, lowWatermark uint64) { +} + +// GraveyardObjectCount implements Metrics. +func (*NopMetrics) GraveyardObjectCount(tableName string, numDeletedObjects int) { +} + +// ObjectCount implements Metrics. +func (*NopMetrics) ObjectCount(tableName string, numObjects int) { +} + +// Revision implements Metrics. +func (*NopMetrics) Revision(tableName string, revision uint64) { +} + +// WriteTxnDuration implements Metrics. +func (*NopMetrics) WriteTxnDuration(handle string, tables []string, acquire time.Duration) { +} + +// WriteTxnTableAcquisition implements Metrics. +func (*NopMetrics) WriteTxnTableAcquisition(handle string, tableName string, acquire time.Duration) { +} + +// WriteTxnTotalAcquisition implements Metrics. +func (*NopMetrics) WriteTxnTotalAcquisition(handle string, tables []string, acquire time.Duration) { +} + +var _ Metrics = &NopMetrics{} diff --git a/table.go b/table.go index 121f138..39264fe 100644 --- a/table.go +++ b/table.go @@ -10,6 +10,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "github.com/cilium/statedb/internal" + iradix "github.com/hashicorp/go-immutable-radix/v2" "github.com/cilium/statedb/index" ) @@ -46,10 +47,24 @@ func NewTable[Obj any]( primaryAnyIndexer: toAnyIndexer(primaryIndexer), primaryIndexer: primaryIndexer, secondaryAnyIndexers: make(map[string]anyIndexer, len(secondaryIndexers)), + indexPositions: make(map[string]int), } + table.indexPositions[primaryIndexer.indexName()] = PrimaryIndexPos + + // Internal indexes + table.indexPositions[RevisionIndex] = RevisionIndexPos + table.indexPositions[GraveyardIndex] = GraveyardIndexPos + table.indexPositions[GraveyardRevisionIndex] = GraveyardRevisionIndexPos + + indexPos := SecondaryIndexStartPos for _, indexer := range secondaryIndexers { - table.secondaryAnyIndexers[indexer.indexName()] = toAnyIndexer(indexer) + name := indexer.indexName() + anyIndexer := toAnyIndexer(indexer) + anyIndexer.pos = indexPos + table.secondaryAnyIndexers[name] = anyIndexer + table.indexPositions[name] = indexPos + indexPos++ } // Primary index must always be unique @@ -88,17 +103,50 @@ func MustNewTable[Obj any]( } type genTable[Obj any] struct { + pos int table TableName smu internal.SortableMutex primaryIndexer Indexer[Obj] primaryAnyIndexer anyIndexer secondaryAnyIndexers map[string]anyIndexer + indexPositions map[string]int +} + +func (t *genTable[Obj]) tableEntry() tableEntry { + var entry tableEntry + entry.meta = t + entry.deleteTrackers = iradix.New[deleteTracker]() + entry.indexes = make([]indexEntry, len(t.indexPositions)) + entry.indexes[t.indexPositions[t.primaryIndexer.indexName()]] = indexEntry{iradix.New[object](), nil, true} + + for index, indexer := range t.secondaryAnyIndexers { + entry.indexes[t.indexPositions[index]] = indexEntry{iradix.New[object](), nil, indexer.unique} + } + entry.indexes[t.indexPositions[RevisionIndex]] = indexEntry{iradix.New[object](), nil, true} + entry.indexes[t.indexPositions[GraveyardIndex]] = indexEntry{iradix.New[object](), nil, true} + entry.indexes[t.indexPositions[GraveyardRevisionIndex]] = indexEntry{iradix.New[object](), nil, true} + return entry +} + +func (t *genTable[Obj]) setTablePos(pos int) { + t.pos = pos +} + +func (t *genTable[Obj]) tablePos() int { + return t.pos } func (t *genTable[Obj]) tableKey() []byte { return []byte(t.table) } +func (t *genTable[Obj]) indexPos(name string) int { + if t.primaryAnyIndexer.name == name { + return PrimaryIndexPos + } + return t.indexPositions[name] +} + func (t *genTable[Obj]) PrimaryIndexer() Indexer[Obj] { return t.primaryIndexer } @@ -120,12 +168,12 @@ func (t *genTable[Obj]) ToTable() Table[Obj] { } func (t *genTable[Obj]) Revision(txn ReadTxn) Revision { - return txn.getTxn().GetRevision(t.table) + return txn.getTxn().getRevision(t) } func (t *genTable[Obj]) NumObjects(txn ReadTxn) int { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, t.primaryAnyIndexer.name) - return indexTxn.entry.tree.Len() + table := &txn.getTxn().root[t.tablePos()] + return table.indexes[PrimaryIndexPos].tree.Len() } func (t *genTable[Obj]) First(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint64, ok bool) { @@ -134,11 +182,22 @@ func (t *genTable[Obj]) First(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint } func (t *genTable[Obj]) FirstWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint64, watch <-chan struct{}, ok bool) { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, t.indexPos(q.index)) + var iobj object + if indexTxn.unique { + // On a unique index we can do a direct get rather than a prefix search. + watch, iobj, ok = indexTxn.Root().GetWatch(q.key) + if !ok { + return + } + obj = iobj.data.(Obj) + revision = iobj.revision + return + } + + // For a non-unique index we need to do a prefix search. iter := indexTxn.Root().Iterator() watch = iter.SeekPrefixWatch(q.key) - - var iobj object for { var key []byte key, iobj, ok = iter.Next() @@ -147,14 +206,8 @@ func (t *genTable[Obj]) FirstWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision } // Check that we have a full match on the key - var match bool - if indexTxn.entry.unique { - match = len(key) == len(q.key) - } else { - _, secondary := decodeNonUniqueKey(key) - match = len(secondary) == len(q.key) - } - if match { + _, secondary := decodeNonUniqueKey(key) + if len(secondary) == len(q.key) { break } } @@ -172,11 +225,21 @@ func (t *genTable[Obj]) Last(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint6 } func (t *genTable[Obj]) LastWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint64, watch <-chan struct{}, ok bool) { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, t.indexPos(q.index)) + var iobj object + if indexTxn.unique { + // On a unique index we can do a direct get rather than a prefix search. + watch, iobj, ok = indexTxn.Root().GetWatch(q.key) + if !ok { + return + } + obj = iobj.data.(Obj) + revision = iobj.revision + return + } + iter := indexTxn.Root().ReverseIterator() watch = iter.SeekPrefixWatch(q.key) - - var iobj object for { var key []byte key, iobj, ok = iter.Previous() @@ -185,14 +248,8 @@ func (t *genTable[Obj]) LastWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision } // Check that we have a full match on the key - var match bool - if indexTxn.entry.unique { - match = len(key) == len(q.key) - } else { - _, secondary := decodeNonUniqueKey(key) - match = len(secondary) == len(q.key) - } - if match { + _, secondary := decodeNonUniqueKey(key) + if len(secondary) == len(q.key) { break } } @@ -205,7 +262,7 @@ func (t *genTable[Obj]) LastWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision } func (t *genTable[Obj]) LowerBound(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <-chan struct{}) { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, t.indexPos(q.index)) root := indexTxn.Root() // Since LowerBound query may be invalidated by changes in another branch @@ -218,7 +275,7 @@ func (t *genTable[Obj]) LowerBound(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <- } func (t *genTable[Obj]) All(txn ReadTxn) (Iterator[Obj], <-chan struct{}) { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, t.primaryAnyIndexer.name) + indexTxn := txn.getTxn().mustIndexReadTxn(t, PrimaryIndexPos) root := indexTxn.Root() // Grab the watch channel for the root node watchCh, _, _ := root.GetWatch(nil) @@ -226,11 +283,11 @@ func (t *genTable[Obj]) All(txn ReadTxn) (Iterator[Obj], <-chan struct{}) { } func (t *genTable[Obj]) Get(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <-chan struct{}) { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, t.indexPos(q.index)) iter := indexTxn.Root().Iterator() watchCh := iter.SeekPrefixWatch(q.key) - if indexTxn.entry.unique { + if indexTxn.unique { return &uniqueIterator[Obj]{iter, q.key}, watchCh } return &nonUniqueIterator[Obj]{iter, q.key}, watchCh diff --git a/txn.go b/txn.go index 8f06ef3..23cf151 100644 --- a/txn.go +++ b/txn.go @@ -11,10 +11,10 @@ import ( "io" "reflect" "runtime" + "slices" "time" iradix "github.com/hashicorp/go-immutable-radix/v2" - "golang.org/x/exp/maps" "github.com/cilium/statedb/index" "github.com/cilium/statedb/internal" @@ -22,29 +22,32 @@ import ( type txn struct { db *DB - rootReadTxn *iradix.Txn[tableEntry] // read transaction onto the tree of tables - writeTxns map[tableIndex]indexTxn // opened per-index write transactions - modifiedTables map[TableName]*tableEntry // table entries being modified - smus internal.SortableMutexes // the (sorted) table locks - acquiredAt time.Time // the time at which the transaction acquired the locks - packageName string // name of the package that created the transaction + handle string + root dbRoot + modifiedTables []*tableEntry // table entries being modified + smus internal.SortableMutexes // the (sorted) table locks + acquiredAt time.Time // the time at which the transaction acquired the locks + tableNames []string } type tableIndex struct { - table TableName - index IndexName + tablePos int + index IndexName } -type indexTxn struct { - *iradix.Txn[object] - entry indexEntry +// rooter is implemented by both iradix.Txn and iradix.Tree. +type rooter interface { + Root() *iradix.Node[object] } -func (i indexTxn) Clone() indexTxn { - return indexTxn{ - i.Txn.Clone(), - i.entry, - } +type indexReadTxn struct { + rooter + unique bool +} + +type indexTxn struct { + *iradix.Txn[object] + unique bool } var zeroTxn = txn{} @@ -58,79 +61,59 @@ func (txn *txn) getTxn() *txn { // has been Aborted or Committed. This is a safeguard against forgetting to // Abort/Commit which would cause the table to be locked forever. func txnFinalizer(txn *txn) { - if txn.writeTxns != nil { - panic(fmt.Sprintf("WriteTxn from package %q against tables %v was never Abort()'d or Commit()'d", txn.packageName, maps.Keys(txn.modifiedTables))) + if txn.db != nil { + panic(fmt.Sprintf("WriteTxn from handle %s against tables %v was never Abort()'d or Commit()'d", txn.handle, txn.tableNames)) } } -func (txn *txn) GetRevision(name TableName) Revision { - if table, ok := txn.modifiedTables[name]; ok { - // This is a write transaction preparing to modify the table with a - // new revision. - return table.revision - } - - // This is either a read transaction, or a write transaction to tables - // other than this table. Look up the revision from the index. - table, ok := txn.rootReadTxn.Get([]byte(name)) - if !ok { - panic("BUG: Table " + name + " not found") +func (txn *txn) getRevision(meta TableMeta) Revision { + if txn.modifiedTables != nil { + entry := txn.modifiedTables[meta.tablePos()] + if entry != nil { + return entry.revision + } } - return table.revision + return txn.root[meta.tablePos()].revision } // indexReadTxn returns a transaction to read from the specific index. // If the table or index is not found this returns nil & error. -func (txn *txn) indexReadTxn(name TableName, index IndexName) (indexTxn, error) { - if txn.writeTxns != nil { - if _, ok := txn.modifiedTables[name]; ok { - itxn, err := txn.indexWriteTxn(name, index) - if err == nil { - return itxn.Clone(), nil +func (txn *txn) indexReadTxn(meta TableMeta, indexPos int) (indexReadTxn, error) { + if txn.modifiedTables != nil { + entry := txn.modifiedTables[meta.tablePos()] + if entry != nil { + itxn, err := txn.indexWriteTxn(meta, indexPos) + if err != nil { + return indexReadTxn{}, err } - return indexTxn{}, err + // Since iradix reuses nodes when mutating we need to return a clone + // so that iterators don't become invalid. + return indexReadTxn{itxn.Txn.Clone(), itxn.unique}, nil } } - - table, ok := txn.rootReadTxn.Get([]byte(name)) - if !ok { - return indexTxn{}, fmt.Errorf("table %q not found", name) - } - indexEntry, ok := table.indexes.Get([]byte(index)) - if !ok { - return indexTxn{}, fmt.Errorf("index %q not found from table %q", index, name) - } - - return indexTxn{ - indexEntry.tree.Txn(), - indexEntry}, nil + indexEntry := txn.root[meta.tablePos()].indexes[indexPos] + return indexReadTxn{indexEntry.tree, indexEntry.unique}, nil } // indexWriteTxn returns a transaction to read/write to a specific index. // The created transaction is memoized and used for subsequent reads and/or writes. -func (txn *txn) indexWriteTxn(name TableName, index IndexName) (indexTxn, error) { - if indexTreeTxn, ok := txn.writeTxns[tableIndex{name, index}]; ok { - return indexTreeTxn, nil - } - table, ok := txn.modifiedTables[name] - if !ok { - return indexTxn{}, fmt.Errorf("table %q not found", name) - } - indexEntry, ok := table.indexes.Get([]byte(index)) - if !ok { - return indexTxn{}, fmt.Errorf("index %q not found from table %q", index, name) - } - itxn := indexEntry.tree.Txn() - itxn.TrackMutate(true) - indexWriteTxn := indexTxn{itxn, indexEntry} - txn.writeTxns[tableIndex{name, index}] = indexWriteTxn - return indexWriteTxn, nil +func (txn *txn) indexWriteTxn(meta TableMeta, indexPos int) (indexTxn, error) { + table := txn.modifiedTables[meta.tablePos()] + if table == nil { + return indexTxn{}, tableError(meta.Name(), ErrTableNotLockedForWriting) + } + indexEntry := &table.indexes[indexPos] + if indexEntry.txn == nil { + indexEntry.txn = indexEntry.tree.Txn() + indexEntry.txn.TrackMutate(true) + } + return indexTxn{indexEntry.txn, indexEntry.unique}, nil } // mustIndexReadTxn returns a transaction to read from the specific index. // Panics if table or index are not found. -func (txn *txn) mustIndexReadTxn(name TableName, index IndexName) indexTxn { - indexTxn, err := txn.indexReadTxn(name, index) +func (txn *txn) mustIndexReadTxn(meta TableMeta, indexPos int) indexReadTxn { + indexTxn, err := txn.indexReadTxn(meta, indexPos) if err != nil { panic(err) } @@ -139,8 +122,8 @@ func (txn *txn) mustIndexReadTxn(name TableName, index IndexName) indexTxn { // mustIndexReadTxn returns a transaction to read or write from the specific index. // Panics if table or index not found. -func (txn *txn) mustIndexWriteTxn(name TableName, index IndexName) indexTxn { - indexTxn, err := txn.indexWriteTxn(name, index) +func (txn *txn) mustIndexWriteTxn(meta TableMeta, indexPos int) indexTxn { + indexTxn, err := txn.indexWriteTxn(meta, indexPos) if err != nil { panic(err) } @@ -148,14 +131,14 @@ func (txn *txn) mustIndexWriteTxn(name TableName, index IndexName) indexTxn { } func (txn *txn) Insert(meta TableMeta, guardRevision Revision, data any) (object, bool, error) { - if txn.rootReadTxn == nil { + if txn.db == nil { return object{}, false, ErrTransactionClosed } // Look up table and allocate a new revision. tableName := meta.Name() - table, ok := txn.modifiedTables[tableName] - if !ok { + table := txn.modifiedTables[meta.tablePos()] + if table == nil { return object{}, false, tableError(tableName, ErrTableNotLockedForWriting) } oldRevision := table.revision @@ -169,7 +152,7 @@ func (txn *txn) Insert(meta TableMeta, guardRevision Revision, data any) (object // Update the primary index first idKey := meta.primary().fromObject(obj).First() - idIndexTxn := txn.mustIndexWriteTxn(tableName, meta.primary().name) + idIndexTxn := txn.mustIndexWriteTxn(meta, PrimaryIndexPos) oldObj, oldExists := idIndexTxn.Insert(idKey, obj) // Sanity check: is the same object being inserted back and thus the @@ -206,27 +189,30 @@ func (txn *txn) Insert(meta TableMeta, guardRevision Revision, data any) (object } // Update revision index - revIndexTxn := txn.mustIndexWriteTxn(tableName, RevisionIndex) + revIndexTxn := txn.mustIndexWriteTxn(meta, RevisionIndexPos) if oldExists { - _, ok := revIndexTxn.Delete([]byte(index.Uint64(oldObj.revision))) + var revKey [8]byte + binary.BigEndian.PutUint64(revKey[:], oldObj.revision) + _, ok := revIndexTxn.Delete(revKey[:]) if !ok { panic("BUG: Old revision index entry not found") } - } - revIndexTxn.Insert([]byte(index.Uint64(revision)), obj) + var revKey [8]byte + binary.BigEndian.PutUint64(revKey[:], obj.revision) + revIndexTxn.Insert(revKey[:], obj) // If it's new, possibly remove an older deleted object with the same // primary key from the graveyard. - if !oldExists && txn.hasDeleteTrackers(tableName) { - if old, existed := txn.mustIndexWriteTxn(tableName, GraveyardIndex).Delete(idKey); existed { - txn.mustIndexWriteTxn(tableName, GraveyardRevisionIndex).Delete([]byte(index.Uint64(old.revision))) + if !oldExists && txn.hasDeleteTrackers(meta) { + if old, existed := txn.mustIndexWriteTxn(meta, GraveyardIndexPos).Delete(idKey); existed { + txn.mustIndexWriteTxn(meta, GraveyardRevisionIndexPos).Delete([]byte(index.Uint64(old.revision))) } } // Then update secondary indexes - for idx, indexer := range meta.secondary() { - indexTxn := txn.mustIndexWriteTxn(tableName, idx) + for _, indexer := range meta.secondary() { + indexTxn := txn.mustIndexWriteTxn(meta, indexer.pos) newKeys := indexer.fromObject(obj) if oldExists { @@ -255,29 +241,19 @@ func (txn *txn) Insert(meta TableMeta, guardRevision Revision, data any) (object return oldObj, oldExists, nil } -func (txn *txn) hasDeleteTrackers(name TableName) bool { - // Table is being modified, return the entry we're mutating, - // so we can read the latest changes. - table, ok := txn.modifiedTables[name] - if !ok { - // Table is not being modified, look it up from the root. - if t, ok := txn.rootReadTxn.Get([]byte(name)); ok { - table = &t - } else { - panic(fmt.Sprintf("BUG: table %q not found", name)) - } +func (txn *txn) hasDeleteTrackers(meta TableMeta) bool { + table := txn.modifiedTables[meta.tablePos()] + if table != nil { + return table.deleteTrackers.Len() > 0 } - return table.deleteTrackers.Len() > 0 + return txn.root[meta.tablePos()].deleteTrackers.Len() > 0 } func (txn *txn) addDeleteTracker(meta TableMeta, trackerName string, dt deleteTracker) error { - if txn.rootReadTxn == nil { + if txn.db == nil { return ErrTransactionClosed } - table, ok := txn.modifiedTables[meta.Name()] - if !ok { - return tableError(meta.Name(), ErrTableNotLockedForWriting) - } + table := txn.modifiedTables[meta.tablePos()] table.deleteTrackers, _, _ = table.deleteTrackers.Insert([]byte(trackerName), dt) txn.db.metrics.DeleteTrackerCount(meta.Name(), table.deleteTrackers.Len()) return nil @@ -285,14 +261,14 @@ func (txn *txn) addDeleteTracker(meta TableMeta, trackerName string, dt deleteTr } func (txn *txn) Delete(meta TableMeta, guardRevision Revision, data any) (object, bool, error) { - if txn.rootReadTxn == nil { + if txn.db == nil { return object{}, false, ErrTransactionClosed } // Look up table and allocate a new revision. tableName := meta.Name() - table, ok := txn.modifiedTables[tableName] - if !ok { + table := txn.modifiedTables[meta.tablePos()] + if table == nil { return object{}, false, tableError(tableName, ErrTableNotLockedForWriting) } oldRevision := table.revision @@ -303,7 +279,7 @@ func (txn *txn) Delete(meta TableMeta, guardRevision Revision, data any) (object // We assume that "data" has only enough defined fields to // compute the primary key. idKey := meta.primary().fromObject(object{data: data}).First() - idIndexTree := txn.mustIndexWriteTxn(tableName, meta.primary().name) + idIndexTree := txn.mustIndexWriteTxn(meta, PrimaryIndexPos) obj, existed := idIndexTree.Delete(idKey) if !existed { return object{}, false, nil @@ -320,29 +296,31 @@ func (txn *txn) Delete(meta TableMeta, guardRevision Revision, data any) (object } // Update revision index. - indexTree := txn.mustIndexWriteTxn(tableName, RevisionIndex) - if _, ok := indexTree.Delete(index.Uint64(obj.revision)); !ok { + indexTree := txn.mustIndexWriteTxn(meta, RevisionIndexPos) + var revKey [8]byte + binary.BigEndian.PutUint64(revKey[:], obj.revision) + if _, ok := indexTree.Delete(index.Key(revKey[:])); !ok { panic("BUG: Object to be deleted not found from revision index") } // Then update secondary indexes. - for idx, indexer := range meta.secondary() { + for _, indexer := range meta.secondary() { indexer.fromObject(obj).Foreach(func(key index.Key) { if !indexer.unique { key = encodeNonUniqueKey(idKey, key) } - txn.mustIndexWriteTxn(tableName, idx).Delete(key) + txn.mustIndexWriteTxn(meta, indexer.pos).Delete(key) }) } // And finally insert the object into the graveyard. - if txn.hasDeleteTrackers(tableName) { - graveyardIndex := txn.mustIndexWriteTxn(tableName, GraveyardIndex) + if txn.hasDeleteTrackers(meta) { + graveyardIndex := txn.mustIndexWriteTxn(meta, GraveyardIndexPos) obj.revision = revision if _, existed := graveyardIndex.Insert(idKey, obj); existed { panic("BUG: Double deletion! Deleted object already existed in graveyard") } - txn.mustIndexWriteTxn(tableName, GraveyardRevisionIndex).Insert(index.Uint64(revision), obj) + txn.mustIndexWriteTxn(meta, GraveyardRevisionIndexPos).Insert(index.Uint64(revision), obj) } return obj, true, nil @@ -392,14 +370,14 @@ func (txn *txn) Abort() { // // txn.Commit() // - if txn.writeTxns == nil { + if txn.db == nil { return } txn.smus.Unlock() txn.db.metrics.WriteTxnDuration( - txn.packageName, - maps.Keys(txn.modifiedTables), + txn.handle, + txn.tableNames, time.Since(txn.acquiredAt)) *txn = zeroTxn @@ -426,9 +404,9 @@ func (txn *txn) Commit() { // a reader can acquire an immutable snapshot of all data in the database with a // simpler atomic pointer load. - // If writeTxns is nil, this transaction has already been committed or aborted, and + // If db is nil, this transaction has already been committed or aborted, and // thus there is nothing to do. - if txn.writeTxns == nil { + if txn.db == nil { return } @@ -437,19 +415,25 @@ func (txn *txn) Commit() { // Commit each individual changed index to each table. // We don't notify yet (CommitOnly) as the root needs to be updated // first as otherwise readers would wake up too early. - for tableIndex, subTxn := range txn.writeTxns { - table, ok := txn.modifiedTables[tableIndex.table] - if !ok { - panic("BUG: Table " + tableIndex.table + " in writeTxns, but not in modifiedTables") + txnToNotify := []*iradix.Txn[object]{} + for _, table := range txn.modifiedTables { + if table == nil { + continue + } + for i := range table.indexes { + txn := table.indexes[i].txn + if txn != nil { + table.indexes[i].tree = txn.CommitOnly() + table.indexes[i].txn = nil + txnToNotify = append(txnToNotify, txn) + } } - subTxn.entry.tree = subTxn.CommitOnly() - table.indexes, _, _ = - table.indexes.Insert([]byte(tableIndex.index), subTxn.entry) // Update metrics - db.metrics.GraveyardObjectCount(tableIndex.table, table.numDeletedObjects()) - db.metrics.ObjectCount(tableIndex.table, table.numObjects()) - db.metrics.Revision(tableIndex.table, table.revision) + name := table.meta.Name() + db.metrics.GraveyardObjectCount(name, table.numDeletedObjects()) + db.metrics.ObjectCount(name, table.numObjects()) + db.metrics.Revision(name, table.revision) } // Acquire the lock on the root tree to sequence the updates to it. We can acquire @@ -460,17 +444,19 @@ func (txn *txn) Commit() { // Since the root may have changed since the pointer was last read in WriteTxn(), // load it again and modify the latest version that we now have immobilised by // the root lock. - rootTxn := db.root.Load().Txn() + root := *db.root.Load() + root = slices.Clone(root) // Insert the modified tables into the root tree of tables. - for name, table := range txn.modifiedTables { - rootTxn.Insert([]byte(name), *table) + for pos, table := range txn.modifiedTables { + if table != nil { + root[pos] = *table + } } // Commit the transaction to build the new root tree and then // atomically store it. - newRoot := rootTxn.CommitOnly() - db.root.Store(newRoot) + db.root.Store(&root) db.mu.Unlock() // With the root pointer updated, we can now release the tables for the next write transaction. @@ -478,14 +464,13 @@ func (txn *txn) Commit() { // Now that new root is committed, we can notify readers by closing the watch channels of // mutated radix tree nodes in all changed indexes and on the root itself. - for _, subTxn := range txn.writeTxns { - subTxn.Notify() + for _, txn := range txnToNotify { + txn.Notify() } - rootTxn.Notify() txn.db.metrics.WriteTxnDuration( - txn.packageName, - maps.Keys(txn.modifiedTables), + txn.handle, + txn.tableNames, time.Since(txn.acquiredAt)) // Zero out the transaction to make it inert. @@ -497,18 +482,18 @@ func (txn *txn) WriteJSON(w io.Writer) error { buf := bufio.NewWriter(w) buf.WriteString("{\n") first := true - for _, table := range txn.db.tables { + for _, table := range txn.root { if !first { buf.WriteString(",\n") } else { first = false } - indexTxn := txn.getTxn().mustIndexReadTxn(table.Name(), table.primary().name) + indexTxn := txn.getTxn().mustIndexReadTxn(table.meta, PrimaryIndexPos) root := indexTxn.Root() iter := root.Iterator() - buf.WriteString(" \"" + table.Name() + "\": [\n") + buf.WriteString(" \"" + table.meta.Name() + "\": [\n") _, obj, ok := iter.Next() for ok { diff --git a/types.go b/types.go index 9fde16a..2225015 100644 --- a/types.go +++ b/types.go @@ -149,7 +149,11 @@ type RWTable[Obj any] interface { // TableMeta provides information about the table that is independent of // the object type (the 'Obj' constraint). type TableMeta interface { - Name() TableName // The name of the table + Name() TableName // The name of the table + tableEntry() tableEntry + tablePos() int + setTablePos(int) + indexPos(string) int tableKey() []byte // The radix key for the table in the root tree primary() anyIndexer // The untyped primary indexer for the table secondary() map[string]anyIndexer // Secondary indexers (if any) @@ -277,10 +281,17 @@ type TableWritable interface { // const ( - reservedIndexPrefix = "__" - RevisionIndex = "__revision__" - GraveyardIndex = "__graveyard__" - GraveyardRevisionIndex = "__graveyard_revision__" + PrimaryIndexPos = 0 + + reservedIndexPrefix = "__" + RevisionIndex = "__revision__" + RevisionIndexPos = 1 + GraveyardIndex = "__graveyard__" + GraveyardIndexPos = 2 + GraveyardRevisionIndex = "__graveyard_revision__" + GraveyardRevisionIndexPos = 3 + + SecondaryIndexStartPos = 4 ) // object is the format in which data is stored in the tables. @@ -303,6 +314,9 @@ type anyIndexer struct { // values returned by fromObject. If false the primary // key of the object will be appended to the key. unique bool + + // pos is the position of the index in [tableEntry.indexes] + pos int } type deleteTracker interface { @@ -312,28 +326,23 @@ type deleteTracker interface { type indexEntry struct { tree *iradix.Tree[object] + txn *iradix.Txn[object] unique bool } type tableEntry struct { meta TableMeta - indexes *iradix.Tree[indexEntry] + indexes []indexEntry deleteTrackers *iradix.Tree[deleteTracker] revision uint64 } func (t *tableEntry) numObjects() int { - indexEntry, ok := t.indexes.Get([]byte(RevisionIndex)) - if ok { - return indexEntry.tree.Len() - } - return 0 + indexEntry := t.indexes[t.meta.indexPos(RevisionIndex)] + return indexEntry.tree.Len() } func (t *tableEntry) numDeletedObjects() int { - indexEntry, ok := t.indexes.Get([]byte(GraveyardIndex)) - if ok { - return indexEntry.tree.Len() - } - return 0 + indexEntry := t.indexes[t.meta.indexPos(GraveyardIndex)] + return indexEntry.tree.Len() }