Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support analyze multiple protocol in the same connection #160

Merged
merged 5 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pkg/accesslog/collector/protocols/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (

type PartitionConnection struct {
connectionID, randomID uint64
dataBuffer *buffer.Buffer
protocol map[enums.ConnectionProtocol]bool
dataBuffers map[enums.ConnectionProtocol]*buffer.Buffer
protocol map[enums.ConnectionProtocol]uint64 // protocol with minimal data id
protocolAnalyzer map[enums.ConnectionProtocol]Protocol
protocolMetrics map[enums.ConnectionProtocol]ProtocolMetrics
closed bool
Expand All @@ -48,8 +48,8 @@ func (p *PartitionConnection) IsExistProtocol(protocol enums.ConnectionProtocol)
return exist
}

func (p *PartitionConnection) Buffer() *buffer.Buffer {
return p.dataBuffer
func (p *PartitionConnection) Buffer(protocol enums.ConnectionProtocol) *buffer.Buffer {
return p.dataBuffers[protocol]
}

func (p *PartitionConnection) AppendDetail(ctx *common.AccessLogContext, detail events.SocketDetail) {
Expand All @@ -58,12 +58,12 @@ func (p *PartitionConnection) AppendDetail(ctx *common.AccessLogContext, detail
forwarder.SendTransferNoProtocolEvent(ctx, detail)
return
}
p.dataBuffer.AppendDetailEvent(detail)
p.dataBuffers[detail.GetProtocol()].AppendDetailEvent(detail)
}

func (p *PartitionConnection) AppendData(data buffer.SocketDataBuffer) {
if p.skipAllDataAnalyze {
return
}
p.dataBuffer.AppendDataEvent(data)
p.dataBuffers[data.Protocol()].AppendDataEvent(data)
}
19 changes: 10 additions & 9 deletions pkg/accesslog/collector/protocols/http1.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,19 @@ func (p *HTTP1Protocol) GenerateConnection(connectionID, randomID uint64) Protoc

func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelper) error {
metrics := connection.Metrics(enums.ConnectionProtocolHTTP).(*HTTP1Metrics)
buf := connection.Buffer(enums.ConnectionProtocolHTTP)
http1Log.Debugf("ready to analyze HTTP/1 protocol data, connection ID: %d, random ID: %d, data len: %d",
metrics.ConnectionID, metrics.RandomID, connection.Buffer().DataLength())
connection.Buffer().ResetForLoopReading()
metrics.ConnectionID, metrics.RandomID, buf.DataLength())
buf.ResetForLoopReading()
for {
if !connection.Buffer().PrepareForReading() {
if !buf.PrepareForReading() {
return nil
}

messageType, err := reader.IdentityMessageType(connection.Buffer())
messageType, err := reader.IdentityMessageType(buf)
if err != nil {
http1Log.Debugf("failed to identity message type, %v", err)
if connection.Buffer().SkipCurrentElement() {
if buf.SkipCurrentElement() {
break
}
continue
Expand All @@ -88,19 +89,19 @@ func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelpe
var result enums.ParseResult
switch messageType {
case reader.MessageTypeRequest:
result, _ = p.handleRequest(metrics, connection.Buffer())
result, _ = p.handleRequest(metrics, buf)
case reader.MessageTypeResponse:
result, _ = p.handleResponse(metrics, connection.Buffer())
result, _ = p.handleResponse(metrics, buf)
case reader.MessageTypeUnknown:
result = enums.ParseResultSkipPackage
}

finishReading := false
switch result {
case enums.ParseResultSuccess:
finishReading = connection.Buffer().RemoveReadElements()
finishReading = buf.RemoveReadElements()
case enums.ParseResultSkipPackage:
finishReading = connection.Buffer().SkipCurrentElement()
finishReading = buf.SkipCurrentElement()
}

if finishReading {
Expand Down
21 changes: 11 additions & 10 deletions pkg/accesslog/collector/protocols/http2.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,20 @@ func (r *HTTP2Protocol) GenerateConnection(connectionID, randomID uint64) Protoc

func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper *AnalyzeHelper) error {
http2Metrics := connection.Metrics(enums.ConnectionProtocolHTTP2).(*HTTP2Metrics)
buf := connection.Buffer(enums.ConnectionProtocolHTTP2)
http2Log.Debugf("ready to analyze HTTP/2 protocol data, connection ID: %d, random ID: %d",
http2Metrics.connectionID, http2Metrics.randomID)
connection.Buffer().ResetForLoopReading()
buf.ResetForLoopReading()
for {
if !connection.Buffer().PrepareForReading() {
if !buf.PrepareForReading() {
return nil
}

startPosition := connection.Buffer().Position()
header, err := http2.ReadFrameHeader(connection.Buffer())
startPosition := buf.Position()
header, err := http2.ReadFrameHeader(buf)
if err != nil {
http2Log.Debugf("failed to read frame header, %v", err)
if connection.Buffer().SkipCurrentElement() {
if buf.SkipCurrentElement() {
break
}
continue
Expand All @@ -112,12 +113,12 @@ func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper *Analyze
var result enums.ParseResult
switch header.Type {
case http2.FrameHeaders:
result, protocolBreak, _ = r.handleHeader(&header, startPosition, http2Metrics, connection.Buffer())
result, protocolBreak, _ = r.handleHeader(&header, startPosition, http2Metrics, buf)
case http2.FrameData:
result, protocolBreak, _ = r.handleData(&header, startPosition, http2Metrics, connection.Buffer())
result, protocolBreak, _ = r.handleData(&header, startPosition, http2Metrics, buf)
default:
tmp := make([]byte, header.Length)
if err := connection.Buffer().ReadUntilBufferFull(tmp); err != nil {
if err := buf.ReadUntilBufferFull(tmp); err != nil {
if errors.Is(err, buffer.ErrNotComplete) {
result = enums.ParseResultSkipPackage
} else {
Expand All @@ -139,9 +140,9 @@ func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper *Analyze
finishReading := false
switch result {
case enums.ParseResultSuccess:
finishReading = connection.Buffer().RemoveReadElements()
finishReading = buf.RemoveReadElements()
case enums.ParseResultSkipPackage:
finishReading = connection.Buffer().SkipCurrentElement()
finishReading = buf.SkipCurrentElement()
}

if finishReading {
Expand Down
63 changes: 43 additions & 20 deletions pkg/accesslog/collector/protocols/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"os"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -119,25 +120,30 @@ type PartitionContext struct {
analyzeLocker sync.Mutex
}

func newPartitionConnection(protocolMgr *ProtocolManager, conID, randomID uint64, protocol enums.ConnectionProtocol) *PartitionConnection {
func newPartitionConnection(protocolMgr *ProtocolManager, conID, randomID uint64,
protocol enums.ConnectionProtocol, currentDataID uint64) *PartitionConnection {
connection := &PartitionConnection{
connectionID: conID,
randomID: randomID,
dataBuffer: buffer.NewBuffer(),
protocol: make(map[enums.ConnectionProtocol]bool),
dataBuffers: make(map[enums.ConnectionProtocol]*buffer.Buffer),
protocol: make(map[enums.ConnectionProtocol]uint64),
protocolAnalyzer: make(map[enums.ConnectionProtocol]Protocol),
protocolMetrics: make(map[enums.ConnectionProtocol]ProtocolMetrics),
}
connection.appendProtocolIfNeed(protocolMgr, conID, randomID, protocol)
connection.appendProtocolIfNeed(protocolMgr, conID, randomID, protocol, currentDataID)
return connection
}

func (p *PartitionConnection) appendProtocolIfNeed(protocolMgr *ProtocolManager, conID, randomID uint64, protocol enums.ConnectionProtocol) {
if _, exist := p.protocol[protocol]; !exist {
func (p *PartitionConnection) appendProtocolIfNeed(protocolMgr *ProtocolManager, conID, randomID uint64,
protocol enums.ConnectionProtocol, currentDataID uint64) {
if minDataID, exist := p.protocol[protocol]; !exist {
analyzer := protocolMgr.GetProtocol(protocol)
p.protocol[protocol] = true
p.protocol[protocol] = currentDataID
p.dataBuffers[protocol] = buffer.NewBuffer()
p.protocolAnalyzer[protocol] = analyzer
p.protocolMetrics[protocol] = analyzer.GenerateConnection(conID, randomID)
} else if currentDataID < minDataID {
p.protocol[protocol] = currentDataID
}
}

Expand Down Expand Up @@ -212,26 +218,27 @@ func (p *PartitionContext) Consume(data interface{}) {
forwarder.SendTransferNoProtocolEvent(p.context, event)
return
}
connection := p.getConnectionContext(event.GetConnectionID(), event.GetRandomID(), event.GetProtocol())
connection := p.getConnectionContext(event.GetConnectionID(), event.GetRandomID(), event.GetProtocol(), event.DataID())
connection.AppendDetail(p.context, event)
case *events.SocketDataUploadEvent:
pid, _ := events.ParseConnectionID(event.ConnectionID)
log.Debugf("receive the socket data event, connection ID: %d, random ID: %d, pid: %d, data id: %d, sequence: %d, protocol: %d",
event.ConnectionID, event.RandomID, pid, event.DataID0, event.Sequence0, event.Protocol)
connection := p.getConnectionContext(event.ConnectionID, event.RandomID, event.Protocol)
event.ConnectionID, event.RandomID, pid, event.DataID0, event.Sequence0, event.Protocol0)
connection := p.getConnectionContext(event.ConnectionID, event.RandomID, event.Protocol0, event.DataID0)
connection.AppendData(event)
}
}

func (p *PartitionContext) getConnectionContext(connectionID, randomID uint64, protocol enums.ConnectionProtocol) *PartitionConnection {
func (p *PartitionContext) getConnectionContext(connectionID, randomID uint64,
protocol enums.ConnectionProtocol, currentDataID uint64) *PartitionConnection {
conKey := p.buildConnectionKey(connectionID, randomID)
conn, exist := p.connections.Get(conKey)
if exist {
connection := conn.(*PartitionConnection)
connection.appendProtocolIfNeed(p.protocolMgr, connectionID, randomID, protocol)
connection.appendProtocolIfNeed(p.protocolMgr, connectionID, randomID, protocol, currentDataID)
return connection
}
result := newPartitionConnection(p.protocolMgr, connectionID, randomID, protocol)
result := newPartitionConnection(p.protocolMgr, connectionID, randomID, protocol, currentDataID)
p.connections.Set(conKey, result)
return result
}
Expand All @@ -254,7 +261,10 @@ func (p *PartitionContext) processEvents() {
p.processConnectionEvents(info)

// if the connection already closed and not contains any buffer data, then delete the connection
bufLen := info.dataBuffer.DataLength()
var bufLen = 0
for _, buf := range info.dataBuffers {
bufLen += buf.DataLength()
}
if bufLen > 0 {
return
}
Expand Down Expand Up @@ -309,9 +319,11 @@ func (p *PartitionContext) processExpireEvents() {
}

func (p *PartitionContext) processConnectionExpireEvents(connection *PartitionConnection) {
if c := connection.dataBuffer.DeleteExpireEvents(maxBufferExpireDuration); c > 0 {
log.Debugf("total removed %d expired socket data events from connection ID: %d, random ID: %d", c,
connection.connectionID, connection.randomID)
for _, buf := range connection.dataBuffers {
if c := buf.DeleteExpireEvents(maxBufferExpireDuration); c > 0 {
log.Debugf("total removed %d expired socket data events from connection ID: %d, random ID: %d", c,
connection.connectionID, connection.randomID)
}
}
}

Expand All @@ -320,8 +332,17 @@ func (p *PartitionContext) processConnectionEvents(connection *PartitionConnecti
return
}
helper := &AnalyzeHelper{}
for protocol, analyzer := range connection.protocolAnalyzer {
if err := analyzer.Analyze(connection, helper); err != nil {

// since the socket data/detail are getting unsorted, so rover need to using the minimal data id to analyze to ensure the order
sortedProtocols := make([]enums.ConnectionProtocol, 0, len(connection.protocol))
for protocol := range connection.protocol {
sortedProtocols = append(sortedProtocols, protocol)
}
sort.Slice(sortedProtocols, func(i, j int) bool {
return connection.protocol[sortedProtocols[i]] < connection.protocol[sortedProtocols[j]]
})
for _, protocol := range sortedProtocols {
if err := connection.protocolAnalyzer[protocol].Analyze(connection, helper); err != nil {
log.Warnf("failed to analyze the %s protocol data: %v", enums.ConnectionProtocolString(protocol), err)
}
}
Expand All @@ -330,6 +351,8 @@ func (p *PartitionContext) processConnectionEvents(connection *PartitionConnecti
// notify the connection manager to skip analyze all data(just sending the detail)
connection.skipAllDataAnalyze = true
p.context.ConnectionMgr.SkipAllDataAnalyze(connection.connectionID, connection.randomID)
connection.dataBuffer.Clean()
for _, buf := range connection.dataBuffers {
buf.Clean()
}
}
}
6 changes: 5 additions & 1 deletion pkg/accesslog/events/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

type SocketDataUploadEvent struct {
Protocol enums.ConnectionProtocol
Protocol0 enums.ConnectionProtocol
HaveReduce uint8
Direction0 enums.SocketDataDirection
Finished uint8
Expand All @@ -39,6 +39,10 @@ type SocketDataUploadEvent struct {
Buffer [2048]byte
}

func (s *SocketDataUploadEvent) Protocol() enums.ConnectionProtocol {
return s.Protocol0
}

func (s *SocketDataUploadEvent) GenerateConnectionID() string {
return fmt.Sprintf("%d_%d", s.ConnectionID, s.RandomID)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/profiling/task/network/analyze/events/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

type SocketDataUploadEvent struct {
Protocol enums.ConnectionProtocol
Protocol0 enums.ConnectionProtocol
HaveReduce uint8
Direction0 enums.SocketDataDirection
Finished uint8
Expand All @@ -39,6 +39,10 @@ type SocketDataUploadEvent struct {
Buffer [2048]byte
}

func (s *SocketDataUploadEvent) Protocol() enums.ConnectionProtocol {
return s.Protocol0
}

func (s *SocketDataUploadEvent) GenerateConnectionID() string {
return fmt.Sprintf("%d_%d", s.ConnectionID, s.RandomID)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ func (a *Analyzer) Start(ctx context.Context) {
}

func (a *Analyzer) ReceiveSocketDataEvent(event *events.SocketDataUploadEvent) {
analyzer := a.protocols[event.Protocol]
analyzer := a.protocols[event.Protocol()]
if analyzer == nil {
log.Warnf("could not found any protocol to handle socket data, connection id: %s, protocol: %s(%d)",
event.GenerateConnectionID(), enums.ConnectionProtocolString(event.Protocol), event.Protocol)
event.GenerateConnectionID(), enums.ConnectionProtocolString(event.Protocol()), event.Protocol())
return
}
analyzer.ReceiveSocketData(a.ctx, event)
Expand Down
6 changes: 6 additions & 0 deletions pkg/tools/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ var (
)

type SocketDataBuffer interface {
// Protocol of the buffer
Protocol() enums.ConnectionProtocol
// GenerateConnectionID for identity the buffer belong which connection
GenerateConnectionID() string
// BufferData of the buffer
Expand Down Expand Up @@ -88,6 +90,10 @@ type SocketDataEventLimited struct {
Size int
}

func (s *SocketDataEventLimited) Protocol() enums.ConnectionProtocol {
return s.SocketDataBuffer.Protocol()
}

func (s *SocketDataEventLimited) BufferData() []byte {
return s.SocketDataBuffer.BufferData()[s.From:s.Size]
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/tools/ssl/gotls.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (r *Register) generateGOTLSSymbolOffsets(register *Register, elfFile *elf.F

sym := register.SearchSymbol(func(a, b string) bool {
return a == b
}, "go.itab.*net.TCPConn,net.Conn")
}, "go.itab.*net.TCPConn,net.Conn", "go:itab.*net.TCPConn,net.Conn")
if sym == nil {
log.Warnf("could not found the tcp connection symbol: go.itab.*net.TCPConn,net.Conn")
return nil, nil
Expand Down
Loading