Skip to content

Commit

Permalink
Drop callerPackage and introduce NewHandle
Browse files Browse the repository at this point in the history
Drop callerPackage as it was too costly and replace it with a named handle
that we can use in metrics etc.

The named handle can then be provided automatically with hive's module provider
for per-module transaction cost accounting.

Before:

goos: linux
goarch: amd64
pkg: github.com/cilium/statedb
cpu: Intel(R) Core(TM) i5-10210U CPU @ 1.60GHz
BenchmarkDB_WriteTxn_1-8                                          240938              4752 ns/op            210459 objects/sec
BenchmarkDB_WriteTxn_10-8                                         502363              2551 ns/op            392063 objects/sec
BenchmarkDB_WriteTxn_100-8                                        457850              2279 ns/op            438872 objects/sec
BenchmarkDB_WriteTxn_100_SecondaryIndex-8                         526416              2222 ns/op            450100 objects/sec
BenchmarkDB_RandomInsert-8                                          1012           1181665 ns/op            846264 objects/sec
BenchmarkDB_RandomReplace-8                                          216           5048896 ns/op            198063 objects/sec
BenchmarkDB_SequentialInsert-8                                       398           2996997 ns/op            333667 objects/sec
...
BenchmarkDB_DeleteTracker_Baseline-8                                 390           3036951 ns/op            329278 objects/sec
BenchmarkDB_DeleteTracker-8                                          141           8194663 ns/op            122031 objects/sec
BenchmarkDB_RandomLookup-8                                          8846            134745 ns/op           7421428 objects/sec
BenchmarkDB_SequentialLookup-8                                      8425            123284 ns/op           8111372 objects/sec
BenchmarkDB_FullIteration_All-8                                   103279             10996 ns/op          90941891 objects/sec
BenchmarkDB_FullIteration_Get-8                                    84451             13637 ns/op          73328686 objects/sec
BenchmarkDB_PropagationDelay-8                                    235146              5342 ns/op
48.00 50th_µs           57.00 90th_µs          215.0 99th_µs
PASS
ok      github.com/cilium/statedb       31.480s

After:

goos: linux
goarch: amd64
pkg: github.com/cilium/statedb
cpu: Intel(R) Core(TM) i5-10210U CPU @ 1.60GHz
BenchmarkDB_WriteTxn_1-8                                          310290              3885 ns/op            257388 objects/sec
BenchmarkDB_WriteTxn_10-8                                         523450              2441 ns/op            409679 objects/sec
BenchmarkDB_WriteTxn_100-8                                        538578              2219 ns/op            450628 objects/sec
BenchmarkDB_WriteTxn_100_SecondaryIndex-8                         515170              2156 ns/op            463816 objects/sec
BenchmarkDB_RandomInsert-8                                          1110           1081693 ns/op            924477 objects/sec
BenchmarkDB_RandomReplace-8                                          237           5034048 ns/op            198647 objects/sec
BenchmarkDB_SequentialInsert-8                                       380           3048134 ns/op            328070 objects/sec
...
BenchmarkDB_DeleteTracker_Baseline-8                                 396           3066078 ns/op            326150 objects/sec
BenchmarkDB_DeleteTracker-8                                          169           7019558 ns/op            142459 objects/sec
BenchmarkDB_RandomLookup-8                                          8839            137467 ns/op           7274474 objects/sec
BenchmarkDB_SequentialLookup-8                                      8958            124483 ns/op           8033258 objects/sec
BenchmarkDB_FullIteration_All-8                                    97218             11356 ns/op          88057271 objects/sec
BenchmarkDB_FullIteration_Get-8                                    78102             14373 ns/op          69577187 objects/sec
BenchmarkDB_PropagationDelay-8                                    245020              4727 ns/op
42.00 50th_µs           48.00 90th_µs          210.0 99th_µs
PASS
ok      github.com/cilium/statedb       31.520s

Signed-off-by: Jussi Maki <[email protected]>
  • Loading branch information
joamaki committed Mar 25, 2024
1 parent 1858d6b commit 7c3e0bb
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 102 deletions.
141 changes: 69 additions & 72 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ import (
"context"
"errors"
"net/http"
"reflect"
"runtime"
"slices"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -93,6 +91,7 @@ type DB struct {
gcExited chan struct{}
gcRateLimitInterval time.Duration
metrics Metrics
defaultHandle Handle
}

type dbRoot []tableEntry
Expand All @@ -102,6 +101,7 @@ func NewDB(tables []TableMeta, metrics Metrics) (*DB, error) {
metrics: metrics,
gcRateLimitInterval: defaultGCRateLimitInterval,
}
db.defaultHandle = Handle{db, "DB"}
root := make(dbRoot, 0, len(tables))
for _, t := range tables {
if err := db.registerTable(t, &root); err != nil {
Expand Down Expand Up @@ -158,63 +158,19 @@ func (db *DB) registerTable(table TableMeta, root *dbRoot) error {
// ReadTxn constructs a new read transaction for performing reads against
// a snapshot of the database.
//
// ReadTxn is not thread-safe!
// The returned ReadTxn is not thread-safe.
func (db *DB) ReadTxn() ReadTxn {
return &txn{
db: db,
root: *db.root.Load(),
}
return db.defaultHandle.ReadTxn()
}

// WriteTxn constructs a new write transaction against the given set of tables.
// Each table is locked, which may block until the table locks are acquired.
// The modifications performed in the write transaction are not visible outside
// it until Commit() is called. To discard the changes call Abort().
//
// WriteTxn is not thread-safe!
// The returned WriteTxn is not thread-safe.
func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn {
callerPkg := callerPackage()

allTables := append(tables, table)
smus := internal.SortableMutexes{}
for _, table := range allTables {
smus = append(smus, table.sortableMutex())
}
lockAt := time.Now()
smus.Lock()
acquiredAt := time.Now()

root := *db.root.Load()
tableEntries := make([]*tableEntry, len(root))
var tableNames []string
for _, table := range allTables {
tableEntry := root[table.tablePos()]
tableEntry.indexes = slices.Clone(tableEntry.indexes)
tableEntries[table.tablePos()] = &tableEntry
tableNames = append(tableNames, table.Name())

db.metrics.WriteTxnTableAcquisition(
table.Name(),
table.sortableMutex().AcquireDuration(),
)
}

db.metrics.WriteTxnTotalAcquisition(
callerPkg,
tableNames,
acquiredAt.Sub(lockAt),
)

txn := &txn{
db: db,
root: root,
modifiedTables: tableEntries,
smus: smus,
acquiredAt: acquiredAt,
packageName: callerPkg,
}
runtime.SetFinalizer(txn, txnFinalizer)
return txn
return db.defaultHandle.WriteTxn(table, tables...)
}

func (db *DB) Start(cell.HookContext) error {
Expand Down Expand Up @@ -255,32 +211,73 @@ func (db *DB) setGCRateLimitInterval(interval time.Duration) {
db.gcRateLimitInterval = interval
}

var ciliumPackagePrefix = func() string {
sentinel := func() {}
name := runtime.FuncForPC(reflect.ValueOf(sentinel).Pointer()).Name()
if idx := strings.LastIndex(name, "/"); idx >= 0 {
return name[:idx+1]
// NewHandle returns a named handle to the DB. The handle has the same ReadTxn and
// WriteTxn methods as DB, but annotated with the given name for more accurate
// cost accounting in e.g. metrics.
func (db *DB) NewHandle(name string) Handle {
return Handle{db, name}
}

// Handle is a named handle to the database for constructing read or write
// transactions.
type Handle struct {
db *DB
name string
}

func (h Handle) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn {
db := h.db
allTables := append(tables, table)
smus := internal.SortableMutexes{}
for _, table := range allTables {
smus = append(smus, table.sortableMutex())
}
return ""
}()
lockAt := time.Now()
smus.Lock()
acquiredAt := time.Now()

func callerPackage() string {
var callerPkg string
pc, _, _, ok := runtime.Caller(2)
root := *db.root.Load()
tableEntries := make([]*tableEntry, len(root))
var tableNames []string
for _, table := range allTables {
tableEntry := root[table.tablePos()]
tableEntry.indexes = slices.Clone(tableEntry.indexes)
tableEntries[table.tablePos()] = &tableEntry
tableNames = append(tableNames, table.Name())

if ok {
f := runtime.FuncForPC(pc)
if f != nil {
callerPkg = f.Name()
callerPkg, _ = strings.CutPrefix(callerPkg, ciliumPackagePrefix)
callerPkg = strings.SplitN(callerPkg, ".", 2)[0]
} else {
callerPkg = "unknown"
}
} else {
db.metrics.WriteTxnTableAcquisition(
h.name,
table.Name(),
table.sortableMutex().AcquireDuration(),
)
}

callerPkg = "unknown"
db.metrics.WriteTxnTotalAcquisition(
h.name,
tableNames,
acquiredAt.Sub(lockAt),
)

txn := &txn{
db: db,
root: root,
modifiedTables: tableEntries,
smus: smus,
acquiredAt: acquiredAt,
tableNames: tableNames,
handle: h.name,
}
runtime.SetFinalizer(txn, txnFinalizer)
return txn
}

return callerPkg
// ReadTxn constructs a new read transaction for performing reads against
// a snapshot of the database.
//
// The returned ReadTxn is not thread-safe.
func (h Handle) ReadTxn() ReadTxn {
return &txn{
db: h.db,
root: *h.db.root.Load(),
}
}
16 changes: 4 additions & 12 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,8 @@ func TestDB_GetFirstLast(t *testing.T) {
func TestDB_CommitAbort(t *testing.T) {
t.Parallel()

db, table, metrics := newTestDB(t, tagsIndex)
dbX, table, metrics := newTestDB(t, tagsIndex)
db := dbX.NewHandle("test-handle")

txn := db.WriteTxn(table)
_, _, err := table.Insert(txn, testObject{ID: 123, Tags: nil})
Expand All @@ -616,8 +617,8 @@ func TestDB_CommitAbort(t *testing.T) {

assert.EqualValues(t, table.Revision(db.ReadTxn()), expvarInt(metrics.RevisionVar.Get("test")), "Revision")
assert.EqualValues(t, 1, expvarInt(metrics.ObjectCountVar.Get("test")), "ObjectCount")
assert.Greater(t, expvarFloat(metrics.WriteTxnAcquisitionVar.Get("statedb")), 0.0, "WriteTxnAcquisition")
assert.Greater(t, expvarFloat(metrics.WriteTxnDurationVar.Get("statedb")), 0.0, "WriteTxnDuration")
assert.Greater(t, expvarFloat(metrics.WriteTxnAcquisitionVar.Get("test-handle/test")), 0.0, "WriteTxnAcquisition")
assert.Greater(t, expvarFloat(metrics.WriteTxnDurationVar.Get("test-handle/test")), 0.0, "WriteTxnDuration")

obj, rev, ok := table.First(db.ReadTxn(), idIndex.Query(123))
require.True(t, ok, "expected First(1) to return result")
Expand Down Expand Up @@ -779,15 +780,6 @@ func TestWriteJSON(t *testing.T) {
txn.Commit()
}

func Test_callerPackage(t *testing.T) {
t.Parallel()

pkg := func() string {
return callerPackage()
}()
require.Equal(t, "statedb", pkg)
}

func Test_nonUniqueKey(t *testing.T) {
// empty keys
key := encodeNonUniqueKey(nil, nil)
Expand Down
24 changes: 12 additions & 12 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
)

type Metrics interface {
WriteTxnTableAcquisition(tableName string, acquire time.Duration)
WriteTxnTotalAcquisition(goPackage string, tables []string, acquire time.Duration)
WriteTxnDuration(goPackage string, acquire time.Duration)
WriteTxnTableAcquisition(handle string, tableName string, acquire time.Duration)
WriteTxnTotalAcquisition(handle string, tables []string, acquire time.Duration)
WriteTxnDuration(handle string, tables []string, acquire time.Duration)

GraveyardLowWatermark(tableName string, lowWatermark Revision)
GraveyardCleaningDuration(tableName string, duration time.Duration)
Expand Down Expand Up @@ -121,16 +121,16 @@ func (m *ExpVarMetrics) ObjectCount(name string, numObjects int) {
m.ObjectCountVar.Set(name, &intVar)
}

func (m *ExpVarMetrics) WriteTxnDuration(goPackage string, acquire time.Duration) {
m.WriteTxnDurationVar.AddFloat(goPackage, acquire.Seconds())
func (m *ExpVarMetrics) WriteTxnDuration(handle string, tables []string, acquire time.Duration) {
m.WriteTxnDurationVar.AddFloat(handle+"/"+strings.Join(tables, "+"), acquire.Seconds())
}

func (m *ExpVarMetrics) WriteTxnTotalAcquisition(goPackage string, tables []string, acquire time.Duration) {
m.WriteTxnAcquisitionVar.AddFloat(goPackage, acquire.Seconds())
func (m *ExpVarMetrics) WriteTxnTotalAcquisition(handle string, tables []string, acquire time.Duration) {
m.WriteTxnAcquisitionVar.AddFloat(handle+"/"+strings.Join(tables, "+"), acquire.Seconds())
}

func (m *ExpVarMetrics) WriteTxnTableAcquisition(name string, acquire time.Duration) {
m.LockContentionVar.AddFloat(name, acquire.Seconds())
func (m *ExpVarMetrics) WriteTxnTableAcquisition(handle string, tableName string, acquire time.Duration) {
m.LockContentionVar.AddFloat(handle+"/"+tableName, acquire.Seconds())
}

var _ Metrics = &ExpVarMetrics{}
Expand Down Expand Up @@ -162,15 +162,15 @@ func (*NopMetrics) Revision(tableName string, revision uint64) {
}

// WriteTxnDuration implements Metrics.
func (*NopMetrics) WriteTxnDuration(goPackage string, acquire time.Duration) {
func (*NopMetrics) WriteTxnDuration(handle string, tables []string, acquire time.Duration) {
}

// WriteTxnTableAcquisition implements Metrics.
func (*NopMetrics) WriteTxnTableAcquisition(tableName string, acquire time.Duration) {
func (*NopMetrics) WriteTxnTableAcquisition(handle string, tableName string, acquire time.Duration) {
}

// WriteTxnTotalAcquisition implements Metrics.
func (*NopMetrics) WriteTxnTotalAcquisition(goPackage string, tables []string, acquire time.Duration) {
func (*NopMetrics) WriteTxnTotalAcquisition(handle string, tables []string, acquire time.Duration) {
}

var _ Metrics = &NopMetrics{}
5 changes: 3 additions & 2 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ func NewTable[Obj any](

indexPos := SecondaryIndexStartPos
for _, indexer := range secondaryIndexers {
name := indexer.indexName()
anyIndexer := toAnyIndexer(indexer)
anyIndexer.pos = indexPos
table.secondaryAnyIndexers[indexer.indexName()] = anyIndexer
table.indexPositions[indexer.indexName()] = indexPos
table.secondaryAnyIndexers[name] = anyIndexer
table.indexPositions[name] = indexPos
indexPos++
}

Expand Down
11 changes: 7 additions & 4 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (

type txn struct {
db *DB
handle string
root dbRoot
modifiedTables []*tableEntry // table entries being modified
smus internal.SortableMutexes // the (sorted) table locks
acquiredAt time.Time // the time at which the transaction acquired the locks
packageName string // name of the package that created the transaction
tableNames []string
}

type tableIndex struct {
Expand Down Expand Up @@ -61,7 +62,7 @@ func (txn *txn) getTxn() *txn {
// Abort/Commit which would cause the table to be locked forever.
func txnFinalizer(txn *txn) {
if txn.db != nil {
panic(fmt.Sprintf("WriteTxn acquired by package %q was never Abort()'d or Commit()'d", txn.packageName))
panic(fmt.Sprintf("WriteTxn from handle %s against tables %v was never Abort()'d or Commit()'d", txn.handle, txn.tableNames))
}
}

Expand Down Expand Up @@ -375,7 +376,8 @@ func (txn *txn) Abort() {

txn.smus.Unlock()
txn.db.metrics.WriteTxnDuration(
txn.packageName,
txn.handle,
txn.tableNames,
time.Since(txn.acquiredAt))

*txn = zeroTxn
Expand Down Expand Up @@ -467,7 +469,8 @@ func (txn *txn) Commit() {
}

txn.db.metrics.WriteTxnDuration(
txn.packageName,
txn.handle,
txn.tableNames,
time.Since(txn.acquiredAt))

// Zero out the transaction to make it inert.
Expand Down

0 comments on commit 7c3e0bb

Please sign in to comment.