Skip to content

Commit 0819eec

Browse files
committed
Add leadership "domains" so multiple Rivers can operate in one schema
We've gotten a couple requests so far (see #342 and #1105) to be able to start multiple River clients targeting different queues within the same database/schema, and giving them the capacity to operate independently enough to be functional. This is currently not possible because a single leader is elected given a single schema and it handles all maintenance operations including non-queue ones like periodic job enqueuing. Here, add the idea of a `LeaderDomain`. This lets a user set the "domain" on which a client will elect its leader and allowing multiple leaders to be elected in a single schema. Each leader will run its own maintenance services. Setting `LeaderDomain` causes the additional effect of having maintenance services start to operate only on the queues that their client is configured for. The idea here is to give us backwards compatibility in that the default behavior (in case of an unset `LeaderDomain`) is the same, but providing a path for multiple leaders to be interoperable with each other. There are still a few edges: for example, reindexing is not queue specific, so multiple leaders could be running a reindexer. I've provided guidance in the config documentation that ideally, all clients but one should have their reindexer disabled.
1 parent ccbe042 commit 0819eec

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1118
-298
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Added
11+
12+
- Added `Config.LeaderDomain` to allow multiple River clients to be elected leader within a single schema/database and run maintenance services on only their configured queues. [PR #1113](https://github.com/riverqueue/river/pull/1113).
13+
1014
## [0.29.0] - 2025-12-22
1115

1216
### Added

client.go

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"log/slog"
1010
"os"
1111
"regexp"
12+
"slices"
1213
"strings"
1314
"sync"
1415
"time"
@@ -209,6 +210,41 @@ type Config struct {
209210
// Jobs may have their own specific hooks by implementing JobArgsWithHooks.
210211
Hooks []rivertype.Hook
211212

213+
// LeaderDomain is an optional "domain" string to use for leader election.
214+
// Different clients sharing the same River schema can elect multiple
215+
// leaders as long as they're using different domains, with one leader
216+
// elected per domain.
217+
//
218+
// Setting this value also triggers the related behavior that maintenance
219+
// services start to only operate on the queues they're configured on. So
220+
// for example, given client1 handling queue_a and queue_b and client2
221+
// handling queue_c and queue_d, whichever client is elected leader will end
222+
// up running all maintenance services for all queues (queue_a, queue_b,
223+
// queue_c, and queue_d). But if client1 is using domain "domain1" and
224+
// client2 is using domain "domain2", then client1 (elected in domain1) will
225+
// only run maintenance services on queue_a and queue_b, while client2
226+
// (elected in domain2) will run maintenance services on queue_c and
227+
// queue_d.
228+
//
229+
// A warning though that River *does not protect against configuration
230+
// mistakes*. If client1 on domain1 is configured for queue_a and queue_b,
231+
// and client2 on domain2 is *also* configured for queue_a and queue_b, then
232+
// both clients may end up running maintenance services on the same queues
233+
// at the same time. It's the caller's responsibility to ensure that doesn't
234+
// happen.
235+
//
236+
// Certain maintenance services that aren't queue-related like the indexer
237+
// will continue to run on all leaders regardless of domain. If using this
238+
// feature, it's a good idea to configure ReindexerTimeout on all but a
239+
// single leader domain to river.NeverSchedule().
240+
//
241+
// In general, most River users should not need LeaderDomain, and when
242+
// running multiple Rivers may want to consider using multiple databases and
243+
// multiple schemas instead.
244+
//
245+
// Defaults to "default".
246+
LeaderDomain string
247+
212248
// Logger is the structured logger to use for logging purposes. If none is
213249
// specified, logs will be emitted to STDOUT with messages at warn level
214250
// or higher.
@@ -415,6 +451,7 @@ func (c *Config) WithDefaults() *Config {
415451
Hooks: c.Hooks,
416452
JobInsertMiddleware: c.JobInsertMiddleware,
417453
JobTimeout: cmp.Or(c.JobTimeout, JobTimeoutDefault),
454+
LeaderDomain: c.LeaderDomain,
418455
Logger: logger,
419456
MaxAttempts: cmp.Or(c.MaxAttempts, MaxAttemptsDefault),
420457
Middleware: c.Middleware,
@@ -840,6 +877,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
840877

841878
client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, &leadership.Config{
842879
ClientID: config.ID,
880+
Domain: config.LeaderDomain,
843881
Schema: config.Schema,
844882
})
845883
client.services = append(client.services, client.elector)
@@ -860,6 +898,14 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
860898
client.services = append(client.services, pluginPilot.PluginServices()...)
861899
}
862900

901+
// It's important for queuesIncluded to be `nil` in case it's not in use
902+
// for the various driver queries to work correctly.
903+
var queuesIncluded []string
904+
if config.LeaderDomain != "" && config.LeaderDomain != leadership.DomainDefault && len(config.Queues) > 0 {
905+
queuesIncluded = maputil.Keys(config.Queues)
906+
slices.Sort(queuesIncluded)
907+
}
908+
863909
//
864910
// Maintenance services
865911
//
@@ -872,6 +918,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
872918
CompletedJobRetentionPeriod: config.CompletedJobRetentionPeriod,
873919
DiscardedJobRetentionPeriod: config.DiscardedJobRetentionPeriod,
874920
QueuesExcluded: client.pilot.JobCleanerQueuesExcluded(),
921+
QueuesIncluded: queuesIncluded,
875922
Schema: config.Schema,
876923
Timeout: config.JobCleanerTimeout,
877924
}, driver.GetExecutor())
@@ -882,6 +929,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
882929
{
883930
jobRescuer := maintenance.NewRescuer(archetype, &maintenance.JobRescuerConfig{
884931
ClientRetryPolicy: config.RetryPolicy,
932+
QueuesIncluded: queuesIncluded,
885933
RescueAfter: config.RescueStuckJobsAfter,
886934
Schema: config.Schema,
887935
WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory {
@@ -897,9 +945,10 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
897945

898946
{
899947
jobScheduler := maintenance.NewJobScheduler(archetype, &maintenance.JobSchedulerConfig{
900-
Interval: config.schedulerInterval,
901-
NotifyInsert: client.maybeNotifyInsertForQueues,
902-
Schema: config.Schema,
948+
Interval: config.schedulerInterval,
949+
NotifyInsert: client.maybeNotifyInsertForQueues,
950+
QueuesIncluded: queuesIncluded,
951+
Schema: config.Schema,
903952
}, driver.GetExecutor())
904953
maintenanceServices = append(maintenanceServices, jobScheduler)
905954
client.testSignals.jobScheduler = &jobScheduler.TestSignals
@@ -925,6 +974,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
925974

926975
{
927976
queueCleaner := maintenance.NewQueueCleaner(archetype, &maintenance.QueueCleanerConfig{
977+
QueuesIncluded: queuesIncluded,
928978
RetentionPeriod: maintenance.QueueRetentionPeriodDefault,
929979
Schema: config.Schema,
930980
}, driver.GetExecutor())

client_test.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1491,6 +1491,154 @@ func Test_Client_Common(t *testing.T) {
14911491

14921492
startstoptest.Stress(ctx, t, clientWithStop)
14931493
})
1494+
1495+
t.Run("LeaderDomain_Alternate", func(t *testing.T) {
1496+
t.Parallel()
1497+
1498+
var client1 *Client[pgx.Tx]
1499+
{
1500+
config, bundle := setupConfig(t)
1501+
config.LeaderDomain = "domain1"
1502+
config.ReindexerSchedule = &neverSchedule{}
1503+
config.Queues = map[string]QueueConfig{
1504+
"queue_a": {MaxWorkers: 50},
1505+
"queue_b": {MaxWorkers: 50},
1506+
}
1507+
1508+
var err error
1509+
client1, err = NewClient(bundle.driver, config)
1510+
require.NoError(t, err)
1511+
client1.testSignals.Init(t)
1512+
}
1513+
1514+
var client2 *Client[pgx.Tx]
1515+
{
1516+
config, bundle := setupConfig(t)
1517+
config.LeaderDomain = "domain2"
1518+
config.Queues = map[string]QueueConfig{
1519+
"queue_c": {MaxWorkers: 50},
1520+
"queue_d": {MaxWorkers: 50},
1521+
}
1522+
config.Schema = client1.config.Schema
1523+
config.ReindexerSchedule = &neverSchedule{}
1524+
1525+
var err error
1526+
client2, err = NewClient(bundle.driver, config)
1527+
require.NoError(t, err)
1528+
client2.testSignals.Init(t)
1529+
}
1530+
1531+
startClient(ctx, t, client1)
1532+
startClient(ctx, t, client2)
1533+
1534+
// Both elected
1535+
client1.testSignals.electedLeader.WaitOrTimeout()
1536+
client2.testSignals.electedLeader.WaitOrTimeout()
1537+
})
1538+
1539+
t.Run("LeaderDomain_MaintenanceServiceConfigEmpty", func(t *testing.T) {
1540+
t.Parallel()
1541+
1542+
config, bundle := setupConfig(t)
1543+
config.Queues = map[string]QueueConfig{
1544+
"queue_a": {MaxWorkers: 50},
1545+
"queue_b": {MaxWorkers: 50},
1546+
}
1547+
1548+
client, err := NewClient(bundle.driver, config)
1549+
require.NoError(t, err)
1550+
client.testSignals.Init(t)
1551+
1552+
jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer)
1553+
require.Nil(t, jobCleaner.Config.QueuesIncluded)
1554+
jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client.queueMaintainer)
1555+
require.Nil(t, jobRescuer.Config.QueuesIncluded)
1556+
jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client.queueMaintainer)
1557+
require.Nil(t, jobScheduler.Config.QueuesIncluded)
1558+
queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client.queueMaintainer)
1559+
require.Nil(t, queueCleaner.Config.QueuesIncluded)
1560+
})
1561+
1562+
// The domain "default" is special in that it behaves like if LeaderDomain
1563+
// was not set.
1564+
t.Run("LeaderDomain_MaintenanceServiceConfigDefault", func(t *testing.T) {
1565+
t.Parallel()
1566+
1567+
config, bundle := setupConfig(t)
1568+
config.LeaderDomain = "default"
1569+
config.Queues = map[string]QueueConfig{
1570+
"queue_a": {MaxWorkers: 50},
1571+
"queue_b": {MaxWorkers: 50},
1572+
}
1573+
1574+
client, err := NewClient(bundle.driver, config)
1575+
require.NoError(t, err)
1576+
client.testSignals.Init(t)
1577+
1578+
jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer)
1579+
require.Nil(t, jobCleaner.Config.QueuesIncluded)
1580+
jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client.queueMaintainer)
1581+
require.Nil(t, jobRescuer.Config.QueuesIncluded)
1582+
jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client.queueMaintainer)
1583+
require.Nil(t, jobScheduler.Config.QueuesIncluded)
1584+
queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client.queueMaintainer)
1585+
require.Nil(t, queueCleaner.Config.QueuesIncluded)
1586+
})
1587+
1588+
// When non-default leader domains are configured, each client's maintenance
1589+
// services are limited to only their client's queues.
1590+
t.Run("LeaderDomain_MaintenanceServiceConfigAlternate", func(t *testing.T) {
1591+
t.Parallel()
1592+
1593+
var client1 *Client[pgx.Tx]
1594+
{
1595+
config, bundle := setupConfig(t)
1596+
config.LeaderDomain = "domain1"
1597+
config.ReindexerSchedule = &neverSchedule{}
1598+
config.Queues = map[string]QueueConfig{
1599+
"queue_a": {MaxWorkers: 50},
1600+
"queue_b": {MaxWorkers: 50},
1601+
}
1602+
1603+
var err error
1604+
client1, err = NewClient(bundle.driver, config)
1605+
require.NoError(t, err)
1606+
client1.testSignals.Init(t)
1607+
1608+
jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client1.queueMaintainer)
1609+
require.Equal(t, []string{"queue_a", "queue_b"}, jobCleaner.Config.QueuesIncluded)
1610+
jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client1.queueMaintainer)
1611+
require.Equal(t, []string{"queue_a", "queue_b"}, jobRescuer.Config.QueuesIncluded)
1612+
jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client1.queueMaintainer)
1613+
require.Equal(t, []string{"queue_a", "queue_b"}, jobScheduler.Config.QueuesIncluded)
1614+
queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client1.queueMaintainer)
1615+
require.Equal(t, []string{"queue_a", "queue_b"}, queueCleaner.Config.QueuesIncluded)
1616+
}
1617+
1618+
{
1619+
config, bundle := setupConfig(t)
1620+
config.LeaderDomain = "domain2"
1621+
config.Queues = map[string]QueueConfig{
1622+
"queue_c": {MaxWorkers: 50},
1623+
"queue_d": {MaxWorkers: 50},
1624+
}
1625+
config.Schema = client1.config.Schema
1626+
config.ReindexerSchedule = &neverSchedule{}
1627+
1628+
client2, err := NewClient(bundle.driver, config)
1629+
require.NoError(t, err)
1630+
client2.testSignals.Init(t)
1631+
1632+
jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client2.queueMaintainer)
1633+
require.Equal(t, []string{"queue_c", "queue_d"}, jobCleaner.Config.QueuesIncluded)
1634+
jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client2.queueMaintainer)
1635+
require.Equal(t, []string{"queue_c", "queue_d"}, jobRescuer.Config.QueuesIncluded)
1636+
jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client2.queueMaintainer)
1637+
require.Equal(t, []string{"queue_c", "queue_d"}, jobScheduler.Config.QueuesIncluded)
1638+
queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client2.queueMaintainer)
1639+
require.Equal(t, []string{"queue_c", "queue_d"}, queueCleaner.Config.QueuesIncluded)
1640+
}
1641+
})
14941642
}
14951643

14961644
type workerWithMiddleware[T JobArgs] struct {

0 commit comments

Comments
 (0)