Skip to content

Commit

Permalink
sql: reduce syncv2 conn pool usage and add pool utilization metrics
Browse files Browse the repository at this point in the history
Database.WithConnection is always used by syncv2 for any probe/sync
operations against peers, but the actual connection it acquires from
the pool is not actually used sometimes, when all the necessary
information can be retrieved from the FPTree. Also, while holding the
connection for the duration of sync/probe works best in absolute
majority of cases, in case of a very slow peer it may cause unwanted
extra utilization of the connection pool.

Another problem is that there's currently no metrics for the actual
database connection pool utilization, and connection pool wait latency
metrics for the main state database pool, api pool and the local
database are not collected separately.

This change makes `Database.WithConnection` acquire the connection
from the pool lazily, that is, upon the first query, and release it
back if there are no queries executed for certain amount of
time (defaults to 10ms), which will prevent slow syncv2 peers from
holding connections for too long. In the latter case, the connection
is re-acquired from the pool upon the following query.

Also, this change adds connection pool utilization metric, and
separates the connection wait latency metrics and pool utilization
metrics for th main state database pool, api pool and the local
database.
  • Loading branch information
ivan4th committed Jan 8, 2025
1 parent cdee34f commit 8496629
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 39 deletions.
8 changes: 5 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type BaseConfig struct {
DatabaseQueryCache bool `mapstructure:"db-query-cache"`
DatabaseQueryCacheSizes DatabaseQueryCacheSizes `mapstructure:"db-query-cache-sizes"`
DatabaseSchemaAllowDrift bool `mapstructure:"db-allow-schema-drift"`
DatabaseConnIdleTimeout time.Duration `mapstructure:"db-conn-idle-timeout"`

PruneActivesetsFrom types.EpochID `mapstructure:"prune-activesets-from"`

Expand Down Expand Up @@ -240,9 +241,10 @@ func defaultBaseConfig() BaseConfig {
ATXBlob: 10000,
ActiveSetBlob: 200,
},
NetworkHRP: "sm",
ATXGradeDelay: 10 * time.Second,
PostValidDelay: 12 * time.Hour,
DatabaseConnIdleTimeout: 10 * time.Millisecond,
NetworkHRP: "sm",
ATXGradeDelay: 10 * time.Second,
PostValidDelay: 12 * time.Hour,

PprofHTTPServerListener: "localhost:6060",
}
Expand Down
17 changes: 9 additions & 8 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,15 @@ func MainnetConfig() Config {

return Config{
BaseConfig: BaseConfig{
DataDirParent: defaultDataDir,
FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"),
MetricsPort: 1010,
DatabaseConnections: 16,
DatabasePruneInterval: 30 * time.Minute,
DatabaseVacuumState: 21,
PruneActivesetsFrom: 12, // starting from epoch 13 activesets below 12 will be pruned
NetworkHRP: "sm",
DataDirParent: defaultDataDir,
FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"),
MetricsPort: 1010,
DatabaseConnections: 16,
DatabasePruneInterval: 30 * time.Minute,
DatabaseVacuumState: 21,
DatabaseConnIdleTimeout: 10 * time.Millisecond,
PruneActivesetsFrom: 12, // starting from epoch 13 activesets below 12 will be pruned
NetworkHRP: "sm",

LayerDuration: 5 * time.Minute,
LayerAvgSize: 50,
Expand Down
1 change: 1 addition & 0 deletions config/presets/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func testnet() config.Config {
DatabaseConnections: 16,
DatabaseSizeMeteringInterval: 10 * time.Minute,
DatabasePruneInterval: 30 * time.Minute,
DatabaseConnIdleTimeout: 10 * time.Millisecond,
NetworkHRP: "stest",

LayerDuration: 5 * time.Minute,
Expand Down
6 changes: 6 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2019,6 +2019,8 @@ func (app *App) setupDBs(ctx context.Context, lg log.Log) error {
atxs.CacheKindATXBlob: app.Config.DatabaseQueryCacheSizes.ATXBlob,
activesets.CacheKindActiveSetBlob: app.Config.DatabaseQueryCacheSizes.ActiveSetBlob,
}),
sql.WithConnIdleTimeout(app.Config.DatabaseConnIdleTimeout),
sql.WithDBName("state"),
}
sqlDB, err := statesql.Open("file:"+filepath.Join(dbPath, dbFile), dbopts...)
if err != nil {
Expand All @@ -2033,6 +2035,8 @@ func (app *App) setupDBs(ctx context.Context, lg log.Log) error {
sql.WithConnections(app.Config.API.DatabaseConnections),
sql.WithNoCheckSchemaDrift(), // already checked above
sql.WithMigrationsDisabled(),
sql.WithConnIdleTimeout(app.Config.DatabaseConnIdleTimeout),
sql.WithDBName("state-api"),
)
if err != nil {
return fmt.Errorf("open sqlite db: %w", err)
Expand Down Expand Up @@ -2081,6 +2085,8 @@ func (app *App) setupDBs(ctx context.Context, lg log.Log) error {
sql.WithDatabaseSchema(lSchema),
sql.WithConnections(app.Config.DatabaseConnections),
sql.WithAllowSchemaDrift(app.Config.DatabaseSchemaAllowDrift),
sql.WithConnIdleTimeout(app.Config.DatabaseConnIdleTimeout),
sql.WithDBName("local"),
)
if err != nil {
return fmt.Errorf("open sqlite db: %w", err)
Expand Down
153 changes: 133 additions & 20 deletions sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/go-llsqlite/crawshaw/sqlitex"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/common/types"
)
Expand Down Expand Up @@ -70,6 +71,7 @@ func defaultConf() *conf {
schema: &Schema{},
checkSchemaDrift: true,
handleIncompleteMigrations: true,
connIdleTimeout: 10 * time.Millisecond,
}
}

Expand All @@ -91,6 +93,8 @@ type conf struct {
handleIncompleteMigrations bool
exclusive bool
readOnly bool
dbName string
connIdleTimeout time.Duration
}

// WithConnections overwrites number of pooled connections.
Expand Down Expand Up @@ -194,6 +198,23 @@ func WithTemp() Opt {
}
}

// WithDBName sets the name of the database which is used for metrics.
func WithDBName(name string) Opt {
return func(c *conf) {
c.dbName = name
}
}

// WithConnIdleTimeout sets idle timeout for connections from the pool
// which are acquired upon first statement executed against a Connection
// passed to the callback of Database.WithConnection. After the timeout,
// the connection is released back to the pool until the next statement.
func WithConnIdleTimeout(timeout time.Duration) Opt {
return func(c *conf) {
c.connIdleTimeout = timeout
}
}

func withDisableIncompleteMigrationHandling() Opt {
return func(c *conf) {
c.handleIncompleteMigrations = false
Expand Down Expand Up @@ -307,7 +328,7 @@ func openDB(config *conf) (db *sqliteDatabase, err error) {
return nil, fmt.Errorf("create db %s: %w", config.uri, err)
}
}
db = &sqliteDatabase{pool: pool}
db = &sqliteDatabase{pool: pool, connIdleTimeout: config.connIdleTimeout}
defer func() {
// If something goes wrong, close the database even in case of a
// panic. This is important for tests that verify incomplete migration.
Expand All @@ -323,6 +344,10 @@ func openDB(config *conf) (db *sqliteDatabase, err error) {
db.Close()
return nil, err
}
if config.dbName != "" {
actualDB.connWaitLatency = ConnWaitLatency.WithLabelValues(config.dbName)
actualDB.poolUsage = PoolUsage.WithLabelValues(config.dbName)
}
return actualDB, nil
}

Expand Down Expand Up @@ -654,19 +679,32 @@ type sqliteDatabase struct {

interceptMtx sync.Mutex
interceptors map[string]Interceptor

connWaitLatency prometheus.Observer
poolUsage prometheus.Gauge

connIdleTimeout time.Duration
}

var _ Database = &sqliteDatabase{}

func (db *sqliteDatabase) getConn(ctx context.Context) *sqlite.Conn {
start := time.Now()
conn := db.pool.Get(ctx)
if conn != nil {
connWaitLatency.Observe(time.Since(start).Seconds())
if conn != nil && db.connWaitLatency != nil {
db.connWaitLatency.Observe(time.Since(start).Seconds())
db.poolUsage.Inc()
}
return conn
}

func (db *sqliteDatabase) putConn(conn *sqlite.Conn) {
db.pool.Put(conn)
if db.poolUsage != nil {
db.poolUsage.Dec()
}
}

func (db *sqliteDatabase) getTx(ctx context.Context, initstmt string) (*sqliteTx, error) {
if db.closed {
return nil, ErrClosed
Expand All @@ -680,7 +718,7 @@ func (db *sqliteDatabase) getTx(ctx context.Context, initstmt string) (*sqliteTx
tx := &sqliteTx{queryCache: db.queryCache, db: db, conn: conn, freeConn: cancel}
if err := tx.begin(initstmt); err != nil {
cancel()
db.pool.Put(conn)
db.putConn(conn)

Check warning on line 721 in sql/database.go

View check run for this annotation

Codecov / codecov/patch

sql/database.go#L721

Added line #L721 was not covered by tests
return nil, err
}
return tx, nil
Expand All @@ -707,7 +745,7 @@ func (db *sqliteDatabase) startExclusive() error {
if conn == nil {
return ErrNoConnection
}
defer db.pool.Put(conn)
defer db.putConn(conn)
// We don't need to wait for long if the database is busy
conn.SetBusyTimeout(1 * time.Millisecond)
// From SQLite docs:
Expand Down Expand Up @@ -787,7 +825,7 @@ func (db *sqliteDatabase) Exec(query string, encoder Encoder, decoder Decoder) (
if conn == nil {
return 0, ErrNoConnection
}
defer db.pool.Put(conn)
defer db.putConn(conn)
if db.latency != nil {
start := time.Now()
defer func() {
Expand All @@ -812,18 +850,17 @@ func (db *sqliteDatabase) Close() error {
}

// WithConnection implements Database.
func (db *sqliteDatabase) WithConnection(ctx context.Context, exec func(Executor) error) error {
func (db *sqliteDatabase) WithConnection(ctx context.Context, toCall func(Executor) error) error {
if db.closed {
return ErrClosed
}
conCtx, cancel := context.WithCancel(ctx)
defer cancel()
conn := db.getConn(conCtx)
if conn == nil {
return ErrNoConnection
}
defer db.pool.Put(conn)
return exec(&sqliteConn{queryCache: db.queryCache, db: db, conn: conn})
c := newLazyConn(conCtx, db)
defer func() {
cancel()
c.release()
}()
return toCall(c)
}

// Intercept adds an interceptor function to the database. The interceptor functions
Expand Down Expand Up @@ -1120,7 +1157,7 @@ func (tx *sqliteTx) Commit() error {

// Release transaction. Every transaction that was created must be released.
func (tx *sqliteTx) Release() error {
defer tx.db.pool.Put(tx.conn)
defer tx.db.putConn(tx.conn)
if tx.committed {
tx.freeConn()
return nil
Expand All @@ -1147,13 +1184,83 @@ func (tx *sqliteTx) Exec(query string, encoder Encoder, decoder Decoder) (int, e
return exec(tx.conn, query, encoder, decoder)
}

type sqliteConn struct {
// lazyConn is a connection that is acquired lazily from the pool, that is, upon the first
// query, and released after a certain period of inactivity.
type lazyConn struct {
*queryCache
db *sqliteDatabase
conn *sqlite.Conn
db *sqliteDatabase
getConn func() *sqlite.Conn
eg errgroup.Group
conn *sqlite.Conn
timer *time.Timer
doneCh chan struct{}
connMtx sync.Mutex
}

func newLazyConn(ctx context.Context, db *sqliteDatabase) *lazyConn {
return &lazyConn{
queryCache: db.queryCache,
db: db,
getConn: func() *sqlite.Conn {
return db.getConn(ctx)
},
}
}

func (c *lazyConn) ensureConn() *sqlite.Conn {
if c.conn != nil {
return c.conn
}

c.conn = c.getConn()
if c.timer == nil {
c.timer = time.NewTimer(c.db.connIdleTimeout)
c.eg.Go(func() error {
for {
select {
case <-c.timer.C:
// Although TryLock docs say that it's not recommended to use it
// in most cases, this use case is justified.
// If the mutex is already locked here, this means that an SQL
// statement is being executed on the connection, after which
// the idle timer will be restarted, or the connection is currently
// being released.
if c.connMtx.TryLock() {
c.releaseConn()
c.connMtx.Unlock()
}
case <-c.doneCh:
return nil
}
}
})
c.doneCh = make(chan struct{})
} else {
c.timer.Reset(c.db.connIdleTimeout)
}
return c.conn
}

func (c *sqliteConn) Exec(query string, encoder Encoder, decoder Decoder) (int, error) {
func (c *lazyConn) releaseConn() {
if c.conn != nil {
c.timer.Stop()
c.db.putConn(c.conn)
c.conn = nil
}
}

func (c *lazyConn) release() {
// Lock the mutex so that we don't get concurrent releaseConn() from the timer handler.
c.connMtx.Lock()
defer c.connMtx.Unlock()
c.releaseConn()
if c.doneCh != nil {
close(c.doneCh)
c.eg.Wait()
}
}

func (c *lazyConn) Exec(query string, encoder Encoder, decoder Decoder) (int, error) {
if err := c.db.runInterceptors(query); err != nil {
return 0, fmt.Errorf("running query interceptors: %w", err)
}
Expand All @@ -1165,7 +1272,13 @@ func (c *sqliteConn) Exec(query string, encoder Encoder, decoder Decoder) (int,
c.db.latency.WithLabelValues(query).Observe(float64(time.Since(start)))
}()
}
return exec(c.conn, query, encoder, decoder)
c.connMtx.Lock()
defer c.connMtx.Unlock()
conn := c.ensureConn()
defer func() {
c.timer.Reset(c.db.connIdleTimeout)
}()
return exec(conn, query, encoder, decoder)
}

func mapSqliteError(err error) error {
Expand Down
30 changes: 30 additions & 0 deletions sql/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"go.uber.org/zap"
Expand Down Expand Up @@ -659,3 +660,32 @@ func TestConnection(t *testing.T) {
return errors.New("error")
}))
}

func TestConnection_Idle(t *testing.T) {
dbName := t.Name()
numConns := func() int {
return int(testutil.ToFloat64(PoolUsage.WithLabelValues(dbName)))
}
require.Zero(t, numConns())
db := InMemoryTest(t, WithDBName(dbName))
require.NoError(t, db.WithConnection(context.Background(), func(ex Executor) error {
for range 3 {
for range 3 {
_, err := ex.Exec("select 1", nil, func(stmt *Statement) bool {
require.Equal(t, 1, numConns())
return true
})
require.NoError(t, err)
// The connection should still be in use right after the query
require.Equal(t, 1, numConns())
}
// The connection should be released after idle interval,
// but reacquired on the next query
require.Eventually(t, func() bool { return numConns() == 0 },
time.Second, 10*time.Millisecond)
}
return nil
}))

require.Zero(t, numConns())
}
Loading

0 comments on commit 8496629

Please sign in to comment.