Skip to content

Commit

Permalink
[FIXED] (2.11) Respect consumer's starting seq, even if in the future (
Browse files Browse the repository at this point in the history
…#6253)

When using a replicated stream a consumer with a starting sequence could
be created on a follower that's ever so slightly behind on applies. If
the starting sequence is higher than the stream's last sequence we would
clamp it down.
We should not be clamping it down, and must respect what the user/client
gave us. That ensures any messages the user/client gets are always
`message.seq>=starting_seq` as well.

However, if the consumer is used for mirroring/sourcing then we should
still clamp down.

Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
derekcollison authored Jan 23, 2025
2 parents e54cc94 + ddcc364 commit 9c7cf29
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 3 deletions.
12 changes: 9 additions & 3 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5364,12 +5364,18 @@ func (o *consumer) selectStartingSeqNo() {
o.sseq = o.cfg.OptStartSeq
}

if state.FirstSeq == 0 {
if state.FirstSeq == 0 && (o.cfg.Direct || o.cfg.OptStartSeq == 0) {
// If the stream is empty, deliver only new.
// But only if mirroring/sourcing, or start seq is unset, otherwise need to respect provided value.
o.sseq = 1
} else if o.sseq > state.LastSeq && (o.cfg.Direct || o.cfg.OptStartSeq == 0) {
// If selected sequence is in the future, clamp back down.
// But only if mirroring/sourcing, or start seq is unset, otherwise need to respect provided value.
o.sseq = state.LastSeq + 1
} else if o.sseq < state.FirstSeq {
// If the first sequence is further ahead than the starting sequence,
// there are no messages there anymore, so move the sequence up.
o.sseq = state.FirstSeq
} else if o.sseq > state.LastSeq {
o.sseq = state.LastSeq + 1
}
}

Expand Down
45 changes: 45 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7323,6 +7323,51 @@ func TestJetStreamClusterConsumerHealthCheckMustNotRecreate(t *testing.T) {
checkNodeIsClosed(ca)
}

func TestJetStreamClusterRespectConsumerStartSeq(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Create replicated stream.
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

// We could have published messages into the stream that have not yet been applied on the follower.
// If we create a consumer with a starting sequence in the future, we must respect it.
ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
DeliverPolicy: nats.DeliverByStartSequencePolicy,
OptStartSeq: 20,
})
require_NoError(t, err)
require_Equal(t, ci.Delivered.Stream, 19)

// Same thing if the first sequence is not 0.
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 10})
require_NoError(t, err)

ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
DeliverPolicy: nats.DeliverByStartSequencePolicy,
OptStartSeq: 20,
})
require_NoError(t, err)
require_Equal(t, ci.Delivered.Stream, 19)

// Only if we're requested to start at a sequence that's not available anymore
// can we safely move it up. That data is gone already, so can't do anything else.
ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
DeliverPolicy: nats.DeliverByStartSequencePolicy,
OptStartSeq: 5,
})
require_NoError(t, err)
require_Equal(t, ci.Delivered.Stream, 9)
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down

0 comments on commit 9c7cf29

Please sign in to comment.