Riverboat is the job queue used in openlane based on the riverqueue project.
Jobs can be inserted into the job queue either from this server directly, or
from any codebase with an
Insert Only river client. All
jobs will be processed via the riverboat
server. Since jobs are committed to
the database within a transaction, and stored in the database we do not have to
worry about dropped events.
This repo includes several Taskfiles to assist with getting things running.
- Go 1.23+
- Docker (used for running Postgres and the river-ui)
- task
The following will start up postgres, the river-ui, and the riverboat server:
task run-dev
Included in the test/
directory are test jobs corresponding to the job types
in pkg/jobs
.
-
Start the
riverboat
server usingtask run-dev
-
Run the test main, for example the
email
:go run test/email/main.go
-
This should insert the job successfully, it should be processed by
river
and the email should be added tofixtures/email
-
New jobs should be added to the
pkg/jobs
directory in a new file, refer to the upstream docs for implementation details. The following is a stem job that could be copied to get you started.package jobs import ( "context" "github.com/riverqueue/river" "github.com/rs/zerolog/log" ) // ExampleArgs for the example worker to process the job type ExampleArgs struct { // ExampleArg is an example argument ExampleArg string `json:"example_arg"` } // Kind satisfies the river.Job interface func (ExampleArgs) Kind() string { return "example" } // ExampleWorker does all sorts of neat stuff type ExampleWorker struct { river.WorkerDefaults[ExampleArgs] ExampleConfig } // ExampleConfig contains the configuration for the example worker type ExampleConfig struct { // DevMode is a flag to enable dev mode so we don't actually send millions of carrier pigeons DevMode bool `koanf:"devMode" json:"devMode" jsonschema:"description=enable dev mode" default:"true"` } // Work satisfies the river.Worker interface for the example worker func (w *ExampleConfig) Work(ctx context.Context, job *river.Job[ExampleArgs]) error { // do some work return nil }
-
Add a test for the new job, see
email_test.go
as an example. There are additional helper functions that can be used, see river test helpers for details. -
If there are configuration settings, add the worker to
pkg/river/config.go
Workers
struct, this will allow the config variables to be set via thekoanf
config setup. Once added you will need to regenerate the config:task config:generate
-
Register the worker by adding the
river.AddWorkerSafely
to thepkg/river/workers.go
createWorkers
function. -
Add a
test
job totest/
directory by creating a new directory with amain.go
function that will insert the job into the queue.
See the contributing guide for more information.