From 6f953c7dc0362ce821dc3dc5620833a7af3ba772 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Fri, 9 Aug 2024 15:45:16 +0100 Subject: [PATCH 1/2] Cancel pending `checkForOrphans` on cluster monitor exit This was leaving dangling goroutines and creating confusing logging in unit test runs. Signed-off-by: Neil Twigg --- server/jetstream_cluster.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 402fd728946..ce18b77ca15 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1339,6 +1339,11 @@ func (js *jetStream) monitorCluster() { updateConsumers: make(map[string]*consumerAssignment), } + // Make sure to cancel any pending checkForOrphans calls if the + // monitor goroutine exits. + var oc *time.Timer + defer stopAndClearTimer(&oc) + for { select { case <-s.quitCh: @@ -1375,7 +1380,7 @@ func (js *jetStream) monitorCluster() { // Clear. ru = nil s.Debugf("Recovered JetStream cluster metadata") - time.AfterFunc(30*time.Second, js.checkForOrphans) + oc = time.AfterFunc(30*time.Second, js.checkForOrphans) // Do a health check here as well. go checkHealth() continue From b0414811c3ab0fd322ed0a3193af0869fa1a07fe Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 9 Aug 2024 12:24:16 -0700 Subject: [PATCH 2/2] Fix R1 streams exceeding quota limits (#5771) Fixes #5770 Signed-off-by: Waldemar Quevedo --------- Signed-off-by: Waldemar Quevedo Co-authored-by: John Weldon --- server/jetstream_cluster_3_test.go | 68 ++++++++++++++++++++++++++++++ server/jwt_test.go | 43 ++++++++++++++++++- server/stream.go | 2 + 3 files changed, 111 insertions(+), 2 deletions(-) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index ab999f2fdf2..2a3eebef50c 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -6433,3 +6433,71 @@ Consume3: t.Errorf("Consumers to same stream are at different sequences: %d vs %d", a, b) } } + +func TestJetStreamClusterAccountFileStoreLimits(t *testing.T) { + c := createJetStreamClusterExplicit(t, "limits", 3) + defer c.shutdown() + + limits := map[string]JetStreamAccountLimits{ + "R1": { + MaxMemory: 1 << 10, + MaxStore: 1 << 10, + MaxStreams: -1, + MaxConsumers: -1, + }, + "R3": { + MaxMemory: 1 << 10, + MaxStore: 1 << 10, + MaxStreams: -1, + MaxConsumers: -1, + }, + } + + // Update the limits in all servers. + for _, s := range c.servers { + acc := s.GlobalAccount() + if err := acc.UpdateJetStreamLimits(limits); err != nil { + t.Fatalf("Unexpected error updating jetstream account limits: %v", err) + } + } + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + for _, replicas := range []int64{1, 3} { + sname := fmt.Sprintf("test-stream:%d", replicas) + t.Run(sname, func(t *testing.T) { + sconfig := &nats.StreamConfig{ + Name: sname, + Replicas: int(replicas), + Storage: nats.FileStorage, + Retention: nats.LimitsPolicy, + } + _, err := js.AddStream(sconfig) + if err != nil { + t.Fatalf("Unexpected error creating stream: %v", err) + } + + data := []byte(strings.Repeat("A", 1<<8)) + for i := 0; i < 30; i++ { + if _, err = js.Publish(sname, data); err != nil && !strings.Contains(err.Error(), "resource limits exceeded for account") { + t.Errorf("Error publishing random data (iteration %d): %v", i, err) + } + + if err = nc.Flush(); err != nil { + t.Fatalf("Unexpected error flushing connection: %v", err) + } + + _, err = js.StreamInfo(sname) + require_NoError(t, err) + } + + si, err := js.StreamInfo(sname) + require_NoError(t, err) + st := si.State + maxStore := limits[fmt.Sprintf("R%d", replicas)].MaxStore + if int64(st.Bytes) > replicas*maxStore { + t.Errorf("Unexpected size of stream: got %d, expected less than %d\nstate: %#v", st.Bytes, maxStore, st) + } + }) + } +} diff --git a/server/jwt_test.go b/server/jwt_test.go index d6cdd1dd7d9..cb3a3391b5c 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -5444,11 +5444,20 @@ func TestJWTJetStreamTiers(t *testing.T) { _, err = js.Publish("testR1-2", msg[:]) require_NoError(t, err) + ainfo, err := js.AccountInfo() + require_NoError(t, err) + require_Equal(t, ainfo.Tiers["R1"].Store, 1100) + // test exceeding tiered storage limit _, err = js.Publish("testR1-1", []byte("1")) require_Error(t, err) require_Equal(t, err.Error(), "nats: resource limits exceeded for account") + // Check that storage has not increased after the rejected publish. + ainfo, err = js.AccountInfo() + require_NoError(t, err) + require_Equal(t, ainfo.Tiers["R1"].Store, 1100) + time.Sleep(time.Second - time.Since(start)) // make sure the time stamp changes accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{ DiskStorage: 1650, MemoryStorage: 0, Consumer: 1, Streams: 3} @@ -5466,9 +5475,22 @@ func TestJWTJetStreamTiers(t *testing.T) { _, err = js.AddConsumer("testR1-3", &nats.ConsumerConfig{Durable: "dur7", AckPolicy: nats.AckExplicitPolicy}) require_Error(t, err) require_Equal(t, err.Error(), "nats: maximum consumers limit reached") + + // At this point it will be exactly at the DiskStorage limit so it should not fail. _, err = js.Publish("testR1-3", msg[:]) + require_NoError(t, err) + ainfo, err = js.AccountInfo() + require_NoError(t, err) + require_Equal(t, ainfo.Tiers["R1"].Store, 1650) + + _, err = js.Publish("testR1-3", []byte("1")) require_Error(t, err) require_Equal(t, err.Error(), "nats: resource limits exceeded for account") + + // Should remain at the same usage. + ainfo, err = js.AccountInfo() + require_NoError(t, err) + require_Equal(t, ainfo.Tiers["R1"].Store, 1650) } func TestJWTJetStreamMaxAckPending(t *testing.T) { @@ -5625,11 +5647,28 @@ func TestJWTJetStreamMaxStreamBytes(t *testing.T) { _, err = js.AddStream(&nats.StreamConfig{Name: "bar", Replicas: 1, MaxBytes: 2048}) require_NoError(t, err) - // test if we can push more messages into the stream - _, err = js.Publish("baz", msg[:]) // exceeds max stream bytes + ainfo, err := js.AccountInfo() + require_NoError(t, err) + require_Equal(t, ainfo.Tiers["R1"].Store, 933) + + // This should be exactly at the limit of the account. + _, err = js.Publish("baz", []byte(strings.Repeat("A", 1082))) + require_NoError(t, err) + + ainfo, err = js.AccountInfo() + require_NoError(t, err) + require_Equal(t, ainfo.Tiers["R1"].Store, 2048) + + // Exceed max stream bytes limit. + _, err = js.Publish("baz", []byte("1")) require_Error(t, err) require_Equal(t, err.Error(), "nats: resource limits exceeded for account") + // Confirm no changes after rejected publish. + ainfo, err = js.AccountInfo() + require_NoError(t, err) + require_Equal(t, ainfo.Tiers["R1"].Store, 2048) + // test disabling max bytes required _, err = js.UpdateStream(&nats.StreamConfig{Name: "bar", Replicas: 1}) require_Error(t, err) diff --git a/server/stream.go b/server/stream.go index 9137254607a..1279efae581 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4573,6 +4573,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, response, _ = json.Marshal(resp) mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0)) } + mset.mu.Unlock() + return err } }