diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index ab999f2fdf..2a3eebef50 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -6433,3 +6433,71 @@ Consume3: t.Errorf("Consumers to same stream are at different sequences: %d vs %d", a, b) } } + +func TestJetStreamClusterAccountFileStoreLimits(t *testing.T) { + c := createJetStreamClusterExplicit(t, "limits", 3) + defer c.shutdown() + + limits := map[string]JetStreamAccountLimits{ + "R1": { + MaxMemory: 1 << 10, + MaxStore: 1 << 10, + MaxStreams: -1, + MaxConsumers: -1, + }, + "R3": { + MaxMemory: 1 << 10, + MaxStore: 1 << 10, + MaxStreams: -1, + MaxConsumers: -1, + }, + } + + // Update the limits in all servers. + for _, s := range c.servers { + acc := s.GlobalAccount() + if err := acc.UpdateJetStreamLimits(limits); err != nil { + t.Fatalf("Unexpected error updating jetstream account limits: %v", err) + } + } + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + for _, replicas := range []int64{1, 3} { + sname := fmt.Sprintf("test-stream:%d", replicas) + t.Run(sname, func(t *testing.T) { + sconfig := &nats.StreamConfig{ + Name: sname, + Replicas: int(replicas), + Storage: nats.FileStorage, + Retention: nats.LimitsPolicy, + } + _, err := js.AddStream(sconfig) + if err != nil { + t.Fatalf("Unexpected error creating stream: %v", err) + } + + data := []byte(strings.Repeat("A", 1<<8)) + for i := 0; i < 30; i++ { + if _, err = js.Publish(sname, data); err != nil && !strings.Contains(err.Error(), "resource limits exceeded for account") { + t.Errorf("Error publishing random data (iteration %d): %v", i, err) + } + + if err = nc.Flush(); err != nil { + t.Fatalf("Unexpected error flushing connection: %v", err) + } + + _, err = js.StreamInfo(sname) + require_NoError(t, err) + } + + si, err := js.StreamInfo(sname) + require_NoError(t, err) + st := si.State + maxStore := limits[fmt.Sprintf("R%d", replicas)].MaxStore + if int64(st.Bytes) > replicas*maxStore { + t.Errorf("Unexpected size of stream: got %d, expected less than %d\nstate: %#v", st.Bytes, maxStore, st) + } + }) + } +} diff --git a/server/jwt_test.go b/server/jwt_test.go index d6cdd1dd7d..cb3a3391b5 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -5444,11 +5444,20 @@ func TestJWTJetStreamTiers(t *testing.T) { _, err = js.Publish("testR1-2", msg[:]) require_NoError(t, err) + ainfo, err := js.AccountInfo() + require_NoError(t, err) + require_Equal(t, ainfo.Tiers["R1"].Store, 1100) + // test exceeding tiered storage limit _, err = js.Publish("testR1-1", []byte("1")) require_Error(t, err) require_Equal(t, err.Error(), "nats: resource limits exceeded for account") + // Check that storage has not increased after the rejected publish. + ainfo, err = js.AccountInfo() + require_NoError(t, err) + require_Equal(t, ainfo.Tiers["R1"].Store, 1100) + time.Sleep(time.Second - time.Since(start)) // make sure the time stamp changes accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{ DiskStorage: 1650, MemoryStorage: 0, Consumer: 1, Streams: 3} @@ -5466,9 +5475,22 @@ func TestJWTJetStreamTiers(t *testing.T) { _, err = js.AddConsumer("testR1-3", &nats.ConsumerConfig{Durable: "dur7", AckPolicy: nats.AckExplicitPolicy}) require_Error(t, err) require_Equal(t, err.Error(), "nats: maximum consumers limit reached") + + // At this point it will be exactly at the DiskStorage limit so it should not fail. _, err = js.Publish("testR1-3", msg[:]) + require_NoError(t, err) + ainfo, err = js.AccountInfo() + require_NoError(t, err) + require_Equal(t, ainfo.Tiers["R1"].Store, 1650) + + _, err = js.Publish("testR1-3", []byte("1")) require_Error(t, err) require_Equal(t, err.Error(), "nats: resource limits exceeded for account") + + // Should remain at the same usage. + ainfo, err = js.AccountInfo() + require_NoError(t, err) + require_Equal(t, ainfo.Tiers["R1"].Store, 1650) } func TestJWTJetStreamMaxAckPending(t *testing.T) { @@ -5625,11 +5647,28 @@ func TestJWTJetStreamMaxStreamBytes(t *testing.T) { _, err = js.AddStream(&nats.StreamConfig{Name: "bar", Replicas: 1, MaxBytes: 2048}) require_NoError(t, err) - // test if we can push more messages into the stream - _, err = js.Publish("baz", msg[:]) // exceeds max stream bytes + ainfo, err := js.AccountInfo() + require_NoError(t, err) + require_Equal(t, ainfo.Tiers["R1"].Store, 933) + + // This should be exactly at the limit of the account. + _, err = js.Publish("baz", []byte(strings.Repeat("A", 1082))) + require_NoError(t, err) + + ainfo, err = js.AccountInfo() + require_NoError(t, err) + require_Equal(t, ainfo.Tiers["R1"].Store, 2048) + + // Exceed max stream bytes limit. + _, err = js.Publish("baz", []byte("1")) require_Error(t, err) require_Equal(t, err.Error(), "nats: resource limits exceeded for account") + // Confirm no changes after rejected publish. + ainfo, err = js.AccountInfo() + require_NoError(t, err) + require_Equal(t, ainfo.Tiers["R1"].Store, 2048) + // test disabling max bytes required _, err = js.UpdateStream(&nats.StreamConfig{Name: "bar", Replicas: 1}) require_Error(t, err) diff --git a/server/stream.go b/server/stream.go index 9137254607..1279efae58 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4573,6 +4573,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, response, _ = json.Marshal(resp) mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0)) } + mset.mu.Unlock() + return err } }