diff --git a/config/config.go b/config/config.go index 47e08cc12b..d93e5ca159 100644 --- a/config/config.go +++ b/config/config.go @@ -122,6 +122,7 @@ type BaseConfig struct { DatabaseQueryCache bool `mapstructure:"db-query-cache"` DatabaseQueryCacheSizes DatabaseQueryCacheSizes `mapstructure:"db-query-cache-sizes"` DatabaseSchemaAllowDrift bool `mapstructure:"db-allow-schema-drift"` + DatabaseConnIdleTimeout time.Duration `mapstructure:"db-conn-idle-timeout"` PruneActivesetsFrom types.EpochID `mapstructure:"prune-activesets-from"` @@ -240,9 +241,10 @@ func defaultBaseConfig() BaseConfig { ATXBlob: 10000, ActiveSetBlob: 200, }, - NetworkHRP: "sm", - ATXGradeDelay: 10 * time.Second, - PostValidDelay: 12 * time.Hour, + DatabaseConnIdleTimeout: 10 * time.Millisecond, + NetworkHRP: "sm", + ATXGradeDelay: 10 * time.Second, + PostValidDelay: 12 * time.Hour, PprofHTTPServerListener: "localhost:6060", } diff --git a/config/mainnet.go b/config/mainnet.go index 2af24825c9..ed2ef099e7 100644 --- a/config/mainnet.go +++ b/config/mainnet.go @@ -88,14 +88,15 @@ func MainnetConfig() Config { return Config{ BaseConfig: BaseConfig{ - DataDirParent: defaultDataDir, - FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"), - MetricsPort: 1010, - DatabaseConnections: 16, - DatabasePruneInterval: 30 * time.Minute, - DatabaseVacuumState: 21, - PruneActivesetsFrom: 12, // starting from epoch 13 activesets below 12 will be pruned - NetworkHRP: "sm", + DataDirParent: defaultDataDir, + FileLock: filepath.Join(os.TempDir(), "spacemesh.lock"), + MetricsPort: 1010, + DatabaseConnections: 16, + DatabasePruneInterval: 30 * time.Minute, + DatabaseVacuumState: 21, + DatabaseConnIdleTimeout: 10 * time.Millisecond, + PruneActivesetsFrom: 12, // starting from epoch 13 activesets below 12 will be pruned + NetworkHRP: "sm", LayerDuration: 5 * time.Minute, LayerAvgSize: 50, diff --git a/config/presets/testnet.go b/config/presets/testnet.go index 476b4fe16b..614bcdf61f 100644 --- a/config/presets/testnet.go +++ b/config/presets/testnet.go @@ -82,6 +82,7 @@ func testnet() config.Config { DatabaseConnections: 16, DatabaseSizeMeteringInterval: 10 * time.Minute, DatabasePruneInterval: 30 * time.Minute, + DatabaseConnIdleTimeout: 10 * time.Millisecond, NetworkHRP: "stest", LayerDuration: 5 * time.Minute, diff --git a/node/node.go b/node/node.go index 20b8905485..6ef58e58f9 100644 --- a/node/node.go +++ b/node/node.go @@ -2019,6 +2019,8 @@ func (app *App) setupDBs(ctx context.Context, lg log.Log) error { atxs.CacheKindATXBlob: app.Config.DatabaseQueryCacheSizes.ATXBlob, activesets.CacheKindActiveSetBlob: app.Config.DatabaseQueryCacheSizes.ActiveSetBlob, }), + sql.WithConnIdleTimeout(app.Config.DatabaseConnIdleTimeout), + sql.WithDBName("state"), } sqlDB, err := statesql.Open("file:"+filepath.Join(dbPath, dbFile), dbopts...) if err != nil { @@ -2033,6 +2035,8 @@ func (app *App) setupDBs(ctx context.Context, lg log.Log) error { sql.WithConnections(app.Config.API.DatabaseConnections), sql.WithNoCheckSchemaDrift(), // already checked above sql.WithMigrationsDisabled(), + sql.WithConnIdleTimeout(app.Config.DatabaseConnIdleTimeout), + sql.WithDBName("state-api"), ) if err != nil { return fmt.Errorf("open sqlite db: %w", err) @@ -2081,6 +2085,8 @@ func (app *App) setupDBs(ctx context.Context, lg log.Log) error { sql.WithDatabaseSchema(lSchema), sql.WithConnections(app.Config.DatabaseConnections), sql.WithAllowSchemaDrift(app.Config.DatabaseSchemaAllowDrift), + sql.WithConnIdleTimeout(app.Config.DatabaseConnIdleTimeout), + sql.WithDBName("local"), ) if err != nil { return fmt.Errorf("open sqlite db: %w", err) diff --git a/sql/database.go b/sql/database.go index da63a3b4b6..fe832cd3a8 100644 --- a/sql/database.go +++ b/sql/database.go @@ -19,6 +19,7 @@ import ( "github.com/go-llsqlite/crawshaw/sqlitex" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "github.com/spacemeshos/go-spacemesh/common/types" ) @@ -70,6 +71,8 @@ func defaultConf() *conf { schema: &Schema{}, checkSchemaDrift: true, handleIncompleteMigrations: true, + connIdleTimeout: 10 * time.Millisecond, + dbName: "sqlite", } } @@ -91,6 +94,8 @@ type conf struct { handleIncompleteMigrations bool exclusive bool readOnly bool + dbName string + connIdleTimeout time.Duration } // WithConnections overwrites number of pooled connections. @@ -194,6 +199,23 @@ func WithTemp() Opt { } } +// WithDBName sets the name of the database which is used for metrics. +func WithDBName(name string) Opt { + return func(c *conf) { + c.dbName = name + } +} + +// WithConnIdleTimeout sets idle timeout for connections from the pool +// which are acquired upon first statement executed against a Connection +// passed to the callback of Database.WithConnection. After the timeout, +// the connection is released back to the pool until the next statement. +func WithConnIdleTimeout(timeout time.Duration) Opt { + return func(c *conf) { + c.connIdleTimeout = timeout + } +} + func withDisableIncompleteMigrationHandling() Opt { return func(c *conf) { c.handleIncompleteMigrations = false @@ -307,7 +329,7 @@ func openDB(config *conf) (db *sqliteDatabase, err error) { return nil, fmt.Errorf("create db %s: %w", config.uri, err) } } - db = &sqliteDatabase{pool: pool} + db = &sqliteDatabase{pool: pool, connIdleTimeout: config.connIdleTimeout} defer func() { // If something goes wrong, close the database even in case of a // panic. This is important for tests that verify incomplete migration. @@ -323,6 +345,8 @@ func openDB(config *conf) (db *sqliteDatabase, err error) { db.Close() return nil, err } + actualDB.connWaitLatency = ConnWaitLatency.WithLabelValues(config.dbName) + actualDB.poolUsage = PoolUsage.WithLabelValues(config.dbName) return actualDB, nil } @@ -654,6 +678,11 @@ type sqliteDatabase struct { interceptMtx sync.Mutex interceptors map[string]Interceptor + + connWaitLatency prometheus.Observer + poolUsage prometheus.Gauge + + connIdleTimeout time.Duration } var _ Database = &sqliteDatabase{} @@ -661,12 +690,20 @@ var _ Database = &sqliteDatabase{} func (db *sqliteDatabase) getConn(ctx context.Context) *sqlite.Conn { start := time.Now() conn := db.pool.Get(ctx) - if conn != nil { - connWaitLatency.Observe(time.Since(start).Seconds()) + if conn != nil && db.connWaitLatency != nil { + db.connWaitLatency.Observe(time.Since(start).Seconds()) + db.poolUsage.Inc() } return conn } +func (db *sqliteDatabase) putConn(conn *sqlite.Conn) { + db.pool.Put(conn) + if db.poolUsage != nil { + db.poolUsage.Dec() + } +} + func (db *sqliteDatabase) getTx(ctx context.Context, initstmt string) (*sqliteTx, error) { if db.closed { return nil, ErrClosed @@ -680,7 +717,7 @@ func (db *sqliteDatabase) getTx(ctx context.Context, initstmt string) (*sqliteTx tx := &sqliteTx{queryCache: db.queryCache, db: db, conn: conn, freeConn: cancel} if err := tx.begin(initstmt); err != nil { cancel() - db.pool.Put(conn) + db.putConn(conn) return nil, err } return tx, nil @@ -707,7 +744,7 @@ func (db *sqliteDatabase) startExclusive() error { if conn == nil { return ErrNoConnection } - defer db.pool.Put(conn) + defer db.putConn(conn) // We don't need to wait for long if the database is busy conn.SetBusyTimeout(1 * time.Millisecond) // From SQLite docs: @@ -787,7 +824,7 @@ func (db *sqliteDatabase) Exec(query string, encoder Encoder, decoder Decoder) ( if conn == nil { return 0, ErrNoConnection } - defer db.pool.Put(conn) + defer db.putConn(conn) if db.latency != nil { start := time.Now() defer func() { @@ -812,18 +849,15 @@ func (db *sqliteDatabase) Close() error { } // WithConnection implements Database. -func (db *sqliteDatabase) WithConnection(ctx context.Context, exec func(Executor) error) error { +func (db *sqliteDatabase) WithConnection(ctx context.Context, toCall func(Executor) error) error { if db.closed { return ErrClosed } conCtx, cancel := context.WithCancel(ctx) defer cancel() - conn := db.getConn(conCtx) - if conn == nil { - return ErrNoConnection - } - defer db.pool.Put(conn) - return exec(&sqliteConn{queryCache: db.queryCache, db: db, conn: conn}) + c := newLazyConn(conCtx, db) + defer c.release() + return toCall(c) } // Intercept adds an interceptor function to the database. The interceptor functions @@ -1120,7 +1154,7 @@ func (tx *sqliteTx) Commit() error { // Release transaction. Every transaction that was created must be released. func (tx *sqliteTx) Release() error { - defer tx.db.pool.Put(tx.conn) + defer tx.db.putConn(tx.conn) if tx.committed { tx.freeConn() return nil @@ -1147,13 +1181,83 @@ func (tx *sqliteTx) Exec(query string, encoder Encoder, decoder Decoder) (int, e return exec(tx.conn, query, encoder, decoder) } -type sqliteConn struct { +// lazyConn is a connection that is acquired lazily from the pool, that is, upon the first +// query, and released after a certain period of inactivity. +type lazyConn struct { *queryCache - db *sqliteDatabase - conn *sqlite.Conn + db *sqliteDatabase + getConn func() *sqlite.Conn + eg errgroup.Group + conn *sqlite.Conn + timer *time.Timer + doneCh chan struct{} + connMtx sync.Mutex +} + +func newLazyConn(ctx context.Context, db *sqliteDatabase) *lazyConn { + return &lazyConn{ + queryCache: db.queryCache, + db: db, + getConn: func() *sqlite.Conn { + return db.getConn(ctx) + }, + } +} + +func (c *lazyConn) ensureConn() *sqlite.Conn { + if c.conn != nil { + return c.conn + } + + c.conn = c.getConn() + if c.timer != nil { + c.timer.Reset(c.db.connIdleTimeout) + return c.conn + } + c.timer = time.NewTimer(c.db.connIdleTimeout) + c.doneCh = make(chan struct{}) + c.eg.Go(func() error { + for { + select { + case <-c.timer.C: + // Although TryLock docs say that it's not recommended to use it + // in most cases, this use case is justified. + // If the mutex is already locked here, this means that an SQL + // statement is being executed on the connection, after which + // the idle timer will be restarted, or the connection is currently + // being released. + if c.connMtx.TryLock() { + c.releaseConn() + c.connMtx.Unlock() + } + case <-c.doneCh: + return nil + } + } + }) + return c.conn } -func (c *sqliteConn) Exec(query string, encoder Encoder, decoder Decoder) (int, error) { +func (c *lazyConn) releaseConn() { + if c.conn != nil { + c.timer.Stop() + c.db.putConn(c.conn) + c.conn = nil + } +} + +func (c *lazyConn) release() { + // Lock the mutex so that we don't get concurrent releaseConn() from the timer handler. + c.connMtx.Lock() + defer c.connMtx.Unlock() + c.releaseConn() + if c.doneCh != nil { + close(c.doneCh) + c.eg.Wait() + } +} + +func (c *lazyConn) Exec(query string, encoder Encoder, decoder Decoder) (int, error) { if err := c.db.runInterceptors(query); err != nil { return 0, fmt.Errorf("running query interceptors: %w", err) } @@ -1165,7 +1269,11 @@ func (c *sqliteConn) Exec(query string, encoder Encoder, decoder Decoder) (int, c.db.latency.WithLabelValues(query).Observe(float64(time.Since(start))) }() } - return exec(c.conn, query, encoder, decoder) + c.connMtx.Lock() + defer c.connMtx.Unlock() + conn := c.ensureConn() + defer c.timer.Reset(c.db.connIdleTimeout) + return exec(conn, query, encoder, decoder) } func mapSqliteError(err error) error { diff --git a/sql/database_test.go b/sql/database_test.go index 60b81ff9cf..1e9acf9b56 100644 --- a/sql/database_test.go +++ b/sql/database_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "go.uber.org/zap" @@ -659,3 +660,32 @@ func TestConnection(t *testing.T) { return errors.New("error") })) } + +func TestConnection_Idle(t *testing.T) { + dbName := t.Name() + numConns := func() int { + return int(testutil.ToFloat64(PoolUsage.WithLabelValues(dbName))) + } + require.Zero(t, numConns()) + db := InMemoryTest(t, WithDBName(dbName)) + require.NoError(t, db.WithConnection(context.Background(), func(ex Executor) error { + for range 3 { + for range 3 { + _, err := ex.Exec("select 1", nil, func(stmt *Statement) bool { + require.Equal(t, 1, numConns()) + return true + }) + require.NoError(t, err) + // The connection should still be in use right after the query + require.Equal(t, 1, numConns()) + } + // The connection should be released after idle interval, + // but reacquired on the next query + require.Eventually(t, func() bool { return numConns() == 0 }, + time.Second, 10*time.Millisecond) + } + return nil + })) + + require.Zero(t, numConns()) +} diff --git a/sql/metrics.go b/sql/metrics.go index 2d50b1469a..462e6ed059 100644 --- a/sql/metrics.go +++ b/sql/metrics.go @@ -6,7 +6,10 @@ import ( "github.com/spacemeshos/go-spacemesh/metrics" ) -const namespace = "database" +const ( + namespace = "database" + dbLabel = "db" +) func newQueryLatency() *prometheus.HistogramVec { return metrics.NewHistogramWithBuckets( @@ -18,10 +21,19 @@ func newQueryLatency() *prometheus.HistogramVec { ) } -var connWaitLatency = metrics.NewHistogramWithBuckets( - "conn_wait_seconds", - namespace, - "time spent in waiting for a connection from a pool", - []string{}, - prometheus.ExponentialBuckets(0.01, 2, 20), -).WithLabelValues() +var ( + ConnWaitLatency = metrics.NewHistogramWithBuckets( + "conn_wait_seconds", + namespace, + "time spent in waiting for a connection from a pool", + []string{dbLabel}, + prometheus.ExponentialBuckets(0.01, 2, 20), + ) + + PoolUsage = metrics.NewGauge( + "pool_usage", + namespace, + "number of connections in use", + []string{dbLabel}, + ) +)