Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various optimizations #2

Merged
merged 5 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
375 changes: 251 additions & 124 deletions benchmarks_test.go

Large diffs are not rendered by default.

197 changes: 93 additions & 104 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,12 @@ import (
"context"
"errors"
"net/http"
"reflect"
"runtime"
"strings"
"slices"
"sync"
"sync/atomic"
"time"

iradix "github.com/hashicorp/go-immutable-radix/v2"

"github.com/cilium/hive/cell"
"github.com/cilium/statedb/internal"
)
Expand Down Expand Up @@ -87,29 +84,31 @@ import (
// the lowest revision of all delete trackers.
type DB struct {
mu sync.Mutex // protects 'tables' and sequences modifications to the root tree
tables map[TableName]TableMeta
ctx context.Context
cancel context.CancelFunc
root atomic.Pointer[iradix.Tree[tableEntry]]
root atomic.Pointer[dbRoot]
gcTrigger chan struct{} // trigger for graveyard garbage collection
gcExited chan struct{}
gcRateLimitInterval time.Duration
metrics Metrics
defaultHandle Handle
}

type dbRoot []tableEntry

func NewDB(tables []TableMeta, metrics Metrics) (*DB, error) {
txn := iradix.New[tableEntry]().Txn()
db := &DB{
tables: make(map[TableName]TableMeta),
metrics: metrics,
gcRateLimitInterval: defaultGCRateLimitInterval,
}
db.defaultHandle = Handle{db, "DB"}
root := make(dbRoot, 0, len(tables))
for _, t := range tables {
if err := db.registerTable(t, txn); err != nil {
if err := db.registerTable(t, &root); err != nil {
return nil, err
}
}
db.root.Store(txn.CommitOnly())
db.root.Store(&root)

return db, nil
}
Expand All @@ -128,104 +127,50 @@ func (db *DB) RegisterTable(table TableMeta, tables ...TableMeta) error {
db.mu.Lock()
defer db.mu.Unlock()

txn := db.root.Load().Txn()
if err := db.registerTable(table, txn); err != nil {
root := slices.Clone(*db.root.Load())

if err := db.registerTable(table, &root); err != nil {
return err
}
for _, t := range tables {
if err := db.registerTable(t, txn); err != nil {
if err := db.registerTable(t, &root); err != nil {
return err
}
}
db.root.Store(txn.CommitOnly())
db.root.Store(&root)
return nil
}

func (db *DB) registerTable(table TableMeta, txn *iradix.Txn[tableEntry]) error {
func (db *DB) registerTable(table TableMeta, root *dbRoot) error {
name := table.Name()
if _, ok := db.tables[name]; ok {
return tableError(name, ErrDuplicateTable)
}
db.tables[name] = table
var entry tableEntry
entry.meta = table
entry.deleteTrackers = iradix.New[deleteTracker]()
indexTxn := iradix.New[indexEntry]().Txn()
indexTxn.Insert([]byte(table.primary().name), indexEntry{iradix.New[object](), true})
indexTxn.Insert([]byte(RevisionIndex), indexEntry{iradix.New[object](), true})
indexTxn.Insert([]byte(GraveyardIndex), indexEntry{iradix.New[object](), true})
indexTxn.Insert([]byte(GraveyardRevisionIndex), indexEntry{iradix.New[object](), true})
for index, indexer := range table.secondary() {
indexTxn.Insert([]byte(index), indexEntry{iradix.New[object](), indexer.unique})
for _, t := range *root {
if t.meta.Name() == name {
return tableError(name, ErrDuplicateTable)
}
}
entry.indexes = indexTxn.CommitOnly()
txn.Insert(table.tableKey(), entry)

pos := len(*root)
table.setTablePos(pos)
*root = append(*root, table.tableEntry())
return nil
}

// ReadTxn constructs a new read transaction for performing reads against
// a snapshot of the database.
//
// ReadTxn is not thread-safe!
// The returned ReadTxn is not thread-safe.
func (db *DB) ReadTxn() ReadTxn {
return &txn{
db: db,
rootReadTxn: db.root.Load().Txn(),
}
return db.defaultHandle.ReadTxn()
}

// WriteTxn constructs a new write transaction against the given set of tables.
// Each table is locked, which may block until the table locks are acquired.
// The modifications performed in the write transaction are not visible outside
// it until Commit() is called. To discard the changes call Abort().
//
// WriteTxn is not thread-safe!
// The returned WriteTxn is not thread-safe.
func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn {
callerPkg := callerPackage()

allTables := append(tables, table)
smus := internal.SortableMutexes{}
for _, table := range allTables {
smus = append(smus, table.sortableMutex())
}
lockAt := time.Now()
smus.Lock()
acquiredAt := time.Now()

rootReadTxn := db.root.Load().Txn()
tableEntries := make(map[TableName]*tableEntry, len(tables))
var tableNames []string
for _, table := range allTables {
tableEntry, ok := rootReadTxn.Get(table.tableKey())
if !ok {
panic("BUG: Table '" + table.Name() + "' not found")
}
tableEntries[table.Name()] = &tableEntry
tableNames = append(tableNames, table.Name())

db.metrics.WriteTxnTableAcquisition(
table.Name(),
table.sortableMutex().AcquireDuration(),
)
}

db.metrics.WriteTxnTotalAcquisition(
callerPkg,
tableNames,
acquiredAt.Sub(lockAt),
)

txn := &txn{
db: db,
rootReadTxn: rootReadTxn,
modifiedTables: tableEntries,
writeTxns: make(map[tableIndex]indexTxn),
smus: smus,
acquiredAt: acquiredAt,
packageName: callerPkg,
}
runtime.SetFinalizer(txn, txnFinalizer)
return txn
return db.defaultHandle.WriteTxn(table, tables...)
}

func (db *DB) Start(cell.HookContext) error {
Expand Down Expand Up @@ -266,29 +211,73 @@ func (db *DB) setGCRateLimitInterval(interval time.Duration) {
db.gcRateLimitInterval = interval
}

var ciliumPackagePrefix = func() string {
sentinel := func() {}
name := runtime.FuncForPC(reflect.ValueOf(sentinel).Pointer()).Name()
if idx := strings.LastIndex(name, "/"); idx >= 0 {
return name[:idx+1]
// NewHandle returns a named handle to the DB. The handle has the same ReadTxn and
// WriteTxn methods as DB, but annotated with the given name for more accurate
// cost accounting in e.g. metrics.
func (db *DB) NewHandle(name string) Handle {
return Handle{db, name}
}

// Handle is a named handle to the database for constructing read or write
// transactions.
type Handle struct {
db *DB
name string
}

func (h Handle) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn {
db := h.db
allTables := append(tables, table)
smus := internal.SortableMutexes{}
for _, table := range allTables {
smus = append(smus, table.sortableMutex())
}
return ""
}()
lockAt := time.Now()
smus.Lock()
acquiredAt := time.Now()

func callerPackage() string {
var callerPkg string
pc, _, _, ok := runtime.Caller(2)
if ok {
f := runtime.FuncForPC(pc)
if f != nil {
callerPkg = f.Name()
callerPkg, _ = strings.CutPrefix(callerPkg, ciliumPackagePrefix)
callerPkg = strings.SplitN(callerPkg, ".", 2)[0]
} else {
callerPkg = "unknown"
}
} else {
callerPkg = "unknown"
root := *db.root.Load()
tableEntries := make([]*tableEntry, len(root))
var tableNames []string
for _, table := range allTables {
tableEntry := root[table.tablePos()]
tableEntry.indexes = slices.Clone(tableEntry.indexes)
tableEntries[table.tablePos()] = &tableEntry
tableNames = append(tableNames, table.Name())

db.metrics.WriteTxnTableAcquisition(
h.name,
table.Name(),
table.sortableMutex().AcquireDuration(),
)
}

db.metrics.WriteTxnTotalAcquisition(
h.name,
tableNames,
acquiredAt.Sub(lockAt),
)

txn := &txn{
db: db,
root: root,
modifiedTables: tableEntries,
smus: smus,
acquiredAt: acquiredAt,
tableNames: tableNames,
handle: h.name,
}
runtime.SetFinalizer(txn, txnFinalizer)
return txn
}

// ReadTxn constructs a new read transaction for performing reads against
// a snapshot of the database.
//
// The returned ReadTxn is not thread-safe.
func (h Handle) ReadTxn() ReadTxn {
return &txn{
db: h.db,
root: *h.db.root.Load(),
}
return callerPkg
}
37 changes: 19 additions & 18 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ var logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
}))

func newTestDB(t testing.TB, secondaryIndexers ...Indexer[testObject]) (*DB, RWTable[testObject], *ExpVarMetrics) {
metrics := NewExpVarMetrics(false)
db, table := newTestDBWithMetrics(t, metrics, secondaryIndexers...)
return db, table, metrics
}

func newTestDBWithMetrics(t testing.TB, metrics Metrics, secondaryIndexers ...Indexer[testObject]) (*DB, RWTable[testObject]) {
var (
db *DB
)
Expand All @@ -79,8 +85,6 @@ func newTestDB(t testing.TB, secondaryIndexers ...Indexer[testObject]) (*DB, RWT
)
require.NoError(t, err, "NewTable[testObject]")

metrics := NewExpVarMetrics(false)

h := hive.NewWithOptions(
hive.Options{Logger: logger},

Expand All @@ -100,7 +104,7 @@ func newTestDB(t testing.TB, secondaryIndexers ...Indexer[testObject]) (*DB, RWT
t.Cleanup(func() {
assert.NoError(t, h.Stop(context.TODO()))
})
return db, table, metrics
return db, table
}

func TestDB_Insert_SamePointer(t *testing.T) {
Expand Down Expand Up @@ -136,7 +140,7 @@ func TestDB_Insert_SamePointer(t *testing.T) {
func TestDB_LowerBound_ByRevision(t *testing.T) {
t.Parallel()

db, table, _ := newTestDB(t, tagsIndex)
db, table := newTestDBWithMetrics(t, &NopMetrics{}, tagsIndex)

{
txn := db.WriteTxn(table)
Expand Down Expand Up @@ -420,7 +424,7 @@ func TestDB_All(t *testing.T) {

select {
case <-watch:
t.Fatalf("expected All() watch channel to not close before changes")
t.Fatalf("expected All() watch channel to not close before delete")
default:
}

Expand All @@ -430,10 +434,15 @@ func TestDB_All(t *testing.T) {
txn.Commit()
}

// Prior read transaction not affected by delete.
iter, _ = table.All(txn)
objs = Collect(iter)
require.Len(t, objs, 3)

select {
case <-watch:
case <-time.After(time.Second):
t.Fatalf("expceted All() watch channel to close after changes")
t.Fatalf("expected All() watch channel to close after delete")
}
}

Expand Down Expand Up @@ -598,7 +607,8 @@ func TestDB_GetFirstLast(t *testing.T) {
func TestDB_CommitAbort(t *testing.T) {
t.Parallel()

db, table, metrics := newTestDB(t, tagsIndex)
dbX, table, metrics := newTestDB(t, tagsIndex)
db := dbX.NewHandle("test-handle")

txn := db.WriteTxn(table)
_, _, err := table.Insert(txn, testObject{ID: 123, Tags: nil})
Expand All @@ -607,8 +617,8 @@ func TestDB_CommitAbort(t *testing.T) {

assert.EqualValues(t, table.Revision(db.ReadTxn()), expvarInt(metrics.RevisionVar.Get("test")), "Revision")
assert.EqualValues(t, 1, expvarInt(metrics.ObjectCountVar.Get("test")), "ObjectCount")
assert.Greater(t, expvarFloat(metrics.WriteTxnAcquisitionVar.Get("statedb")), 0.0, "WriteTxnAcquisition")
assert.Greater(t, expvarFloat(metrics.WriteTxnDurationVar.Get("statedb")), 0.0, "WriteTxnDuration")
assert.Greater(t, expvarFloat(metrics.WriteTxnAcquisitionVar.Get("test-handle/test")), 0.0, "WriteTxnAcquisition")
assert.Greater(t, expvarFloat(metrics.WriteTxnDurationVar.Get("test-handle/test")), 0.0, "WriteTxnDuration")

obj, rev, ok := table.First(db.ReadTxn(), idIndex.Query(123))
require.True(t, ok, "expected First(1) to return result")
Expand Down Expand Up @@ -770,15 +780,6 @@ func TestWriteJSON(t *testing.T) {
txn.Commit()
}

func Test_callerPackage(t *testing.T) {
t.Parallel()

pkg := func() string {
return callerPackage()
}()
require.Equal(t, "statedb", pkg)
}

func Test_nonUniqueKey(t *testing.T) {
// empty keys
key := encodeNonUniqueKey(nil, nil)
Expand Down
4 changes: 2 additions & 2 deletions deletetracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (dt *DeleteTracker[Obj]) getRevision() uint64 {
// 'minRevision'. The deleted objects are not garbage-collected unless 'Mark' is
// called!
func (dt *DeleteTracker[Obj]) Deleted(txn ReadTxn, minRevision Revision) Iterator[Obj] {
indexTxn := txn.getTxn().mustIndexReadTxn(dt.table.Name(), GraveyardRevisionIndex)
indexTxn := txn.getTxn().mustIndexReadTxn(dt.table, GraveyardRevisionIndexPos)
iter := indexTxn.Root().Iterator()
iter.SeekLowerBound(index.Uint64(minRevision))
return &iterator[Obj]{iter}
Expand All @@ -57,7 +57,7 @@ func (dt *DeleteTracker[Obj]) Close() {
// Remove the delete tracker from the table.
txn := dt.db.WriteTxn(dt.table).getTxn()
db := txn.db
table := txn.modifiedTables[dt.table.Name()]
table := txn.modifiedTables[dt.table.tablePos()]
if table == nil {
panic("BUG: Table missing from write transaction")
}
Expand Down
Loading
Loading