From 41dab33066d0f5f0e3743ddd38fe8afa6b61b8f8 Mon Sep 17 00:00:00 2001 From: slasher Date: Wed, 19 Jul 2023 17:42:43 +0800 Subject: [PATCH] feat(dial): add dial testing for blobstore Signed-off-by: slasher --- blobstore/testing/dial/connect.go | 152 ++++++++++++++++++++++++++ blobstore/testing/dial/dial.go | 63 +++++++++++ blobstore/testing/dial/main/dial.conf | 33 ++++++ blobstore/testing/dial/main/main.go | 26 +++++ blobstore/testing/dial/metric.go | 95 ++++++++++++++++ blobstore/testing/dial/service.go | 133 ++++++++++++++++++++++ build/build.sh | 10 +- 7 files changed, 510 insertions(+), 2 deletions(-) create mode 100644 blobstore/testing/dial/connect.go create mode 100644 blobstore/testing/dial/dial.go create mode 100644 blobstore/testing/dial/main/dial.conf create mode 100644 blobstore/testing/dial/main/main.go create mode 100644 blobstore/testing/dial/metric.go create mode 100644 blobstore/testing/dial/service.go diff --git a/blobstore/testing/dial/connect.go b/blobstore/testing/dial/connect.go new file mode 100644 index 0000000000..8469b30b62 --- /dev/null +++ b/blobstore/testing/dial/connect.go @@ -0,0 +1,152 @@ +// Copyright 2023 The CubeFS Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package dial + +import ( + "context" + "errors" + "io" + "math/rand" + + "github.com/cubefs/cubefs/blobstore/api/access" + "github.com/cubefs/cubefs/blobstore/common/trace" + "github.com/cubefs/cubefs/blobstore/util/retry" + "github.com/cubefs/cubefs/blobstore/util/task" +) + +var dollars = newBuffer(36) + +func newBuffer(val byte) (buff [1 << 20]byte) { + for idx := range buff { + buff[idx] = val + } + return +} + +type anyReader struct { + remain int +} + +func (r *anyReader) Read(p []byte) (n int, err error) { + if r.remain <= 0 { + err = io.EOF + return + } + for r.remain > 0 && len(p) > 0 { + remain := r.remain + if remain > len(dollars) { + remain = len(dollars) + } + nn := copy(p, dollars[:remain]) + n += nn + p = p[nn:] + r.remain -= nn + } + return +} + +func runConnection(conn Connection) { + if conn.Sequentially { + for ii := 0; ii < conn.N; ii++ { + connect(conn) + } + return + } + + tasks := make([]func() error, conn.N) + for idx := range tasks { + tasks[idx] = func() error { + connect(conn) + return nil + } + } + task.Run(context.Background(), tasks...) +} + +func connect(conn Connection) { + var ( + span, ctx = trace.StartSpanFromContextWithTraceID( + context.Background(), "dial", "dial-"+hostname+"-"+trace.RandomID().String()) + body io.ReadCloser + location access.Location + err error + ) + defer func() { + if err == nil { + return + } + if err = retry.Timed(10, 1).On(func() error { + _, e := conn.api.Delete(ctx, &access.DeleteArgs{ + Locations: []access.Location{location}, + }) + return e + }); err != nil { + span.Error("delete", err) + } + }() + + span.Infof("to upload conn:%v", conn) + runTimer(conn, "put", conn.Size, func() error { + location, _, err = conn.api.Put(ctx, &access.PutArgs{ + Size: int64(conn.Size), + Hashes: access.HashAlgMD5 | access.HashAlgSHA1, + Body: &anyReader{remain: conn.Size}, + }) + return err + }) + if err != nil { + span.Error(conn, "put error:", err) + err = nil + return + } + + readSize := 10 + if readSize > conn.Size-1 { + readSize = conn.Size - 1 + } + readOffset := rand.Intn(conn.Size - readSize) + + span.Infof("to download conn:%v offset:%d-(%d) location:%+v", conn, readOffset, readSize, location) + runTimer(conn, "get", readSize, func() error { + body, err = conn.api.Get(ctx, &access.GetArgs{ + Location: location, + Offset: uint64(readOffset), + ReadSize: uint64(readSize), + }) + if err != nil { + return err + } + defer body.Close() + + buff, e := io.ReadAll(body) + if e != nil { + return e + } + for idx := range buff { + if buff[idx] != dollars[0] { + return errors.New("checksum mismatched") + } + } + return nil + }) + + span.Infof("to delete conn:%v", conn) + runTimer(conn, "del", 0, func() error { + _, err = conn.api.Delete(ctx, &access.DeleteArgs{ + Locations: []access.Location{location}, + }) + return err + }) +} diff --git a/blobstore/testing/dial/dial.go b/blobstore/testing/dial/dial.go new file mode 100644 index 0000000000..bda2308178 --- /dev/null +++ b/blobstore/testing/dial/dial.go @@ -0,0 +1,63 @@ +// Copyright 2023 The CubeFS Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package dial + +import ( + "net/http" + + "github.com/cubefs/cubefs/blobstore/cmd" + "github.com/cubefs/cubefs/blobstore/common/config" + "github.com/cubefs/cubefs/blobstore/common/rpc" +) + +var ( + gService *serviceDial + gConfig serviceConfig +) + +func init() { + cmd.RegisterModule(&cmd.Module{ + Name: "DialTest", + InitConfig: initConfig, + SetUp: setUp, + TearDown: tearDown, + }) +} + +func initConfig(args []string) (*cmd.Config, error) { + config.Init("f", "", "dial.conf") + if err := config.Load(&gConfig); err != nil { + return nil, err + } + return &gConfig.Config, nil +} + +func setUp() (*rpc.Router, []rpc.ProgressHandler) { + gService = newService(gConfig) + if err := gService.Start(); err != nil { + panic(err) + } + return newHandler(gService), nil +} + +func tearDown() { + gService.Close() +} + +func newHandler(service *serviceDial) *rpc.Router { + router := rpc.New() + router.Handle(http.MethodGet, "/status", service.Status) + return router +} diff --git a/blobstore/testing/dial/main/dial.conf b/blobstore/testing/dial/main/dial.conf new file mode 100644 index 0000000000..71b82ab7a6 --- /dev/null +++ b/blobstore/testing/dial/main/dial.conf @@ -0,0 +1,33 @@ +{ + "max_procs": 4, + "bind_addr": ":9590", + "log": { + "level": "debug", + "filename": "/tmp/dial.log" + }, + "dial_setting": { + "interval_s": 300, + "size": 1048576, + "n": 10, + "sequentially": false + }, + "dial_connections": [ + { + "region": "region-1", + "cluster": "cluster-1", + "idc": "idc-1", + "consul_addr": "localhost:8500", + "host_addrs": [] + }, + { + "size": 1024, + "n": 7, + "sequentially": true, + "region": "region-2", + "cluster": "cluster-2", + "idc": "idc-2", + "consul_addr": "localhost:8500", + "host_addrs": [] + } + ] +} diff --git a/blobstore/testing/dial/main/main.go b/blobstore/testing/dial/main/main.go new file mode 100644 index 0000000000..06695d5ac5 --- /dev/null +++ b/blobstore/testing/dial/main/main.go @@ -0,0 +1,26 @@ +// Copyright 2023 The CubeFS Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package main + +import ( + "os" + + "github.com/cubefs/cubefs/blobstore/cmd" + _ "github.com/cubefs/cubefs/blobstore/testing/dial" +) + +func main() { + cmd.Main(os.Args) +} diff --git a/blobstore/testing/dial/metric.go b/blobstore/testing/dial/metric.go new file mode 100644 index 0000000000..c7354f0b1b --- /dev/null +++ b/blobstore/testing/dial/metric.go @@ -0,0 +1,95 @@ +// Copyright 2023 The CubeFS Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package dial + +import ( + "os" + "strconv" + "time" + + "github.com/cubefs/cubefs/blobstore/common/rpc" + "github.com/prometheus/client_golang/prometheus" +) + +const ( + namespace = "dialtest" + subsystem = "blobstore" +) + +var ( + labels = []string{"region", "cluster", "idc", "consul", "method"} + buckets = []float64{1, 5, 10, 50, 200, 1000, 5000} + + hostname = getHostname() + constLabels = map[string]string{"host": hostname} +) + +var ( + httpcodeMetric = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "httpcode", + Help: "http code counter", + ConstLabels: constLabels, + }, append(labels, "code"), + ) + throughputMetric = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "throughput", + Help: "http bytes throughput", + ConstLabels: constLabels, + }, labels, + ) + latencyMetric = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "latency", + Help: "http latency duration ms", + Buckets: buckets[:], + ConstLabels: constLabels, + }, labels, + ) +) + +func init() { + prometheus.MustRegister(httpcodeMetric) + prometheus.MustRegister(throughputMetric) + prometheus.MustRegister(latencyMetric) +} + +func getHostname() string { + hostname, _ := os.Hostname() + return hostname +} + +func runTimer(conn Connection, method string, size int, f func() error) { + st := time.Now() + err := f() + duration := time.Since(st) + + code := rpc.DetectStatusCode(err) + region, cluster, idc, consul := conn.Region, conn.Cluster, conn.IDC, conn.ConsulAddr + httpcodeMetric.WithLabelValues(region, cluster, idc, consul, method, strconv.Itoa(code)).Inc() + + d := duration.Milliseconds() + latencyMetric.WithLabelValues(region, cluster, idc, consul, method).Observe(float64(d)) + if code < 300 && size > 0 && d > 0 { + throughputMetric.WithLabelValues(region, cluster, idc, consul, method).Add(float64(size) / (float64(d) / 1000)) + } +} diff --git a/blobstore/testing/dial/service.go b/blobstore/testing/dial/service.go new file mode 100644 index 0000000000..1f36629f39 --- /dev/null +++ b/blobstore/testing/dial/service.go @@ -0,0 +1,133 @@ +// Copyright 2023 The CubeFS Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package dial + +import ( + "encoding/json" + "time" + + "github.com/cubefs/cubefs/blobstore/api/access" + "github.com/cubefs/cubefs/blobstore/cmd" + "github.com/cubefs/cubefs/blobstore/common/rpc" + "github.com/cubefs/cubefs/blobstore/common/trace" + "github.com/cubefs/cubefs/blobstore/util/closer" + "github.com/cubefs/cubefs/blobstore/util/defaulter" + "github.com/cubefs/cubefs/blobstore/util/log" +) + +type ( + serviceConfig struct { + cmd.Config + + DialSetting Setting `json:"dial_setting"` + DialConnections []Connection `json:"dial_connections"` + } + serviceDial struct { + closer.Closer + config serviceConfig + } + + // Setting dial setting + Setting struct { + IntervalS int `json:"interval_s"` + Size int `json:"size"` + N int `json:"n"` + Sequentially bool `json:"sequentially"` + } + + // Connection dial connection + Connection struct { + Setting + + Region string `json:"region"` + Cluster string `json:"cluster"` + IDC string `json:"idc"` + ConsulAddr string `json:"consul_addr"` + HostAddrs []string `json:"host_addrs"` + + api access.API `json:"-"` + } +) + +func (c *serviceConfig) defaulter() { + defaulter.LessOrEqual(&c.DialSetting.IntervalS, 5*60*1000) + defaulter.LessOrEqual(&c.DialSetting.Size, 1<<20) + defaulter.LessOrEqual(&c.DialSetting.N, 10) + for idx := range c.DialConnections { + defaulter.LessOrEqual(&c.DialConnections[idx].Setting.IntervalS, c.DialSetting.IntervalS) + defaulter.LessOrEqual(&c.DialConnections[idx].Setting.Size, c.DialSetting.Size) + defaulter.LessOrEqual(&c.DialConnections[idx].Setting.N, c.DialSetting.N) + } +} + +func newService(cfg serviceConfig) *serviceDial { + cfg.defaulter() + val, _ := json.MarshalIndent(cfg, " ", " ") + log.Infof("load config:\n%s", string(val)) + return &serviceDial{ + Closer: closer.New(), + config: cfg, + } +} + +func (s *serviceDial) Status(c *rpc.Context) { + span := trace.SpanFromContextSafe(c.Request.Context()) + span.Info("hello.") + c.Respond() +} + +func (s *serviceDial) Start() error { + conns := make([]Connection, 0, len(s.config.DialConnections)) + for _, conn := range s.config.DialConnections { + cfg := access.Config{ + ConnMode: access.GeneralConnMode, + Consul: access.ConsulConfig{Address: conn.ConsulAddr}, + LogLevel: s.config.LogConf.Level, + } + if conn.ConsulAddr == "" { + cfg.PriorityAddrs = conn.HostAddrs[:] + } + + api, err := access.New(cfg) + if err != nil { + log.Error(err) + return err + } + + conn.api = api + conns = append(conns, conn) + } + + for _, conn := range conns { + log.Infof("start %+v", conn) + s.StartConnection(conn) + } + return nil +} + +func (s *serviceDial) StartConnection(conn Connection) { + go func() { + ticker := time.NewTicker(time.Second * time.Duration(conn.IntervalS)) + defer ticker.Stop() + for { + select { + case <-s.Done(): + return + case <-ticker.C: + runConnection(conn) + } + } + }() +} diff --git a/build/build.sh b/build/build.sh index 9fbb601bc3..e6bcb131ff 100755 --- a/build/build.sh +++ b/build/build.sh @@ -10,7 +10,7 @@ VendorPath=${RootPath}/vendor DependsPath=${RootPath}/depends use_clang=$(echo ${CC} | grep "clang" | grep -v "grep") cgo_ldflags="-L${BuildDependsLibPath} -lrocksdb -lz -lbz2 -lsnappy -llz4 -lzstd -lstdc++" -if [ "${use_clang}" != "" ]; then +if [ "${use_clang}" != "" ]; then cgo_ldflags="-L${BuildDependsLibPath} -lrocksdb -lz -lbz2 -lsnappy -llz4 -lzstd -lc++" fi cgo_cflags="-I${BuildDependsIncludePath}" @@ -302,10 +302,16 @@ build_blobstore_cli() { CGO_ENABLED=1 go build ${MODFLAGS} -gcflags=all=-trimpath=${SrcPath} -asmflags=all=-trimpath=${SrcPath} -ldflags="${LDFlags}" -o ${BuildBinPath}/blobstore/blobstore-cli ${SrcPath}/blobstore/cli/cli } +build_blobstore_dialtest() { + CGO_ENABLED=0 go build ${MODFLAGS} -gcflags=all=-trimpath=${SrcPath} -asmflags=all=-trimpath=${SrcPath} -ldflags="${LDFlags}" -o ${BuildBinPath}/blobstore/blobstore-dialtest ${SrcPath}/blobstore/testing/dial/main +} + build_blobstore() { pushd $SrcPath >/dev/null echo -n "build blobstore " - build_clustermgr && build_blobnode && build_access && build_scheduler && build_proxy && build_blobstore_cli && echo "success" || echo "failed" + build_clustermgr && build_blobnode && build_access && build_scheduler && build_proxy \ + && build_blobstore_cli && build_blobstore_dialtest \ + && echo "success" || echo "failed" popd >/dev/null }