Skip to content

Commit 95b46f4

Browse files
hjxphulongjun
andauthored
feat: mq push consume supports ratelimit (#948)
Co-authored-by: hulongjun <[email protected]>
1 parent 5393a4d commit 95b46f4

File tree

3 files changed

+8
-1
lines changed

3 files changed

+8
-1
lines changed

pkg/client/rocketmq/push_consumer.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,17 @@ func (conf *PushConsumerConfig) Build() *PushConsumer {
4848

4949
xlog.Jupiter().Debug("rocketmq's config: ", xlog.String("name", name), xlog.Any("conf", conf))
5050

51+
var bucket *ratelimit.Bucket
52+
if conf.Rate > 0 && conf.Capacity > 0 {
53+
bucket = ratelimit.NewBucketWithRate(conf.Rate, conf.Capacity)
54+
}
55+
5156
cc := &PushConsumer{
5257
name: name,
5358
PushConsumerConfig: *conf,
5459
subscribers: make(map[string]func(context.Context, ...*primitive.MessageExt) (consumer.ConsumeResult, error)),
5560
interceptors: []primitive.Interceptor{},
61+
bucket: bucket,
5662
}
5763
cc.interceptors = append(cc.interceptors,
5864
consumerMetricInterceptor(),

test/e2e/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ require (
108108
go.opentelemetry.io/otel/trace v1.16.0 // indirect
109109
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
110110
go.uber.org/atomic v1.10.0 // indirect
111-
go.uber.org/automaxprocs v1.5.2 // indirect
111+
go.uber.org/automaxprocs v1.5.3 // indirect
112112
go.uber.org/multierr v1.11.0 // indirect
113113
go.uber.org/zap v1.24.0 // indirect
114114
golang.org/x/arch v0.3.0 // indirect

test/e2e/go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,7 @@ go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
623623
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
624624
go.uber.org/automaxprocs v1.5.2 h1:2LxUOGiR3O6tw8ui5sZa2LAaHnsviZdVOUZw4fvbnME=
625625
go.uber.org/automaxprocs v1.5.2/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
626+
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
626627
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
627628
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
628629
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=

0 commit comments

Comments
 (0)