Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: nolint dupl #918

Merged
merged 4 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/BurntSushi/toml v1.2.1
github.com/alibaba/sentinel-golang v1.0.4
github.com/aliyun/aliyun-tablestore-go-sdk v1.7.7
github.com/apache/rocketmq-client-go/v2 v2.0.0-20230518020902-2a8172bb9174
github.com/apache/rocketmq-client-go/v2 v2.1.2-0.20230628073434-533de03048e1
github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0
github.com/coocood/freecache v1.2.3
github.com/cosmtrek/air v1.43.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ github.com/aliyun/aliyun-tablestore-go-sdk v1.7.7/go.mod h1:mZCxM44kLKLY5ci+0j6b
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apache/rocketmq-client-go/v2 v2.0.0-20230518020902-2a8172bb9174 h1:RXutpBr8h9zJH1zcmCCptgm6+q8N0oyMrpzbIjRess4=
github.com/apache/rocketmq-client-go/v2 v2.0.0-20230518020902-2a8172bb9174/go.mod h1:6I6vgxHR3hzrvn+6n/4mrhS+UTulzK/X9LB2Vk1U5gE=
github.com/apache/rocketmq-client-go/v2 v2.1.2-0.20230628073434-533de03048e1 h1:vVViC77QvxLOUhbbMGSaVnOwkUnFCnXZQ7mX6SDuOZA=
github.com/apache/rocketmq-client-go/v2 v2.1.2-0.20230628073434-533de03048e1/go.mod h1:6I6vgxHR3hzrvn+6n/4mrhS+UTulzK/X9LB2Vk1U5gE=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
Expand Down
29 changes: 19 additions & 10 deletions pkg/client/rocketmq/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package rocketmq
import (
"crypto/md5"
"fmt"
"os"
"strings"
"time"

Expand Down Expand Up @@ -144,13 +145,15 @@ func StdProducerConfig(name string) *ProducerConfig {
}

// RawPushConsumerConfig 返push consume回配置
// nolint:dupl
func RawPushConsumerConfig(name string) *PushConsumerConfig {
var defaultConfig = DefaultConfig()
var pushConsumerConfig = defaultConfig.PushConsumer
if err := conf.UnmarshalKey(name, &defaultConfig, conf.TagName("toml")); err != nil {
xlog.Jupiter().Panic("unmarshal config", xlog.FieldErr(err), xlog.String("key", name), xlog.Any("config", pushConsumerConfig))
if err := conf.UnmarshalKey(name, &defaultConfig, conf.TagName("toml")); err != nil ||
(len(pushConsumerConfig.Addr) == 0 && len(defaultConfig.Addresses) == 0) ||
len(pushConsumerConfig.Topic) == 0 {
xlog.Jupiter().Panic("pushConsumerConfig fail", xlog.FieldErr(err), xlog.String("key", name), xlog.Any("config", pushConsumerConfig))
}

// 兼容rocket_client_mq变更,addr需要携带shceme
if len(pushConsumerConfig.Addr) == 0 {
pushConsumerConfig.Addr = defaultConfig.Addresses
Expand All @@ -162,7 +165,7 @@ func RawPushConsumerConfig(name string) *PushConsumerConfig {
// 这里根据mq集群地址的md5,生成默认InstanceName
// 实现自动支持多集群,解决官方库默认不支持多集群消费的问题
if pushConsumerConfig.InstanceName == "" {
pushConsumerConfig.InstanceName = fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(pushConsumerConfig.Addr, ","))))
pushConsumerConfig.InstanceName = fmt.Sprintf("%x@%d", md5.Sum([]byte(strings.Join(pushConsumerConfig.Addr, ","))), os.Getpid())
}

if xdebug.IsDevelopmentMode() {
Expand All @@ -172,11 +175,14 @@ func RawPushConsumerConfig(name string) *PushConsumerConfig {
}

// RawPullConsumerConfig 返回pull consume配置
// nolint:dupl
func RawPullConsumerConfig(name string) *PullConsumerConfig {
var defaultConfig = DefaultConfig()
var pullConsumerConfig = defaultConfig.PullConsumer
if err := conf.UnmarshalKey(name, &defaultConfig, conf.TagName("toml")); err != nil {
xlog.Jupiter().Panic("unmarshal config", xlog.FieldErr(err), xlog.String("key", name), xlog.Any("config", pullConsumerConfig))
if err := conf.UnmarshalKey(name, &defaultConfig, conf.TagName("toml")); err != nil ||
(len(pullConsumerConfig.Addr) == 0 && len(defaultConfig.Addresses) == 0) ||
len(pullConsumerConfig.Topic) == 0 {
xlog.Jupiter().Panic("PullConsumerConfig fail", xlog.FieldErr(err), xlog.String("key", name), xlog.Any("config", pullConsumerConfig))
}

// 兼容rocket_client_mq变更,addr需要携带shceme
Expand All @@ -190,7 +196,7 @@ func RawPullConsumerConfig(name string) *PullConsumerConfig {
// 这里根据mq集群地址的md5,生成默认InstanceName
// 实现自动支持多集群,解决官方库默认不支持多集群消费的问题
if pullConsumerConfig.InstanceName == "" {
pullConsumerConfig.InstanceName = fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(pullConsumerConfig.Addr, ","))))
pullConsumerConfig.InstanceName = fmt.Sprintf("%x@%d", md5.Sum([]byte(strings.Join(pullConsumerConfig.Addr, ","))), os.Getpid())
}

if xdebug.IsDevelopmentMode() {
Expand All @@ -200,11 +206,14 @@ func RawPullConsumerConfig(name string) *PullConsumerConfig {
}

// RawProducerConfig 返回produce配置
// nolint:dupl
func RawProducerConfig(name string) *ProducerConfig {
var defaultConfig = DefaultConfig()
var producerConfig = defaultConfig.Producer
if err := conf.UnmarshalKey(name, &defaultConfig, conf.TagName("toml")); err != nil {
xlog.Jupiter().Panic("unmarshal config", xlog.FieldErr(err), xlog.String("key", name), xlog.Any("config", defaultConfig))
if err := conf.UnmarshalKey(name, &defaultConfig, conf.TagName("toml")); err != nil ||
(len(producerConfig.Addr) == 0 && len(defaultConfig.Addresses) == 0) ||
len(producerConfig.Topic) == 0 {
xlog.Jupiter().Panic("RawProducerConfig fail", xlog.FieldErr(err), xlog.String("key", name), xlog.Any("config", producerConfig))
}

// 兼容rocket_client_mq变更,addr需要携带shceme
Expand All @@ -217,7 +226,7 @@ func RawProducerConfig(name string) *ProducerConfig {
// 这里根据mq集群地址的md5,生成默认InstanceName
// 实现自动支持多集群,解决官方库默认不支持多集群消费的问题
if producerConfig.InstanceName == "" {
producerConfig.InstanceName = fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(producerConfig.Addr, ","))))
producerConfig.InstanceName = fmt.Sprintf("%x@%d", md5.Sum([]byte(strings.Join(producerConfig.Addr, ","))), os.Getpid())
}

if xdebug.IsDevelopmentMode() {
Expand Down
Loading