Skip to content

Commit

Permalink
feat(auditlog): use structured data directly to reduce the parsing pr…
Browse files Browse the repository at this point in the history
…ocess

close: cubefs#1913

Signed-off-by: setcy <[email protected]>
  • Loading branch information
setcy authored and sejust committed Jul 7, 2023
1 parent 019b302 commit 2d5176f
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 37 deletions.
111 changes: 81 additions & 30 deletions blobstore/common/rpc/auditlog/auditlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package auditlog

import (
"bytes"
"encoding/json"
"net/http"
"runtime/debug"
"strconv"
Expand Down Expand Up @@ -47,6 +48,55 @@ type jsonAuditlog struct {
cfg *Config
}

// AuditLog Define a struct to represent the structured log data
type AuditLog struct {
ReqType string `json:"req_type"`
Module string `json:"module"`
StartTime int64 `json:"start_time"`
Method string `json:"method"`
Path string `json:"path"`
ReqHeader M `json:"req_header"`
ReqParams string `json:"req_params"`
StatusCode int `json:"status_code"`
RespHeader M `json:"resp_header"`
RespBody string `json:"resp_body"`
RespLength int64 `json:"resp_length"`
Duration int64 `json:"duration"`
}

func (a *AuditLog) ToBytesWithTab(buf *bytes.Buffer) (b []byte) {
buf.WriteString(a.ReqType)
buf.WriteByte('\t')
buf.WriteString(a.Module)
buf.WriteByte('\t')
buf.WriteString(strconv.FormatInt(a.StartTime, 10))
buf.WriteByte('\t')
buf.WriteString(a.Method)
buf.WriteByte('\t')
buf.WriteString(a.Path)
buf.WriteByte('\t')
buf.Write(a.ReqHeader.Encode())
buf.WriteByte('\t')
buf.WriteString(a.ReqParams)
buf.WriteByte('\t')
buf.WriteString(strconv.Itoa(a.StatusCode))
buf.WriteByte('\t')
buf.Write(a.RespHeader.Encode())
buf.WriteByte('\t')
buf.WriteString(a.RespBody)
buf.WriteByte('\t')
buf.WriteString(strconv.FormatInt(a.RespLength, 10))
buf.WriteByte('\t')
buf.WriteString(strconv.FormatInt(a.Duration, 10))
buf.WriteByte('\n')
return buf.Bytes()
}

func (a *AuditLog) ToJson() (b []byte) {
b, _ = json.Marshal(a)
return
}

func Open(module string, cfg *Config) (ph rpc.ProgressHandler, logFile LogCloser, err error) {
if cfg.BodyLimit < 0 {
cfg.BodyLimit = 0
Expand Down Expand Up @@ -96,6 +146,10 @@ func Open(module string, cfg *Config) (ph rpc.ProgressHandler, logFile LogCloser
}

func (j *jsonAuditlog) Handler(w http.ResponseWriter, req *http.Request, f func(http.ResponseWriter, *http.Request)) {
var (
logBytes []byte
err error
)
startTime := time.Now().UnixNano()

span, ctx := trace.StartSpanFromHTTPHeaderSafe(req, j.module)
Expand Down Expand Up @@ -131,28 +185,22 @@ func (j *jsonAuditlog) Handler(w http.ResponseWriter, req *http.Request, f func(
defer j.logPool.Put(b)
b.Reset()

b.WriteString("REQ\t")
b.WriteString(j.module)
b.WriteByte('\t')

// record request info
b.WriteString(strconv.FormatInt(startTime/100, 10))
b.WriteByte('\t')
b.WriteString(req.Method)
b.WriteByte('\t')
b.WriteString(decodeReq.Path)
b.WriteByte('\t')
b.Write(decodeReq.Header.Encode())
b.WriteByte('\t')
auditLog := &AuditLog{
ReqType: "REQ",
Module: j.module,
StartTime: startTime / 100,
Method: req.Method,
Path: decodeReq.Path,
ReqHeader: decodeReq.Header,
}

if len(decodeReq.Params) <= maxSeekableBodyLength && len(decodeReq.Params) > 0 {
b.Write(decodeReq.Params)
auditLog.ReqParams = string(decodeReq.Params)
}
b.WriteByte('\t')

// record response info
respContentType := _w.Header().Get("Content-Type")
b.Write(_w.getStatusCode())
b.WriteByte('\t')
auditLog.StatusCode = _w.getStatusCode()

// Check if track-log and tags changed or not,
// if changed, we should set into response header again.
Expand All @@ -167,28 +215,31 @@ func (j *jsonAuditlog) Handler(w http.ResponseWriter, req *http.Request, f func(
if len(wHeader[rpc.HeaderTraceTags]) < len(tags) {
wHeader[rpc.HeaderTraceTags] = tags
}
b.Write(_w.getHeader())
b.WriteByte('\t')
auditLog.RespHeader = _w.getHeader()

// record body in json or xml content type
if (respContentType == rpc.MIMEJSON || respContentType == rpc.MIMEXML) &&
_w.Header().Get("Content-Encoding") != rpc.GzipEncodingType {
b.Write(_w.getBody())
auditLog.RespBody = string(_w.getBody())
}
b.WriteByte('\t')
b.WriteString(strconv.FormatInt(_w.getBodyWritten(), 10))
b.WriteByte('\t')
b.WriteString(strconv.FormatInt(endTime-startTime/1000, 10))
b.WriteByte('\n')

// report request metric
j.metricSender.Send(b.Bytes())
auditLog.RespLength = _w.getBodyWritten()
auditLog.Duration = endTime - startTime/1000

j.metricSender.Send(auditLog.ToBytesWithTab(b))

// log filter
if j.logFile == nil || (len(j.cfg.KeywordsFilter) > 0 && defaultLogFilter(req, j.cfg.KeywordsFilter)) {
return
}
err := j.logFile.Log(b.Bytes())

switch j.cfg.LogFormat {
case LogFormatText:
logBytes = auditLog.ToBytesWithTab(b)
case LogFormatJSON:
logBytes = auditLog.ToJson()
default:
logBytes = auditLog.ToBytesWithTab(b)
}
err = j.logFile.Log(logBytes)
if err != nil {
span.Errorf("jsonlog.Handler Log failed, err: %s", err.Error())
return
Expand Down
8 changes: 8 additions & 0 deletions blobstore/common/rpc/auditlog/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ package auditlog

import "net/http"

const (
LogFormatText = "text"
LogFormatJSON = "json"
)

type Config struct {
// LogDir audit log whether to enable depend on whether config log dir
LogDir string `json:"logdir"`
Expand All @@ -33,6 +38,9 @@ type Config struct {

// KeywordsFilter log filter based on uri and request method
KeywordsFilter []string `json:"keywords_filter"`

// LogFormat valid value is "text" or "json", default is "text"
LogFormat string `json:"log_format"`
}

// LogCloser a implemented audit logger should implements ProgressHandler
Expand Down
11 changes: 4 additions & 7 deletions blobstore/common/rpc/auditlog/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ func (w *responseWriter) getBody() []byte {
return w.body[:w.n]
}

func (w *responseWriter) getStatusCode() []byte {
return []byte(strconv.Itoa(w.statusCode))
func (w *responseWriter) getStatusCode() int {
return w.statusCode
}

func (w *responseWriter) getHeader() []byte {
func (w *responseWriter) getHeader() M {
header := w.ResponseWriter.Header()
headerM := make(M)
for k := range header {
Expand All @@ -111,10 +111,7 @@ func (w *responseWriter) getHeader() []byte {
headerM[k] = header.Get(k)
}
}
if len(headerM) > 0 {
return headerM.Encode()
}
return nil
return headerM
}

func (w *responseWriter) getBodyWritten() int64 {
Expand Down

0 comments on commit 2d5176f

Please sign in to comment.