Skip to content

Commit ecfa6a0

Browse files
authored
Profile base to dev (#352)
profile done
1 parent 64558b2 commit ecfa6a0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+2408
-840
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ jobs:
1919
testing:
2020
strategy:
2121
matrix:
22-
go-version: [1.12.x,1.13.x,1.14.x,1.15.x,1.16.x,1.17.x,1.18.x,1.19.x]
22+
go-version: [1.12.x,1.13.x,1.14.x,1.15.x,1.16.x,1.17.x,1.18.x,1.19.x,1.20.x,1.21.x]
2323
platform: [ubuntu-latest]
2424
runs-on: ${{ matrix.platform }}
2525
steps:

agent.go

Lines changed: 40 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func (a *Agent) RegisterCommandHandler(f CommandHandler) {
133133
a.commandHandlers = append(a.commandHandlers, f)
134134
}
135135

136-
func (a *Agent) GetDynamicRegistryInfo() *registrySnapInfoStorage {
136+
func (a *Agent) GetDynamicRegistryInfo() *RegistrySnapInfoStorage {
137137
return a.configurer.getRegistryInfo()
138138
}
139139

@@ -150,7 +150,7 @@ func (a *Agent) RuntimeDir() string {
150150
return a.runtimedir
151151
}
152152

153-
// get Agent server
153+
// GetAgentServer get Agent server
154154
func (a *Agent) GetAgentServer() motan.Server {
155155
return a.agentServer
156156
}
@@ -300,7 +300,7 @@ func (a *Agent) initStatus() {
300300

301301
func (a *Agent) saveStatus() {
302302
statSnapFile := a.runtimedir + string(filepath.Separator) + defaultStatusSnap
303-
err := ioutil.WriteFile(statSnapFile, []byte(strconv.Itoa(int(http.StatusOK))), 0644)
303+
err := ioutil.WriteFile(statSnapFile, []byte(strconv.Itoa(http.StatusOK)), 0644)
304304
if err != nil {
305305
vlog.Errorln("Save status error: " + err.Error())
306306
return
@@ -351,6 +351,14 @@ func (a *Agent) initParam() {
351351
initLog(logDir, section)
352352
registerSwitchers(a.Context)
353353

354+
processPoolSize := 0
355+
if section != nil && section["processPoolSize"] != nil {
356+
processPoolSize = section["processPoolSize"].(int)
357+
}
358+
if processPoolSize > 0 {
359+
mserver.SetProcessPoolSize(processPoolSize)
360+
}
361+
354362
port := *motan.Port
355363
if port == 0 && section != nil && section["port"] != nil {
356364
port = section["port"].(int)
@@ -474,7 +482,7 @@ func (a *Agent) reloadClusters(ctx *motan.Context) {
474482
serviceItemKeep := make(map[string]bool)
475483
clusterMap := make(map[interface{}]interface{})
476484
serviceMap := make(map[interface{}]interface{})
477-
var allRefersURLs = []*motan.URL{}
485+
var allRefersURLs []*motan.URL
478486
if a.configurer != nil {
479487
//keep all dynamic refers
480488
for _, url := range a.configurer.subscribeNodes {
@@ -490,7 +498,7 @@ func (a *Agent) reloadClusters(ctx *motan.Context) {
490498
}
491499

492500
service := url.Path
493-
mapKey := getClusterKey(url.Group, url.GetStringParamsWithDefault(motan.VersionKey, "0.1"), url.Protocol, url.Path)
501+
mapKey := getClusterKey(url.Group, url.GetStringParamsWithDefault(motan.VersionKey, motan.DefaultReferVersion), url.Protocol, url.Path)
494502

495503
// find exists old serviceMap
496504
var serviceMapValue serviceMapItem
@@ -589,7 +597,7 @@ func (a *Agent) initCluster(url *motan.URL) {
589597
}
590598
a.serviceMap.UnsafeStore(url.Path, serviceMapItemArr)
591599
})
592-
mapKey := getClusterKey(url.Group, url.GetStringParamsWithDefault(motan.VersionKey, "0.1"), url.Protocol, url.Path)
600+
mapKey := getClusterKey(url.Group, url.GetStringParamsWithDefault(motan.VersionKey, motan.DefaultReferVersion), url.Protocol, url.Path)
593601
a.clsLock.Lock() // Mutually exclusive with the reloadClusters method
594602
defer a.clsLock.Unlock()
595603
a.clusterMap.Store(mapKey, c)
@@ -748,7 +756,9 @@ func (a *agentMessageHandler) httpCall(request motan.Request, ck string, httpClu
748756
if err != nil {
749757
return getDefaultResponse(request.GetRequestID(), "do http request failed : "+err.Error())
750758
}
751-
res = &motan.MotanResponse{RequestID: request.GetRequestID()}
759+
httpMotanResp := mhttp.AcquireHttpMotanResponse()
760+
httpMotanResp.RequestID = request.GetRequestID()
761+
res = httpMotanResp
752762
mhttp.FasthttpResponseToMotanResponse(res, httpResponse)
753763
return res
754764
}
@@ -794,61 +804,38 @@ func (a *agentMessageHandler) Call(request motan.Request) (res motan.Response) {
794804
}
795805
return res
796806
}
797-
func (a *agentMessageHandler) matchRule(typ, cond, key string, data []serviceMapItem, f func(u *motan.URL) string) (foundClusters []serviceMapItem, err error) {
798-
if cond == "" {
799-
err = fmt.Errorf("empty %s is not supported", typ)
807+
808+
func (a *agentMessageHandler) findCluster(request motan.Request) (c *cluster.MotanCluster, key string, err error) {
809+
service := request.GetServiceName()
810+
if service == "" {
811+
err = fmt.Errorf("empty service is not supported. service: %s", service)
800812
return
801813
}
802-
for _, item := range data {
803-
if f(item.url) == cond {
804-
foundClusters = append(foundClusters, item)
805-
}
814+
serviceItemArrI, exists := a.agent.serviceMap.Load(service)
815+
if !exists {
816+
err = fmt.Errorf("cluster not found. service: %s", service)
817+
return
806818
}
807-
if len(foundClusters) == 0 {
808-
err = fmt.Errorf("cluster not found. cluster:%s", key)
819+
clusters := serviceItemArrI.([]serviceMapItem)
820+
if len(clusters) == 1 {
821+
//TODO: add strict mode to avoid incorrect group call
822+
c = clusters[0].cluster
809823
return
810824
}
811-
return
812-
}
813-
func (a *agentMessageHandler) findCluster(request motan.Request) (c *cluster.MotanCluster, key string, err error) {
814-
service := request.GetServiceName()
815825
group := request.GetAttachment(mpro.MGroup)
816-
version := request.GetAttachment(mpro.MVersion)
817-
protocol := request.GetAttachment(mpro.MProxyProtocol)
818-
reqInfo := fmt.Sprintf("request information: {service: %s, group: %s, protocol: %s, version: %s}",
819-
service, group, protocol, version)
820-
serviceItemArrI, exists := a.agent.serviceMap.Load(service)
821-
if !exists {
822-
err = fmt.Errorf("cluster not found. cluster:%s, %s", service, reqInfo)
826+
if group == "" {
827+
err = fmt.Errorf("multiple clusters are matched with service: %s, but the group is empty", service)
823828
return
824829
}
825-
search := []struct {
826-
tip string
827-
cond string
828-
condFn func(u *motan.URL) string
829-
}{
830-
{"service", service, func(u *motan.URL) string { return u.Path }},
831-
{"group", group, func(u *motan.URL) string { return u.Group }},
832-
{"protocol", protocol, func(u *motan.URL) string { return u.Protocol }},
833-
{"version", version, func(u *motan.URL) string { return u.GetParam(motan.VersionKey, "") }},
834-
}
835-
foundClusters := serviceItemArrI.([]serviceMapItem)
836-
for i, rule := range search {
837-
if i == 0 {
838-
key = rule.cond
839-
} else {
840-
key += "_" + rule.cond
841-
}
842-
foundClusters, err = a.matchRule(rule.tip, rule.cond, key, foundClusters, rule.condFn)
843-
if err != nil {
844-
return
845-
}
846-
if len(foundClusters) == 1 {
847-
c = foundClusters[0].cluster
830+
version := request.GetAttachment(mpro.MVersion)
831+
protocol := request.GetAttachment(mpro.MProxyProtocol)
832+
for _, j := range clusters {
833+
if j.url.IsMatch(service, group, protocol, version) {
834+
c = j.cluster
848835
return
849836
}
850837
}
851-
err = fmt.Errorf("less condition to select cluster, maybe this service belongs to multiple group, protocol, version; cluster: %s, %s", key, reqInfo)
838+
err = fmt.Errorf("no cluster matches the request; info: {service: %s, group: %s, protocol: %s, version: %s}", service, group, protocol, version)
852839
return
853840
}
854841

@@ -1145,7 +1132,7 @@ func (a *Agent) startMServer() {
11451132
continue
11461133
}
11471134
a.mport = port
1148-
managementListener = motan.TCPKeepAliveListener{listener.(*net.TCPListener)}
1135+
managementListener = motan.TCPKeepAliveListener{TCPListener: listener.(*net.TCPListener)}
11491136
break
11501137
}
11511138
if managementListener == nil {
@@ -1158,7 +1145,7 @@ func (a *Agent) startMServer() {
11581145
vlog.Infof("listen manage port %d failed:%s", a.mport, err.Error())
11591146
return
11601147
}
1161-
managementListener = motan.TCPKeepAliveListener{listener.(*net.TCPListener)}
1148+
managementListener = motan.TCPKeepAliveListener{TCPListener: listener.(*net.TCPListener)}
11621149
}
11631150

11641151
vlog.Infof("start listen manage for address: %s", managementListener.Addr().String())

agent_test.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
vlog "github.com/weibocom/motan-go/log"
1212
"github.com/weibocom/motan-go/registry"
1313
"github.com/weibocom/motan-go/serialize"
14+
"github.com/weibocom/motan-go/server"
1415
_ "github.com/weibocom/motan-go/server"
1516
_ "golang.org/x/net/context"
1617
"io/ioutil"
@@ -63,6 +64,7 @@ motan-agent:
6364
log_dir: "stdout"
6465
snapshot_dir: "./snapshot"
6566
application: "testing"
67+
processPoolSize: 100
6668
6769
motan-registry:
6870
direct:
@@ -87,6 +89,7 @@ motan-refer:
8789
resp := c1.BaseCall(req, nil)
8890
assert.Nil(t, resp.GetException())
8991
assert.Equal(t, "Hello jack from motan server", resp.GetValue())
92+
assert.Equal(t, 100, server.GetProcessPoolSize())
9093
}
9194
func Test_unixClientCall2(t *testing.T) {
9295
t.Parallel()
@@ -387,7 +390,7 @@ func TestAgent_InitCall(t *testing.T) {
387390
}
388391

389392
//test init cluster with one path and one groups in clusterMap
390-
temp := agent.clusterMap.LoadOrNil(getClusterKey("test1", "0.1", "", ""))
393+
temp := agent.clusterMap.LoadOrNil(getClusterKey("test1", "1.0", "", ""))
391394
assert.NotNil(t, temp, "init cluster with one path and two groups in clusterMap fail")
392395

393396
//test agentHandler call with group
@@ -413,15 +416,18 @@ func TestAgent_InitCall(t *testing.T) {
413416
version string
414417
except string
415418
}{
419+
// only input service,and there is only one cluster,findCluster would return successfully
416420
{"test0", "", "", "", "No refers for request"},
417-
{"test-1", "111", "222", "333", "cluster not found. cluster:test-1"},
418-
{"test3", "", "", "", "empty group is not supported"},
421+
{"test0", "g0", "", "", "No refers for request"},
422+
{"test0", "g0", "http", "", "No refers for request"},
423+
{"test0", "g0", "", "1.3", "No refers for request"},
424+
{"test-1", "111", "222", "333", "cluster not found"},
419425
{"test", "g2", "", "", "No refers for request"},
420-
{"test", "g1", "", "", "empty protocol is not supported"},
421426
{"test", "g1", "motan2", "", "No refers for request"},
422-
{"test", "g1", "motan", "", "empty version is not supported"},
423427
{"test", "g1", "http", "1.3", "No refers for request"},
424-
{"test", "g1", "http", "1.2", "less condition to select cluster"},
428+
{"test", "b", "c", "d", "no cluster matches the request"},
429+
// one service matches multiple clusters, without passing group
430+
{"test", "", "c", "d", "multiple clusters are matched with service"},
425431
} {
426432
request.ServiceName = v.service
427433
request.SetAttachment(mpro.MGroup, v.group)
@@ -479,10 +485,9 @@ func TestAgent_InitCall(t *testing.T) {
479485
version string
480486
except string
481487
}{
482-
{"test3", "111", "222", "333", "cluster not found. cluster:test3"},
483-
{"test4", "", "", "", "empty group is not supported"},
488+
{"test3", "111", "222", "333", "cluster not found. service: test3"},
484489
{"test5", "", "", "", "No refers for request"},
485-
{"helloService2", "", "", "", "cluster not found. cluster:helloService2"},
490+
{"helloService2", "", "", "", "cluster not found. service: helloService2"},
486491
} {
487492
request = newRequest(v.service, "")
488493
request.SetAttachment(mpro.MGroup, v.group)
@@ -633,7 +638,7 @@ motan-service:
633638
c1.Initialize()
634639
var reply []byte
635640
req := c1.BuildRequestWithGroup("helloService", "/unixclient", []interface{}{}, "hello")
636-
req.SetAttachment("HTTP_HOST", "test.com")
641+
req.SetAttachment("http_Host", "test.com")
637642
resp := c1.BaseCall(req, &reply)
638643
assert.Nil(t, resp.GetException())
639644
assert.Equal(t, "okay", string(reply))

cluster/command.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,11 @@ type CmdList []ClientCommand
108108
func (c CmdList) Len() int {
109109
return len(c)
110110
}
111+
111112
func (c CmdList) Swap(i, j int) {
112113
c[i], c[j] = c[j], c[i]
113114
}
115+
114116
func (c CmdList) Less(i, j int) bool {
115117
return c[i].Index < c[j].Index
116118
}
@@ -149,7 +151,7 @@ func GetCommandRegistryWrapper(cluster *MotanCluster, registry motan.Registry) m
149151
mixGroups := cluster.GetURL().GetParam(motan.MixGroups, "")
150152
if mixGroups != "" {
151153
groups := strings.Split(mixGroups, ",")
152-
command := &ClientCommand{CommandType: CMDTrafficControl, Index: 0, Version: "1.0", MergeGroups: make([]string, 0, len(groups)+1)}
154+
command := &ClientCommand{CommandType: CMDTrafficControl, Index: 0, Version: motan.DefaultReferVersion, MergeGroups: make([]string, 0, len(groups)+1)}
153155
ownGroup := cluster.GetURL().Group
154156
command.MergeGroups = append(command.MergeGroups, ownGroup)
155157
for _, group := range groups {

cluster/motanCluster.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ func (m *MotanCluster) GetURL() *motan.URL {
6060
func (m *MotanCluster) SetURL(url *motan.URL) {
6161
m.url = url
6262
}
63+
6364
func (m *MotanCluster) Call(request motan.Request) (res motan.Response) {
6465
defer motan.HandlePanic(func() {
6566
res = motan.BuildExceptionResponse(request.GetRequestID(), &motan.Exception{ErrCode: 500, ErrMsg: "cluster call panic", ErrType: motan.ServiceException})
@@ -71,6 +72,7 @@ func (m *MotanCluster) Call(request motan.Request) (res motan.Response) {
7172
vlog.Infoln("cluster:" + m.GetIdentity() + "is not available!")
7273
return motan.BuildExceptionResponse(request.GetRequestID(), &motan.Exception{ErrCode: 500, ErrMsg: "cluster not available, maybe caused by degrade", ErrType: motan.ServiceException})
7374
}
75+
7476
func (m *MotanCluster) initCluster() bool {
7577
m.registryRefers = make(map[string][]motan.EndPoint)
7678
//ha
@@ -99,15 +101,19 @@ func (m *MotanCluster) initCluster() bool {
99101
vlog.Infof("init MotanCluster %s", m.GetIdentity())
100102
return true
101103
}
104+
102105
func (m *MotanCluster) SetLoadBalance(loadBalance motan.LoadBalance) {
103106
m.LoadBalance = loadBalance
104107
}
108+
105109
func (m *MotanCluster) SetHaStrategy(haStrategy motan.HaStrategy) {
106110
m.HaStrategy = haStrategy
107111
}
112+
108113
func (m *MotanCluster) GetRefers() []motan.EndPoint {
109114
return m.Refers
110115
}
116+
111117
func (m *MotanCluster) refresh() {
112118
newRefers := make([]motan.EndPoint, 0, 32)
113119
for _, v := range m.registryRefers {
@@ -120,14 +126,17 @@ func (m *MotanCluster) refresh() {
120126
m.Refers = newRefers
121127
m.LoadBalance.OnRefresh(newRefers)
122128
}
129+
123130
func (m *MotanCluster) ShuffleEndpoints(endpoints []motan.EndPoint) []motan.EndPoint {
124131
rand.Seed(time.Now().UnixNano())
125132
rand.Shuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
126133
return endpoints
127134
}
135+
128136
func (m *MotanCluster) AddRegistry(registry motan.Registry) {
129137
m.Registries = append(m.Registries, registry)
130138
}
139+
131140
func (m *MotanCluster) Notify(registryURL *motan.URL, urls []*motan.URL) {
132141
vlog.Infof("cluster %s receive notify size %d. ", m.GetIdentity(), len(urls))
133142
m.notifyLock.Lock()

0 commit comments

Comments
 (0)