Skip to content

Commit 9e07519

Browse files
committed
rivertest: enable metadata assertions
This enables basic assertions against metadata in `rivertest.RequireX` methods. The approach here isn't anything special: it allows a `map[string]any` to be specified and considers it a failure if all keys & values in it are not also in the metadata. This should make it possible to build workflow-related assertions and other metadata assertions much more easily.
1 parent a8d8127 commit 9e07519

File tree

3 files changed

+175
-0
lines changed

3 files changed

+175
-0
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Added
11+
12+
- Add metadata assertions to rivertest. [PR #1137](https://github.com/riverqueue/river/pull/1137).
13+
1014
## [0.30.2] - 2026-01-26
1115

1216
### Fixed

rivertest/rivertest.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"context"
77
"encoding/json"
88
"fmt"
9+
"reflect"
910
"slices"
1011
"strings"
1112
"testing"
@@ -50,6 +51,13 @@ type RequireInsertedOpts struct {
5051
// No assertion is made if left the zero value.
5152
MaxAttempts int
5253

54+
// Metadata is a subset of job metadata to assert against. Only the keys and
55+
// values provided are compared, and any extra metadata on the job is
56+
// ignored.
57+
//
58+
// No assertion is made if left nil or empty.
59+
Metadata map[string]any
60+
5361
// Priority is the expected priority for the inserted job.
5462
//
5563
// No assertion is made if left the zero value.
@@ -501,6 +509,16 @@ func compareJobToInsertOpts(t testingT, jobRow *rivertype.JobRow, expectedOpts *
501509
}
502510
}
503511

512+
if len(expectedOpts.Metadata) > 0 {
513+
metadataMatches, metadataFailures := compareMetadataSubset(t, jobRow.Metadata, expectedOpts.Metadata, requireNotInserted)
514+
515+
if !metadataMatches && requireNotInserted {
516+
return true
517+
}
518+
519+
failures = append(failures, metadataFailures...)
520+
}
521+
504522
if expectedOpts.Priority != 0 {
505523
if jobRow.Priority == expectedOpts.Priority {
506524
if requireNotInserted {
@@ -594,6 +612,94 @@ func compareJobToInsertOpts(t testingT, jobRow *rivertype.JobRow, expectedOpts *
594612
return false
595613
}
596614

615+
func compareMetadataSubset(t testingT, jobMetadataBytes []byte, expectedMetadata map[string]any, requireNotInserted bool) (bool, []string) {
616+
t.Helper()
617+
618+
jobMetadata := map[string]any{}
619+
if len(jobMetadataBytes) > 0 {
620+
if err := json.Unmarshal(jobMetadataBytes, &jobMetadata); err != nil {
621+
failuref(t, "Internal failure: error unmarshaling job metadata: %s", err)
622+
}
623+
}
624+
625+
keys := make([]string, 0, len(expectedMetadata))
626+
for key := range expectedMetadata {
627+
keys = append(keys, key)
628+
}
629+
slices.Sort(keys)
630+
631+
failures := make([]string, 0, len(keys))
632+
allMatch := true
633+
for _, key := range keys {
634+
expectedValue := expectedMetadata[key]
635+
636+
actualValue, ok := jobMetadata[key]
637+
if !ok {
638+
allMatch = false
639+
if requireNotInserted {
640+
return false, nil
641+
}
642+
failures = append(failures, fmt.Sprintf("metadata missing key '%s'", key))
643+
continue
644+
}
645+
646+
if expectedValue == nil {
647+
if actualValue == nil {
648+
if requireNotInserted {
649+
failures = append(failures, fmt.Sprintf("metadata[%s] equal to excluded null", key))
650+
}
651+
} else {
652+
allMatch = false
653+
if requireNotInserted {
654+
return false, nil
655+
}
656+
failures = append(failures, fmt.Sprintf("metadata[%s] %s not equal to expected null", key, formatMetadataValue(actualValue)))
657+
}
658+
continue
659+
}
660+
661+
normalizedExpected, err := normalizeMetadataValue(expectedValue)
662+
if err != nil {
663+
failuref(t, "Internal failure: error normalizing metadata for key '%s': %s", key, err)
664+
}
665+
666+
if reflect.DeepEqual(actualValue, normalizedExpected) {
667+
if requireNotInserted {
668+
failures = append(failures, fmt.Sprintf("metadata[%s] equal to excluded %s", key, formatMetadataValue(normalizedExpected)))
669+
}
670+
} else {
671+
allMatch = false
672+
if requireNotInserted {
673+
return false, nil
674+
}
675+
failures = append(failures, fmt.Sprintf("metadata[%s] %s not equal to expected %s", key, formatMetadataValue(actualValue), formatMetadataValue(normalizedExpected)))
676+
}
677+
}
678+
679+
return allMatch, failures
680+
}
681+
682+
func formatMetadataValue(value any) string {
683+
encoded, err := json.Marshal(value)
684+
if err != nil {
685+
return fmt.Sprintf("%v", value)
686+
}
687+
return string(encoded)
688+
}
689+
690+
func normalizeMetadataValue(value any) (any, error) {
691+
encoded, err := json.Marshal(value)
692+
if err != nil {
693+
return nil, err
694+
}
695+
696+
var normalized any
697+
if err := json.Unmarshal(encoded, &normalized); err != nil {
698+
return nil, err
699+
}
700+
return normalized, nil
701+
}
702+
597703
// failuref takes a printf-style directive and is a shortcut for failing an
598704
// assertion.
599705
func failuref(t testingT, format string, a ...any) {

rivertest/rivertest_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,41 @@ func TestRequireInsertedTx(t *testing.T) {
253253
mockT.LogOutput())
254254
})
255255

256+
t.Run("Metadata", func(t *testing.T) {
257+
t.Parallel()
258+
259+
riverClient, bundle := setup(t)
260+
261+
_, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
262+
Metadata: []byte(`{"key":"value","list":[1,2],"nested":{"enabled":true},"num":1}`),
263+
})
264+
require.NoError(t, err)
265+
266+
mockT := testutil.NewMockT(t)
267+
opts := &RequireInsertedOpts{
268+
Metadata: map[string]any{
269+
"key": "value",
270+
"nested": map[string]any{"enabled": true},
271+
"num": int64(1),
272+
},
273+
}
274+
_ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts)
275+
require.False(t, mockT.Failed, "Should have succeeded, but failed with: "+mockT.LogOutput())
276+
277+
mockT = testutil.NewMockT(t)
278+
opts = &RequireInsertedOpts{
279+
Metadata: map[string]any{
280+
"key": "wrong",
281+
"missing": "value",
282+
},
283+
}
284+
_ = requireInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts)
285+
require.True(t, mockT.Failed)
286+
require.Equal(t,
287+
failureString("Job with kind 'job2' metadata[key] \"value\" not equal to expected \"wrong\", metadata missing key 'missing'")+"\n",
288+
mockT.LogOutput())
289+
})
290+
256291
t.Run("Priority", func(t *testing.T) {
257292
t.Parallel()
258293

@@ -587,6 +622,36 @@ func TestRequireNotInsertedTx(t *testing.T) {
587622
mockT.LogOutput())
588623
})
589624

625+
t.Run("Metadata", func(t *testing.T) {
626+
t.Parallel()
627+
628+
riverClient, bundle := setup(t)
629+
630+
_, err := riverClient.InsertTx(ctx, bundle.tx, Job2Args{Int: 123}, &river.InsertOpts{
631+
Metadata: []byte(`{"key":"value"}`),
632+
})
633+
require.NoError(t, err)
634+
635+
mockT := testutil.NewMockT(t)
636+
opts := emptyOpts()
637+
opts.Metadata = map[string]any{
638+
"key": "value",
639+
}
640+
requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts)
641+
require.True(t, mockT.Failed)
642+
require.Equal(t,
643+
failureString("Job with kind 'job2' metadata[key] equal to excluded \"value\"")+"\n",
644+
mockT.LogOutput())
645+
646+
mockT = testutil.NewMockT(t)
647+
opts = emptyOpts()
648+
opts.Metadata = map[string]any{
649+
"key": "other",
650+
}
651+
requireNotInsertedTx[*riverpgxv5.Driver](ctx, mockT, bundle.tx, &Job2Args{}, opts)
652+
require.False(t, mockT.Failed)
653+
})
654+
590655
t.Run("Priority", func(t *testing.T) {
591656
t.Parallel()
592657

0 commit comments

Comments
 (0)