From 9730341e47dc27f1724eeb950e00d90075b53f94 Mon Sep 17 00:00:00 2001 From: Brandur Date: Sun, 31 May 2026 22:08:07 +0200 Subject: [PATCH] Add `statement_timeout` on fetching available jobs Follows up #1255 to add a `statement_timeout` in addition to the Go context timeout. `statement_timeout` will give us a better error message, and also minimizes the chances of accidentally locking rows that won't be work if we had an operation that ran long, succeeded, but then was immediately cancelled as Go's context timeout ran out. `statement_timeout` is Postgres only, so the code is a little gnarlier than would be desirable in that we add a `SetLocalStatementTimeout` function to driver `ExecutorTx`, but which is a no-op on some databases like SQLite. We try to clarify in documentation that it needs to be used in addition to context timeout, not instead of it, because it may no-op depending on the database. I also increased the timeout to 30 seconds. This matches our timeouts in the various maintenance modules, and seems a little safer as job locks on tables with huge numbers of dead rows could potentially take over 10 seconds, and maybe some users have this happening. IMO, it's too random still where we put this stuff in, but we'll have to figure that out on follow up changes. e.g. Why do we do a statement timeout for locking jobs but not for maintenance operations? Hard to justify. --- CHANGELOG.md | 4 +--- riverdriver/river_driver_interface.go | 8 +++++++ riverdriver/river_driver_interface_test.go | 11 ++++++++++ .../river_database_sql_driver.go | 8 +++++++ riverdriver/riverdrivertest/executor_tx.go | 22 +++++++++++++++++++ riverdriver/riverpgxv5/river_pgx_v5_driver.go | 4 ++++ .../riversqlite/river_sqlite_driver.go | 8 +++++++ riverdriver/statement_timeout.go | 21 ++++++++++++++++++ rivershared/riverpilot/standard_pilot.go | 22 +++++++++++++++---- rivershared/riverpilot/standard_pilot_test.go | 15 +++++++++++++ 10 files changed, 116 insertions(+), 7 deletions(-) create mode 100644 riverdriver/statement_timeout.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 2401f5bb..6851d96e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,8 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -### Fixed - ⚠️ **Breaking API change:** `rivermigrate.Migrator.Validate` and `rivermigrate.Migrator.ValidateTx` now take a `*rivermigrate.ValidateOpts` parameter. Pass `nil` to preserve previous behavior. We normally endeavor not to make any breaking API changes, but this one will keep the API in a much nicer state, and is on an ancillary function that most installations won't be using. [PR #1259](https://github.com/riverqueue/river/pull/1259) ### Changed @@ -18,7 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed -- Add a 10-second timeout around `StandardPilot.JobGetAvailable` so a stalled standard-pilot fetch no longer hangs a producer indefinitely. [PR #1255](https://github.com/riverqueue/river/pull/1255) +- Add a 30-second timeout around `StandardPilot.JobGetAvailable` so a stalled standard-pilot fetch no longer hangs a producer indefinitely. [PR #1255](https://github.com/riverqueue/river/pull/1255) [PR #1263](https://github.com/riverqueue/river/pull/1263) - Fixed `rivertest.Worker.Work` and `WorkJob` to honor a configured custom `Config.Schema` when transitioning a job to its running state. Previously, the running-state update ran unqualified and could fail on a connection whose `search_path` didn't include the configured schema. [PR #1262](https://github.com/riverqueue/river/pull/1262) ## [0.38.0] - 2026-05-22 diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 8727a6e3..8b345edb 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -295,6 +295,14 @@ type ExecutorTx interface { // // API is not stable. DO NOT USE. Rollback(ctx context.Context) error + + // SetLocalStatementTimeout sets a statement timeout local to the current + // transaction if supported by the underlying database. Some databases don't + // support this behavior, so this should be used in addition to context + // timeouts, not instead of them. + // + // API is not stable. DO NOT USE. + SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error } type GetListenenerParams struct { diff --git a/riverdriver/river_driver_interface_test.go b/riverdriver/river_driver_interface_test.go index 978e299c..9e43da14 100644 --- a/riverdriver/river_driver_interface_test.go +++ b/riverdriver/river_driver_interface_test.go @@ -49,6 +49,17 @@ func TestJobSetStateCancelled(t *testing.T) { }) } +func TestPostgresStatementTimeoutValue(t *testing.T) { + t.Parallel() + + require.Equal(t, "0ms", PostgresStatementTimeoutValue(0)) + require.Equal(t, "1ms", PostgresStatementTimeoutValue(time.Nanosecond)) + require.Equal(t, "1ms", PostgresStatementTimeoutValue(999*time.Microsecond)) + require.Equal(t, "1ms", PostgresStatementTimeoutValue(time.Millisecond)) + require.Equal(t, "2ms", PostgresStatementTimeoutValue(time.Millisecond+time.Nanosecond)) + require.Equal(t, "1234ms", PostgresStatementTimeoutValue(1234*time.Millisecond)) +} + func TestJobSetStateCompleted(t *testing.T) { t.Parallel() diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index 16a3ae35..86a4af4e 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -1050,6 +1050,10 @@ func (t *ExecutorTx) Rollback(ctx context.Context) error { return t.tx.Rollback() } +func (t *ExecutorTx) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error { + return t.Exec(ctx, "SELECT set_config('statement_timeout', $1, true)", riverdriver.PostgresStatementTimeoutValue(timeout)) +} + type ExecutorSubTx struct { Executor @@ -1103,6 +1107,10 @@ func (t *ExecutorSubTx) Rollback(ctx context.Context) error { return nil } +func (t *ExecutorSubTx) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error { + return t.Exec(ctx, "SELECT set_config('statement_timeout', $1, true)", riverdriver.PostgresStatementTimeoutValue(timeout)) +} + func interpretError(err error) error { if errors.Is(err, sql.ErrNoRows) { return rivertype.ErrNotFound diff --git a/riverdriver/riverdrivertest/executor_tx.go b/riverdriver/riverdrivertest/executor_tx.go index 451c65c2..cf168387 100644 --- a/riverdriver/riverdrivertest/executor_tx.go +++ b/riverdriver/riverdrivertest/executor_tx.go @@ -3,6 +3,7 @@ package riverdrivertest import ( "context" "testing" + "time" "github.com/stretchr/testify/require" @@ -161,6 +162,27 @@ func exerciseExecutorTx[TTx any](ctx context.Context, t *testing.T, }) }) + t.Run("SetLocalStatementTimeout", func(t *testing.T) { + t.Parallel() + + exec, driver := executorWithTx(ctx, t) + + tx, err := exec.Begin(ctx) + require.NoError(t, err) + t.Cleanup(func() { _ = tx.Rollback(ctx) }) + + require.NoError(t, tx.SetLocalStatementTimeout(ctx, 999*time.Microsecond)) + require.NoError(t, tx.SetLocalStatementTimeout(ctx, 1234*time.Millisecond)) + + if driver.DatabaseName() == databaseNameSQLite { + return + } + + var timeoutMilliseconds int64 + require.NoError(t, tx.QueryRow(ctx, "SELECT setting::bigint FROM pg_settings WHERE name = 'statement_timeout'").Scan(&timeoutMilliseconds)) + require.Equal(t, int64(1234), timeoutMilliseconds) + }) + t.Run("PGAdvisoryXactLock", func(t *testing.T) { t.Parallel() diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index efbddf1f..ebbdb741 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -1024,6 +1024,10 @@ func (t *ExecutorTx) Rollback(ctx context.Context) error { return t.tx.Rollback(ctx) } +func (t *ExecutorTx) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error { + return t.Exec(ctx, "SELECT set_config('statement_timeout', $1, true)", riverdriver.PostgresStatementTimeoutValue(timeout)) +} + type Listener struct { afterConnectExec string // should only ever be used in testing conn *pgx.Conn diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index b7d729ae..bd948bc3 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -1500,6 +1500,10 @@ func (t *ExecutorTx) Rollback(ctx context.Context) error { return t.tx.Rollback() } +func (t *ExecutorTx) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error { + return nil +} + type ExecutorSubTx struct { Executor @@ -1560,6 +1564,10 @@ func (t *ExecutorSubTx) Rollback(ctx context.Context) error { return nil } +func (t *ExecutorSubTx) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error { + return nil +} + func interpretError(err error) error { if errors.Is(err, sql.ErrNoRows) { return rivertype.ErrNotFound diff --git a/riverdriver/statement_timeout.go b/riverdriver/statement_timeout.go new file mode 100644 index 00000000..c787e4d8 --- /dev/null +++ b/riverdriver/statement_timeout.go @@ -0,0 +1,21 @@ +package riverdriver + +import ( + "strconv" + "time" +) + +// PostgresStatementTimeoutValue formats a duration for Postgres' +// statement_timeout setting. +// +// Postgres accepts statement_timeout values as whole milliseconds. Round +// positive sub-millisecond values up so they don't truncate to 0ms, which would +// disable the timeout. +func PostgresStatementTimeoutValue(timeout time.Duration) string { + milliseconds := timeout / time.Millisecond + if timeout > 0 && timeout%time.Millisecond != 0 { + milliseconds++ + } + + return strconv.FormatInt(int64(milliseconds), 10) + "ms" +} diff --git a/rivershared/riverpilot/standard_pilot.go b/rivershared/riverpilot/standard_pilot.go index 45e2ff2b..2c3eb6ee 100644 --- a/rivershared/riverpilot/standard_pilot.go +++ b/rivershared/riverpilot/standard_pilot.go @@ -2,16 +2,16 @@ package riverpilot import ( "context" + "fmt" "sync/atomic" "time" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/util/dbutil" "github.com/riverqueue/river/rivertype" ) -const standardPilotJobGetAvailableTimeoutDefault = 10 * time.Second - type StandardPilot struct { seq atomic.Int64 } @@ -23,10 +23,24 @@ func (p *StandardPilot) JobGetAvailable(ctx context.Context, exec riverdriver.Ex return nil, nil } - ctx, cancel := context.WithTimeoutCause(ctx, standardPilotJobGetAvailableTimeoutDefault, context.DeadlineExceeded) + // Set an outer context timeout on locking jobs, and where possible (i.e. in + // Postgres, but not SQLite), set an inner `statement_timeout` inside a + // transaction so the configuration isn't durable. The error from the + // Postgres version will be better, so try to have that trigger first. It + // also minimizes the chances of a successful operation that locks jobs but + // then accidentally errors because it's run time was so close to the Go + // timeout. + const timeout = 30 * time.Second + + ctx, cancel := context.WithTimeoutCause(ctx, timeout, context.DeadlineExceeded) defer cancel() - return exec.JobGetAvailable(ctx, params) + return dbutil.WithTxV(ctx, exec, func(ctx context.Context, execTx riverdriver.ExecutorTx) ([]*rivertype.JobRow, error) { + if err := execTx.SetLocalStatementTimeout(ctx, timeout-1*time.Second); err != nil { + return nil, fmt.Errorf("error setting statement timeout: %w", err) + } + return execTx.JobGetAvailable(ctx, params) + }) } func (p *StandardPilot) JobCancel(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error) { diff --git a/rivershared/riverpilot/standard_pilot_test.go b/rivershared/riverpilot/standard_pilot_test.go index ae7d4254..d625f13c 100644 --- a/rivershared/riverpilot/standard_pilot_test.go +++ b/rivershared/riverpilot/standard_pilot_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "testing" + "time" "github.com/stretchr/testify/require" @@ -17,10 +18,24 @@ type standardPilotExecutorMock struct { jobGetAvailableFunc func(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) } +func (m *standardPilotExecutorMock) Begin(ctx context.Context) (riverdriver.ExecutorTx, error) { + return &standardPilotExecutorTxMock{standardPilotExecutorMock: m}, nil +} + func (m *standardPilotExecutorMock) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) { return m.jobGetAvailableFunc(ctx, params) } +type standardPilotExecutorTxMock struct { + *standardPilotExecutorMock +} + +func (m *standardPilotExecutorTxMock) Commit(ctx context.Context) error { return nil } +func (m *standardPilotExecutorTxMock) Rollback(ctx context.Context) error { return nil } +func (m *standardPilotExecutorTxMock) SetLocalStatementTimeout(ctx context.Context, timeout time.Duration) error { + return nil +} + func TestStandardPilot_JobGetAvailable(t *testing.T) { t.Parallel()