Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

⚠️ **Breaking API change:** `rivermigrate.Migrator.Validate` and `rivermigrate.Migrator.ValidateTx` now take a `*rivermigrate.ValidateOpts` parameter. Pass `nil` to preserve previous behavior. We normally endeavor not to make any breaking API changes, but this one will keep the API in a much nicer state, and is on an ancillary function that most installations won't be using. [PR #1259](https://github.com/riverqueue/river/pull/1259)

### Changed
Expand All @@ -16,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- Add a 10-second timeout around `StandardPilot.JobGetAvailable` so a stalled standard-pilot fetch no longer hangs a producer indefinitely. [PR #1255](https://github.com/riverqueue/river/pull/1255)
- Fixed `rivertest.Worker.Work` and `WorkJob` to honor a configured custom `Config.Schema` when transitioning a job to its running state. Previously, the running-state update ran unqualified and could fail on a connection whose `search_path` didn't include the configured schema. [PR #1262](https://github.com/riverqueue/river/pull/1262)

## [0.38.0] - 2026-05-22
Expand Down
7 changes: 7 additions & 0 deletions rivershared/riverpilot/standard_pilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package riverpilot
import (
"context"
"sync/atomic"
"time"

"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivertype"
)

const standardPilotJobGetAvailableTimeoutDefault = 10 * time.Second

type StandardPilot struct {
seq atomic.Int64
}
Expand All @@ -19,6 +22,10 @@ func (p *StandardPilot) JobGetAvailable(ctx context.Context, exec riverdriver.Ex
if params.MaxToLock <= 0 {
return nil, nil
}

ctx, cancel := context.WithTimeoutCause(ctx, standardPilotJobGetAvailableTimeoutDefault, context.DeadlineExceeded)
defer cancel()

return exec.JobGetAvailable(ctx, params)
}

Expand Down
69 changes: 69 additions & 0 deletions rivershared/riverpilot/standard_pilot_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package riverpilot

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/require"

"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivertype"
)

type standardPilotExecutorMock struct {
riverdriver.Executor

jobGetAvailableFunc func(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error)
}

func (m *standardPilotExecutorMock) JobGetAvailable(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) {
return m.jobGetAvailableFunc(ctx, params)
}

func TestStandardPilot_JobGetAvailable(t *testing.T) {
t.Parallel()

type testBundle struct {
exec *standardPilotExecutorMock
pilot *StandardPilot
}

setup := func(t *testing.T) *testBundle {
t.Helper()

return &testBundle{
exec: &standardPilotExecutorMock{},
pilot: &StandardPilot{},
}
}

t.Run("ReturnsNilWhenMaxToLockIsZero", func(t *testing.T) {
t.Parallel()

bundle := setup(t)

res, err := bundle.pilot.JobGetAvailable(context.Background(), bundle.exec, nil, &riverdriver.JobGetAvailableParams{})
require.NoError(t, err)
require.Nil(t, res)
})

t.Run("PreservesParentCancellation", func(t *testing.T) {
t.Parallel()

bundle := setup(t)
parentErr := errors.New("parent cancelled")
parentCtx, cancel := context.WithCancelCause(context.Background())
cancel(parentErr)

bundle.exec.jobGetAvailableFunc = func(ctx context.Context, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) {
<-ctx.Done()
return nil, context.Cause(ctx)
}

_, err := bundle.pilot.JobGetAvailable(parentCtx, bundle.exec, nil, &riverdriver.JobGetAvailableParams{
MaxToLock: 1,
})
require.ErrorIs(t, err, parentErr)
})
}
Loading