Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Added `Config.ReindexerIndexNames` and `ReindexerIndexNamesDefault()` so the reindexer's target indexes can be customized from the public API. [PR #1194](https://github.com/riverqueue/river/pull/1194).

### Fixed

- Upon a client gaining leadership, its queue maintainer is given more than one opportunity to start. [PR #1184](https://github.com/riverqueue/river/pull/1184).
Expand Down
13 changes: 13 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ type Config struct {
// reindexer will run at midnight UTC every day.
ReindexerSchedule PeriodicSchedule

// ReindexerIndexNames customizes which indexes River periodically reindexes.
// If nil, River uses [ReindexerIndexNamesDefault]. If non-nil, the provided
// slice is used as the exact list.
ReindexerIndexNames []string

// ReindexerTimeout is the amount of time to wait for the reindexer to run a
// single reindex operation before cancelling it via context. Set to -1 to
// disable the timeout.
Expand Down Expand Up @@ -380,6 +385,12 @@ func (c *Config) WithDefaults() *Config {
c = &Config{}
}

var reindexerIndexNames []string
if c.ReindexerIndexNames != nil {
reindexerIndexNames = make([]string, len(c.ReindexerIndexNames))
copy(reindexerIndexNames, c.ReindexerIndexNames)
}

// Use the existing logger if set, otherwise create a default one.
logger := c.Logger
if logger == nil {
Expand Down Expand Up @@ -420,6 +431,7 @@ func (c *Config) WithDefaults() *Config {
PeriodicJobs: c.PeriodicJobs,
PollOnly: c.PollOnly,
Queues: c.Queues,
ReindexerIndexNames: reindexerIndexNames,
ReindexerSchedule: c.ReindexerSchedule,
ReindexerTimeout: cmp.Or(c.ReindexerTimeout, maintenance.ReindexerTimeoutDefault),
RescueStuckJobsAfter: cmp.Or(c.RescueStuckJobsAfter, rescueAfter),
Expand Down Expand Up @@ -936,6 +948,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
}

reindexer := maintenance.NewReindexer(archetype, &maintenance.ReindexerConfig{
IndexNames: config.ReindexerIndexNames,
ScheduleFunc: scheduleFunc,
Schema: config.Schema,
Timeout: config.ReindexerTimeout,
Expand Down
84 changes: 77 additions & 7 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7185,6 +7185,39 @@ func Test_Client_Start_Error(t *testing.T) {
})
}

func Test_Config_WithDefaults(t *testing.T) {
t.Parallel()

t.Run("ReindexerIndexNamesEmptyStaysNonNil", func(t *testing.T) {
t.Parallel()

config := (&Config{ReindexerIndexNames: []string{}}).WithDefaults()

require.NotNil(t, config.ReindexerIndexNames)
require.Empty(t, config.ReindexerIndexNames)
})

t.Run("ReindexerIndexNamesNilStaysNil", func(t *testing.T) {
t.Parallel()

config := (&Config{}).WithDefaults()

require.Nil(t, config.ReindexerIndexNames)
})

t.Run("ReindexerIndexNamesSliceIsCopied", func(t *testing.T) {
t.Parallel()

input := []string{"custom_index", "other_index"}
config := (&Config{ReindexerIndexNames: input}).WithDefaults()

require.Equal(t, input, config.ReindexerIndexNames)

input[0] = "mutated"
require.Equal(t, []string{"custom_index", "other_index"}, config.ReindexerIndexNames)
})
}

func Test_NewClient_BaseServiceName(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -7279,13 +7312,16 @@ func Test_NewClient_Defaults(t *testing.T) {
require.False(t, enqueuer.StaggerStartupIsDisabled())

reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer)
require.Contains(t, reindexer.Config.IndexNames, "river_job_args_index")
require.Contains(t, reindexer.Config.IndexNames, "river_job_kind")
require.Contains(t, reindexer.Config.IndexNames, "river_job_metadata_index")
require.Contains(t, reindexer.Config.IndexNames, "river_job_pkey")
require.Contains(t, reindexer.Config.IndexNames, "river_job_prioritized_fetching_index")
require.Contains(t, reindexer.Config.IndexNames, "river_job_state_and_finalized_at_index")
require.Contains(t, reindexer.Config.IndexNames, "river_job_unique_idx")
// Assert the exact list so index list changes require explicit test updates.
require.Equal(t, []string{
"river_job_args_index",
"river_job_kind",
"river_job_metadata_index",
"river_job_pkey",
"river_job_prioritized_fetching_index",
"river_job_state_and_finalized_at_index",
"river_job_unique_idx",
}, reindexer.Config.IndexNames)
now := time.Now().UTC()
nextMidnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC).AddDate(0, 0, 1)
require.Equal(t, nextMidnight, reindexer.Config.ScheduleFunc(now))
Expand Down Expand Up @@ -7349,6 +7385,7 @@ func Test_NewClient_Overrides(t *testing.T) {
Logger: logger,
MaxAttempts: 5,
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}},
ReindexerIndexNames: []string{"custom_index", "other_index"},
ReindexerSchedule: &periodicIntervalSchedule{interval: time.Hour},
ReindexerTimeout: 125 * time.Millisecond,
RetryPolicy: retryPolicy,
Expand All @@ -7373,6 +7410,8 @@ func Test_NewClient_Overrides(t *testing.T) {
require.True(t, enqueuer.StaggerStartupIsDisabled())

reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer)
// Assert the exact list so index list changes require explicit test updates.
require.Equal(t, []string{"custom_index", "other_index"}, reindexer.Config.IndexNames)
now := time.Now().UTC()
require.Equal(t, now.Add(time.Hour), reindexer.Config.ScheduleFunc(now))

Expand All @@ -7391,6 +7430,37 @@ func Test_NewClient_Overrides(t *testing.T) {
require.Len(t, client.config.WorkerMiddleware, 1)
}

func Test_NewClient_ReindexerIndexNamesExplicitEmptyOverride(t *testing.T) {
t.Parallel()

ctx := context.Background()

var (
dbPool = riversharedtest.DBPool(ctx, t)
driver = riverpgxv5.New(dbPool)
schema = riverdbtest.TestSchema(ctx, t, driver, nil)
)

workers := NewWorkers()
AddWorker(workers, &noOpWorker{})

client, err := NewClient(driver, &Config{
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}},
ReindexerIndexNames: []string{},
Schema: schema,
TestOnly: true,
Workers: workers,
})
require.NoError(t, err)

require.NotNil(t, client.config.ReindexerIndexNames)
require.Empty(t, client.config.ReindexerIndexNames)

reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer)
require.NotNil(t, reindexer.Config.IndexNames)
require.Empty(t, reindexer.Config.IndexNames)
}

func Test_NewClient_MissingParameters(t *testing.T) {
t.Parallel()

Expand Down
2 changes: 1 addition & 1 deletion insert_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ var jobStateAll = rivertype.JobStates() //nolint:gochecknoglobals
// what to do about the job that can't be scheduled. We can't send feedback to
// the caller at this point, so probably the best we could do is leave it in
// this untransitionable state until the `running` job finished, which isn't
// particularly satsifactory.
// particularly satisfactory.
var requiredV3states = []rivertype.JobState{ //nolint:gochecknoglobals
rivertype.JobStateAvailable,
rivertype.JobStatePending,
Expand Down
59 changes: 52 additions & 7 deletions internal/maintenance/reindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ var defaultIndexNames = []string{ //nolint:gochecknoglobals
"river_job_unique_idx",
}

// DefaultReindexerIndexNames returns the default set of indexes reindexed by River.
func DefaultReindexerIndexNames() []string {
return append([]string(nil), defaultIndexNames...)
}

// ReindexerTestSignals are internal signals used exclusively in tests.
type ReindexerTestSignals struct {
Reindexed testsignal.TestSignal[struct{}] // notifies when a run finishes executing reindexes for all indexes
Expand Down Expand Up @@ -133,11 +138,27 @@ func (s *Reindexer) Start(ctx context.Context) error {
s.Logger.DebugContext(ctx, s.Name+": Scheduling first run", slog.Time("next_run_at", nextRunAt))

timerUntilNextRun := time.NewTimer(time.Until(nextRunAt))
scheduleNextRun := func() {
// Advance from the previous scheduled time, not "now", so retries
// stay aligned with the configured cadence and don't immediately
// refire after a timer that has already elapsed.
nextRunAt = s.Config.ScheduleFunc(nextRunAt)
timerUntilNextRun.Reset(time.Until(nextRunAt))
}

for {
select {
case <-timerUntilNextRun.C:
for _, indexName := range s.Config.IndexNames {
reindexableIndexNames, err := s.reindexableIndexNames(ctx)
if err != nil {
if !errors.Is(err, context.Canceled) {
s.Logger.ErrorContext(ctx, s.Name+": Error listing reindexable indexes", slog.String("error", err.Error()))
}
scheduleNextRun()
continue
}

for _, indexName := range reindexableIndexNames {
if _, err := s.reindexOne(ctx, indexName); err != nil {
if !errors.Is(err, context.Canceled) {
s.Logger.ErrorContext(ctx, s.Name+": Error reindexing", slog.String("error", err.Error()), slog.String("index_name", indexName))
Expand All @@ -151,15 +172,11 @@ func (s *Reindexer) Start(ctx context.Context) error {
// On each run, we calculate the new schedule based on the
// previous run's start time. This ensures that we don't
// accidentally skip a run as time elapses during the run.
nextRunAt = s.Config.ScheduleFunc(nextRunAt)
scheduleNextRun()

// TODO: maybe we should log differently if some of these fail?
s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRanSuccessfully,
slog.Time("next_run_at", nextRunAt), slog.Int("num_reindexes_initiated", len(s.Config.IndexNames)))

// Reset the timer after the insert loop has finished so it's
// paused during work. Makes its firing more deterministic.
timerUntilNextRun.Reset(time.Until(nextRunAt))
slog.Time("next_run_at", nextRunAt), slog.Int("num_reindexes_initiated", len(reindexableIndexNames)))

case <-ctx.Done():
// Clean up timer resources. We know it has _not_ received from
Expand All @@ -176,6 +193,34 @@ func (s *Reindexer) Start(ctx context.Context) error {
return nil
}

func (s *Reindexer) reindexableIndexNames(ctx context.Context) ([]string, error) {
indexesExist, err := s.exec.IndexesExist(ctx, &riverdriver.IndexesExistParams{
IndexNames: s.Config.IndexNames,
Schema: s.Config.Schema,
})
if err != nil {
return nil, err
}

indexNames := make([]string, 0, len(s.Config.IndexNames))
missingIndexNames := make([]string, 0)
for _, indexName := range s.Config.IndexNames {
if indexesExist[indexName] {
indexNames = append(indexNames, indexName)
continue
}

missingIndexNames = append(missingIndexNames, indexName)
}

if len(missingIndexNames) > 0 {
s.Logger.WarnContext(ctx, s.Name+": Configured reindex indexes do not exist; run migrations or update ReindexerIndexNames",
slog.Any("index_names", missingIndexNames))
}

return indexNames, nil
}

func (s *Reindexer) reindexOne(ctx context.Context, indexName string) (bool, error) {
var cancel func()
if s.Config.Timeout > -1 {
Expand Down
Loading
Loading