Skip to content

Commit

Permalink
feat(dial): add dial testing for blobstore
Browse files Browse the repository at this point in the history
Signed-off-by: slasher <[email protected]>
  • Loading branch information
sejust committed Jul 21, 2023
1 parent ca609d6 commit 41dab33
Show file tree
Hide file tree
Showing 7 changed files with 510 additions and 2 deletions.
152 changes: 152 additions & 0 deletions blobstore/testing/dial/connect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright 2023 The CubeFS Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package dial

import (
"context"
"errors"
"io"
"math/rand"

"github.com/cubefs/cubefs/blobstore/api/access"
"github.com/cubefs/cubefs/blobstore/common/trace"
"github.com/cubefs/cubefs/blobstore/util/retry"
"github.com/cubefs/cubefs/blobstore/util/task"
)

var dollars = newBuffer(36)

func newBuffer(val byte) (buff [1 << 20]byte) {
for idx := range buff {
buff[idx] = val
}
return
}

type anyReader struct {
remain int
}

func (r *anyReader) Read(p []byte) (n int, err error) {
if r.remain <= 0 {
err = io.EOF
return
}
for r.remain > 0 && len(p) > 0 {
remain := r.remain
if remain > len(dollars) {
remain = len(dollars)
}
nn := copy(p, dollars[:remain])
n += nn
p = p[nn:]
r.remain -= nn
}
return
}

func runConnection(conn Connection) {
if conn.Sequentially {
for ii := 0; ii < conn.N; ii++ {
connect(conn)
}
return
}

tasks := make([]func() error, conn.N)
for idx := range tasks {
tasks[idx] = func() error {
connect(conn)
return nil
}
}
task.Run(context.Background(), tasks...)
}

func connect(conn Connection) {
var (
span, ctx = trace.StartSpanFromContextWithTraceID(
context.Background(), "dial", "dial-"+hostname+"-"+trace.RandomID().String())
body io.ReadCloser
location access.Location
err error
)
defer func() {
if err == nil {
return
}
if err = retry.Timed(10, 1).On(func() error {
_, e := conn.api.Delete(ctx, &access.DeleteArgs{
Locations: []access.Location{location},
})
return e
}); err != nil {
span.Error("delete", err)
}
}()

span.Infof("to upload conn:%v", conn)
runTimer(conn, "put", conn.Size, func() error {
location, _, err = conn.api.Put(ctx, &access.PutArgs{
Size: int64(conn.Size),
Hashes: access.HashAlgMD5 | access.HashAlgSHA1,
Body: &anyReader{remain: conn.Size},
})
return err
})
if err != nil {
span.Error(conn, "put error:", err)
err = nil
return
}

readSize := 10
if readSize > conn.Size-1 {
readSize = conn.Size - 1
}
readOffset := rand.Intn(conn.Size - readSize)

span.Infof("to download conn:%v offset:%d-(%d) location:%+v", conn, readOffset, readSize, location)
runTimer(conn, "get", readSize, func() error {
body, err = conn.api.Get(ctx, &access.GetArgs{
Location: location,
Offset: uint64(readOffset),
ReadSize: uint64(readSize),
})
if err != nil {
return err
}
defer body.Close()

buff, e := io.ReadAll(body)
if e != nil {
return e
}
for idx := range buff {
if buff[idx] != dollars[0] {
return errors.New("checksum mismatched")
}
}
return nil
})

span.Infof("to delete conn:%v", conn)
runTimer(conn, "del", 0, func() error {
_, err = conn.api.Delete(ctx, &access.DeleteArgs{
Locations: []access.Location{location},
})
return err
})
}
63 changes: 63 additions & 0 deletions blobstore/testing/dial/dial.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2023 The CubeFS Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package dial

import (
"net/http"

"github.com/cubefs/cubefs/blobstore/cmd"
"github.com/cubefs/cubefs/blobstore/common/config"
"github.com/cubefs/cubefs/blobstore/common/rpc"
)

var (
gService *serviceDial
gConfig serviceConfig
)

func init() {
cmd.RegisterModule(&cmd.Module{
Name: "DialTest",
InitConfig: initConfig,
SetUp: setUp,
TearDown: tearDown,
})
}

func initConfig(args []string) (*cmd.Config, error) {
config.Init("f", "", "dial.conf")
if err := config.Load(&gConfig); err != nil {
return nil, err
}
return &gConfig.Config, nil
}

func setUp() (*rpc.Router, []rpc.ProgressHandler) {
gService = newService(gConfig)
if err := gService.Start(); err != nil {
panic(err)
}
return newHandler(gService), nil
}

func tearDown() {
gService.Close()
}

func newHandler(service *serviceDial) *rpc.Router {
router := rpc.New()
router.Handle(http.MethodGet, "/status", service.Status)
return router
}
33 changes: 33 additions & 0 deletions blobstore/testing/dial/main/dial.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"max_procs": 4,
"bind_addr": ":9590",
"log": {
"level": "debug",
"filename": "/tmp/dial.log"
},
"dial_setting": {
"interval_s": 300,
"size": 1048576,
"n": 10,
"sequentially": false
},
"dial_connections": [
{
"region": "region-1",
"cluster": "cluster-1",
"idc": "idc-1",
"consul_addr": "localhost:8500",
"host_addrs": []
},
{
"size": 1024,
"n": 7,
"sequentially": true,
"region": "region-2",
"cluster": "cluster-2",
"idc": "idc-2",
"consul_addr": "localhost:8500",
"host_addrs": []
}
]
}
26 changes: 26 additions & 0 deletions blobstore/testing/dial/main/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2023 The CubeFS Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package main

import (
"os"

"github.com/cubefs/cubefs/blobstore/cmd"
_ "github.com/cubefs/cubefs/blobstore/testing/dial"
)

func main() {
cmd.Main(os.Args)
}
95 changes: 95 additions & 0 deletions blobstore/testing/dial/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2023 The CubeFS Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package dial

import (
"os"
"strconv"
"time"

"github.com/cubefs/cubefs/blobstore/common/rpc"
"github.com/prometheus/client_golang/prometheus"
)

const (
namespace = "dialtest"
subsystem = "blobstore"
)

var (
labels = []string{"region", "cluster", "idc", "consul", "method"}
buckets = []float64{1, 5, 10, 50, 200, 1000, 5000}

hostname = getHostname()
constLabels = map[string]string{"host": hostname}
)

var (
httpcodeMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "httpcode",
Help: "http code counter",
ConstLabels: constLabels,
}, append(labels, "code"),
)
throughputMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "throughput",
Help: "http bytes throughput",
ConstLabels: constLabels,
}, labels,
)
latencyMetric = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "latency",
Help: "http latency duration ms",
Buckets: buckets[:],
ConstLabels: constLabels,
}, labels,
)
)

func init() {
prometheus.MustRegister(httpcodeMetric)
prometheus.MustRegister(throughputMetric)
prometheus.MustRegister(latencyMetric)
}

func getHostname() string {
hostname, _ := os.Hostname()
return hostname
}

func runTimer(conn Connection, method string, size int, f func() error) {
st := time.Now()
err := f()
duration := time.Since(st)

code := rpc.DetectStatusCode(err)
region, cluster, idc, consul := conn.Region, conn.Cluster, conn.IDC, conn.ConsulAddr
httpcodeMetric.WithLabelValues(region, cluster, idc, consul, method, strconv.Itoa(code)).Inc()

d := duration.Milliseconds()
latencyMetric.WithLabelValues(region, cluster, idc, consul, method).Observe(float64(d))
if code < 300 && size > 0 && d > 0 {
throughputMetric.WithLabelValues(region, cluster, idc, consul, method).Add(float64(size) / (float64(d) / 1000))
}
}
Loading

0 comments on commit 41dab33

Please sign in to comment.