Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - sql: reduce syncv2 conn pool usage and add pool utilization metrics #6596

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type BaseConfig struct {
DatabaseQueryCache bool `mapstructure:"db-query-cache"`
DatabaseQueryCacheSizes DatabaseQueryCacheSizes `mapstructure:"db-query-cache-sizes"`
DatabaseSchemaAllowDrift bool `mapstructure:"db-allow-schema-drift"`
DatabaseConnIdleTimeout time.Duration `mapstructure:"db-conn-idle-timeout"`

PruneActivesetsFrom types.EpochID `mapstructure:"prune-activesets-from"`

Expand Down Expand Up @@ -240,9 +241,10 @@ func defaultBaseConfig() BaseConfig {
ATXBlob: 10000,
ActiveSetBlob: 200,
},
NetworkHRP: "sm",
ATXGradeDelay: 10 * time.Second,
PostValidDelay: 12 * time.Hour,
DatabaseConnIdleTimeout: 10 * time.Millisecond,
NetworkHRP: "sm",
ATXGradeDelay: 10 * time.Second,
PostValidDelay: 12 * time.Hour,

PprofHTTPServerListener: "localhost:6060",
}
Expand Down
17 changes: 9 additions & 8 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,15 @@ func MainnetConfig() Config {

return Config{
BaseConfig: BaseConfig{
DataDirParent: defaultDataDir,
FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"),
MetricsPort: 1010,
DatabaseConnections: 16,
DatabasePruneInterval: 30 * time.Minute,
DatabaseVacuumState: 21,
PruneActivesetsFrom: 12, // starting from epoch 13 activesets below 12 will be pruned
NetworkHRP: "sm",
DataDirParent: defaultDataDir,
FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"),
MetricsPort: 1010,
DatabaseConnections: 16,
DatabasePruneInterval: 30 * time.Minute,
DatabaseVacuumState: 21,
DatabaseConnIdleTimeout: 10 * time.Millisecond,
PruneActivesetsFrom: 12, // starting from epoch 13 activesets below 12 will be pruned
NetworkHRP: "sm",

LayerDuration: 5 * time.Minute,
LayerAvgSize: 50,
Expand Down
1 change: 1 addition & 0 deletions config/presets/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func testnet() config.Config {
DatabaseConnections: 16,
DatabaseSizeMeteringInterval: 10 * time.Minute,
DatabasePruneInterval: 30 * time.Minute,
DatabaseConnIdleTimeout: 10 * time.Millisecond,
NetworkHRP: "stest",

LayerDuration: 5 * time.Minute,
Expand Down
6 changes: 6 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2019,6 +2019,8 @@ func (app *App) setupDBs(ctx context.Context, lg log.Log) error {
atxs.CacheKindATXBlob: app.Config.DatabaseQueryCacheSizes.ATXBlob,
activesets.CacheKindActiveSetBlob: app.Config.DatabaseQueryCacheSizes.ActiveSetBlob,
}),
sql.WithConnIdleTimeout(app.Config.DatabaseConnIdleTimeout),
sql.WithDBName("state"),
}
sqlDB, err := statesql.Open("file:"+filepath.Join(dbPath, dbFile), dbopts...)
if err != nil {
Expand All @@ -2033,6 +2035,8 @@ func (app *App) setupDBs(ctx context.Context, lg log.Log) error {
sql.WithConnections(app.Config.API.DatabaseConnections),
sql.WithNoCheckSchemaDrift(), // already checked above
sql.WithMigrationsDisabled(),
sql.WithConnIdleTimeout(app.Config.DatabaseConnIdleTimeout),
sql.WithDBName("state-api"),
)
if err != nil {
return fmt.Errorf("open sqlite db: %w", err)
Expand Down Expand Up @@ -2081,6 +2085,8 @@ func (app *App) setupDBs(ctx context.Context, lg log.Log) error {
sql.WithDatabaseSchema(lSchema),
sql.WithConnections(app.Config.DatabaseConnections),
sql.WithAllowSchemaDrift(app.Config.DatabaseSchemaAllowDrift),
sql.WithConnIdleTimeout(app.Config.DatabaseConnIdleTimeout),
sql.WithDBName("local"),
)
if err != nil {
return fmt.Errorf("open sqlite db: %w", err)
Expand Down
146 changes: 127 additions & 19 deletions sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"github.com/go-llsqlite/crawshaw/sqlitex"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/common/types"
)
Expand Down Expand Up @@ -70,6 +71,8 @@
schema: &Schema{},
checkSchemaDrift: true,
handleIncompleteMigrations: true,
connIdleTimeout: 10 * time.Millisecond,
dbName: "sqlite",
}
}

Expand All @@ -91,6 +94,8 @@
handleIncompleteMigrations bool
exclusive bool
readOnly bool
dbName string
connIdleTimeout time.Duration
}

// WithConnections overwrites number of pooled connections.
Expand Down Expand Up @@ -194,6 +199,23 @@
}
}

// WithDBName sets the name of the database which is used for metrics.
func WithDBName(name string) Opt {
return func(c *conf) {
c.dbName = name
}
}

// WithConnIdleTimeout sets idle timeout for connections from the pool
// which are acquired upon first statement executed against a Connection
// passed to the callback of Database.WithConnection. After the timeout,
// the connection is released back to the pool until the next statement.
func WithConnIdleTimeout(timeout time.Duration) Opt {
return func(c *conf) {
c.connIdleTimeout = timeout
}
}

func withDisableIncompleteMigrationHandling() Opt {
return func(c *conf) {
c.handleIncompleteMigrations = false
Expand Down Expand Up @@ -307,7 +329,7 @@
return nil, fmt.Errorf("create db %s: %w", config.uri, err)
}
}
db = &sqliteDatabase{pool: pool}
db = &sqliteDatabase{pool: pool, connIdleTimeout: config.connIdleTimeout}
defer func() {
// If something goes wrong, close the database even in case of a
// panic. This is important for tests that verify incomplete migration.
Expand All @@ -323,6 +345,8 @@
db.Close()
return nil, err
}
actualDB.connWaitLatency = ConnWaitLatency.WithLabelValues(config.dbName)
actualDB.poolUsage = PoolUsage.WithLabelValues(config.dbName)
return actualDB, nil
}

Expand Down Expand Up @@ -654,19 +678,32 @@

interceptMtx sync.Mutex
interceptors map[string]Interceptor

connWaitLatency prometheus.Observer
poolUsage prometheus.Gauge

connIdleTimeout time.Duration
}

var _ Database = &sqliteDatabase{}

func (db *sqliteDatabase) getConn(ctx context.Context) *sqlite.Conn {
start := time.Now()
conn := db.pool.Get(ctx)
if conn != nil {
connWaitLatency.Observe(time.Since(start).Seconds())
if conn != nil && db.connWaitLatency != nil {
db.connWaitLatency.Observe(time.Since(start).Seconds())
db.poolUsage.Inc()
}
return conn
}

func (db *sqliteDatabase) putConn(conn *sqlite.Conn) {
db.pool.Put(conn)
if db.poolUsage != nil {
db.poolUsage.Dec()
fasmat marked this conversation as resolved.
Show resolved Hide resolved
}
}

fasmat marked this conversation as resolved.
Show resolved Hide resolved
func (db *sqliteDatabase) getTx(ctx context.Context, initstmt string) (*sqliteTx, error) {
if db.closed {
return nil, ErrClosed
Expand All @@ -680,7 +717,7 @@
tx := &sqliteTx{queryCache: db.queryCache, db: db, conn: conn, freeConn: cancel}
if err := tx.begin(initstmt); err != nil {
cancel()
db.pool.Put(conn)
db.putConn(conn)

Check warning on line 720 in sql/database.go

View check run for this annotation

Codecov / codecov/patch

sql/database.go#L720

Added line #L720 was not covered by tests
return nil, err
}
return tx, nil
Expand All @@ -707,7 +744,7 @@
if conn == nil {
return ErrNoConnection
}
defer db.pool.Put(conn)
defer db.putConn(conn)
// We don't need to wait for long if the database is busy
conn.SetBusyTimeout(1 * time.Millisecond)
// From SQLite docs:
Expand Down Expand Up @@ -787,7 +824,7 @@
if conn == nil {
return 0, ErrNoConnection
}
defer db.pool.Put(conn)
defer db.putConn(conn)
if db.latency != nil {
start := time.Now()
defer func() {
Expand All @@ -812,18 +849,15 @@
}

// WithConnection implements Database.
func (db *sqliteDatabase) WithConnection(ctx context.Context, exec func(Executor) error) error {
func (db *sqliteDatabase) WithConnection(ctx context.Context, toCall func(Executor) error) error {
if db.closed {
return ErrClosed
}
conCtx, cancel := context.WithCancel(ctx)
defer cancel()
conn := db.getConn(conCtx)
if conn == nil {
return ErrNoConnection
}
defer db.pool.Put(conn)
return exec(&sqliteConn{queryCache: db.queryCache, db: db, conn: conn})
c := newLazyConn(conCtx, db)
defer c.release()
return toCall(c)
}

// Intercept adds an interceptor function to the database. The interceptor functions
Expand Down Expand Up @@ -1120,7 +1154,7 @@

// Release transaction. Every transaction that was created must be released.
func (tx *sqliteTx) Release() error {
defer tx.db.pool.Put(tx.conn)
defer tx.db.putConn(tx.conn)
if tx.committed {
tx.freeConn()
return nil
Expand All @@ -1147,13 +1181,83 @@
return exec(tx.conn, query, encoder, decoder)
}

type sqliteConn struct {
// lazyConn is a connection that is acquired lazily from the pool, that is, upon the first
// query, and released after a certain period of inactivity.
type lazyConn struct {
*queryCache
db *sqliteDatabase
conn *sqlite.Conn
db *sqliteDatabase
getConn func() *sqlite.Conn
eg errgroup.Group
conn *sqlite.Conn
timer *time.Timer
doneCh chan struct{}
connMtx sync.Mutex
}

func newLazyConn(ctx context.Context, db *sqliteDatabase) *lazyConn {
return &lazyConn{
queryCache: db.queryCache,
db: db,
getConn: func() *sqlite.Conn {
return db.getConn(ctx)
},
}
}

func (c *lazyConn) ensureConn() *sqlite.Conn {
if c.conn != nil {
return c.conn
}

c.conn = c.getConn()
if c.timer != nil {
c.timer.Reset(c.db.connIdleTimeout)
return c.conn
}
c.timer = time.NewTimer(c.db.connIdleTimeout)
c.doneCh = make(chan struct{})
c.eg.Go(func() error {
for {
select {
case <-c.timer.C:
// Although TryLock docs say that it's not recommended to use it
// in most cases, this use case is justified.
// If the mutex is already locked here, this means that an SQL
// statement is being executed on the connection, after which
// the idle timer will be restarted, or the connection is currently
// being released.
if c.connMtx.TryLock() {
c.releaseConn()
c.connMtx.Unlock()
}
case <-c.doneCh:
return nil
}
}
})
return c.conn
}

func (c *sqliteConn) Exec(query string, encoder Encoder, decoder Decoder) (int, error) {
func (c *lazyConn) releaseConn() {
if c.conn != nil {
c.timer.Stop()
c.db.putConn(c.conn)
c.conn = nil
}
}

func (c *lazyConn) release() {
// Lock the mutex so that we don't get concurrent releaseConn() from the timer handler.
c.connMtx.Lock()
defer c.connMtx.Unlock()
c.releaseConn()
if c.doneCh != nil {
close(c.doneCh)
c.eg.Wait()
}
}

func (c *lazyConn) Exec(query string, encoder Encoder, decoder Decoder) (int, error) {
if err := c.db.runInterceptors(query); err != nil {
return 0, fmt.Errorf("running query interceptors: %w", err)
}
Expand All @@ -1165,7 +1269,11 @@
c.db.latency.WithLabelValues(query).Observe(float64(time.Since(start)))
}()
}
return exec(c.conn, query, encoder, decoder)
c.connMtx.Lock()
defer c.connMtx.Unlock()
conn := c.ensureConn()
defer c.timer.Reset(c.db.connIdleTimeout)
return exec(conn, query, encoder, decoder)
}

func mapSqliteError(err error) error {
Expand Down
30 changes: 30 additions & 0 deletions sql/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"go.uber.org/zap"
Expand Down Expand Up @@ -659,3 +660,32 @@ func TestConnection(t *testing.T) {
return errors.New("error")
}))
}

func TestConnection_Idle(t *testing.T) {
dbName := t.Name()
numConns := func() int {
return int(testutil.ToFloat64(PoolUsage.WithLabelValues(dbName)))
}
fasmat marked this conversation as resolved.
Show resolved Hide resolved
require.Zero(t, numConns())
db := InMemoryTest(t, WithDBName(dbName))
require.NoError(t, db.WithConnection(context.Background(), func(ex Executor) error {
for range 3 {
for range 3 {
_, err := ex.Exec("select 1", nil, func(stmt *Statement) bool {
require.Equal(t, 1, numConns())
return true
})
require.NoError(t, err)
// The connection should still be in use right after the query
require.Equal(t, 1, numConns())
}
// The connection should be released after idle interval,
// but reacquired on the next query
require.Eventually(t, func() bool { return numConns() == 0 },
time.Second, 10*time.Millisecond)
}
return nil
}))

require.Zero(t, numConns())
}
Loading
Loading