Skip to content

Commit

Permalink
Fix R1 streams exceeding quota limits (#5771)
Browse files Browse the repository at this point in the history
Fixes #5770 

Signed-off-by: Waldemar Quevedo <[email protected]>

---------

Signed-off-by: Waldemar Quevedo <[email protected]>
Co-authored-by: John Weldon <[email protected]>
  • Loading branch information
wallyqs and johnweldon committed Aug 9, 2024
1 parent 6f953c7 commit b041481
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 2 deletions.
68 changes: 68 additions & 0 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6433,3 +6433,71 @@ Consume3:
t.Errorf("Consumers to same stream are at different sequences: %d vs %d", a, b)
}
}

func TestJetStreamClusterAccountFileStoreLimits(t *testing.T) {
c := createJetStreamClusterExplicit(t, "limits", 3)
defer c.shutdown()

limits := map[string]JetStreamAccountLimits{
"R1": {
MaxMemory: 1 << 10,
MaxStore: 1 << 10,
MaxStreams: -1,
MaxConsumers: -1,
},
"R3": {
MaxMemory: 1 << 10,
MaxStore: 1 << 10,
MaxStreams: -1,
MaxConsumers: -1,
},
}

// Update the limits in all servers.
for _, s := range c.servers {
acc := s.GlobalAccount()
if err := acc.UpdateJetStreamLimits(limits); err != nil {
t.Fatalf("Unexpected error updating jetstream account limits: %v", err)
}
}
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

for _, replicas := range []int64{1, 3} {
sname := fmt.Sprintf("test-stream:%d", replicas)
t.Run(sname, func(t *testing.T) {
sconfig := &nats.StreamConfig{
Name: sname,
Replicas: int(replicas),
Storage: nats.FileStorage,
Retention: nats.LimitsPolicy,
}
_, err := js.AddStream(sconfig)
if err != nil {
t.Fatalf("Unexpected error creating stream: %v", err)
}

data := []byte(strings.Repeat("A", 1<<8))
for i := 0; i < 30; i++ {
if _, err = js.Publish(sname, data); err != nil && !strings.Contains(err.Error(), "resource limits exceeded for account") {
t.Errorf("Error publishing random data (iteration %d): %v", i, err)
}

if err = nc.Flush(); err != nil {
t.Fatalf("Unexpected error flushing connection: %v", err)
}

_, err = js.StreamInfo(sname)
require_NoError(t, err)
}

si, err := js.StreamInfo(sname)
require_NoError(t, err)
st := si.State
maxStore := limits[fmt.Sprintf("R%d", replicas)].MaxStore
if int64(st.Bytes) > replicas*maxStore {
t.Errorf("Unexpected size of stream: got %d, expected less than %d\nstate: %#v", st.Bytes, maxStore, st)
}
})
}
}
43 changes: 41 additions & 2 deletions server/jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5444,11 +5444,20 @@ func TestJWTJetStreamTiers(t *testing.T) {
_, err = js.Publish("testR1-2", msg[:])
require_NoError(t, err)

ainfo, err := js.AccountInfo()
require_NoError(t, err)
require_Equal(t, ainfo.Tiers["R1"].Store, 1100)

// test exceeding tiered storage limit
_, err = js.Publish("testR1-1", []byte("1"))
require_Error(t, err)
require_Equal(t, err.Error(), "nats: resource limits exceeded for account")

// Check that storage has not increased after the rejected publish.
ainfo, err = js.AccountInfo()
require_NoError(t, err)
require_Equal(t, ainfo.Tiers["R1"].Store, 1100)

time.Sleep(time.Second - time.Since(start)) // make sure the time stamp changes
accClaim.Limits.JetStreamTieredLimits["R1"] = jwt.JetStreamLimits{
DiskStorage: 1650, MemoryStorage: 0, Consumer: 1, Streams: 3}
Expand All @@ -5466,9 +5475,22 @@ func TestJWTJetStreamTiers(t *testing.T) {
_, err = js.AddConsumer("testR1-3", &nats.ConsumerConfig{Durable: "dur7", AckPolicy: nats.AckExplicitPolicy})
require_Error(t, err)
require_Equal(t, err.Error(), "nats: maximum consumers limit reached")

// At this point it will be exactly at the DiskStorage limit so it should not fail.
_, err = js.Publish("testR1-3", msg[:])
require_NoError(t, err)
ainfo, err = js.AccountInfo()
require_NoError(t, err)
require_Equal(t, ainfo.Tiers["R1"].Store, 1650)

_, err = js.Publish("testR1-3", []byte("1"))
require_Error(t, err)
require_Equal(t, err.Error(), "nats: resource limits exceeded for account")

// Should remain at the same usage.
ainfo, err = js.AccountInfo()
require_NoError(t, err)
require_Equal(t, ainfo.Tiers["R1"].Store, 1650)
}

func TestJWTJetStreamMaxAckPending(t *testing.T) {
Expand Down Expand Up @@ -5625,11 +5647,28 @@ func TestJWTJetStreamMaxStreamBytes(t *testing.T) {
_, err = js.AddStream(&nats.StreamConfig{Name: "bar", Replicas: 1, MaxBytes: 2048})
require_NoError(t, err)

// test if we can push more messages into the stream
_, err = js.Publish("baz", msg[:]) // exceeds max stream bytes
ainfo, err := js.AccountInfo()
require_NoError(t, err)
require_Equal(t, ainfo.Tiers["R1"].Store, 933)

// This should be exactly at the limit of the account.
_, err = js.Publish("baz", []byte(strings.Repeat("A", 1082)))
require_NoError(t, err)

ainfo, err = js.AccountInfo()
require_NoError(t, err)
require_Equal(t, ainfo.Tiers["R1"].Store, 2048)

// Exceed max stream bytes limit.
_, err = js.Publish("baz", []byte("1"))
require_Error(t, err)
require_Equal(t, err.Error(), "nats: resource limits exceeded for account")

// Confirm no changes after rejected publish.
ainfo, err = js.AccountInfo()
require_NoError(t, err)
require_Equal(t, ainfo.Tiers["R1"].Store, 2048)

// test disabling max bytes required
_, err = js.UpdateStream(&nats.StreamConfig{Name: "bar", Replicas: 1})
require_Error(t, err)
Expand Down
2 changes: 2 additions & 0 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4573,6 +4573,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
response, _ = json.Marshal(resp)
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0))
}
mset.mu.Unlock()
return err
}
}

Expand Down

0 comments on commit b041481

Please sign in to comment.