diff --git a/go.mod b/go.mod index f5f2d8f400..3418d19cd5 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/BurntSushi/toml v1.2.1 github.com/alibaba/sentinel-golang v1.0.4 github.com/aliyun/aliyun-tablestore-go-sdk v1.7.7 - github.com/apache/rocketmq-client-go/v2 v2.0.0-20230518020902-2a8172bb9174 + github.com/apache/rocketmq-client-go/v2 v2.1.2-0.20230628073434-533de03048e1 github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0 github.com/coocood/freecache v1.2.3 github.com/cosmtrek/air v1.43.0 diff --git a/go.sum b/go.sum index 15ef714118..2ef068fade 100644 --- a/go.sum +++ b/go.sum @@ -65,8 +65,8 @@ github.com/aliyun/aliyun-tablestore-go-sdk v1.7.7/go.mod h1:mZCxM44kLKLY5ci+0j6b github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= -github.com/apache/rocketmq-client-go/v2 v2.0.0-20230518020902-2a8172bb9174 h1:RXutpBr8h9zJH1zcmCCptgm6+q8N0oyMrpzbIjRess4= -github.com/apache/rocketmq-client-go/v2 v2.0.0-20230518020902-2a8172bb9174/go.mod h1:6I6vgxHR3hzrvn+6n/4mrhS+UTulzK/X9LB2Vk1U5gE= +github.com/apache/rocketmq-client-go/v2 v2.1.2-0.20230628073434-533de03048e1 h1:vVViC77QvxLOUhbbMGSaVnOwkUnFCnXZQ7mX6SDuOZA= +github.com/apache/rocketmq-client-go/v2 v2.1.2-0.20230628073434-533de03048e1/go.mod h1:6I6vgxHR3hzrvn+6n/4mrhS+UTulzK/X9LB2Vk1U5gE= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= diff --git a/pkg/client/rocketmq/option.go b/pkg/client/rocketmq/option.go index 21fd2b054f..312c98e038 100644 --- a/pkg/client/rocketmq/option.go +++ b/pkg/client/rocketmq/option.go @@ -17,6 +17,7 @@ package rocketmq import ( "crypto/md5" "fmt" + "os" "strings" "time" @@ -147,10 +148,11 @@ func StdProducerConfig(name string) *ProducerConfig { func RawPushConsumerConfig(name string) *PushConsumerConfig { var defaultConfig = DefaultConfig() var pushConsumerConfig = defaultConfig.PushConsumer - if err := conf.UnmarshalKey(name, &defaultConfig, conf.TagName("toml")); err != nil { - xlog.Jupiter().Panic("unmarshal config", xlog.FieldErr(err), xlog.String("key", name), xlog.Any("config", pushConsumerConfig)) + if err := conf.UnmarshalKey(name, &defaultConfig, conf.TagName("toml")); err != nil || + (len(pushConsumerConfig.Addr) == 0 && len(defaultConfig.Addresses) == 0) || + len(pushConsumerConfig.Topic) == 0 { + xlog.Jupiter().Panic("pushConsumerConfig fail", xlog.FieldErr(err), xlog.String("key", name), xlog.Any("config", pushConsumerConfig)) } - // 兼容rocket_client_mq变更,addr需要携带shceme if len(pushConsumerConfig.Addr) == 0 { pushConsumerConfig.Addr = defaultConfig.Addresses @@ -162,7 +164,7 @@ func RawPushConsumerConfig(name string) *PushConsumerConfig { // 这里根据mq集群地址的md5,生成默认InstanceName // 实现自动支持多集群,解决官方库默认不支持多集群消费的问题 if pushConsumerConfig.InstanceName == "" { - pushConsumerConfig.InstanceName = fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(pushConsumerConfig.Addr, ",")))) + pushConsumerConfig.InstanceName = fmt.Sprintf("%x@%d", md5.Sum([]byte(strings.Join(pushConsumerConfig.Addr, ","))), os.Getpid()) } if xdebug.IsDevelopmentMode() { @@ -175,8 +177,10 @@ func RawPushConsumerConfig(name string) *PushConsumerConfig { func RawPullConsumerConfig(name string) *PullConsumerConfig { var defaultConfig = DefaultConfig() var pullConsumerConfig = defaultConfig.PullConsumer - if err := conf.UnmarshalKey(name, &defaultConfig, conf.TagName("toml")); err != nil { - xlog.Jupiter().Panic("unmarshal config", xlog.FieldErr(err), xlog.String("key", name), xlog.Any("config", pullConsumerConfig)) + if err := conf.UnmarshalKey(name, &defaultConfig, conf.TagName("toml")); err != nil || + (len(pullConsumerConfig.Addr) == 0 && len(defaultConfig.Addresses) == 0) || + len(pullConsumerConfig.Topic) == 0 { + xlog.Jupiter().Panic("PullConsumerConfig fail", xlog.FieldErr(err), xlog.String("key", name), xlog.Any("config", pullConsumerConfig)) } // 兼容rocket_client_mq变更,addr需要携带shceme @@ -190,7 +194,7 @@ func RawPullConsumerConfig(name string) *PullConsumerConfig { // 这里根据mq集群地址的md5,生成默认InstanceName // 实现自动支持多集群,解决官方库默认不支持多集群消费的问题 if pullConsumerConfig.InstanceName == "" { - pullConsumerConfig.InstanceName = fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(pullConsumerConfig.Addr, ",")))) + pullConsumerConfig.InstanceName = fmt.Sprintf("%x@%d", md5.Sum([]byte(strings.Join(pullConsumerConfig.Addr, ","))), os.Getpid()) } if xdebug.IsDevelopmentMode() { @@ -203,8 +207,10 @@ func RawPullConsumerConfig(name string) *PullConsumerConfig { func RawProducerConfig(name string) *ProducerConfig { var defaultConfig = DefaultConfig() var producerConfig = defaultConfig.Producer - if err := conf.UnmarshalKey(name, &defaultConfig, conf.TagName("toml")); err != nil { - xlog.Jupiter().Panic("unmarshal config", xlog.FieldErr(err), xlog.String("key", name), xlog.Any("config", defaultConfig)) + if err := conf.UnmarshalKey(name, &defaultConfig, conf.TagName("toml")); err != nil || + (len(producerConfig.Addr) == 0 && len(defaultConfig.Addresses) == 0) || + len(producerConfig.Topic) == 0 { + xlog.Jupiter().Panic("RawProducerConfig fail", xlog.FieldErr(err), xlog.String("key", name), xlog.Any("config", producerConfig)) } // 兼容rocket_client_mq变更,addr需要携带shceme @@ -217,7 +223,7 @@ func RawProducerConfig(name string) *ProducerConfig { // 这里根据mq集群地址的md5,生成默认InstanceName // 实现自动支持多集群,解决官方库默认不支持多集群消费的问题 if producerConfig.InstanceName == "" { - producerConfig.InstanceName = fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(producerConfig.Addr, ",")))) + producerConfig.InstanceName = fmt.Sprintf("%x@%d", md5.Sum([]byte(strings.Join(producerConfig.Addr, ","))), os.Getpid()) } if xdebug.IsDevelopmentMode() {