Skip to content

Commit

Permalink
Refector HTTP protocol analyzer in access log module (#178)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu authored Feb 14, 2025
1 parent 40d03c1 commit 045ace9
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 10 deletions.
19 changes: 11 additions & 8 deletions pkg/accesslog/collector/protocols/http1.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ import (
var http1Log = logger.GetLogger("accesslog", "collector", "protocols", "http1")
var http1AnalyzeMaxRetryCount = 3

type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, request *reader.Request, response *reader.Response) error
type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, connection *PartitionConnection,
request *reader.Request, response *reader.Response) error

type HTTP1Protocol struct {
ctx *common.AccessLogContext
Expand Down Expand Up @@ -82,7 +83,7 @@ func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelpe
buf := connection.Buffer(enums.ConnectionProtocolHTTP)
http1Log.Debugf("ready to analyze HTTP/1 protocol data, connection ID: %d, random ID: %d, data len: %d",
metrics.ConnectionID, metrics.RandomID, buf.DataLength())
p.handleUnFinishedEvents(metrics)
p.handleUnFinishedEvents(metrics, connection)
buf.ResetForLoopReading()
for {
if !buf.PrepareForReading() {
Expand All @@ -103,7 +104,7 @@ func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelpe
case reader.MessageTypeRequest:
result, err = p.handleRequest(metrics, buf)
case reader.MessageTypeResponse:
result, err = p.handleResponse(metrics, buf)
result, err = p.handleResponse(metrics, connection, buf)
case reader.MessageTypeUnknown:
result = enums.ParseResultSkipPackage
}
Expand Down Expand Up @@ -148,7 +149,8 @@ func (p *HTTP1Protocol) handleRequest(metrics *HTTP1Metrics, buf *buffer.Buffer)
return result, nil
}

func (p *HTTP1Protocol) handleResponse(metrics *HTTP1Metrics, b *buffer.Buffer) (enums.ParseResult, error) {
func (p *HTTP1Protocol) handleResponse(metrics *HTTP1Metrics, connection *PartitionConnection,
b *buffer.Buffer) (enums.ParseResult, error) {
request := metrics.findMatchesRequest(b.Position().DataID(), b.Position().PrevDataID())
if request == nil {
log.Debugf("cannot found request for response, skip response, connection ID: %d, random ID: %d, "+
Expand All @@ -172,7 +174,7 @@ func (p *HTTP1Protocol) handleResponse(metrics *HTTP1Metrics, b *buffer.Buffer)
}

// getting the request and response, then send to the forwarder
if analyzeError := p.analyze(metrics, request, response); analyzeError != nil {
if analyzeError := p.analyze(metrics, connection, request, response); analyzeError != nil {
p.appendAnalyzeUnFinished(metrics, request, response)
}
return enums.ParseResultSuccess, nil
Expand All @@ -186,10 +188,10 @@ func (p *HTTP1Protocol) appendAnalyzeUnFinished(metrics *HTTP1Metrics, request *
})
}

func (p *HTTP1Protocol) handleUnFinishedEvents(m *HTTP1Metrics) {
func (p *HTTP1Protocol) handleUnFinishedEvents(m *HTTP1Metrics, connection *PartitionConnection) {
for element := m.analyzeUnFinished.Front(); element != nil; {
unFinished := element.Value.(*HTTP1AnalyzeUnFinished)
err := p.analyze(m, unFinished.request, unFinished.response)
err := p.analyze(m, connection, unFinished.request, unFinished.response)
if err != nil {
unFinished.retryCount++
if unFinished.retryCount < http1AnalyzeMaxRetryCount {
Expand All @@ -205,7 +207,8 @@ func (p *HTTP1Protocol) handleUnFinishedEvents(m *HTTP1Metrics) {
}
}

func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, request *reader.Request, response *reader.Response) error {
func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, connection *PartitionConnection,
request *reader.Request, response *reader.Response) error {
details := make([]events.SocketDetail, 0)
var allInclude = true
var idRange *buffer.DataIDRange
Expand Down
6 changes: 4 additions & 2 deletions pkg/accesslog/collector/protocols/http2.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type HTTP2Streaming struct {
Status int
RespHeaderBuffer *buffer.Buffer
RespBodyBuffer *buffer.Buffer
connection *PartitionConnection
}

func (r *HTTP2Protocol) GenerateConnection(connectionID, randomID uint64) ProtocolMetrics {
Expand Down Expand Up @@ -114,7 +115,7 @@ func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper *Analyze
var result enums.ParseResult
switch header.Type {
case http2.FrameHeaders:
result, protocolBreak, _ = r.handleHeader(&header, startPosition, http2Metrics, buf)
result, protocolBreak, _ = r.handleHeader(connection, &header, startPosition, http2Metrics, buf)
case http2.FrameData:
result, protocolBreak, _ = r.handleData(&header, startPosition, http2Metrics, buf)
default:
Expand Down Expand Up @@ -158,7 +159,7 @@ func (r *HTTP2Protocol) ForProtocol() enums.ConnectionProtocol {
return enums.ConnectionProtocolHTTP2
}

func (r *HTTP2Protocol) handleHeader(header *http2.FrameHeader, startPos *buffer.Position,
func (r *HTTP2Protocol) handleHeader(connection *PartitionConnection, header *http2.FrameHeader, startPos *buffer.Position,
metrics *HTTP2Metrics, buf *buffer.Buffer) (enums.ParseResult, bool, error) {
bytes := make([]byte, header.Length)
if err := buf.ReadUntilBufferFull(bytes); err != nil {
Expand All @@ -177,6 +178,7 @@ func (r *HTTP2Protocol) handleHeader(header *http2.FrameHeader, startPos *buffer
ReqHeader: headers,
RespHeader: make(map[string]string),
ReqHeaderBuffer: buf.Slice(true, startPos, buf.Position()),
connection: connection,
}
metrics.streams[header.StreamID] = streaming
return enums.ParseResultSuccess, false, nil
Expand Down

0 comments on commit 045ace9

Please sign in to comment.