diff --git a/db.go b/db.go index 372c1f8..6878e81 100644 --- a/db.go +++ b/db.go @@ -7,10 +7,8 @@ import ( "context" "errors" "net/http" - "reflect" "runtime" "slices" - "strings" "sync" "sync/atomic" "time" @@ -93,6 +91,7 @@ type DB struct { gcExited chan struct{} gcRateLimitInterval time.Duration metrics Metrics + defaultHandle Handle } type dbRoot []tableEntry @@ -102,6 +101,7 @@ func NewDB(tables []TableMeta, metrics Metrics) (*DB, error) { metrics: metrics, gcRateLimitInterval: defaultGCRateLimitInterval, } + db.defaultHandle = Handle{db, "DB"} root := make(dbRoot, 0, len(tables)) for _, t := range tables { if err := db.registerTable(t, &root); err != nil { @@ -158,12 +158,9 @@ func (db *DB) registerTable(table TableMeta, root *dbRoot) error { // ReadTxn constructs a new read transaction for performing reads against // a snapshot of the database. // -// ReadTxn is not thread-safe! +// The returned ReadTxn is not thread-safe. func (db *DB) ReadTxn() ReadTxn { - return &txn{ - db: db, - root: *db.root.Load(), - } + return db.defaultHandle.ReadTxn() } // WriteTxn constructs a new write transaction against the given set of tables. @@ -171,50 +168,9 @@ func (db *DB) ReadTxn() ReadTxn { // The modifications performed in the write transaction are not visible outside // it until Commit() is called. To discard the changes call Abort(). // -// WriteTxn is not thread-safe! +// The returned WriteTxn is not thread-safe. func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn { - callerPkg := callerPackage() - - allTables := append(tables, table) - smus := internal.SortableMutexes{} - for _, table := range allTables { - smus = append(smus, table.sortableMutex()) - } - lockAt := time.Now() - smus.Lock() - acquiredAt := time.Now() - - root := *db.root.Load() - tableEntries := make([]*tableEntry, len(root)) - var tableNames []string - for _, table := range allTables { - tableEntry := root[table.tablePos()] - tableEntry.indexes = slices.Clone(tableEntry.indexes) - tableEntries[table.tablePos()] = &tableEntry - tableNames = append(tableNames, table.Name()) - - db.metrics.WriteTxnTableAcquisition( - table.Name(), - table.sortableMutex().AcquireDuration(), - ) - } - - db.metrics.WriteTxnTotalAcquisition( - callerPkg, - tableNames, - acquiredAt.Sub(lockAt), - ) - - txn := &txn{ - db: db, - root: root, - modifiedTables: tableEntries, - smus: smus, - acquiredAt: acquiredAt, - packageName: callerPkg, - } - runtime.SetFinalizer(txn, txnFinalizer) - return txn + return db.defaultHandle.WriteTxn(table, tables...) } func (db *DB) Start(cell.HookContext) error { @@ -255,32 +211,73 @@ func (db *DB) setGCRateLimitInterval(interval time.Duration) { db.gcRateLimitInterval = interval } -var ciliumPackagePrefix = func() string { - sentinel := func() {} - name := runtime.FuncForPC(reflect.ValueOf(sentinel).Pointer()).Name() - if idx := strings.LastIndex(name, "/"); idx >= 0 { - return name[:idx+1] +// NewHandle returns a named handle to the DB. The handle has the same ReadTxn and +// WriteTxn methods as DB, but annotated with the given name for more accurate +// cost accounting in e.g. metrics. +func (db *DB) NewHandle(name string) Handle { + return Handle{db, name} +} + +// Handle is a named handle to the database for constructing read or write +// transactions. +type Handle struct { + db *DB + name string +} + +func (h Handle) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn { + db := h.db + allTables := append(tables, table) + smus := internal.SortableMutexes{} + for _, table := range allTables { + smus = append(smus, table.sortableMutex()) } - return "" -}() + lockAt := time.Now() + smus.Lock() + acquiredAt := time.Now() -func callerPackage() string { - var callerPkg string - pc, _, _, ok := runtime.Caller(2) + root := *db.root.Load() + tableEntries := make([]*tableEntry, len(root)) + var tableNames []string + for _, table := range allTables { + tableEntry := root[table.tablePos()] + tableEntry.indexes = slices.Clone(tableEntry.indexes) + tableEntries[table.tablePos()] = &tableEntry + tableNames = append(tableNames, table.Name()) - if ok { - f := runtime.FuncForPC(pc) - if f != nil { - callerPkg = f.Name() - callerPkg, _ = strings.CutPrefix(callerPkg, ciliumPackagePrefix) - callerPkg = strings.SplitN(callerPkg, ".", 2)[0] - } else { - callerPkg = "unknown" - } - } else { + db.metrics.WriteTxnTableAcquisition( + h.name, + table.Name(), + table.sortableMutex().AcquireDuration(), + ) + } - callerPkg = "unknown" + db.metrics.WriteTxnTotalAcquisition( + h.name, + tableNames, + acquiredAt.Sub(lockAt), + ) + + txn := &txn{ + db: db, + root: root, + modifiedTables: tableEntries, + smus: smus, + acquiredAt: acquiredAt, + tableNames: tableNames, + handle: h.name, } + runtime.SetFinalizer(txn, txnFinalizer) + return txn +} - return callerPkg +// ReadTxn constructs a new read transaction for performing reads against +// a snapshot of the database. +// +// The returned ReadTxn is not thread-safe. +func (h Handle) ReadTxn() ReadTxn { + return &txn{ + db: h.db, + root: *h.db.root.Load(), + } } diff --git a/db_test.go b/db_test.go index 143d337..7e8c4ee 100644 --- a/db_test.go +++ b/db_test.go @@ -607,7 +607,8 @@ func TestDB_GetFirstLast(t *testing.T) { func TestDB_CommitAbort(t *testing.T) { t.Parallel() - db, table, metrics := newTestDB(t, tagsIndex) + dbX, table, metrics := newTestDB(t, tagsIndex) + db := dbX.NewHandle("test-handle") txn := db.WriteTxn(table) _, _, err := table.Insert(txn, testObject{ID: 123, Tags: nil}) @@ -616,8 +617,8 @@ func TestDB_CommitAbort(t *testing.T) { assert.EqualValues(t, table.Revision(db.ReadTxn()), expvarInt(metrics.RevisionVar.Get("test")), "Revision") assert.EqualValues(t, 1, expvarInt(metrics.ObjectCountVar.Get("test")), "ObjectCount") - assert.Greater(t, expvarFloat(metrics.WriteTxnAcquisitionVar.Get("statedb")), 0.0, "WriteTxnAcquisition") - assert.Greater(t, expvarFloat(metrics.WriteTxnDurationVar.Get("statedb")), 0.0, "WriteTxnDuration") + assert.Greater(t, expvarFloat(metrics.WriteTxnAcquisitionVar.Get("test-handle/test")), 0.0, "WriteTxnAcquisition") + assert.Greater(t, expvarFloat(metrics.WriteTxnDurationVar.Get("test-handle/test")), 0.0, "WriteTxnDuration") obj, rev, ok := table.First(db.ReadTxn(), idIndex.Query(123)) require.True(t, ok, "expected First(1) to return result") @@ -779,15 +780,6 @@ func TestWriteJSON(t *testing.T) { txn.Commit() } -func Test_callerPackage(t *testing.T) { - t.Parallel() - - pkg := func() string { - return callerPackage() - }() - require.Equal(t, "statedb", pkg) -} - func Test_nonUniqueKey(t *testing.T) { // empty keys key := encodeNonUniqueKey(nil, nil) diff --git a/metrics.go b/metrics.go index 3613cf2..e723502 100644 --- a/metrics.go +++ b/metrics.go @@ -8,9 +8,9 @@ import ( ) type Metrics interface { - WriteTxnTableAcquisition(tableName string, acquire time.Duration) - WriteTxnTotalAcquisition(goPackage string, tables []string, acquire time.Duration) - WriteTxnDuration(goPackage string, acquire time.Duration) + WriteTxnTableAcquisition(handle string, tableName string, acquire time.Duration) + WriteTxnTotalAcquisition(handle string, tables []string, acquire time.Duration) + WriteTxnDuration(handle string, tables []string, acquire time.Duration) GraveyardLowWatermark(tableName string, lowWatermark Revision) GraveyardCleaningDuration(tableName string, duration time.Duration) @@ -121,16 +121,16 @@ func (m *ExpVarMetrics) ObjectCount(name string, numObjects int) { m.ObjectCountVar.Set(name, &intVar) } -func (m *ExpVarMetrics) WriteTxnDuration(goPackage string, acquire time.Duration) { - m.WriteTxnDurationVar.AddFloat(goPackage, acquire.Seconds()) +func (m *ExpVarMetrics) WriteTxnDuration(handle string, tables []string, acquire time.Duration) { + m.WriteTxnDurationVar.AddFloat(handle+"/"+strings.Join(tables, "+"), acquire.Seconds()) } -func (m *ExpVarMetrics) WriteTxnTotalAcquisition(goPackage string, tables []string, acquire time.Duration) { - m.WriteTxnAcquisitionVar.AddFloat(goPackage, acquire.Seconds()) +func (m *ExpVarMetrics) WriteTxnTotalAcquisition(handle string, tables []string, acquire time.Duration) { + m.WriteTxnAcquisitionVar.AddFloat(handle+"/"+strings.Join(tables, "+"), acquire.Seconds()) } -func (m *ExpVarMetrics) WriteTxnTableAcquisition(name string, acquire time.Duration) { - m.LockContentionVar.AddFloat(name, acquire.Seconds()) +func (m *ExpVarMetrics) WriteTxnTableAcquisition(handle string, tableName string, acquire time.Duration) { + m.LockContentionVar.AddFloat(handle+"/"+tableName, acquire.Seconds()) } var _ Metrics = &ExpVarMetrics{} @@ -162,15 +162,15 @@ func (*NopMetrics) Revision(tableName string, revision uint64) { } // WriteTxnDuration implements Metrics. -func (*NopMetrics) WriteTxnDuration(goPackage string, acquire time.Duration) { +func (*NopMetrics) WriteTxnDuration(handle string, tables []string, acquire time.Duration) { } // WriteTxnTableAcquisition implements Metrics. -func (*NopMetrics) WriteTxnTableAcquisition(tableName string, acquire time.Duration) { +func (*NopMetrics) WriteTxnTableAcquisition(handle string, tableName string, acquire time.Duration) { } // WriteTxnTotalAcquisition implements Metrics. -func (*NopMetrics) WriteTxnTotalAcquisition(goPackage string, tables []string, acquire time.Duration) { +func (*NopMetrics) WriteTxnTotalAcquisition(handle string, tables []string, acquire time.Duration) { } var _ Metrics = &NopMetrics{} diff --git a/table.go b/table.go index 9dcbcac..39264fe 100644 --- a/table.go +++ b/table.go @@ -59,10 +59,11 @@ func NewTable[Obj any]( indexPos := SecondaryIndexStartPos for _, indexer := range secondaryIndexers { + name := indexer.indexName() anyIndexer := toAnyIndexer(indexer) anyIndexer.pos = indexPos - table.secondaryAnyIndexers[indexer.indexName()] = anyIndexer - table.indexPositions[indexer.indexName()] = indexPos + table.secondaryAnyIndexers[name] = anyIndexer + table.indexPositions[name] = indexPos indexPos++ } diff --git a/txn.go b/txn.go index a44ee1a..23cf151 100644 --- a/txn.go +++ b/txn.go @@ -22,11 +22,12 @@ import ( type txn struct { db *DB + handle string root dbRoot modifiedTables []*tableEntry // table entries being modified smus internal.SortableMutexes // the (sorted) table locks acquiredAt time.Time // the time at which the transaction acquired the locks - packageName string // name of the package that created the transaction + tableNames []string } type tableIndex struct { @@ -61,7 +62,7 @@ func (txn *txn) getTxn() *txn { // Abort/Commit which would cause the table to be locked forever. func txnFinalizer(txn *txn) { if txn.db != nil { - panic(fmt.Sprintf("WriteTxn acquired by package %q was never Abort()'d or Commit()'d", txn.packageName)) + panic(fmt.Sprintf("WriteTxn from handle %s against tables %v was never Abort()'d or Commit()'d", txn.handle, txn.tableNames)) } } @@ -375,7 +376,8 @@ func (txn *txn) Abort() { txn.smus.Unlock() txn.db.metrics.WriteTxnDuration( - txn.packageName, + txn.handle, + txn.tableNames, time.Since(txn.acquiredAt)) *txn = zeroTxn @@ -467,7 +469,8 @@ func (txn *txn) Commit() { } txn.db.metrics.WriteTxnDuration( - txn.packageName, + txn.handle, + txn.tableNames, time.Since(txn.acquiredAt)) // Zero out the transaction to make it inert.