Skip to content

Commit

Permalink
fix(rpc): a custom readCloser is used as request.body to count the si…
Browse files Browse the repository at this point in the history
…ze of the request body

in previous version,blobstore access upload no metric if there is no context-length in the request header,now the size is counted when read request body

Fixes cubefs#2120

Signed-off-by: shuqiang-zheng <[email protected]>
  • Loading branch information
shuqiang-zheng authored and sejust committed Jul 7, 2023
1 parent a4537b5 commit 46d1f78
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 19 deletions.
18 changes: 18 additions & 0 deletions blobstore/common/rpc/auditlog/auditlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package auditlog
import (
"bytes"
"encoding/json"
"io"
"net/http"
"runtime/debug"
"strconv"
Expand All @@ -36,6 +37,18 @@ const (
defaultFileChunkBits = 29
)

type reqBodyReadCloser struct {
bodyRead int64

io.ReadCloser
}

func (reqBody *reqBodyReadCloser) Read(p []byte) (n int, err error) {
n, err = reqBody.ReadCloser.Read(p)
reqBody.bodyRead += int64(n)
return
}

type jsonAuditlog struct {
module string
decoder Decoder
Expand Down Expand Up @@ -178,7 +191,12 @@ func (j *jsonAuditlog) Handler(w http.ResponseWriter, req *http.Request, f func(
w.WriteHeader(597)
}
}()

rc := &reqBodyReadCloser{ReadCloser: req.Body}
req.Body = rc
f(_w, req)
bodySize := rc.bodyRead
decodeReq.Header["BodySize"] = bodySize

endTime := time.Now().UnixNano() / 1000
b := j.logPool.Get().(*bytes.Buffer)
Expand Down
90 changes: 76 additions & 14 deletions blobstore/common/rpc/auditlog/auditlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,7 @@ type testRespData struct {
Result string `json:"result"`
}

var (
server *httptest.Server
tmpDir string
lc LogCloser
)

func initServer(t *testing.T) {
func initServer(t *testing.T) (server *httptest.Server, tmpDir string, lc LogCloser) {
moduleName := "TESTMOULE"
tracer := trace.NewTracer(moduleName)
trace.SetGlobalTracer(tracer)
Expand All @@ -56,12 +50,17 @@ func initServer(t *testing.T) {
ah, lc, err = Open(moduleName, &Config{
LogDir: tmpDir, ChunkBits: 29,
KeywordsFilter: []string{"Get"},
MetricConfig: PrometheusConfig{
Idc: moduleName,
},
})
require.NoError(t, err)
require.NotNil(t, ah)
require.NotNil(t, lc)

bussinessHandler := func(w http.ResponseWriter, req *http.Request) {
_, err := ioutil.ReadAll(req.Body)
require.NoError(t, err)
w.Header().Set("testh1", "testh1value")
w.Header().Set("Content-Type", rpc.MIMEJSON)
w.WriteHeader(http.StatusOK)
Expand All @@ -74,22 +73,59 @@ func initServer(t *testing.T) {
}

server = httptest.NewServer(http.HandlerFunc(entryHandler))
return server, tmpDir, lc
}

func close() {
server.Close()
os.RemoveAll(tmpDir)
lc.Close()
func initNoContentLengthServer(t *testing.T) (server *httptest.Server, tmpDir string, lc LogCloser) {
moduleName := "TESTNOCONTENTLENGTHMOULE"
tracer := trace.NewTracer(moduleName)
trace.SetGlobalTracer(tracer)

tmpDir = os.TempDir() + "/test-NoContentLength-log" + strconv.FormatInt(time.Now().Unix(), 10) + strconv.Itoa(rand.Intn(100000))
err := os.Mkdir(tmpDir, 0o755)
require.NoError(t, err)

var ah rpc.ProgressHandler
ah, lc, err = Open(moduleName, &Config{
LogDir: tmpDir, ChunkBits: 29,
MetricConfig: PrometheusConfig{
Idc: moduleName,
},
})
require.NoError(t, err)
require.NotNil(t, ah)
require.NotNil(t, lc)

noContentLengthHandler := func(w http.ResponseWriter, req *http.Request) {
buffered, err := ioutil.ReadAll(req.Body)
require.NoError(t, err)
bodySize := req.Body.(*reqBodyReadCloser).bodyRead
readSting := string(buffered[:bodySize])
w.Header().Set("Content-Type", rpc.MIMEJSON)
w.WriteHeader(http.StatusOK)
data, err := json.Marshal(testRespData{Result: readSting})
require.NoError(t, err)
w.Write(data)
}
entryHandler := func(w http.ResponseWriter, req *http.Request) {
ah.Handler(w, req, noContentLengthHandler)
}

server = httptest.NewServer(http.HandlerFunc(entryHandler))
return server, tmpDir, lc
}

func TestOpen(t *testing.T) {
initServer(t)
defer close()
server, tmpDir, lc := initServer(t)
defer func() {
server.Close()
os.RemoveAll(tmpDir)
lc.Close()
}()

url := server.URL
client := http.DefaultClient

// test keywords filter
req, err := http.NewRequest(http.MethodGet, url, nil)
require.NoError(t, err)
resp, err := client.Do(req)
Expand Down Expand Up @@ -132,6 +168,32 @@ func TestOpen(t *testing.T) {
require.Greater(t, len(dirEntries), 0)
}

func TestNoContentLength(t *testing.T) {
server, tmpDir, lc := initNoContentLengthServer(t)
defer func() {
server.Close()
os.RemoveAll(tmpDir)
lc.Close()
}()

url := server.URL
client := http.DefaultClient

body := strings.NewReader("{\"test1\":\"test1value\"}")
req, err := http.NewRequest(http.MethodPost, url, body)
require.NoError(t, err)
resp, err := client.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
b, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
respData := &testRespData{}
err = json.Unmarshal(b, respData)
require.NoError(t, err)
require.Equal(t, "{\"test1\":\"test1value\"}", respData.Result)
resp.Body.Close()
}

func TestBodylimit(t *testing.T) {
for _, limit := range []struct {
actual, expected int
Expand Down
6 changes: 1 addition & 5 deletions blobstore/common/rpc/auditlog/request_row.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ var vre = regexp.MustCompile("^v[0-9]+$")

type ReqHeader struct {
ContentLength string `json:"Content-Length"`
BodySize int64 `json:"bs"` // body size
BodySize int64 `json:"BodySize"` // body size
RawQuery string `json:"RawQuery"`
Host string `json:"Host"`
Token *Token `json:"Token"`
Expand Down Expand Up @@ -354,10 +354,6 @@ func (a *RequestRow) ReqLength() (reqLength int64) {
if reqHeader == nil {
return
}
reqLength, _ = strconv.ParseInt(reqHeader.ContentLength, 10, 64)
if reqLength > 0 {
return reqLength
}
return reqHeader.BodySize
}

Expand Down

0 comments on commit 46d1f78

Please sign in to comment.