From 1cf3205de7faaed1ec6798b4b3b1cc6dc2aa15ca Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Mon, 25 Mar 2024 13:35:17 +0100 Subject: [PATCH 1/5] Add NopMetrics No-op metrics implementation for benchmarking etc. Signed-off-by: Jussi Maki --- db_test.go | 12 ++++++++---- metrics.go | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/db_test.go b/db_test.go index 77a470f..01143e5 100644 --- a/db_test.go +++ b/db_test.go @@ -69,6 +69,12 @@ var logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ })) func newTestDB(t testing.TB, secondaryIndexers ...Indexer[testObject]) (*DB, RWTable[testObject], *ExpVarMetrics) { + metrics := NewExpVarMetrics(false) + db, table := newTestDBWithMetrics(t, metrics, secondaryIndexers...) + return db, table, metrics +} + +func newTestDBWithMetrics(t testing.TB, metrics Metrics, secondaryIndexers ...Indexer[testObject]) (*DB, RWTable[testObject]) { var ( db *DB ) @@ -79,8 +85,6 @@ func newTestDB(t testing.TB, secondaryIndexers ...Indexer[testObject]) (*DB, RWT ) require.NoError(t, err, "NewTable[testObject]") - metrics := NewExpVarMetrics(false) - h := hive.NewWithOptions( hive.Options{Logger: logger}, @@ -100,7 +104,7 @@ func newTestDB(t testing.TB, secondaryIndexers ...Indexer[testObject]) (*DB, RWT t.Cleanup(func() { assert.NoError(t, h.Stop(context.TODO())) }) - return db, table, metrics + return db, table } func TestDB_Insert_SamePointer(t *testing.T) { @@ -136,7 +140,7 @@ func TestDB_Insert_SamePointer(t *testing.T) { func TestDB_LowerBound_ByRevision(t *testing.T) { t.Parallel() - db, table, _ := newTestDB(t, tagsIndex) + db, table := newTestDBWithMetrics(t, &NopMetrics{}, tagsIndex) { txn := db.WriteTxn(table) diff --git a/metrics.go b/metrics.go index 9aca528..0af0d29 100644 --- a/metrics.go +++ b/metrics.go @@ -134,3 +134,43 @@ func (m *ExpVarMetrics) WriteTxnTableAcquisition(name string, acquire time.Durat } var _ Metrics = &ExpVarMetrics{} + +type NopMetrics struct{} + +// DeleteTrackerCount implements Metrics. +func (*NopMetrics) DeleteTrackerCount(tableName string, numTrackers int) { +} + +// GraveyardCleaningDuration implements Metrics. +func (*NopMetrics) GraveyardCleaningDuration(tableName string, duration time.Duration) { +} + +// GraveyardLowWatermark implements Metrics. +func (*NopMetrics) GraveyardLowWatermark(tableName string, lowWatermark uint64) { +} + +// GraveyardObjectCount implements Metrics. +func (*NopMetrics) GraveyardObjectCount(tableName string, numDeletedObjects int) { +} + +// ObjectCount implements Metrics. +func (*NopMetrics) ObjectCount(tableName string, numObjects int) { +} + +// Revision implements Metrics. +func (*NopMetrics) Revision(tableName string, revision uint64) { +} + +// WriteTxnDuration implements Metrics. +func (*NopMetrics) WriteTxnDuration(goPackage string, s []string, acquire time.Duration) { +} + +// WriteTxnTableAcquisition implements Metrics. +func (*NopMetrics) WriteTxnTableAcquisition(tableName string, acquire time.Duration) { +} + +// WriteTxnTotalAcquisition implements Metrics. +func (*NopMetrics) WriteTxnTotalAcquisition(goPackage string, tables []string, acquire time.Duration) { +} + +var _ Metrics = &NopMetrics{} From 0c84efc15e5649ef49d10daa24bc6ff95421934c Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Mon, 18 Mar 2024 09:39:45 +0100 Subject: [PATCH 2/5] Improve benchmarks - Do not use require.Equal etc. in tight loop as it's slow - Use a constant data set size to avoid using way way too much memory - Use nop metrics - Add RandomReplace benchmark and make RandomInsert always insert new objects. Signed-off-by: Jussi Maki --- benchmarks_test.go | 375 ++++++++++++++++++++++++++++++--------------- 1 file changed, 251 insertions(+), 124 deletions(-) diff --git a/benchmarks_test.go b/benchmarks_test.go index 8da59c5..357ab36 100644 --- a/benchmarks_test.go +++ b/benchmarks_test.go @@ -19,25 +19,32 @@ import ( "github.com/cilium/statedb/index" ) +// Number of objects to insert in tests that do repeated inserts. +const numObjectsToInsert = 1000 + func BenchmarkDB_WriteTxn_1(b *testing.B) { - db, table, _ := newTestDB(b) + db, table := newTestDBWithMetrics(b, &NopMetrics{}) for i := 0; i < b.N; i++ { txn := db.WriteTxn(table) _, _, err := table.Insert(txn, testObject{ID: 123, Tags: nil}) - require.NoError(b, err) + if err != nil { + b.Fatalf("Insert error: %s", err) + } txn.Commit() } b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_WriteTxn_10(b *testing.B) { - db, table, _ := newTestDB(b) + db, table := newTestDBWithMetrics(b, &NopMetrics{}) n := b.N for n > 0 { txn := db.WriteTxn(table) for j := 0; j < 10; j++ { _, _, err := table.Insert(txn, testObject{ID: uint64(j), Tags: nil}) - require.NoError(b, err) + if err != nil { + b.Fatalf("Insert error: %s", err) + } } txn.Commit() n -= 10 @@ -45,20 +52,24 @@ func BenchmarkDB_WriteTxn_10(b *testing.B) { txn := db.WriteTxn(table) for j := 0; j < n; j++ { _, _, err := table.Insert(txn, testObject{ID: uint64(j), Tags: nil}) - require.NoError(b, err) + if err != nil { + b.Fatalf("Insert error: %s", err) + } } txn.Commit() b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_WriteTxn_100(b *testing.B) { - db, table, _ := newTestDB(b) + db, table := newTestDBWithMetrics(b, &NopMetrics{}) n := b.N for n > 0 { txn := db.WriteTxn(table) for j := 0; j < 100; j++ { _, _, err := table.Insert(txn, testObject{ID: uint64(j), Tags: nil}) - require.NoError(b, err) + if err != nil { + b.Fatalf("Insert error: %s", err) + } } txn.Commit() n -= 100 @@ -66,21 +77,25 @@ func BenchmarkDB_WriteTxn_100(b *testing.B) { txn := db.WriteTxn(table) for j := 0; j < n; j++ { _, _, err := table.Insert(txn, testObject{ID: uint64(j), Tags: nil}) - require.NoError(b, err) + if err != nil { + b.Fatalf("Insert error: %s", err) + } } txn.Commit() b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_WriteTxn_100_SecondaryIndex(b *testing.B) { - db, table, _ := newTestDB(b, tagsIndex) + db, table := newTestDBWithMetrics(b, &NopMetrics{}) n := b.N tags := []string{"test"} for n > 0 { txn := db.WriteTxn(table) for j := 0; j < 100; j++ { _, _, err := table.Insert(txn, testObject{ID: uint64(j), Tags: tags}) - require.NoError(b, err) + if err != nil { + b.Fatalf("Insert error: %s", err) + } } txn.Commit() n -= 100 @@ -88,168 +103,264 @@ func BenchmarkDB_WriteTxn_100_SecondaryIndex(b *testing.B) { txn := db.WriteTxn(table) for j := 0; j < n; j++ { _, _, err := table.Insert(txn, testObject{ID: uint64(j), Tags: tags}) - require.NoError(b, err) + if err != nil { + b.Fatalf("Insert error: %s", err) + } } txn.Commit() b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_RandomInsert(b *testing.B) { - db, table, _ := newTestDB(b) - + db, table := newTestDBWithMetrics(b, &NopMetrics{}) ids := []uint64{} - for i := 0; i < b.N; i++ { + for i := 0; i < numObjectsToInsert; i++ { ids = append(ids, uint64(i)) } - rand.Shuffle(b.N, func(i, j int) { + rand.Shuffle(numObjectsToInsert, func(i, j int) { ids[i], ids[j] = ids[j], ids[i] }) b.ResetTimer() + + for j := 0; j < b.N; j++ { + txn := db.WriteTxn(table) + for _, id := range ids { + _, _, err := table.Insert(txn, testObject{ID: id, Tags: nil}) + if err != nil { + b.Fatalf("Insert error: %s", err) + } + } + txn.Abort() + } + b.StopTimer() + + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") +} + +// BenchmarkDB_RandomReplace is like BenchmarkDB_RandomInsert, but instead of +// always inserting a new value this test replaces an existing value. +// This mainly shows the cost of the revision index delete and insert. +// +// This also uses a secondary index to make this a more realistic. +func BenchmarkDB_RandomReplace(b *testing.B) { + db, table := newTestDBWithMetrics(b, &NopMetrics{}, tagsIndex) + ids := []uint64{} txn := db.WriteTxn(table) - for _, id := range ids { - _, _, err := table.Insert(txn, testObject{ID: id, Tags: nil}) - require.NoError(b, err) + for i := 0; i < numObjectsToInsert; i++ { + tag := "odd" + if i%2 == 0 { + tag = "even" + } + table.Insert(txn, testObject{ID: uint64(i), Tags: []string{tag}}) + ids = append(ids, uint64(i)) } txn.Commit() + rand.Shuffle(numObjectsToInsert, func(i, j int) { + ids[i], ids[j] = ids[j], ids[i] + }) + b.ResetTimer() + + for j := 0; j < b.N; j++ { + txn := db.WriteTxn(table) + for _, id := range ids { + tag := "odd" + if id%2 == 0 { + tag = "even" + } + _, _, err := table.Insert(txn, testObject{ID: id, Tags: []string{tag}}) + if err != nil { + b.Fatalf("Insert error: %s", err) + } + } + txn.Abort() + } b.StopTimer() - iter, _ := table.All(db.ReadTxn()) - require.Len(b, Collect(iter), b.N) - b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_SequentialInsert(b *testing.B) { - db, table, _ := newTestDB(b) - + db, table := newTestDBWithMetrics(b, &NopMetrics{}) b.ResetTimer() - txn := db.WriteTxn(table) - for id := uint64(0); id < uint64(b.N); id++ { - _, _, err := table.Insert(txn, testObject{ID: id, Tags: nil}) - require.NoError(b, err) + + for j := 0; j < b.N; j++ { + txn := db.WriteTxn(table) + for id := uint64(0); id < uint64(numObjectsToInsert); id++ { + _, _, err := table.Insert(txn, testObject{ID: id, Tags: nil}) + require.NoError(b, err) + } + txn.Commit() } - txn.Commit() b.StopTimer() - iter, _ := table.All(db.ReadTxn()) - require.Len(b, Collect(iter), b.N) - b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") + require.EqualValues(b, table.NumObjects(db.ReadTxn()), numObjectsToInsert) + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_Baseline_SingleRadix_Insert(b *testing.B) { + for i := 0; i < b.N; i++ { + tree := iradix.New[uint64]() + txn := tree.Txn() + for j := uint64(0); j < numObjectsToInsert; j++ { + txn.Insert(index.Uint64(j), j) + } + tree = txn.Commit() + require.Equal(b, tree.Len(), numObjectsToInsert) + } + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") +} + +func BenchmarkDB_Baseline_SingleRadix_TrackMutate_Insert(b *testing.B) { + for i := 0; i < b.N; i++ { + tree := iradix.New[uint64]() + txn := tree.Txn() + txn.TrackMutate(true) // Enable the watch channels + for j := uint64(0); j < numObjectsToInsert; j++ { + txn.Insert(index.Uint64(j), j) + } + tree = txn.Commit() // Commit and notify + require.Equal(b, tree.Len(), numObjectsToInsert) + } + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") +} +func BenchmarkDB_Baseline_SingleRadix_Lookup(b *testing.B) { tree := iradix.New[uint64]() - txn := tree.Txn() - for i := uint64(0); i < uint64(b.N); i++ { - txn.Insert(index.Uint64(i), i) + for j := uint64(0); j < numObjectsToInsert; j++ { + tree, _, _ = tree.Insert(index.Uint64(j), j) } - txn.Commit() - b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") + b.ResetTimer() + for i := 0; i < b.N; i++ { + for j := uint64(0); j < numObjectsToInsert; j++ { + v, ok := tree.Get(index.Uint64(j)) + if v != j || !ok { + b.Fatalf("impossible: %d != %d || %v", v, j, ok) + } + } + + } + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_Baseline_Hashmap_Insert(b *testing.B) { - m := map[uint64]uint64{} - for i := uint64(0); i < uint64(b.N); i++ { - m[i] = i + for i := 0; i < b.N; i++ { + m := map[uint64]uint64{} + for j := uint64(0); j < numObjectsToInsert; j++ { + m[j] = j + } + if len(m) != numObjectsToInsert { + b.Fatalf("%d != %d", len(m), numObjectsToInsert) + } } - b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_Baseline_Hashmap_Lookup(b *testing.B) { m := map[uint64]uint64{} - for i := uint64(0); i < uint64(b.N); i++ { - m[i] = i + for j := uint64(0); j < numObjectsToInsert; j++ { + m[j] = j } b.ResetTimer() - for i := uint64(0); i < uint64(b.N); i++ { - require.Equal(b, m[i], i) + for i := 0; i < b.N; i++ { + for j := uint64(0); j < numObjectsToInsert; j++ { + if m[j] != j { + b.Fatalf("impossible: %d != %d", m[j], j) + } + } } - b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_DeleteTracker_Baseline(b *testing.B) { - db, table, _ := newTestDB(b) - - // Create b.N objects - txn := db.WriteTxn(table) - for i := 0; i < b.N; i++ { - _, _, err := table.Insert(txn, testObject{ID: uint64(i), Tags: nil}) - require.NoError(b, err) - } - txn.Commit() + db, table := newTestDBWithMetrics(b, &NopMetrics{}) b.ResetTimer() + for n := 0; n < b.N; n++ { + txn := db.WriteTxn(table) + for i := uint64(0); i < numObjectsToInsert; i++ { + _, _, err := table.Insert(txn, testObject{ID: uint64(i), Tags: nil}) + if err != nil { + b.Fatalf("Insert: %s", err) + } + } + txn.Commit() - // Start the timer and delete all objects to time - // the baseline without deletion tracking. - txn = db.WriteTxn(table) - table.DeleteAll(txn) - txn.Commit() - b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") + // Delete all objects to time the baseline without deletion tracking. + txn = db.WriteTxn(table) + table.DeleteAll(txn) + txn.Commit() + } + b.ReportMetric(float64(b.N*numObjectsToInsert)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_DeleteTracker(b *testing.B) { - db, table, _ := newTestDB(b) - - // Start tracking deletions from the start - - // Create b.N objects - txn := db.WriteTxn(table) - dt, err := table.DeleteTracker(txn, "test") - require.NoError(b, err) - defer dt.Close() - for i := 0; i < b.N; i++ { - _, _, err := table.Insert(txn, testObject{ID: uint64(i), Tags: nil}) - require.NoError(b, err) - } - txn.Commit() + db, table := newTestDBWithMetrics(b, &NopMetrics{}) b.ResetTimer() + for n := 0; n < b.N; n++ { + // Create objects + txn := db.WriteTxn(table) + dt, err := table.DeleteTracker(txn, "test") + require.NoError(b, err) + defer dt.Close() + for i := 0; i < numObjectsToInsert; i++ { + _, _, err := table.Insert(txn, testObject{ID: uint64(i), Tags: nil}) + if err != nil { + b.Fatalf("Insert: %s", err) + } + } + txn.Commit() - // Start the timer and delete all objects to time the cost for - // deletion tracking. - txn = db.WriteTxn(table) - table.DeleteAll(txn) - txn.Commit() + // Delete all objects to time the cost for deletion tracking. + txn = db.WriteTxn(table) + table.DeleteAll(txn) + txn.Commit() - nDeleted := 0 - dt.Iterate( - db.ReadTxn(), - func(obj testObject, deleted bool, _ Revision) { - nDeleted++ - }) - require.EqualValues(b, nDeleted, b.N) + // Iterate over the deleted objects + nDeleted := 0 + dt.Iterate( + db.ReadTxn(), + func(obj testObject, deleted bool, _ Revision) { + nDeleted++ + }) + require.EqualValues(b, nDeleted, numObjectsToInsert) + dt.Close() + } b.StopTimer() - - b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") eventuallyGraveyardIsEmpty(b, db) + b.ReportMetric(float64(b.N*numObjectsToInsert)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_RandomLookup(b *testing.B) { - db, table, _ := newTestDB(b) + db, table := newTestDBWithMetrics(b, &NopMetrics{}) wtxn := db.WriteTxn(table) - ids := []uint64{} - for i := 0; i < b.N; i++ { - ids = append(ids, uint64(i)) + queries := []Query[testObject]{} + for i := 0; i < numObjectsToInsert; i++ { + queries = append(queries, idIndex.Query(uint64(i))) _, _, err := table.Insert(wtxn, testObject{ID: uint64(i), Tags: nil}) require.NoError(b, err) } wtxn.Commit() - rand.Shuffle(b.N, func(i, j int) { - ids[i], ids[j] = ids[j], ids[i] + rand.Shuffle(numObjectsToInsert, func(i, j int) { + queries[i], queries[j] = queries[j], queries[i] }) b.ResetTimer() - txn := db.ReadTxn() - for _, id := range ids { - _, _, ok := table.First(txn, idIndex.Query(id)) - require.True(b, ok) + for j := 0; j < b.N; j++ { + txn := db.ReadTxn() + for _, q := range queries { + _, _, ok := table.First(txn, q) + if !ok { + b.Fatal("object not found") + } + } } - b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_SequentialLookup(b *testing.B) { - db, table, _ := newTestDB(b) + db, table := newTestDBWithMetrics(b, &NopMetrics{}) wtxn := db.WriteTxn(table) ids := []uint64{} - for i := 0; i < b.N; i++ { + for i := 0; i < numObjectsToInsert; i++ { ids = append(ids, uint64(i)) _, _, err := table.Insert(wtxn, testObject{ID: uint64(i), Tags: nil}) require.NoError(b, err) @@ -258,52 +369,68 @@ func BenchmarkDB_SequentialLookup(b *testing.B) { b.ResetTimer() txn := db.ReadTxn() - for _, id := range ids { - obj, _, ok := table.First(txn, idIndex.Query(id)) - require.True(b, ok) - require.Equal(b, obj.ID, id) + for n := 0; n < b.N; n++ { + for _, id := range ids { + obj, _, ok := table.First(txn, idIndex.Query(id)) + if !ok { + b.Fatalf("Object not found") + } + if obj.ID != id { + b.Fatalf("expected ID %d, got %d", id, obj.ID) + } + } } - b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_FullIteration_All(b *testing.B) { - db, table, _ := newTestDB(b) + db, table := newTestDBWithMetrics(b, &NopMetrics{}) wtxn := db.WriteTxn(table) - for i := 0; i < b.N; i++ { + for i := 0; i < numObjectsToInsert; i++ { _, _, err := table.Insert(wtxn, testObject{ID: uint64(i), Tags: nil}) require.NoError(b, err) } wtxn.Commit() b.ResetTimer() - txn := db.ReadTxn() - iter, _ := table.All(txn) - i := uint64(0) - for obj, _, ok := iter.Next(); ok; obj, _, ok = iter.Next() { - require.Equal(b, obj.ID, i) - i++ + for j := 0; j < b.N; j++ { + txn := db.ReadTxn() + iter, _ := table.All(txn) + i := uint64(0) + for obj, _, ok := iter.Next(); ok; obj, _, ok = iter.Next() { + if obj.ID != i { + b.Fatalf("expected ID %d, got %d", i, obj.ID) + } + i++ + } + require.EqualValues(b, i, numObjectsToInsert) } - b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") } func BenchmarkDB_FullIteration_Get(b *testing.B) { - db, table, _ := newTestDB(b, tagsIndex) + db, table := newTestDBWithMetrics(b, &NopMetrics{}, tagsIndex) wtxn := db.WriteTxn(table) - for i := 0; i < b.N; i++ { + for i := 0; i < numObjectsToInsert; i++ { _, _, err := table.Insert(wtxn, testObject{ID: uint64(i), Tags: []string{"foo"}}) require.NoError(b, err) } wtxn.Commit() b.ResetTimer() - txn := db.ReadTxn() - iter, _ := table.Get(txn, tagsIndex.Query("foo")) - i := uint64(0) - for obj, _, ok := iter.Next(); ok; obj, _, ok = iter.Next() { - require.Equal(b, obj.ID, i) - i++ + for j := 0; j < b.N; j++ { + txn := db.ReadTxn() + iter, _ := table.Get(txn, tagsIndex.Query("foo")) + i := uint64(0) + for obj, _, ok := iter.Next(); ok; obj, _, ok = iter.Next() { + if obj.ID != i { + b.Fatalf("expected ID %d, got %d", i, obj.ID) + } + i++ + } + require.EqualValues(b, i, numObjectsToInsert) } - b.ReportMetric(float64(b.N)/b.Elapsed().Seconds(), "objects/sec") + b.ReportMetric(float64(numObjectsToInsert*b.N)/b.Elapsed().Seconds(), "objects/sec") } type testObject2 testObject From 308c157b566ee3bbc1ba2b0885b8056ce3729461 Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Sun, 17 Mar 2024 15:41:15 +0100 Subject: [PATCH 3/5] statedb: Use table positions instead of names in txn MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit WriteTxn() is fairly slow and partially the reason is the heaviness of using hash maps to look up the tables that are being modified. Since the set of tables is static at runtime we can just use integer index to look up. This can later be extended to the indexTxn as well. Before: goos: linux goarch: amd64 pkg: github.com/cilium/statedb cpu: Intel(R) Core(TM) i5-10210U CPU @ 1.60GHz BenchmarkDB_WriteTxn_1-8 132523 9268 ns/op 107899 objects/sec BenchmarkDB_WriteTxn_10-8 367114 3044 ns/op 328531 objects/sec BenchmarkDB_WriteTxn_100-8 430207 2459 ns/op 406751 objects/sec BenchmarkDB_WriteTxn_100_SecondaryIndex-8 503071 2358 ns/op 424111 objects/sec BenchmarkDB_RandomInsert-8 945 1288636 ns/op 776014 objects/sec BenchmarkDB_RandomReplace-8 226 5256999 ns/op 190223 objects/sec BenchmarkDB_SequentialInsert-8 370 3580799 ns/op 279267 objects/sec ... BenchmarkDB_DeleteTracker_Baseline-8 356 3314962 ns/op 301663 objects/sec BenchmarkDB_DeleteTracker-8 181 6609844 ns/op 151290 objects/sec BenchmarkDB_RandomLookup-8 3289 354713 ns/op 2819181 objects/sec BenchmarkDB_SequentialLookup-8 3519 334955 ns/op 2985479 objects/sec BenchmarkDB_FullIteration_All-8 88718 12817 ns/op 78022832 objects/sec BenchmarkDB_FullIteration_Get-8 71965 15974 ns/op 62599999 objects/sec BenchmarkDB_PropagationDelay-8 188659 6398 ns/op 55.00 50th_µs 74.00 90th_µs 277.0 99th_µs PASS ok github.com/cilium/statedb 33.132s After: goos: linux goarch: amd64 pkg: github.com/cilium/statedb cpu: Intel(R) Core(TM) i5-10210U CPU @ 1.60GHz BenchmarkDB_WriteTxn_1-8 160879 7232 ns/op 138271 objects/sec BenchmarkDB_WriteTxn_10-8 442489 2597 ns/op 385104 objects/sec BenchmarkDB_WriteTxn_100-8 511983 2201 ns/op 454271 objects/sec BenchmarkDB_WriteTxn_100_SecondaryIndex-8 511678 2205 ns/op 453489 objects/sec BenchmarkDB_RandomInsert-8 1045 1134981 ns/op 881072 objects/sec BenchmarkDB_RandomReplace-8 246 4879081 ns/op 204957 objects/sec BenchmarkDB_SequentialInsert-8 387 3108060 ns/op 321744 objects/sec ... BenchmarkDB_DeleteTracker_Baseline-8 374 3167598 ns/op 315697 objects/sec BenchmarkDB_DeleteTracker-8 182 6409534 ns/op 156018 objects/sec BenchmarkDB_RandomLookup-8 3505 317084 ns/op 3153747 objects/sec BenchmarkDB_SequentialLookup-8 3951 293740 ns/op 3404371 objects/sec BenchmarkDB_FullIteration_All-8 98962 12085 ns/op 82749863 objects/sec BenchmarkDB_FullIteration_Get-8 81453 14711 ns/op 67978410 objects/sec BenchmarkDB_PropagationDelay-8 206851 5742 ns/op 50.00 50th_µs 64.00 90th_µs 261.0 99th_µs PASS ok github.com/cilium/statedb 31.966s Signed-off-by: Jussi Maki --- db.go | 58 +++++++++------- deletetracker.go | 4 +- graveyard.go | 18 +++-- metrics.go | 6 +- table.go | 23 +++++-- txn.go | 169 ++++++++++++++++++++--------------------------- types.go | 4 +- 7 files changed, 136 insertions(+), 146 deletions(-) diff --git a/db.go b/db.go index ec6a6b5..1bbee01 100644 --- a/db.go +++ b/db.go @@ -9,6 +9,7 @@ import ( "net/http" "reflect" "runtime" + "slices" "strings" "sync" "sync/atomic" @@ -87,29 +88,29 @@ import ( // the lowest revision of all delete trackers. type DB struct { mu sync.Mutex // protects 'tables' and sequences modifications to the root tree - tables map[TableName]TableMeta ctx context.Context cancel context.CancelFunc - root atomic.Pointer[iradix.Tree[tableEntry]] + root atomic.Pointer[dbRoot] gcTrigger chan struct{} // trigger for graveyard garbage collection gcExited chan struct{} gcRateLimitInterval time.Duration metrics Metrics } +type dbRoot []tableEntry + func NewDB(tables []TableMeta, metrics Metrics) (*DB, error) { - txn := iradix.New[tableEntry]().Txn() db := &DB{ - tables: make(map[TableName]TableMeta), metrics: metrics, gcRateLimitInterval: defaultGCRateLimitInterval, } + root := make(dbRoot, 0, len(tables)) for _, t := range tables { - if err := db.registerTable(t, txn); err != nil { + if err := db.registerTable(t, &root); err != nil { return nil, err } } - db.root.Store(txn.CommitOnly()) + db.root.Store(&root) return db, nil } @@ -128,25 +129,31 @@ func (db *DB) RegisterTable(table TableMeta, tables ...TableMeta) error { db.mu.Lock() defer db.mu.Unlock() - txn := db.root.Load().Txn() - if err := db.registerTable(table, txn); err != nil { + root := slices.Clone(*db.root.Load()) + + if err := db.registerTable(table, &root); err != nil { return err } for _, t := range tables { - if err := db.registerTable(t, txn); err != nil { + if err := db.registerTable(t, &root); err != nil { return err } } - db.root.Store(txn.CommitOnly()) + db.root.Store(&root) return nil } -func (db *DB) registerTable(table TableMeta, txn *iradix.Txn[tableEntry]) error { +func (db *DB) registerTable(table TableMeta, root *dbRoot) error { name := table.Name() - if _, ok := db.tables[name]; ok { - return tableError(name, ErrDuplicateTable) + for _, t := range *root { + if t.meta.Name() == name { + return tableError(name, ErrDuplicateTable) + } } - db.tables[name] = table + + pos := len(*root) + table.setTablePos(pos) + var entry tableEntry entry.meta = table entry.deleteTrackers = iradix.New[deleteTracker]() @@ -159,7 +166,8 @@ func (db *DB) registerTable(table TableMeta, txn *iradix.Txn[tableEntry]) error indexTxn.Insert([]byte(index), indexEntry{iradix.New[object](), indexer.unique}) } entry.indexes = indexTxn.CommitOnly() - txn.Insert(table.tableKey(), entry) + + *root = append(*root, entry) return nil } @@ -169,8 +177,8 @@ func (db *DB) registerTable(table TableMeta, txn *iradix.Txn[tableEntry]) error // ReadTxn is not thread-safe! func (db *DB) ReadTxn() ReadTxn { return &txn{ - db: db, - rootReadTxn: db.root.Load().Txn(), + db: db, + root: *db.root.Load(), } } @@ -192,15 +200,12 @@ func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn { smus.Lock() acquiredAt := time.Now() - rootReadTxn := db.root.Load().Txn() - tableEntries := make(map[TableName]*tableEntry, len(tables)) + root := *db.root.Load() + tableEntries := make([]*tableEntry, len(root)) var tableNames []string for _, table := range allTables { - tableEntry, ok := rootReadTxn.Get(table.tableKey()) - if !ok { - panic("BUG: Table '" + table.Name() + "' not found") - } - tableEntries[table.Name()] = &tableEntry + tableEntry := root[table.tablePos()] + tableEntries[table.tablePos()] = &tableEntry tableNames = append(tableNames, table.Name()) db.metrics.WriteTxnTableAcquisition( @@ -217,7 +222,7 @@ func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn { txn := &txn{ db: db, - rootReadTxn: rootReadTxn, + root: root, modifiedTables: tableEntries, writeTxns: make(map[tableIndex]indexTxn), smus: smus, @@ -278,6 +283,7 @@ var ciliumPackagePrefix = func() string { func callerPackage() string { var callerPkg string pc, _, _, ok := runtime.Caller(2) + if ok { f := runtime.FuncForPC(pc) if f != nil { @@ -288,7 +294,9 @@ func callerPackage() string { callerPkg = "unknown" } } else { + callerPkg = "unknown" } + return callerPkg } diff --git a/deletetracker.go b/deletetracker.go index 5ccd371..8f87914 100644 --- a/deletetracker.go +++ b/deletetracker.go @@ -36,7 +36,7 @@ func (dt *DeleteTracker[Obj]) getRevision() uint64 { // 'minRevision'. The deleted objects are not garbage-collected unless 'Mark' is // called! func (dt *DeleteTracker[Obj]) Deleted(txn ReadTxn, minRevision Revision) Iterator[Obj] { - indexTxn := txn.getTxn().mustIndexReadTxn(dt.table.Name(), GraveyardRevisionIndex) + indexTxn := txn.getTxn().mustIndexReadTxn(dt.table, GraveyardRevisionIndex) iter := indexTxn.Root().Iterator() iter.SeekLowerBound(index.Uint64(minRevision)) return &iterator[Obj]{iter} @@ -57,7 +57,7 @@ func (dt *DeleteTracker[Obj]) Close() { // Remove the delete tracker from the table. txn := dt.db.WriteTxn(dt.table).getTxn() db := txn.db - table := txn.modifiedTables[dt.table.Name()] + table := txn.modifiedTables[dt.table.tablePos()] if table == nil { panic("BUG: Table missing from write transaction") } diff --git a/graveyard.go b/graveyard.go index 2f7c5f6..f18a116 100644 --- a/graveyard.go +++ b/graveyard.go @@ -39,9 +39,8 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat // Do a lockless read transaction to find potential dead objects. txn := db.ReadTxn().getTxn() - tableIter := txn.rootReadTxn.Root().Iterator() - for nameKey, table, ok := tableIter.Next(); ok; nameKey, table, ok = tableIter.Next() { - tableName := string(nameKey) + for _, table := range txn.root { + tableName := table.meta.Name() start := time.Now() // Find the low watermark @@ -64,7 +63,7 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat // Find objects to be deleted by iterating over the graveyard revision index up // to the low watermark. - indexTree := txn.mustIndexReadTxn(tableName, GraveyardRevisionIndex) + indexTree := txn.mustIndexReadTxn(table.meta, GraveyardRevisionIndex) objIter := indexTree.Root().Iterator() for key, obj, ok := objIter.Next(); ok; key, obj, ok = objIter.Next() { @@ -93,12 +92,12 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat tableName := meta.Name() start := time.Now() for _, key := range deadObjs { - oldObj, existed := txn.mustIndexWriteTxn(tableName, GraveyardRevisionIndex).Delete(key) + oldObj, existed := txn.mustIndexWriteTxn(meta, GraveyardRevisionIndex).Delete(key) if existed { // The dead object still existed (and wasn't replaced by a create->delete), // delete it from the primary index. key = meta.primary().fromObject(oldObj).First() - txn.mustIndexWriteTxn(tableName, GraveyardIndex).Delete(key) + txn.mustIndexWriteTxn(meta, GraveyardIndex).Delete(key) } } cleaningTimes[tableName] = time.Since(start) @@ -114,8 +113,8 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat // Update object count metrics. txn = db.ReadTxn().getTxn() - tableIter = txn.rootReadTxn.Root().Iterator() - for name, table, ok := tableIter.Next(); ok; name, table, ok = tableIter.Next() { + for _, table := range txn.root { + name := table.meta.Name() db.metrics.GraveyardObjectCount(string(name), table.numDeletedObjects()) db.metrics.ObjectCount(string(name), table.numObjects()) } @@ -126,8 +125,7 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat // Used in tests. func (db *DB) graveyardIsEmpty() bool { txn := db.ReadTxn().getTxn() - tableIter := txn.rootReadTxn.Root().Iterator() - for _, table, ok := tableIter.Next(); ok; _, table, ok = tableIter.Next() { + for _, table := range txn.root { indexEntry, ok := table.indexes.Get([]byte(GraveyardIndex)) if !ok { panic("BUG: GraveyardIndex not found from table") diff --git a/metrics.go b/metrics.go index 0af0d29..3613cf2 100644 --- a/metrics.go +++ b/metrics.go @@ -10,7 +10,7 @@ import ( type Metrics interface { WriteTxnTableAcquisition(tableName string, acquire time.Duration) WriteTxnTotalAcquisition(goPackage string, tables []string, acquire time.Duration) - WriteTxnDuration(goPackage string, s []string, acquire time.Duration) + WriteTxnDuration(goPackage string, acquire time.Duration) GraveyardLowWatermark(tableName string, lowWatermark Revision) GraveyardCleaningDuration(tableName string, duration time.Duration) @@ -121,7 +121,7 @@ func (m *ExpVarMetrics) ObjectCount(name string, numObjects int) { m.ObjectCountVar.Set(name, &intVar) } -func (m *ExpVarMetrics) WriteTxnDuration(goPackage string, s []string, acquire time.Duration) { +func (m *ExpVarMetrics) WriteTxnDuration(goPackage string, acquire time.Duration) { m.WriteTxnDurationVar.AddFloat(goPackage, acquire.Seconds()) } @@ -162,7 +162,7 @@ func (*NopMetrics) Revision(tableName string, revision uint64) { } // WriteTxnDuration implements Metrics. -func (*NopMetrics) WriteTxnDuration(goPackage string, s []string, acquire time.Duration) { +func (*NopMetrics) WriteTxnDuration(goPackage string, acquire time.Duration) { } // WriteTxnTableAcquisition implements Metrics. diff --git a/table.go b/table.go index 121f138..fa4c11c 100644 --- a/table.go +++ b/table.go @@ -88,6 +88,7 @@ func MustNewTable[Obj any]( } type genTable[Obj any] struct { + pos int table TableName smu internal.SortableMutex primaryIndexer Indexer[Obj] @@ -95,6 +96,14 @@ type genTable[Obj any] struct { secondaryAnyIndexers map[string]anyIndexer } +func (t *genTable[Obj]) setTablePos(pos int) { + t.pos = pos +} + +func (t *genTable[Obj]) tablePos() int { + return t.pos +} + func (t *genTable[Obj]) tableKey() []byte { return []byte(t.table) } @@ -120,11 +129,11 @@ func (t *genTable[Obj]) ToTable() Table[Obj] { } func (t *genTable[Obj]) Revision(txn ReadTxn) Revision { - return txn.getTxn().GetRevision(t.table) + return txn.getTxn().getRevision(t) } func (t *genTable[Obj]) NumObjects(txn ReadTxn) int { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, t.primaryAnyIndexer.name) + indexTxn := txn.getTxn().mustIndexReadTxn(t, t.primaryAnyIndexer.name) return indexTxn.entry.tree.Len() } @@ -134,7 +143,7 @@ func (t *genTable[Obj]) First(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint } func (t *genTable[Obj]) FirstWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint64, watch <-chan struct{}, ok bool) { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index) iter := indexTxn.Root().Iterator() watch = iter.SeekPrefixWatch(q.key) @@ -172,7 +181,7 @@ func (t *genTable[Obj]) Last(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint6 } func (t *genTable[Obj]) LastWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint64, watch <-chan struct{}, ok bool) { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index) iter := indexTxn.Root().ReverseIterator() watch = iter.SeekPrefixWatch(q.key) @@ -205,7 +214,7 @@ func (t *genTable[Obj]) LastWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision } func (t *genTable[Obj]) LowerBound(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <-chan struct{}) { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index) root := indexTxn.Root() // Since LowerBound query may be invalidated by changes in another branch @@ -218,7 +227,7 @@ func (t *genTable[Obj]) LowerBound(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <- } func (t *genTable[Obj]) All(txn ReadTxn) (Iterator[Obj], <-chan struct{}) { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, t.primaryAnyIndexer.name) + indexTxn := txn.getTxn().mustIndexReadTxn(t, t.primaryAnyIndexer.name) root := indexTxn.Root() // Grab the watch channel for the root node watchCh, _, _ := root.GetWatch(nil) @@ -226,7 +235,7 @@ func (t *genTable[Obj]) All(txn ReadTxn) (Iterator[Obj], <-chan struct{}) { } func (t *genTable[Obj]) Get(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <-chan struct{}) { - indexTxn := txn.getTxn().mustIndexReadTxn(t.table, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index) iter := indexTxn.Root().Iterator() watchCh := iter.SeekPrefixWatch(q.key) diff --git a/txn.go b/txn.go index 8f06ef3..98defd0 100644 --- a/txn.go +++ b/txn.go @@ -14,7 +14,6 @@ import ( "time" iradix "github.com/hashicorp/go-immutable-radix/v2" - "golang.org/x/exp/maps" "github.com/cilium/statedb/index" "github.com/cilium/statedb/internal" @@ -22,17 +21,17 @@ import ( type txn struct { db *DB - rootReadTxn *iradix.Txn[tableEntry] // read transaction onto the tree of tables - writeTxns map[tableIndex]indexTxn // opened per-index write transactions - modifiedTables map[TableName]*tableEntry // table entries being modified - smus internal.SortableMutexes // the (sorted) table locks - acquiredAt time.Time // the time at which the transaction acquired the locks - packageName string // name of the package that created the transaction + root dbRoot + writeTxns map[tableIndex]indexTxn // opened per-index write transactions + modifiedTables []*tableEntry // table entries being modified + smus internal.SortableMutexes // the (sorted) table locks + acquiredAt time.Time // the time at which the transaction acquired the locks + packageName string // name of the package that created the transaction } type tableIndex struct { - table TableName - index IndexName + tablePos int + index IndexName } type indexTxn struct { @@ -58,33 +57,28 @@ func (txn *txn) getTxn() *txn { // has been Aborted or Committed. This is a safeguard against forgetting to // Abort/Commit which would cause the table to be locked forever. func txnFinalizer(txn *txn) { - if txn.writeTxns != nil { - panic(fmt.Sprintf("WriteTxn from package %q against tables %v was never Abort()'d or Commit()'d", txn.packageName, maps.Keys(txn.modifiedTables))) + if txn.db != nil { + panic(fmt.Sprintf("WriteTxn acquired by package %q was never Abort()'d or Commit()'d", txn.packageName)) } } -func (txn *txn) GetRevision(name TableName) Revision { - if table, ok := txn.modifiedTables[name]; ok { - // This is a write transaction preparing to modify the table with a - // new revision. - return table.revision - } - - // This is either a read transaction, or a write transaction to tables - // other than this table. Look up the revision from the index. - table, ok := txn.rootReadTxn.Get([]byte(name)) - if !ok { - panic("BUG: Table " + name + " not found") +func (txn *txn) getRevision(meta TableMeta) Revision { + if txn.modifiedTables != nil { + entry := txn.modifiedTables[meta.tablePos()] + if entry != nil { + return entry.revision + } } - return table.revision + return txn.root[meta.tablePos()].revision } // indexReadTxn returns a transaction to read from the specific index. // If the table or index is not found this returns nil & error. -func (txn *txn) indexReadTxn(name TableName, index IndexName) (indexTxn, error) { +func (txn *txn) indexReadTxn(meta TableMeta, index IndexName) (indexTxn, error) { if txn.writeTxns != nil { - if _, ok := txn.modifiedTables[name]; ok { - itxn, err := txn.indexWriteTxn(name, index) + entry := txn.modifiedTables[meta.tablePos()] + if entry != nil { + itxn, err := txn.indexWriteTxn(meta, index) if err == nil { return itxn.Clone(), nil } @@ -92,13 +86,10 @@ func (txn *txn) indexReadTxn(name TableName, index IndexName) (indexTxn, error) } } - table, ok := txn.rootReadTxn.Get([]byte(name)) - if !ok { - return indexTxn{}, fmt.Errorf("table %q not found", name) - } + table := txn.root[meta.tablePos()] indexEntry, ok := table.indexes.Get([]byte(index)) if !ok { - return indexTxn{}, fmt.Errorf("index %q not found from table %q", index, name) + return indexTxn{}, fmt.Errorf("index %q not found from table %q", index, meta.Name()) } return indexTxn{ @@ -108,29 +99,26 @@ func (txn *txn) indexReadTxn(name TableName, index IndexName) (indexTxn, error) // indexWriteTxn returns a transaction to read/write to a specific index. // The created transaction is memoized and used for subsequent reads and/or writes. -func (txn *txn) indexWriteTxn(name TableName, index IndexName) (indexTxn, error) { - if indexTreeTxn, ok := txn.writeTxns[tableIndex{name, index}]; ok { +func (txn *txn) indexWriteTxn(meta TableMeta, index IndexName) (indexTxn, error) { + if indexTreeTxn, ok := txn.writeTxns[tableIndex{meta.tablePos(), index}]; ok { return indexTreeTxn, nil } - table, ok := txn.modifiedTables[name] - if !ok { - return indexTxn{}, fmt.Errorf("table %q not found", name) - } + table := txn.modifiedTables[meta.tablePos()] indexEntry, ok := table.indexes.Get([]byte(index)) if !ok { - return indexTxn{}, fmt.Errorf("index %q not found from table %q", index, name) + return indexTxn{}, fmt.Errorf("index %q not found from table %q", index, meta.Name()) } itxn := indexEntry.tree.Txn() itxn.TrackMutate(true) indexWriteTxn := indexTxn{itxn, indexEntry} - txn.writeTxns[tableIndex{name, index}] = indexWriteTxn + txn.writeTxns[tableIndex{meta.tablePos(), index}] = indexWriteTxn return indexWriteTxn, nil } // mustIndexReadTxn returns a transaction to read from the specific index. // Panics if table or index are not found. -func (txn *txn) mustIndexReadTxn(name TableName, index IndexName) indexTxn { - indexTxn, err := txn.indexReadTxn(name, index) +func (txn *txn) mustIndexReadTxn(meta TableMeta, index IndexName) indexTxn { + indexTxn, err := txn.indexReadTxn(meta, index) if err != nil { panic(err) } @@ -139,8 +127,8 @@ func (txn *txn) mustIndexReadTxn(name TableName, index IndexName) indexTxn { // mustIndexReadTxn returns a transaction to read or write from the specific index. // Panics if table or index not found. -func (txn *txn) mustIndexWriteTxn(name TableName, index IndexName) indexTxn { - indexTxn, err := txn.indexWriteTxn(name, index) +func (txn *txn) mustIndexWriteTxn(meta TableMeta, index IndexName) indexTxn { + indexTxn, err := txn.indexWriteTxn(meta, index) if err != nil { panic(err) } @@ -148,14 +136,14 @@ func (txn *txn) mustIndexWriteTxn(name TableName, index IndexName) indexTxn { } func (txn *txn) Insert(meta TableMeta, guardRevision Revision, data any) (object, bool, error) { - if txn.rootReadTxn == nil { + if txn.db == nil { return object{}, false, ErrTransactionClosed } // Look up table and allocate a new revision. tableName := meta.Name() - table, ok := txn.modifiedTables[tableName] - if !ok { + table := txn.modifiedTables[meta.tablePos()] + if table == nil { return object{}, false, tableError(tableName, ErrTableNotLockedForWriting) } oldRevision := table.revision @@ -169,7 +157,7 @@ func (txn *txn) Insert(meta TableMeta, guardRevision Revision, data any) (object // Update the primary index first idKey := meta.primary().fromObject(obj).First() - idIndexTxn := txn.mustIndexWriteTxn(tableName, meta.primary().name) + idIndexTxn := txn.mustIndexWriteTxn(meta, meta.primary().name) oldObj, oldExists := idIndexTxn.Insert(idKey, obj) // Sanity check: is the same object being inserted back and thus the @@ -206,27 +194,26 @@ func (txn *txn) Insert(meta TableMeta, guardRevision Revision, data any) (object } // Update revision index - revIndexTxn := txn.mustIndexWriteTxn(tableName, RevisionIndex) + revIndexTxn := txn.mustIndexWriteTxn(meta, RevisionIndex) if oldExists { _, ok := revIndexTxn.Delete([]byte(index.Uint64(oldObj.revision))) if !ok { panic("BUG: Old revision index entry not found") } - } revIndexTxn.Insert([]byte(index.Uint64(revision)), obj) // If it's new, possibly remove an older deleted object with the same // primary key from the graveyard. - if !oldExists && txn.hasDeleteTrackers(tableName) { - if old, existed := txn.mustIndexWriteTxn(tableName, GraveyardIndex).Delete(idKey); existed { - txn.mustIndexWriteTxn(tableName, GraveyardRevisionIndex).Delete([]byte(index.Uint64(old.revision))) + if !oldExists && txn.hasDeleteTrackers(meta) { + if old, existed := txn.mustIndexWriteTxn(meta, GraveyardIndex).Delete(idKey); existed { + txn.mustIndexWriteTxn(meta, GraveyardRevisionIndex).Delete([]byte(index.Uint64(old.revision))) } } // Then update secondary indexes for idx, indexer := range meta.secondary() { - indexTxn := txn.mustIndexWriteTxn(tableName, idx) + indexTxn := txn.mustIndexWriteTxn(meta, idx) newKeys := indexer.fromObject(obj) if oldExists { @@ -255,29 +242,19 @@ func (txn *txn) Insert(meta TableMeta, guardRevision Revision, data any) (object return oldObj, oldExists, nil } -func (txn *txn) hasDeleteTrackers(name TableName) bool { - // Table is being modified, return the entry we're mutating, - // so we can read the latest changes. - table, ok := txn.modifiedTables[name] - if !ok { - // Table is not being modified, look it up from the root. - if t, ok := txn.rootReadTxn.Get([]byte(name)); ok { - table = &t - } else { - panic(fmt.Sprintf("BUG: table %q not found", name)) - } +func (txn *txn) hasDeleteTrackers(meta TableMeta) bool { + table := txn.modifiedTables[meta.tablePos()] + if table != nil { + return table.deleteTrackers.Len() > 0 } - return table.deleteTrackers.Len() > 0 + return txn.root[meta.tablePos()].deleteTrackers.Len() > 0 } func (txn *txn) addDeleteTracker(meta TableMeta, trackerName string, dt deleteTracker) error { - if txn.rootReadTxn == nil { + if txn.db == nil { return ErrTransactionClosed } - table, ok := txn.modifiedTables[meta.Name()] - if !ok { - return tableError(meta.Name(), ErrTableNotLockedForWriting) - } + table := txn.modifiedTables[meta.tablePos()] table.deleteTrackers, _, _ = table.deleteTrackers.Insert([]byte(trackerName), dt) txn.db.metrics.DeleteTrackerCount(meta.Name(), table.deleteTrackers.Len()) return nil @@ -285,14 +262,14 @@ func (txn *txn) addDeleteTracker(meta TableMeta, trackerName string, dt deleteTr } func (txn *txn) Delete(meta TableMeta, guardRevision Revision, data any) (object, bool, error) { - if txn.rootReadTxn == nil { + if txn.db == nil { return object{}, false, ErrTransactionClosed } // Look up table and allocate a new revision. tableName := meta.Name() - table, ok := txn.modifiedTables[tableName] - if !ok { + table := txn.modifiedTables[meta.tablePos()] + if table == nil { return object{}, false, tableError(tableName, ErrTableNotLockedForWriting) } oldRevision := table.revision @@ -303,7 +280,7 @@ func (txn *txn) Delete(meta TableMeta, guardRevision Revision, data any) (object // We assume that "data" has only enough defined fields to // compute the primary key. idKey := meta.primary().fromObject(object{data: data}).First() - idIndexTree := txn.mustIndexWriteTxn(tableName, meta.primary().name) + idIndexTree := txn.mustIndexWriteTxn(meta, meta.primary().name) obj, existed := idIndexTree.Delete(idKey) if !existed { return object{}, false, nil @@ -320,7 +297,7 @@ func (txn *txn) Delete(meta TableMeta, guardRevision Revision, data any) (object } // Update revision index. - indexTree := txn.mustIndexWriteTxn(tableName, RevisionIndex) + indexTree := txn.mustIndexWriteTxn(meta, RevisionIndex) if _, ok := indexTree.Delete(index.Uint64(obj.revision)); !ok { panic("BUG: Object to be deleted not found from revision index") } @@ -331,18 +308,18 @@ func (txn *txn) Delete(meta TableMeta, guardRevision Revision, data any) (object if !indexer.unique { key = encodeNonUniqueKey(idKey, key) } - txn.mustIndexWriteTxn(tableName, idx).Delete(key) + txn.mustIndexWriteTxn(meta, idx).Delete(key) }) } // And finally insert the object into the graveyard. - if txn.hasDeleteTrackers(tableName) { - graveyardIndex := txn.mustIndexWriteTxn(tableName, GraveyardIndex) + if txn.hasDeleteTrackers(meta) { + graveyardIndex := txn.mustIndexWriteTxn(meta, GraveyardIndex) obj.revision = revision if _, existed := graveyardIndex.Insert(idKey, obj); existed { panic("BUG: Double deletion! Deleted object already existed in graveyard") } - txn.mustIndexWriteTxn(tableName, GraveyardRevisionIndex).Insert(index.Uint64(revision), obj) + txn.mustIndexWriteTxn(meta, GraveyardRevisionIndex).Insert(index.Uint64(revision), obj) } return obj, true, nil @@ -399,7 +376,6 @@ func (txn *txn) Abort() { txn.smus.Unlock() txn.db.metrics.WriteTxnDuration( txn.packageName, - maps.Keys(txn.modifiedTables), time.Since(txn.acquiredAt)) *txn = zeroTxn @@ -438,18 +414,16 @@ func (txn *txn) Commit() { // We don't notify yet (CommitOnly) as the root needs to be updated // first as otherwise readers would wake up too early. for tableIndex, subTxn := range txn.writeTxns { - table, ok := txn.modifiedTables[tableIndex.table] - if !ok { - panic("BUG: Table " + tableIndex.table + " in writeTxns, but not in modifiedTables") - } + table := txn.modifiedTables[tableIndex.tablePos] subTxn.entry.tree = subTxn.CommitOnly() table.indexes, _, _ = table.indexes.Insert([]byte(tableIndex.index), subTxn.entry) // Update metrics - db.metrics.GraveyardObjectCount(tableIndex.table, table.numDeletedObjects()) - db.metrics.ObjectCount(tableIndex.table, table.numObjects()) - db.metrics.Revision(tableIndex.table, table.revision) + name := table.meta.Name() + db.metrics.GraveyardObjectCount(name, table.numDeletedObjects()) + db.metrics.ObjectCount(name, table.numObjects()) + db.metrics.Revision(name, table.revision) } // Acquire the lock on the root tree to sequence the updates to it. We can acquire @@ -460,17 +434,18 @@ func (txn *txn) Commit() { // Since the root may have changed since the pointer was last read in WriteTxn(), // load it again and modify the latest version that we now have immobilised by // the root lock. - rootTxn := db.root.Load().Txn() + root := *db.root.Load() // Insert the modified tables into the root tree of tables. - for name, table := range txn.modifiedTables { - rootTxn.Insert([]byte(name), *table) + for pos, table := range txn.modifiedTables { + if table != nil { + root[pos] = *table + } } // Commit the transaction to build the new root tree and then // atomically store it. - newRoot := rootTxn.CommitOnly() - db.root.Store(newRoot) + db.root.Store(&root) db.mu.Unlock() // With the root pointer updated, we can now release the tables for the next write transaction. @@ -481,11 +456,9 @@ func (txn *txn) Commit() { for _, subTxn := range txn.writeTxns { subTxn.Notify() } - rootTxn.Notify() txn.db.metrics.WriteTxnDuration( txn.packageName, - maps.Keys(txn.modifiedTables), time.Since(txn.acquiredAt)) // Zero out the transaction to make it inert. @@ -497,18 +470,18 @@ func (txn *txn) WriteJSON(w io.Writer) error { buf := bufio.NewWriter(w) buf.WriteString("{\n") first := true - for _, table := range txn.db.tables { + for _, table := range txn.root { if !first { buf.WriteString(",\n") } else { first = false } - indexTxn := txn.getTxn().mustIndexReadTxn(table.Name(), table.primary().name) + indexTxn := txn.getTxn().mustIndexReadTxn(table.meta, table.meta.primary().name) root := indexTxn.Root() iter := root.Iterator() - buf.WriteString(" \"" + table.Name() + "\": [\n") + buf.WriteString(" \"" + table.meta.Name() + "\": [\n") _, obj, ok := iter.Next() for ok { diff --git a/types.go b/types.go index 9fde16a..466a28b 100644 --- a/types.go +++ b/types.go @@ -149,7 +149,9 @@ type RWTable[Obj any] interface { // TableMeta provides information about the table that is independent of // the object type (the 'Obj' constraint). type TableMeta interface { - Name() TableName // The name of the table + Name() TableName // The name of the table + tablePos() int + setTablePos(int) tableKey() []byte // The radix key for the table in the root tree primary() anyIndexer // The untyped primary indexer for the table secondary() map[string]anyIndexer // Secondary indexers (if any) From 1858d6b053e44a2126c98e16e4c3322be572e949 Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Sun, 17 Mar 2024 16:20:25 +0100 Subject: [PATCH 4/5] Use arrays for index tree lookup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Similarly to tables we have few enough indexes per table to not require anything more complex than an array for storing them. Before: goos: linux goarch: amd64 pkg: github.com/cilium/statedb cpu: Intel(R) Core(TM) i5-10210U CPU @ 1.60GHz BenchmarkDB_WriteTxn_1-8 160879 7232 ns/op 138271 objects/sec BenchmarkDB_WriteTxn_10-8 442489 2597 ns/op 385104 objects/sec BenchmarkDB_WriteTxn_100-8 511983 2201 ns/op 454271 objects/sec BenchmarkDB_WriteTxn_100_SecondaryIndex-8 511678 2205 ns/op 453489 objects/sec BenchmarkDB_RandomInsert-8 1045 1134981 ns/op 881072 objects/sec BenchmarkDB_RandomReplace-8 246 4879081 ns/op 204957 objects/sec BenchmarkDB_SequentialInsert-8 387 3108060 ns/op 321744 objects/sec ... BenchmarkDB_DeleteTracker_Baseline-8 374 3167598 ns/op 315697 objects/sec BenchmarkDB_DeleteTracker-8 182 6409534 ns/op 156018 objects/sec BenchmarkDB_RandomLookup-8 3505 317084 ns/op 3153747 objects/sec BenchmarkDB_SequentialLookup-8 3951 293740 ns/op 3404371 objects/sec BenchmarkDB_FullIteration_All-8 98962 12085 ns/op 82749863 objects/sec BenchmarkDB_FullIteration_Get-8 81453 14711 ns/op 67978410 objects/sec BenchmarkDB_PropagationDelay-8 206851 5742 ns/op 50.00 50th_µs 64.00 90th_µs 261.0 99th_µs PASS ok github.com/cilium/statedb 31.966s After: goos: linux goarch: amd64 pkg: github.com/cilium/statedb cpu: Intel(R) Core(TM) i5-10210U CPU @ 1.60GHz BenchmarkDB_WriteTxn_1-8 240938 4752 ns/op 210459 objects/sec BenchmarkDB_WriteTxn_10-8 502363 2551 ns/op 392063 objects/sec BenchmarkDB_WriteTxn_100-8 457850 2279 ns/op 438872 objects/sec BenchmarkDB_WriteTxn_100_SecondaryIndex-8 526416 2222 ns/op 450100 objects/sec BenchmarkDB_RandomInsert-8 1012 1181665 ns/op 846264 objects/sec BenchmarkDB_RandomReplace-8 216 5048896 ns/op 198063 objects/sec BenchmarkDB_SequentialInsert-8 398 2996997 ns/op 333667 objects/sec ... BenchmarkDB_DeleteTracker_Baseline-8 390 3036951 ns/op 329278 objects/sec BenchmarkDB_DeleteTracker-8 141 8194663 ns/op 122031 objects/sec BenchmarkDB_RandomLookup-8 8846 134745 ns/op 7421428 objects/sec BenchmarkDB_SequentialLookup-8 8425 123284 ns/op 8111372 objects/sec BenchmarkDB_FullIteration_All-8 103279 10996 ns/op 90941891 objects/sec BenchmarkDB_FullIteration_Get-8 84451 13637 ns/op 73328686 objects/sec BenchmarkDB_PropagationDelay-8 235146 5342 ns/op 48.00 50th_µs 57.00 90th_µs 215.0 99th_µs PASS ok github.com/cilium/statedb 31.480s Signed-off-by: Jussi Maki --- db.go | 20 +------ db_test.go | 9 ++- deletetracker.go | 2 +- graveyard.go | 11 ++-- table.go | 105 ++++++++++++++++++++++++---------- txn.go | 145 +++++++++++++++++++++++++---------------------- types.go | 37 +++++++----- 7 files changed, 189 insertions(+), 140 deletions(-) diff --git a/db.go b/db.go index 1bbee01..372c1f8 100644 --- a/db.go +++ b/db.go @@ -15,8 +15,6 @@ import ( "sync/atomic" "time" - iradix "github.com/hashicorp/go-immutable-radix/v2" - "github.com/cilium/hive/cell" "github.com/cilium/statedb/internal" ) @@ -153,21 +151,7 @@ func (db *DB) registerTable(table TableMeta, root *dbRoot) error { pos := len(*root) table.setTablePos(pos) - - var entry tableEntry - entry.meta = table - entry.deleteTrackers = iradix.New[deleteTracker]() - indexTxn := iradix.New[indexEntry]().Txn() - indexTxn.Insert([]byte(table.primary().name), indexEntry{iradix.New[object](), true}) - indexTxn.Insert([]byte(RevisionIndex), indexEntry{iradix.New[object](), true}) - indexTxn.Insert([]byte(GraveyardIndex), indexEntry{iradix.New[object](), true}) - indexTxn.Insert([]byte(GraveyardRevisionIndex), indexEntry{iradix.New[object](), true}) - for index, indexer := range table.secondary() { - indexTxn.Insert([]byte(index), indexEntry{iradix.New[object](), indexer.unique}) - } - entry.indexes = indexTxn.CommitOnly() - - *root = append(*root, entry) + *root = append(*root, table.tableEntry()) return nil } @@ -205,6 +189,7 @@ func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn { var tableNames []string for _, table := range allTables { tableEntry := root[table.tablePos()] + tableEntry.indexes = slices.Clone(tableEntry.indexes) tableEntries[table.tablePos()] = &tableEntry tableNames = append(tableNames, table.Name()) @@ -224,7 +209,6 @@ func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn { db: db, root: root, modifiedTables: tableEntries, - writeTxns: make(map[tableIndex]indexTxn), smus: smus, acquiredAt: acquiredAt, packageName: callerPkg, diff --git a/db_test.go b/db_test.go index 01143e5..143d337 100644 --- a/db_test.go +++ b/db_test.go @@ -424,7 +424,7 @@ func TestDB_All(t *testing.T) { select { case <-watch: - t.Fatalf("expected All() watch channel to not close before changes") + t.Fatalf("expected All() watch channel to not close before delete") default: } @@ -434,10 +434,15 @@ func TestDB_All(t *testing.T) { txn.Commit() } + // Prior read transaction not affected by delete. + iter, _ = table.All(txn) + objs = Collect(iter) + require.Len(t, objs, 3) + select { case <-watch: case <-time.After(time.Second): - t.Fatalf("expceted All() watch channel to close after changes") + t.Fatalf("expected All() watch channel to close after delete") } } diff --git a/deletetracker.go b/deletetracker.go index 8f87914..096902c 100644 --- a/deletetracker.go +++ b/deletetracker.go @@ -36,7 +36,7 @@ func (dt *DeleteTracker[Obj]) getRevision() uint64 { // 'minRevision'. The deleted objects are not garbage-collected unless 'Mark' is // called! func (dt *DeleteTracker[Obj]) Deleted(txn ReadTxn, minRevision Revision) Iterator[Obj] { - indexTxn := txn.getTxn().mustIndexReadTxn(dt.table, GraveyardRevisionIndex) + indexTxn := txn.getTxn().mustIndexReadTxn(dt.table, GraveyardRevisionIndexPos) iter := indexTxn.Root().Iterator() iter.SeekLowerBound(index.Uint64(minRevision)) return &iterator[Obj]{iter} diff --git a/graveyard.go b/graveyard.go index f18a116..0b95fb7 100644 --- a/graveyard.go +++ b/graveyard.go @@ -63,7 +63,7 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat // Find objects to be deleted by iterating over the graveyard revision index up // to the low watermark. - indexTree := txn.mustIndexReadTxn(table.meta, GraveyardRevisionIndex) + indexTree := txn.mustIndexReadTxn(table.meta, GraveyardRevisionIndexPos) objIter := indexTree.Root().Iterator() for key, obj, ok := objIter.Next(); ok; key, obj, ok = objIter.Next() { @@ -92,12 +92,12 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat tableName := meta.Name() start := time.Now() for _, key := range deadObjs { - oldObj, existed := txn.mustIndexWriteTxn(meta, GraveyardRevisionIndex).Delete(key) + oldObj, existed := txn.mustIndexWriteTxn(meta, GraveyardRevisionIndexPos).Delete(key) if existed { // The dead object still existed (and wasn't replaced by a create->delete), // delete it from the primary index. key = meta.primary().fromObject(oldObj).First() - txn.mustIndexWriteTxn(meta, GraveyardIndex).Delete(key) + txn.mustIndexWriteTxn(meta, GraveyardIndexPos).Delete(key) } } cleaningTimes[tableName] = time.Since(start) @@ -126,10 +126,7 @@ func graveyardWorker(db *DB, ctx context.Context, gcRateLimitInterval time.Durat func (db *DB) graveyardIsEmpty() bool { txn := db.ReadTxn().getTxn() for _, table := range txn.root { - indexEntry, ok := table.indexes.Get([]byte(GraveyardIndex)) - if !ok { - panic("BUG: GraveyardIndex not found from table") - } + indexEntry := table.indexes[table.meta.indexPos(GraveyardIndex)] if indexEntry.tree.Len() != 0 { return false } diff --git a/table.go b/table.go index fa4c11c..9dcbcac 100644 --- a/table.go +++ b/table.go @@ -10,6 +10,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "github.com/cilium/statedb/internal" + iradix "github.com/hashicorp/go-immutable-radix/v2" "github.com/cilium/statedb/index" ) @@ -46,10 +47,23 @@ func NewTable[Obj any]( primaryAnyIndexer: toAnyIndexer(primaryIndexer), primaryIndexer: primaryIndexer, secondaryAnyIndexers: make(map[string]anyIndexer, len(secondaryIndexers)), + indexPositions: make(map[string]int), } + table.indexPositions[primaryIndexer.indexName()] = PrimaryIndexPos + + // Internal indexes + table.indexPositions[RevisionIndex] = RevisionIndexPos + table.indexPositions[GraveyardIndex] = GraveyardIndexPos + table.indexPositions[GraveyardRevisionIndex] = GraveyardRevisionIndexPos + + indexPos := SecondaryIndexStartPos for _, indexer := range secondaryIndexers { - table.secondaryAnyIndexers[indexer.indexName()] = toAnyIndexer(indexer) + anyIndexer := toAnyIndexer(indexer) + anyIndexer.pos = indexPos + table.secondaryAnyIndexers[indexer.indexName()] = anyIndexer + table.indexPositions[indexer.indexName()] = indexPos + indexPos++ } // Primary index must always be unique @@ -94,6 +108,23 @@ type genTable[Obj any] struct { primaryIndexer Indexer[Obj] primaryAnyIndexer anyIndexer secondaryAnyIndexers map[string]anyIndexer + indexPositions map[string]int +} + +func (t *genTable[Obj]) tableEntry() tableEntry { + var entry tableEntry + entry.meta = t + entry.deleteTrackers = iradix.New[deleteTracker]() + entry.indexes = make([]indexEntry, len(t.indexPositions)) + entry.indexes[t.indexPositions[t.primaryIndexer.indexName()]] = indexEntry{iradix.New[object](), nil, true} + + for index, indexer := range t.secondaryAnyIndexers { + entry.indexes[t.indexPositions[index]] = indexEntry{iradix.New[object](), nil, indexer.unique} + } + entry.indexes[t.indexPositions[RevisionIndex]] = indexEntry{iradix.New[object](), nil, true} + entry.indexes[t.indexPositions[GraveyardIndex]] = indexEntry{iradix.New[object](), nil, true} + entry.indexes[t.indexPositions[GraveyardRevisionIndex]] = indexEntry{iradix.New[object](), nil, true} + return entry } func (t *genTable[Obj]) setTablePos(pos int) { @@ -108,6 +139,13 @@ func (t *genTable[Obj]) tableKey() []byte { return []byte(t.table) } +func (t *genTable[Obj]) indexPos(name string) int { + if t.primaryAnyIndexer.name == name { + return PrimaryIndexPos + } + return t.indexPositions[name] +} + func (t *genTable[Obj]) PrimaryIndexer() Indexer[Obj] { return t.primaryIndexer } @@ -133,8 +171,8 @@ func (t *genTable[Obj]) Revision(txn ReadTxn) Revision { } func (t *genTable[Obj]) NumObjects(txn ReadTxn) int { - indexTxn := txn.getTxn().mustIndexReadTxn(t, t.primaryAnyIndexer.name) - return indexTxn.entry.tree.Len() + table := &txn.getTxn().root[t.tablePos()] + return table.indexes[PrimaryIndexPos].tree.Len() } func (t *genTable[Obj]) First(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint64, ok bool) { @@ -143,11 +181,22 @@ func (t *genTable[Obj]) First(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint } func (t *genTable[Obj]) FirstWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint64, watch <-chan struct{}, ok bool) { - indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, t.indexPos(q.index)) + var iobj object + if indexTxn.unique { + // On a unique index we can do a direct get rather than a prefix search. + watch, iobj, ok = indexTxn.Root().GetWatch(q.key) + if !ok { + return + } + obj = iobj.data.(Obj) + revision = iobj.revision + return + } + + // For a non-unique index we need to do a prefix search. iter := indexTxn.Root().Iterator() watch = iter.SeekPrefixWatch(q.key) - - var iobj object for { var key []byte key, iobj, ok = iter.Next() @@ -156,14 +205,8 @@ func (t *genTable[Obj]) FirstWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision } // Check that we have a full match on the key - var match bool - if indexTxn.entry.unique { - match = len(key) == len(q.key) - } else { - _, secondary := decodeNonUniqueKey(key) - match = len(secondary) == len(q.key) - } - if match { + _, secondary := decodeNonUniqueKey(key) + if len(secondary) == len(q.key) { break } } @@ -181,11 +224,21 @@ func (t *genTable[Obj]) Last(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint6 } func (t *genTable[Obj]) LastWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision uint64, watch <-chan struct{}, ok bool) { - indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, t.indexPos(q.index)) + var iobj object + if indexTxn.unique { + // On a unique index we can do a direct get rather than a prefix search. + watch, iobj, ok = indexTxn.Root().GetWatch(q.key) + if !ok { + return + } + obj = iobj.data.(Obj) + revision = iobj.revision + return + } + iter := indexTxn.Root().ReverseIterator() watch = iter.SeekPrefixWatch(q.key) - - var iobj object for { var key []byte key, iobj, ok = iter.Previous() @@ -194,14 +247,8 @@ func (t *genTable[Obj]) LastWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision } // Check that we have a full match on the key - var match bool - if indexTxn.entry.unique { - match = len(key) == len(q.key) - } else { - _, secondary := decodeNonUniqueKey(key) - match = len(secondary) == len(q.key) - } - if match { + _, secondary := decodeNonUniqueKey(key) + if len(secondary) == len(q.key) { break } } @@ -214,7 +261,7 @@ func (t *genTable[Obj]) LastWatch(txn ReadTxn, q Query[Obj]) (obj Obj, revision } func (t *genTable[Obj]) LowerBound(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <-chan struct{}) { - indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, t.indexPos(q.index)) root := indexTxn.Root() // Since LowerBound query may be invalidated by changes in another branch @@ -227,7 +274,7 @@ func (t *genTable[Obj]) LowerBound(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <- } func (t *genTable[Obj]) All(txn ReadTxn) (Iterator[Obj], <-chan struct{}) { - indexTxn := txn.getTxn().mustIndexReadTxn(t, t.primaryAnyIndexer.name) + indexTxn := txn.getTxn().mustIndexReadTxn(t, PrimaryIndexPos) root := indexTxn.Root() // Grab the watch channel for the root node watchCh, _, _ := root.GetWatch(nil) @@ -235,11 +282,11 @@ func (t *genTable[Obj]) All(txn ReadTxn) (Iterator[Obj], <-chan struct{}) { } func (t *genTable[Obj]) Get(txn ReadTxn, q Query[Obj]) (Iterator[Obj], <-chan struct{}) { - indexTxn := txn.getTxn().mustIndexReadTxn(t, q.index) + indexTxn := txn.getTxn().mustIndexReadTxn(t, t.indexPos(q.index)) iter := indexTxn.Root().Iterator() watchCh := iter.SeekPrefixWatch(q.key) - if indexTxn.entry.unique { + if indexTxn.unique { return &uniqueIterator[Obj]{iter, q.key}, watchCh } return &nonUniqueIterator[Obj]{iter, q.key}, watchCh diff --git a/txn.go b/txn.go index 98defd0..a44ee1a 100644 --- a/txn.go +++ b/txn.go @@ -11,6 +11,7 @@ import ( "io" "reflect" "runtime" + "slices" "time" iradix "github.com/hashicorp/go-immutable-radix/v2" @@ -22,7 +23,6 @@ import ( type txn struct { db *DB root dbRoot - writeTxns map[tableIndex]indexTxn // opened per-index write transactions modifiedTables []*tableEntry // table entries being modified smus internal.SortableMutexes // the (sorted) table locks acquiredAt time.Time // the time at which the transaction acquired the locks @@ -34,16 +34,19 @@ type tableIndex struct { index IndexName } -type indexTxn struct { - *iradix.Txn[object] - entry indexEntry +// rooter is implemented by both iradix.Txn and iradix.Tree. +type rooter interface { + Root() *iradix.Node[object] } -func (i indexTxn) Clone() indexTxn { - return indexTxn{ - i.Txn.Clone(), - i.entry, - } +type indexReadTxn struct { + rooter + unique bool +} + +type indexTxn struct { + *iradix.Txn[object] + unique bool } var zeroTxn = txn{} @@ -74,51 +77,42 @@ func (txn *txn) getRevision(meta TableMeta) Revision { // indexReadTxn returns a transaction to read from the specific index. // If the table or index is not found this returns nil & error. -func (txn *txn) indexReadTxn(meta TableMeta, index IndexName) (indexTxn, error) { - if txn.writeTxns != nil { +func (txn *txn) indexReadTxn(meta TableMeta, indexPos int) (indexReadTxn, error) { + if txn.modifiedTables != nil { entry := txn.modifiedTables[meta.tablePos()] if entry != nil { - itxn, err := txn.indexWriteTxn(meta, index) - if err == nil { - return itxn.Clone(), nil + itxn, err := txn.indexWriteTxn(meta, indexPos) + if err != nil { + return indexReadTxn{}, err } - return indexTxn{}, err + // Since iradix reuses nodes when mutating we need to return a clone + // so that iterators don't become invalid. + return indexReadTxn{itxn.Txn.Clone(), itxn.unique}, nil } } - - table := txn.root[meta.tablePos()] - indexEntry, ok := table.indexes.Get([]byte(index)) - if !ok { - return indexTxn{}, fmt.Errorf("index %q not found from table %q", index, meta.Name()) - } - - return indexTxn{ - indexEntry.tree.Txn(), - indexEntry}, nil + indexEntry := txn.root[meta.tablePos()].indexes[indexPos] + return indexReadTxn{indexEntry.tree, indexEntry.unique}, nil } // indexWriteTxn returns a transaction to read/write to a specific index. // The created transaction is memoized and used for subsequent reads and/or writes. -func (txn *txn) indexWriteTxn(meta TableMeta, index IndexName) (indexTxn, error) { - if indexTreeTxn, ok := txn.writeTxns[tableIndex{meta.tablePos(), index}]; ok { - return indexTreeTxn, nil - } +func (txn *txn) indexWriteTxn(meta TableMeta, indexPos int) (indexTxn, error) { table := txn.modifiedTables[meta.tablePos()] - indexEntry, ok := table.indexes.Get([]byte(index)) - if !ok { - return indexTxn{}, fmt.Errorf("index %q not found from table %q", index, meta.Name()) - } - itxn := indexEntry.tree.Txn() - itxn.TrackMutate(true) - indexWriteTxn := indexTxn{itxn, indexEntry} - txn.writeTxns[tableIndex{meta.tablePos(), index}] = indexWriteTxn - return indexWriteTxn, nil + if table == nil { + return indexTxn{}, tableError(meta.Name(), ErrTableNotLockedForWriting) + } + indexEntry := &table.indexes[indexPos] + if indexEntry.txn == nil { + indexEntry.txn = indexEntry.tree.Txn() + indexEntry.txn.TrackMutate(true) + } + return indexTxn{indexEntry.txn, indexEntry.unique}, nil } // mustIndexReadTxn returns a transaction to read from the specific index. // Panics if table or index are not found. -func (txn *txn) mustIndexReadTxn(meta TableMeta, index IndexName) indexTxn { - indexTxn, err := txn.indexReadTxn(meta, index) +func (txn *txn) mustIndexReadTxn(meta TableMeta, indexPos int) indexReadTxn { + indexTxn, err := txn.indexReadTxn(meta, indexPos) if err != nil { panic(err) } @@ -127,8 +121,8 @@ func (txn *txn) mustIndexReadTxn(meta TableMeta, index IndexName) indexTxn { // mustIndexReadTxn returns a transaction to read or write from the specific index. // Panics if table or index not found. -func (txn *txn) mustIndexWriteTxn(meta TableMeta, index IndexName) indexTxn { - indexTxn, err := txn.indexWriteTxn(meta, index) +func (txn *txn) mustIndexWriteTxn(meta TableMeta, indexPos int) indexTxn { + indexTxn, err := txn.indexWriteTxn(meta, indexPos) if err != nil { panic(err) } @@ -157,7 +151,7 @@ func (txn *txn) Insert(meta TableMeta, guardRevision Revision, data any) (object // Update the primary index first idKey := meta.primary().fromObject(obj).First() - idIndexTxn := txn.mustIndexWriteTxn(meta, meta.primary().name) + idIndexTxn := txn.mustIndexWriteTxn(meta, PrimaryIndexPos) oldObj, oldExists := idIndexTxn.Insert(idKey, obj) // Sanity check: is the same object being inserted back and thus the @@ -194,26 +188,30 @@ func (txn *txn) Insert(meta TableMeta, guardRevision Revision, data any) (object } // Update revision index - revIndexTxn := txn.mustIndexWriteTxn(meta, RevisionIndex) + revIndexTxn := txn.mustIndexWriteTxn(meta, RevisionIndexPos) if oldExists { - _, ok := revIndexTxn.Delete([]byte(index.Uint64(oldObj.revision))) + var revKey [8]byte + binary.BigEndian.PutUint64(revKey[:], oldObj.revision) + _, ok := revIndexTxn.Delete(revKey[:]) if !ok { panic("BUG: Old revision index entry not found") } } - revIndexTxn.Insert([]byte(index.Uint64(revision)), obj) + var revKey [8]byte + binary.BigEndian.PutUint64(revKey[:], obj.revision) + revIndexTxn.Insert(revKey[:], obj) // If it's new, possibly remove an older deleted object with the same // primary key from the graveyard. if !oldExists && txn.hasDeleteTrackers(meta) { - if old, existed := txn.mustIndexWriteTxn(meta, GraveyardIndex).Delete(idKey); existed { - txn.mustIndexWriteTxn(meta, GraveyardRevisionIndex).Delete([]byte(index.Uint64(old.revision))) + if old, existed := txn.mustIndexWriteTxn(meta, GraveyardIndexPos).Delete(idKey); existed { + txn.mustIndexWriteTxn(meta, GraveyardRevisionIndexPos).Delete([]byte(index.Uint64(old.revision))) } } // Then update secondary indexes - for idx, indexer := range meta.secondary() { - indexTxn := txn.mustIndexWriteTxn(meta, idx) + for _, indexer := range meta.secondary() { + indexTxn := txn.mustIndexWriteTxn(meta, indexer.pos) newKeys := indexer.fromObject(obj) if oldExists { @@ -280,7 +278,7 @@ func (txn *txn) Delete(meta TableMeta, guardRevision Revision, data any) (object // We assume that "data" has only enough defined fields to // compute the primary key. idKey := meta.primary().fromObject(object{data: data}).First() - idIndexTree := txn.mustIndexWriteTxn(meta, meta.primary().name) + idIndexTree := txn.mustIndexWriteTxn(meta, PrimaryIndexPos) obj, existed := idIndexTree.Delete(idKey) if !existed { return object{}, false, nil @@ -297,29 +295,31 @@ func (txn *txn) Delete(meta TableMeta, guardRevision Revision, data any) (object } // Update revision index. - indexTree := txn.mustIndexWriteTxn(meta, RevisionIndex) - if _, ok := indexTree.Delete(index.Uint64(obj.revision)); !ok { + indexTree := txn.mustIndexWriteTxn(meta, RevisionIndexPos) + var revKey [8]byte + binary.BigEndian.PutUint64(revKey[:], obj.revision) + if _, ok := indexTree.Delete(index.Key(revKey[:])); !ok { panic("BUG: Object to be deleted not found from revision index") } // Then update secondary indexes. - for idx, indexer := range meta.secondary() { + for _, indexer := range meta.secondary() { indexer.fromObject(obj).Foreach(func(key index.Key) { if !indexer.unique { key = encodeNonUniqueKey(idKey, key) } - txn.mustIndexWriteTxn(meta, idx).Delete(key) + txn.mustIndexWriteTxn(meta, indexer.pos).Delete(key) }) } // And finally insert the object into the graveyard. if txn.hasDeleteTrackers(meta) { - graveyardIndex := txn.mustIndexWriteTxn(meta, GraveyardIndex) + graveyardIndex := txn.mustIndexWriteTxn(meta, GraveyardIndexPos) obj.revision = revision if _, existed := graveyardIndex.Insert(idKey, obj); existed { panic("BUG: Double deletion! Deleted object already existed in graveyard") } - txn.mustIndexWriteTxn(meta, GraveyardRevisionIndex).Insert(index.Uint64(revision), obj) + txn.mustIndexWriteTxn(meta, GraveyardRevisionIndexPos).Insert(index.Uint64(revision), obj) } return obj, true, nil @@ -369,7 +369,7 @@ func (txn *txn) Abort() { // // txn.Commit() // - if txn.writeTxns == nil { + if txn.db == nil { return } @@ -402,9 +402,9 @@ func (txn *txn) Commit() { // a reader can acquire an immutable snapshot of all data in the database with a // simpler atomic pointer load. - // If writeTxns is nil, this transaction has already been committed or aborted, and + // If db is nil, this transaction has already been committed or aborted, and // thus there is nothing to do. - if txn.writeTxns == nil { + if txn.db == nil { return } @@ -413,11 +413,19 @@ func (txn *txn) Commit() { // Commit each individual changed index to each table. // We don't notify yet (CommitOnly) as the root needs to be updated // first as otherwise readers would wake up too early. - for tableIndex, subTxn := range txn.writeTxns { - table := txn.modifiedTables[tableIndex.tablePos] - subTxn.entry.tree = subTxn.CommitOnly() - table.indexes, _, _ = - table.indexes.Insert([]byte(tableIndex.index), subTxn.entry) + txnToNotify := []*iradix.Txn[object]{} + for _, table := range txn.modifiedTables { + if table == nil { + continue + } + for i := range table.indexes { + txn := table.indexes[i].txn + if txn != nil { + table.indexes[i].tree = txn.CommitOnly() + table.indexes[i].txn = nil + txnToNotify = append(txnToNotify, txn) + } + } // Update metrics name := table.meta.Name() @@ -435,6 +443,7 @@ func (txn *txn) Commit() { // load it again and modify the latest version that we now have immobilised by // the root lock. root := *db.root.Load() + root = slices.Clone(root) // Insert the modified tables into the root tree of tables. for pos, table := range txn.modifiedTables { @@ -453,8 +462,8 @@ func (txn *txn) Commit() { // Now that new root is committed, we can notify readers by closing the watch channels of // mutated radix tree nodes in all changed indexes and on the root itself. - for _, subTxn := range txn.writeTxns { - subTxn.Notify() + for _, txn := range txnToNotify { + txn.Notify() } txn.db.metrics.WriteTxnDuration( @@ -477,7 +486,7 @@ func (txn *txn) WriteJSON(w io.Writer) error { first = false } - indexTxn := txn.getTxn().mustIndexReadTxn(table.meta, table.meta.primary().name) + indexTxn := txn.getTxn().mustIndexReadTxn(table.meta, PrimaryIndexPos) root := indexTxn.Root() iter := root.Iterator() diff --git a/types.go b/types.go index 466a28b..2225015 100644 --- a/types.go +++ b/types.go @@ -150,8 +150,10 @@ type RWTable[Obj any] interface { // the object type (the 'Obj' constraint). type TableMeta interface { Name() TableName // The name of the table + tableEntry() tableEntry tablePos() int setTablePos(int) + indexPos(string) int tableKey() []byte // The radix key for the table in the root tree primary() anyIndexer // The untyped primary indexer for the table secondary() map[string]anyIndexer // Secondary indexers (if any) @@ -279,10 +281,17 @@ type TableWritable interface { // const ( - reservedIndexPrefix = "__" - RevisionIndex = "__revision__" - GraveyardIndex = "__graveyard__" - GraveyardRevisionIndex = "__graveyard_revision__" + PrimaryIndexPos = 0 + + reservedIndexPrefix = "__" + RevisionIndex = "__revision__" + RevisionIndexPos = 1 + GraveyardIndex = "__graveyard__" + GraveyardIndexPos = 2 + GraveyardRevisionIndex = "__graveyard_revision__" + GraveyardRevisionIndexPos = 3 + + SecondaryIndexStartPos = 4 ) // object is the format in which data is stored in the tables. @@ -305,6 +314,9 @@ type anyIndexer struct { // values returned by fromObject. If false the primary // key of the object will be appended to the key. unique bool + + // pos is the position of the index in [tableEntry.indexes] + pos int } type deleteTracker interface { @@ -314,28 +326,23 @@ type deleteTracker interface { type indexEntry struct { tree *iradix.Tree[object] + txn *iradix.Txn[object] unique bool } type tableEntry struct { meta TableMeta - indexes *iradix.Tree[indexEntry] + indexes []indexEntry deleteTrackers *iradix.Tree[deleteTracker] revision uint64 } func (t *tableEntry) numObjects() int { - indexEntry, ok := t.indexes.Get([]byte(RevisionIndex)) - if ok { - return indexEntry.tree.Len() - } - return 0 + indexEntry := t.indexes[t.meta.indexPos(RevisionIndex)] + return indexEntry.tree.Len() } func (t *tableEntry) numDeletedObjects() int { - indexEntry, ok := t.indexes.Get([]byte(GraveyardIndex)) - if ok { - return indexEntry.tree.Len() - } - return 0 + indexEntry := t.indexes[t.meta.indexPos(GraveyardIndex)] + return indexEntry.tree.Len() } From 7c3e0bbfdb8fabe9064607f5125cb51ae665eab7 Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Mon, 25 Mar 2024 13:24:15 +0100 Subject: [PATCH 5/5] Drop callerPackage and introduce NewHandle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Drop callerPackage as it was too costly and replace it with a named handle that we can use in metrics etc. The named handle can then be provided automatically with hive's module provider for per-module transaction cost accounting. Before: goos: linux goarch: amd64 pkg: github.com/cilium/statedb cpu: Intel(R) Core(TM) i5-10210U CPU @ 1.60GHz BenchmarkDB_WriteTxn_1-8 240938 4752 ns/op 210459 objects/sec BenchmarkDB_WriteTxn_10-8 502363 2551 ns/op 392063 objects/sec BenchmarkDB_WriteTxn_100-8 457850 2279 ns/op 438872 objects/sec BenchmarkDB_WriteTxn_100_SecondaryIndex-8 526416 2222 ns/op 450100 objects/sec BenchmarkDB_RandomInsert-8 1012 1181665 ns/op 846264 objects/sec BenchmarkDB_RandomReplace-8 216 5048896 ns/op 198063 objects/sec BenchmarkDB_SequentialInsert-8 398 2996997 ns/op 333667 objects/sec ... BenchmarkDB_DeleteTracker_Baseline-8 390 3036951 ns/op 329278 objects/sec BenchmarkDB_DeleteTracker-8 141 8194663 ns/op 122031 objects/sec BenchmarkDB_RandomLookup-8 8846 134745 ns/op 7421428 objects/sec BenchmarkDB_SequentialLookup-8 8425 123284 ns/op 8111372 objects/sec BenchmarkDB_FullIteration_All-8 103279 10996 ns/op 90941891 objects/sec BenchmarkDB_FullIteration_Get-8 84451 13637 ns/op 73328686 objects/sec BenchmarkDB_PropagationDelay-8 235146 5342 ns/op 48.00 50th_µs 57.00 90th_µs 215.0 99th_µs PASS ok github.com/cilium/statedb 31.480s After: goos: linux goarch: amd64 pkg: github.com/cilium/statedb cpu: Intel(R) Core(TM) i5-10210U CPU @ 1.60GHz BenchmarkDB_WriteTxn_1-8 310290 3885 ns/op 257388 objects/sec BenchmarkDB_WriteTxn_10-8 523450 2441 ns/op 409679 objects/sec BenchmarkDB_WriteTxn_100-8 538578 2219 ns/op 450628 objects/sec BenchmarkDB_WriteTxn_100_SecondaryIndex-8 515170 2156 ns/op 463816 objects/sec BenchmarkDB_RandomInsert-8 1110 1081693 ns/op 924477 objects/sec BenchmarkDB_RandomReplace-8 237 5034048 ns/op 198647 objects/sec BenchmarkDB_SequentialInsert-8 380 3048134 ns/op 328070 objects/sec ... BenchmarkDB_DeleteTracker_Baseline-8 396 3066078 ns/op 326150 objects/sec BenchmarkDB_DeleteTracker-8 169 7019558 ns/op 142459 objects/sec BenchmarkDB_RandomLookup-8 8839 137467 ns/op 7274474 objects/sec BenchmarkDB_SequentialLookup-8 8958 124483 ns/op 8033258 objects/sec BenchmarkDB_FullIteration_All-8 97218 11356 ns/op 88057271 objects/sec BenchmarkDB_FullIteration_Get-8 78102 14373 ns/op 69577187 objects/sec BenchmarkDB_PropagationDelay-8 245020 4727 ns/op 42.00 50th_µs 48.00 90th_µs 210.0 99th_µs PASS ok github.com/cilium/statedb 31.520s Signed-off-by: Jussi Maki --- db.go | 141 ++++++++++++++++++++++++++--------------------------- db_test.go | 16 ++---- metrics.go | 24 ++++----- table.go | 5 +- txn.go | 11 +++-- 5 files changed, 95 insertions(+), 102 deletions(-) diff --git a/db.go b/db.go index 372c1f8..6878e81 100644 --- a/db.go +++ b/db.go @@ -7,10 +7,8 @@ import ( "context" "errors" "net/http" - "reflect" "runtime" "slices" - "strings" "sync" "sync/atomic" "time" @@ -93,6 +91,7 @@ type DB struct { gcExited chan struct{} gcRateLimitInterval time.Duration metrics Metrics + defaultHandle Handle } type dbRoot []tableEntry @@ -102,6 +101,7 @@ func NewDB(tables []TableMeta, metrics Metrics) (*DB, error) { metrics: metrics, gcRateLimitInterval: defaultGCRateLimitInterval, } + db.defaultHandle = Handle{db, "DB"} root := make(dbRoot, 0, len(tables)) for _, t := range tables { if err := db.registerTable(t, &root); err != nil { @@ -158,12 +158,9 @@ func (db *DB) registerTable(table TableMeta, root *dbRoot) error { // ReadTxn constructs a new read transaction for performing reads against // a snapshot of the database. // -// ReadTxn is not thread-safe! +// The returned ReadTxn is not thread-safe. func (db *DB) ReadTxn() ReadTxn { - return &txn{ - db: db, - root: *db.root.Load(), - } + return db.defaultHandle.ReadTxn() } // WriteTxn constructs a new write transaction against the given set of tables. @@ -171,50 +168,9 @@ func (db *DB) ReadTxn() ReadTxn { // The modifications performed in the write transaction are not visible outside // it until Commit() is called. To discard the changes call Abort(). // -// WriteTxn is not thread-safe! +// The returned WriteTxn is not thread-safe. func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn { - callerPkg := callerPackage() - - allTables := append(tables, table) - smus := internal.SortableMutexes{} - for _, table := range allTables { - smus = append(smus, table.sortableMutex()) - } - lockAt := time.Now() - smus.Lock() - acquiredAt := time.Now() - - root := *db.root.Load() - tableEntries := make([]*tableEntry, len(root)) - var tableNames []string - for _, table := range allTables { - tableEntry := root[table.tablePos()] - tableEntry.indexes = slices.Clone(tableEntry.indexes) - tableEntries[table.tablePos()] = &tableEntry - tableNames = append(tableNames, table.Name()) - - db.metrics.WriteTxnTableAcquisition( - table.Name(), - table.sortableMutex().AcquireDuration(), - ) - } - - db.metrics.WriteTxnTotalAcquisition( - callerPkg, - tableNames, - acquiredAt.Sub(lockAt), - ) - - txn := &txn{ - db: db, - root: root, - modifiedTables: tableEntries, - smus: smus, - acquiredAt: acquiredAt, - packageName: callerPkg, - } - runtime.SetFinalizer(txn, txnFinalizer) - return txn + return db.defaultHandle.WriteTxn(table, tables...) } func (db *DB) Start(cell.HookContext) error { @@ -255,32 +211,73 @@ func (db *DB) setGCRateLimitInterval(interval time.Duration) { db.gcRateLimitInterval = interval } -var ciliumPackagePrefix = func() string { - sentinel := func() {} - name := runtime.FuncForPC(reflect.ValueOf(sentinel).Pointer()).Name() - if idx := strings.LastIndex(name, "/"); idx >= 0 { - return name[:idx+1] +// NewHandle returns a named handle to the DB. The handle has the same ReadTxn and +// WriteTxn methods as DB, but annotated with the given name for more accurate +// cost accounting in e.g. metrics. +func (db *DB) NewHandle(name string) Handle { + return Handle{db, name} +} + +// Handle is a named handle to the database for constructing read or write +// transactions. +type Handle struct { + db *DB + name string +} + +func (h Handle) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn { + db := h.db + allTables := append(tables, table) + smus := internal.SortableMutexes{} + for _, table := range allTables { + smus = append(smus, table.sortableMutex()) } - return "" -}() + lockAt := time.Now() + smus.Lock() + acquiredAt := time.Now() -func callerPackage() string { - var callerPkg string - pc, _, _, ok := runtime.Caller(2) + root := *db.root.Load() + tableEntries := make([]*tableEntry, len(root)) + var tableNames []string + for _, table := range allTables { + tableEntry := root[table.tablePos()] + tableEntry.indexes = slices.Clone(tableEntry.indexes) + tableEntries[table.tablePos()] = &tableEntry + tableNames = append(tableNames, table.Name()) - if ok { - f := runtime.FuncForPC(pc) - if f != nil { - callerPkg = f.Name() - callerPkg, _ = strings.CutPrefix(callerPkg, ciliumPackagePrefix) - callerPkg = strings.SplitN(callerPkg, ".", 2)[0] - } else { - callerPkg = "unknown" - } - } else { + db.metrics.WriteTxnTableAcquisition( + h.name, + table.Name(), + table.sortableMutex().AcquireDuration(), + ) + } - callerPkg = "unknown" + db.metrics.WriteTxnTotalAcquisition( + h.name, + tableNames, + acquiredAt.Sub(lockAt), + ) + + txn := &txn{ + db: db, + root: root, + modifiedTables: tableEntries, + smus: smus, + acquiredAt: acquiredAt, + tableNames: tableNames, + handle: h.name, } + runtime.SetFinalizer(txn, txnFinalizer) + return txn +} - return callerPkg +// ReadTxn constructs a new read transaction for performing reads against +// a snapshot of the database. +// +// The returned ReadTxn is not thread-safe. +func (h Handle) ReadTxn() ReadTxn { + return &txn{ + db: h.db, + root: *h.db.root.Load(), + } } diff --git a/db_test.go b/db_test.go index 143d337..7e8c4ee 100644 --- a/db_test.go +++ b/db_test.go @@ -607,7 +607,8 @@ func TestDB_GetFirstLast(t *testing.T) { func TestDB_CommitAbort(t *testing.T) { t.Parallel() - db, table, metrics := newTestDB(t, tagsIndex) + dbX, table, metrics := newTestDB(t, tagsIndex) + db := dbX.NewHandle("test-handle") txn := db.WriteTxn(table) _, _, err := table.Insert(txn, testObject{ID: 123, Tags: nil}) @@ -616,8 +617,8 @@ func TestDB_CommitAbort(t *testing.T) { assert.EqualValues(t, table.Revision(db.ReadTxn()), expvarInt(metrics.RevisionVar.Get("test")), "Revision") assert.EqualValues(t, 1, expvarInt(metrics.ObjectCountVar.Get("test")), "ObjectCount") - assert.Greater(t, expvarFloat(metrics.WriteTxnAcquisitionVar.Get("statedb")), 0.0, "WriteTxnAcquisition") - assert.Greater(t, expvarFloat(metrics.WriteTxnDurationVar.Get("statedb")), 0.0, "WriteTxnDuration") + assert.Greater(t, expvarFloat(metrics.WriteTxnAcquisitionVar.Get("test-handle/test")), 0.0, "WriteTxnAcquisition") + assert.Greater(t, expvarFloat(metrics.WriteTxnDurationVar.Get("test-handle/test")), 0.0, "WriteTxnDuration") obj, rev, ok := table.First(db.ReadTxn(), idIndex.Query(123)) require.True(t, ok, "expected First(1) to return result") @@ -779,15 +780,6 @@ func TestWriteJSON(t *testing.T) { txn.Commit() } -func Test_callerPackage(t *testing.T) { - t.Parallel() - - pkg := func() string { - return callerPackage() - }() - require.Equal(t, "statedb", pkg) -} - func Test_nonUniqueKey(t *testing.T) { // empty keys key := encodeNonUniqueKey(nil, nil) diff --git a/metrics.go b/metrics.go index 3613cf2..e723502 100644 --- a/metrics.go +++ b/metrics.go @@ -8,9 +8,9 @@ import ( ) type Metrics interface { - WriteTxnTableAcquisition(tableName string, acquire time.Duration) - WriteTxnTotalAcquisition(goPackage string, tables []string, acquire time.Duration) - WriteTxnDuration(goPackage string, acquire time.Duration) + WriteTxnTableAcquisition(handle string, tableName string, acquire time.Duration) + WriteTxnTotalAcquisition(handle string, tables []string, acquire time.Duration) + WriteTxnDuration(handle string, tables []string, acquire time.Duration) GraveyardLowWatermark(tableName string, lowWatermark Revision) GraveyardCleaningDuration(tableName string, duration time.Duration) @@ -121,16 +121,16 @@ func (m *ExpVarMetrics) ObjectCount(name string, numObjects int) { m.ObjectCountVar.Set(name, &intVar) } -func (m *ExpVarMetrics) WriteTxnDuration(goPackage string, acquire time.Duration) { - m.WriteTxnDurationVar.AddFloat(goPackage, acquire.Seconds()) +func (m *ExpVarMetrics) WriteTxnDuration(handle string, tables []string, acquire time.Duration) { + m.WriteTxnDurationVar.AddFloat(handle+"/"+strings.Join(tables, "+"), acquire.Seconds()) } -func (m *ExpVarMetrics) WriteTxnTotalAcquisition(goPackage string, tables []string, acquire time.Duration) { - m.WriteTxnAcquisitionVar.AddFloat(goPackage, acquire.Seconds()) +func (m *ExpVarMetrics) WriteTxnTotalAcquisition(handle string, tables []string, acquire time.Duration) { + m.WriteTxnAcquisitionVar.AddFloat(handle+"/"+strings.Join(tables, "+"), acquire.Seconds()) } -func (m *ExpVarMetrics) WriteTxnTableAcquisition(name string, acquire time.Duration) { - m.LockContentionVar.AddFloat(name, acquire.Seconds()) +func (m *ExpVarMetrics) WriteTxnTableAcquisition(handle string, tableName string, acquire time.Duration) { + m.LockContentionVar.AddFloat(handle+"/"+tableName, acquire.Seconds()) } var _ Metrics = &ExpVarMetrics{} @@ -162,15 +162,15 @@ func (*NopMetrics) Revision(tableName string, revision uint64) { } // WriteTxnDuration implements Metrics. -func (*NopMetrics) WriteTxnDuration(goPackage string, acquire time.Duration) { +func (*NopMetrics) WriteTxnDuration(handle string, tables []string, acquire time.Duration) { } // WriteTxnTableAcquisition implements Metrics. -func (*NopMetrics) WriteTxnTableAcquisition(tableName string, acquire time.Duration) { +func (*NopMetrics) WriteTxnTableAcquisition(handle string, tableName string, acquire time.Duration) { } // WriteTxnTotalAcquisition implements Metrics. -func (*NopMetrics) WriteTxnTotalAcquisition(goPackage string, tables []string, acquire time.Duration) { +func (*NopMetrics) WriteTxnTotalAcquisition(handle string, tables []string, acquire time.Duration) { } var _ Metrics = &NopMetrics{} diff --git a/table.go b/table.go index 9dcbcac..39264fe 100644 --- a/table.go +++ b/table.go @@ -59,10 +59,11 @@ func NewTable[Obj any]( indexPos := SecondaryIndexStartPos for _, indexer := range secondaryIndexers { + name := indexer.indexName() anyIndexer := toAnyIndexer(indexer) anyIndexer.pos = indexPos - table.secondaryAnyIndexers[indexer.indexName()] = anyIndexer - table.indexPositions[indexer.indexName()] = indexPos + table.secondaryAnyIndexers[name] = anyIndexer + table.indexPositions[name] = indexPos indexPos++ } diff --git a/txn.go b/txn.go index a44ee1a..23cf151 100644 --- a/txn.go +++ b/txn.go @@ -22,11 +22,12 @@ import ( type txn struct { db *DB + handle string root dbRoot modifiedTables []*tableEntry // table entries being modified smus internal.SortableMutexes // the (sorted) table locks acquiredAt time.Time // the time at which the transaction acquired the locks - packageName string // name of the package that created the transaction + tableNames []string } type tableIndex struct { @@ -61,7 +62,7 @@ func (txn *txn) getTxn() *txn { // Abort/Commit which would cause the table to be locked forever. func txnFinalizer(txn *txn) { if txn.db != nil { - panic(fmt.Sprintf("WriteTxn acquired by package %q was never Abort()'d or Commit()'d", txn.packageName)) + panic(fmt.Sprintf("WriteTxn from handle %s against tables %v was never Abort()'d or Commit()'d", txn.handle, txn.tableNames)) } } @@ -375,7 +376,8 @@ func (txn *txn) Abort() { txn.smus.Unlock() txn.db.metrics.WriteTxnDuration( - txn.packageName, + txn.handle, + txn.tableNames, time.Since(txn.acquiredAt)) *txn = zeroTxn @@ -467,7 +469,8 @@ func (txn *txn) Commit() { } txn.db.metrics.WriteTxnDuration( - txn.packageName, + txn.handle, + txn.tableNames, time.Since(txn.acquiredAt)) // Zero out the transaction to make it inert.