Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify DB and Handle types into just DB #51

Merged
merged 2 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 62 additions & 73 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ import (
// 6. Periodically garbage collect the graveyard by finding
// the lowest revision of all delete trackers.
type DB struct {
handleName string
*dbState
}

// dbState is the underlying state of the database shared by all [DB] handles.
type dbState struct {
mu sync.Mutex // protects 'tables' and sequences modifications to the root tree
ctx context.Context
cancel context.CancelFunc
Expand All @@ -90,7 +96,6 @@ type DB struct {
gcExited chan struct{}
gcRateLimitInterval time.Duration
metrics Metrics
defaultHandle Handle
}

type dbRoot []tableEntry
Expand Down Expand Up @@ -121,10 +126,12 @@ func New(options ...Option) *DB {
}

db := &DB{
metrics: opts.metrics,
gcRateLimitInterval: defaultGCRateLimitInterval,
dbState: &dbState{
metrics: opts.metrics,
gcRateLimitInterval: defaultGCRateLimitInterval,
},
}
db.defaultHandle = Handle{db, "DB"}
db.handleName = "DB"
root := dbRoot{}
db.root.Store(&root)
return db
Expand Down Expand Up @@ -177,7 +184,10 @@ func (db *DB) registerTable(table TableMeta, root *dbRoot) error {
//
// The returned ReadTxn is not thread-safe.
func (db *DB) ReadTxn() ReadTxn {
return db.defaultHandle.ReadTxn()
return &txn{
db: db,
root: *db.root.Load(),
}
}

// WriteTxn constructs a new write transaction against the given set of tables.
Expand All @@ -187,64 +197,6 @@ func (db *DB) ReadTxn() ReadTxn {
//
// The returned WriteTxn is not thread-safe.
func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn {
return db.defaultHandle.WriteTxn(table, tables...)
}

// Start the background workers for the database.
//
// This starts the graveyard worker that deals with garbage collecting
// deleted objects that are no longer necessary for Changes().
func (db *DB) Start() error {
db.gcTrigger = make(chan struct{}, 1)
db.gcExited = make(chan struct{})
db.ctx, db.cancel = context.WithCancel(context.Background())
go graveyardWorker(db, db.ctx, db.gcRateLimitInterval)
return nil
}

// Stop the background workers.
func (db *DB) Stop() error {
db.cancel()
<-db.gcExited
return nil
}

// ServeHTTP is an HTTP handler for dumping StateDB as JSON.
//
// Example usage:
//
// var db *statedb.DB
//
// http.Handle("/db", db)
// http.ListenAndServe(":8080", nil)
func (db *DB) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
db.ReadTxn().WriteJSON(w)
}

// setGCRateLimitInterval can set the graveyard GC interval before DB is started.
// Used by tests.
func (db *DB) setGCRateLimitInterval(interval time.Duration) {
db.gcRateLimitInterval = interval
}

// NewHandle returns a named handle to the DB. The handle has the same ReadTxn and
// WriteTxn methods as DB, but annotated with the given name for more accurate
// cost accounting in e.g. metrics.
func (db *DB) NewHandle(name string) Handle {
return Handle{db, name}
}

// Handle is a named handle to the database for constructing read or write
// transactions.
type Handle struct {
db *DB
name string
}

func (h Handle) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn {
db := h.db
allTables := append(tables, table)
smus := internal.SortableMutexes{}
for _, table := range allTables {
Expand All @@ -267,7 +219,7 @@ func (h Handle) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn {
tableNames = append(tableNames, table.Name())

db.metrics.WriteTxnTableAcquisition(
h.name,
db.handleName,
table.Name(),
table.sortableMutex().AcquireDuration(),
)
Expand All @@ -277,7 +229,7 @@ func (h Handle) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn {
sort.Strings(tableNames)

db.metrics.WriteTxnTotalAcquisition(
h.name,
db.handleName,
tableNames,
acquiredAt.Sub(lockAt),
)
Expand All @@ -289,19 +241,56 @@ func (h Handle) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn {
smus: smus,
acquiredAt: acquiredAt,
tableNames: tableNames,
handle: h.name,
handle: db.handleName,
}
runtime.SetFinalizer(txn, txnFinalizer)
return txn
}

// ReadTxn constructs a new read transaction for performing reads against
// a snapshot of the database.
// Start the background workers for the database.
//
// The returned ReadTxn is not thread-safe.
func (h Handle) ReadTxn() ReadTxn {
return &txn{
db: h.db,
root: *h.db.root.Load(),
// This starts the graveyard worker that deals with garbage collecting
// deleted objects that are no longer necessary for Changes().
func (db *DB) Start() error {
db.gcTrigger = make(chan struct{}, 1)
db.gcExited = make(chan struct{})
db.ctx, db.cancel = context.WithCancel(context.Background())
go graveyardWorker(db, db.ctx, db.gcRateLimitInterval)
return nil
}

// Stop the background workers.
func (db *DB) Stop() error {
db.cancel()
<-db.gcExited
return nil
}

// ServeHTTP is an HTTP handler for dumping StateDB as JSON.
//
// Example usage:
//
// var db *statedb.DB
//
// http.Handle("/db", db)
// http.ListenAndServe(":8080", nil)
func (db *DB) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
db.ReadTxn().WriteJSON(w)
}

// setGCRateLimitInterval can set the graveyard GC interval before DB is started.
// Used by tests.
func (db *DB) setGCRateLimitInterval(interval time.Duration) {
db.gcRateLimitInterval = interval
}

// NewHandle returns a new named handle to the DB. The given name is used to annotate
// metrics.
func (db *DB) NewHandle(name string) *DB {
return &DB{
handleName: name,
dbState: db.dbState,
}
}
11 changes: 10 additions & 1 deletion reconciler/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,14 @@ func main() {
cmd.Execute()
}

var Hive = hive.New(
var Hive = hive.NewWithOptions(
hive.Options{
// Create a named DB handle for each module.
ModuleDecorator: func(db *statedb.DB, id cell.ModuleID) *statedb.DB {
return db.NewHandle(string(id))
},
},

statedb.Cell,
job.Cell,

Expand Down Expand Up @@ -162,6 +169,8 @@ func registerHTTPServer(

mux := http.NewServeMux()

// To dump the metrics:
// curl -s http://localhost:8080/expvar
mux.Handle("/expvar", expvar.Handler())

// For dumping the database:
Expand Down
2 changes: 1 addition & 1 deletion reconciler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ func (m *ExpVarMetrics) PruneDuration(moduleID cell.FullModuleID, duration time.

func (m *ExpVarMetrics) PruneError(moduleID cell.FullModuleID, err error) {
m.PruneCountVar.Add(moduleID.String(), 1)
m.PruneTotalErrorsVar.Add(moduleID.String(), 1)

var intVar expvar.Int
if err != nil {
m.PruneTotalErrorsVar.Add(moduleID.String(), 1)
intVar.Set(1)
}
m.PruneCurrentErrorsVar.Set(moduleID.String(), &intVar)
Expand Down
Loading