From 97bddaf1fa459776c4bd3f93dc4254644a732283 Mon Sep 17 00:00:00 2001 From: mrproliu <741550557@qq.com> Date: Thu, 28 Nov 2024 15:37:57 +0800 Subject: [PATCH 1/4] Support analyze multiple language in the same connection --- .../collector/protocols/connection.go | 12 ++-- pkg/accesslog/collector/protocols/http1.go | 19 +++--- pkg/accesslog/collector/protocols/http2.go | 21 +++---- pkg/accesslog/collector/protocols/queue.go | 60 ++++++++++++------- pkg/accesslog/events/data.go | 6 +- .../task/network/analyze/events/data.go | 6 +- pkg/tools/buffer/buffer.go | 6 ++ pkg/tools/ssl/gotls.go | 2 +- 8 files changed, 84 insertions(+), 48 deletions(-) diff --git a/pkg/accesslog/collector/protocols/connection.go b/pkg/accesslog/collector/protocols/connection.go index 7ae93256..b6a9e244 100644 --- a/pkg/accesslog/collector/protocols/connection.go +++ b/pkg/accesslog/collector/protocols/connection.go @@ -29,8 +29,8 @@ import ( type PartitionConnection struct { connectionID, randomID uint64 - dataBuffer *buffer.Buffer - protocol map[enums.ConnectionProtocol]bool + dataBuffers map[enums.ConnectionProtocol]*buffer.Buffer + protocol map[enums.ConnectionProtocol]uint64 // protocol with minimal data id protocolAnalyzer map[enums.ConnectionProtocol]Protocol protocolMetrics map[enums.ConnectionProtocol]ProtocolMetrics closed bool @@ -48,8 +48,8 @@ func (p *PartitionConnection) IsExistProtocol(protocol enums.ConnectionProtocol) return exist } -func (p *PartitionConnection) Buffer() *buffer.Buffer { - return p.dataBuffer +func (p *PartitionConnection) Buffer(protocol enums.ConnectionProtocol) *buffer.Buffer { + return p.dataBuffers[protocol] } func (p *PartitionConnection) AppendDetail(ctx *common.AccessLogContext, detail events.SocketDetail) { @@ -58,12 +58,12 @@ func (p *PartitionConnection) AppendDetail(ctx *common.AccessLogContext, detail forwarder.SendTransferNoProtocolEvent(ctx, detail) return } - p.dataBuffer.AppendDetailEvent(detail) + p.dataBuffers[detail.GetProtocol()].AppendDetailEvent(detail) } func (p *PartitionConnection) AppendData(data buffer.SocketDataBuffer) { if p.skipAllDataAnalyze { return } - p.dataBuffer.AppendDataEvent(data) + p.dataBuffers[data.Protocol()].AppendDataEvent(data) } diff --git a/pkg/accesslog/collector/protocols/http1.go b/pkg/accesslog/collector/protocols/http1.go index 06b1b49d..4fec8448 100644 --- a/pkg/accesslog/collector/protocols/http1.go +++ b/pkg/accesslog/collector/protocols/http1.go @@ -68,18 +68,19 @@ func (p *HTTP1Protocol) GenerateConnection(connectionID, randomID uint64) Protoc func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelper) error { metrics := connection.Metrics(enums.ConnectionProtocolHTTP).(*HTTP1Metrics) + buffer := connection.Buffer(enums.ConnectionProtocolHTTP) http1Log.Debugf("ready to analyze HTTP/1 protocol data, connection ID: %d, random ID: %d, data len: %d", - metrics.ConnectionID, metrics.RandomID, connection.Buffer().DataLength()) - connection.Buffer().ResetForLoopReading() + metrics.ConnectionID, metrics.RandomID, buffer.DataLength()) + buffer.ResetForLoopReading() for { - if !connection.Buffer().PrepareForReading() { + if !buffer.PrepareForReading() { return nil } - messageType, err := reader.IdentityMessageType(connection.Buffer()) + messageType, err := reader.IdentityMessageType(buffer) if err != nil { http1Log.Debugf("failed to identity message type, %v", err) - if connection.Buffer().SkipCurrentElement() { + if buffer.SkipCurrentElement() { break } continue @@ -88,9 +89,9 @@ func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelpe var result enums.ParseResult switch messageType { case reader.MessageTypeRequest: - result, _ = p.handleRequest(metrics, connection.Buffer()) + result, _ = p.handleRequest(metrics, buffer) case reader.MessageTypeResponse: - result, _ = p.handleResponse(metrics, connection.Buffer()) + result, _ = p.handleResponse(metrics, buffer) case reader.MessageTypeUnknown: result = enums.ParseResultSkipPackage } @@ -98,9 +99,9 @@ func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelpe finishReading := false switch result { case enums.ParseResultSuccess: - finishReading = connection.Buffer().RemoveReadElements() + finishReading = buffer.RemoveReadElements() case enums.ParseResultSkipPackage: - finishReading = connection.Buffer().SkipCurrentElement() + finishReading = buffer.SkipCurrentElement() } if finishReading { diff --git a/pkg/accesslog/collector/protocols/http2.go b/pkg/accesslog/collector/protocols/http2.go index 486533c8..b18226a6 100644 --- a/pkg/accesslog/collector/protocols/http2.go +++ b/pkg/accesslog/collector/protocols/http2.go @@ -89,19 +89,20 @@ func (r *HTTP2Protocol) GenerateConnection(connectionID, randomID uint64) Protoc func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper *AnalyzeHelper) error { http2Metrics := connection.Metrics(enums.ConnectionProtocolHTTP2).(*HTTP2Metrics) + buf := connection.Buffer(enums.ConnectionProtocolHTTP2) http2Log.Debugf("ready to analyze HTTP/2 protocol data, connection ID: %d, random ID: %d", http2Metrics.connectionID, http2Metrics.randomID) - connection.Buffer().ResetForLoopReading() + buf.ResetForLoopReading() for { - if !connection.Buffer().PrepareForReading() { + if !buf.PrepareForReading() { return nil } - startPosition := connection.Buffer().Position() - header, err := http2.ReadFrameHeader(connection.Buffer()) + startPosition := buf.Position() + header, err := http2.ReadFrameHeader(buf) if err != nil { http2Log.Debugf("failed to read frame header, %v", err) - if connection.Buffer().SkipCurrentElement() { + if buf.SkipCurrentElement() { break } continue @@ -112,12 +113,12 @@ func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper *Analyze var result enums.ParseResult switch header.Type { case http2.FrameHeaders: - result, protocolBreak, _ = r.handleHeader(&header, startPosition, http2Metrics, connection.Buffer()) + result, protocolBreak, _ = r.handleHeader(&header, startPosition, http2Metrics, buf) case http2.FrameData: - result, protocolBreak, _ = r.handleData(&header, startPosition, http2Metrics, connection.Buffer()) + result, protocolBreak, _ = r.handleData(&header, startPosition, http2Metrics, buf) default: tmp := make([]byte, header.Length) - if err := connection.Buffer().ReadUntilBufferFull(tmp); err != nil { + if err := buf.ReadUntilBufferFull(tmp); err != nil { if errors.Is(err, buffer.ErrNotComplete) { result = enums.ParseResultSkipPackage } else { @@ -139,9 +140,9 @@ func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper *Analyze finishReading := false switch result { case enums.ParseResultSuccess: - finishReading = connection.Buffer().RemoveReadElements() + finishReading = buf.RemoveReadElements() case enums.ParseResultSkipPackage: - finishReading = connection.Buffer().SkipCurrentElement() + finishReading = buf.SkipCurrentElement() } if finishReading { diff --git a/pkg/accesslog/collector/protocols/queue.go b/pkg/accesslog/collector/protocols/queue.go index e505d3e9..8aed76d5 100644 --- a/pkg/accesslog/collector/protocols/queue.go +++ b/pkg/accesslog/collector/protocols/queue.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "os" + "sort" "sync" "time" @@ -119,25 +120,28 @@ type PartitionContext struct { analyzeLocker sync.Mutex } -func newPartitionConnection(protocolMgr *ProtocolManager, conID, randomID uint64, protocol enums.ConnectionProtocol) *PartitionConnection { +func newPartitionConnection(protocolMgr *ProtocolManager, conID, randomID uint64, protocol enums.ConnectionProtocol, currentDataId uint64) *PartitionConnection { connection := &PartitionConnection{ connectionID: conID, randomID: randomID, - dataBuffer: buffer.NewBuffer(), - protocol: make(map[enums.ConnectionProtocol]bool), + dataBuffers: make(map[enums.ConnectionProtocol]*buffer.Buffer), + protocol: make(map[enums.ConnectionProtocol]uint64), protocolAnalyzer: make(map[enums.ConnectionProtocol]Protocol), protocolMetrics: make(map[enums.ConnectionProtocol]ProtocolMetrics), } - connection.appendProtocolIfNeed(protocolMgr, conID, randomID, protocol) + connection.appendProtocolIfNeed(protocolMgr, conID, randomID, protocol, currentDataId) return connection } -func (p *PartitionConnection) appendProtocolIfNeed(protocolMgr *ProtocolManager, conID, randomID uint64, protocol enums.ConnectionProtocol) { - if _, exist := p.protocol[protocol]; !exist { +func (p *PartitionConnection) appendProtocolIfNeed(protocolMgr *ProtocolManager, conID, randomID uint64, protocol enums.ConnectionProtocol, currentDataId uint64) { + if minDataId, exist := p.protocol[protocol]; !exist { analyzer := protocolMgr.GetProtocol(protocol) - p.protocol[protocol] = true + p.protocol[protocol] = currentDataId + p.dataBuffers[protocol] = buffer.NewBuffer() p.protocolAnalyzer[protocol] = analyzer p.protocolMetrics[protocol] = analyzer.GenerateConnection(conID, randomID) + } else if currentDataId < minDataId { + p.protocol[protocol] = currentDataId } } @@ -212,26 +216,26 @@ func (p *PartitionContext) Consume(data interface{}) { forwarder.SendTransferNoProtocolEvent(p.context, event) return } - connection := p.getConnectionContext(event.GetConnectionID(), event.GetRandomID(), event.GetProtocol()) + connection := p.getConnectionContext(event.GetConnectionID(), event.GetRandomID(), event.GetProtocol(), event.DataID()) connection.AppendDetail(p.context, event) case *events.SocketDataUploadEvent: pid, _ := events.ParseConnectionID(event.ConnectionID) log.Debugf("receive the socket data event, connection ID: %d, random ID: %d, pid: %d, data id: %d, sequence: %d, protocol: %d", - event.ConnectionID, event.RandomID, pid, event.DataID0, event.Sequence0, event.Protocol) - connection := p.getConnectionContext(event.ConnectionID, event.RandomID, event.Protocol) + event.ConnectionID, event.RandomID, pid, event.DataID0, event.Sequence0, event.Protocol0) + connection := p.getConnectionContext(event.ConnectionID, event.RandomID, event.Protocol0, event.DataID0) connection.AppendData(event) } } -func (p *PartitionContext) getConnectionContext(connectionID, randomID uint64, protocol enums.ConnectionProtocol) *PartitionConnection { +func (p *PartitionContext) getConnectionContext(connectionID, randomID uint64, protocol enums.ConnectionProtocol, currentDataId uint64) *PartitionConnection { conKey := p.buildConnectionKey(connectionID, randomID) conn, exist := p.connections.Get(conKey) if exist { connection := conn.(*PartitionConnection) - connection.appendProtocolIfNeed(p.protocolMgr, connectionID, randomID, protocol) + connection.appendProtocolIfNeed(p.protocolMgr, connectionID, randomID, protocol, currentDataId) return connection } - result := newPartitionConnection(p.protocolMgr, connectionID, randomID, protocol) + result := newPartitionConnection(p.protocolMgr, connectionID, randomID, protocol, currentDataId) p.connections.Set(conKey, result) return result } @@ -254,7 +258,10 @@ func (p *PartitionContext) processEvents() { p.processConnectionEvents(info) // if the connection already closed and not contains any buffer data, then delete the connection - bufLen := info.dataBuffer.DataLength() + var bufLen = 0 + for _, buf := range info.dataBuffers { + bufLen += buf.DataLength() + } if bufLen > 0 { return } @@ -309,9 +316,11 @@ func (p *PartitionContext) processExpireEvents() { } func (p *PartitionContext) processConnectionExpireEvents(connection *PartitionConnection) { - if c := connection.dataBuffer.DeleteExpireEvents(maxBufferExpireDuration); c > 0 { - log.Debugf("total removed %d expired socket data events from connection ID: %d, random ID: %d", c, - connection.connectionID, connection.randomID) + for _, buf := range connection.dataBuffers { + if c := buf.DeleteExpireEvents(maxBufferExpireDuration); c > 0 { + log.Debugf("total removed %d expired socket data events from connection ID: %d, random ID: %d", c, + connection.connectionID, connection.randomID) + } } } @@ -320,8 +329,17 @@ func (p *PartitionContext) processConnectionEvents(connection *PartitionConnecti return } helper := &AnalyzeHelper{} - for protocol, analyzer := range connection.protocolAnalyzer { - if err := analyzer.Analyze(connection, helper); err != nil { + + // since the socket data/detail are getting unsorted, so rover need to using the minimal data id to analyze to ensure the order + sortedProtocols := make([]enums.ConnectionProtocol, 0, len(connection.protocol)) + for protocol, _ := range connection.protocol { + sortedProtocols = append(sortedProtocols, protocol) + } + sort.Slice(sortedProtocols, func(i, j int) bool { + return connection.protocol[sortedProtocols[i]] < connection.protocol[sortedProtocols[j]] + }) + for _, protocol := range sortedProtocols { + if err := connection.protocolAnalyzer[protocol].Analyze(connection, helper); err != nil { log.Warnf("failed to analyze the %s protocol data: %v", enums.ConnectionProtocolString(protocol), err) } } @@ -330,6 +348,8 @@ func (p *PartitionContext) processConnectionEvents(connection *PartitionConnecti // notify the connection manager to skip analyze all data(just sending the detail) connection.skipAllDataAnalyze = true p.context.ConnectionMgr.SkipAllDataAnalyze(connection.connectionID, connection.randomID) - connection.dataBuffer.Clean() + for _, buf := range connection.dataBuffers { + buf.Clean() + } } } diff --git a/pkg/accesslog/events/data.go b/pkg/accesslog/events/data.go index 747ff9fc..ed480596 100644 --- a/pkg/accesslog/events/data.go +++ b/pkg/accesslog/events/data.go @@ -24,7 +24,7 @@ import ( ) type SocketDataUploadEvent struct { - Protocol enums.ConnectionProtocol + Protocol0 enums.ConnectionProtocol HaveReduce uint8 Direction0 enums.SocketDataDirection Finished uint8 @@ -39,6 +39,10 @@ type SocketDataUploadEvent struct { Buffer [2048]byte } +func (s *SocketDataUploadEvent) Protocol() enums.ConnectionProtocol { + return s.Protocol0 +} + func (s *SocketDataUploadEvent) GenerateConnectionID() string { return fmt.Sprintf("%d_%d", s.ConnectionID, s.RandomID) } diff --git a/pkg/profiling/task/network/analyze/events/data.go b/pkg/profiling/task/network/analyze/events/data.go index 9ca242d0..804e96b5 100644 --- a/pkg/profiling/task/network/analyze/events/data.go +++ b/pkg/profiling/task/network/analyze/events/data.go @@ -24,7 +24,7 @@ import ( ) type SocketDataUploadEvent struct { - Protocol enums.ConnectionProtocol + Protocol0 enums.ConnectionProtocol HaveReduce uint8 Direction0 enums.SocketDataDirection Finished uint8 @@ -39,6 +39,10 @@ type SocketDataUploadEvent struct { Buffer [2048]byte } +func (s *SocketDataUploadEvent) Protocol() enums.ConnectionProtocol { + return s.Protocol0 +} + func (s *SocketDataUploadEvent) GenerateConnectionID() string { return fmt.Sprintf("%d_%d", s.ConnectionID, s.RandomID) } diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go index 1c71f7fe..0bfc5d9c 100644 --- a/pkg/tools/buffer/buffer.go +++ b/pkg/tools/buffer/buffer.go @@ -33,6 +33,8 @@ var ( ) type SocketDataBuffer interface { + // Protocol of the buffer + Protocol() enums.ConnectionProtocol // GenerateConnectionID for identity the buffer belong which connection GenerateConnectionID() string // BufferData of the buffer @@ -88,6 +90,10 @@ type SocketDataEventLimited struct { Size int } +func (s *SocketDataEventLimited) Protocol() enums.ConnectionProtocol { + return s.SocketDataBuffer.Protocol() +} + func (s *SocketDataEventLimited) BufferData() []byte { return s.SocketDataBuffer.BufferData()[s.From:s.Size] } diff --git a/pkg/tools/ssl/gotls.go b/pkg/tools/ssl/gotls.go index 2b2b3d7c..573854e0 100644 --- a/pkg/tools/ssl/gotls.go +++ b/pkg/tools/ssl/gotls.go @@ -206,7 +206,7 @@ func (r *Register) generateGOTLSSymbolOffsets(register *Register, elfFile *elf.F sym := register.SearchSymbol(func(a, b string) bool { return a == b - }, "go.itab.*net.TCPConn,net.Conn") + }, "go.itab.*net.TCPConn,net.Conn", "go:itab.*net.TCPConn,net.Conn") if sym == nil { log.Warnf("could not found the tcp connection symbol: go.itab.*net.TCPConn,net.Conn") return nil, nil From 171e9854ea6d815923dbc994f7576fc5e6a01de1 Mon Sep 17 00:00:00 2001 From: mrproliu <741550557@qq.com> Date: Thu, 28 Nov 2024 15:49:50 +0800 Subject: [PATCH 2/4] Fix UT --- .../task/network/analyze/layer7/protocols/protocols.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go index 60bcd710..2f59d0b7 100644 --- a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go +++ b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go @@ -71,10 +71,10 @@ func (a *Analyzer) Start(ctx context.Context) { } func (a *Analyzer) ReceiveSocketDataEvent(event *events.SocketDataUploadEvent) { - analyzer := a.protocols[event.Protocol] + analyzer := a.protocols[event.Protocol()] if analyzer == nil { log.Warnf("could not found any protocol to handle socket data, connection id: %s, protocol: %s(%d)", - event.GenerateConnectionID(), enums.ConnectionProtocolString(event.Protocol), event.Protocol) + event.GenerateConnectionID(), enums.ConnectionProtocolString(event.Protocol()), event.Protocol) return } analyzer.ReceiveSocketData(a.ctx, event) From 861ad3bf37fd44ce922ba98d613215e241d69b7f Mon Sep 17 00:00:00 2001 From: mrproliu <741550557@qq.com> Date: Thu, 28 Nov 2024 16:04:58 +0800 Subject: [PATCH 3/4] Fix UT --- .../task/network/analyze/layer7/protocols/protocols.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go index 2f59d0b7..0b8c887c 100644 --- a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go +++ b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go @@ -74,7 +74,7 @@ func (a *Analyzer) ReceiveSocketDataEvent(event *events.SocketDataUploadEvent) { analyzer := a.protocols[event.Protocol()] if analyzer == nil { log.Warnf("could not found any protocol to handle socket data, connection id: %s, protocol: %s(%d)", - event.GenerateConnectionID(), enums.ConnectionProtocolString(event.Protocol()), event.Protocol) + event.GenerateConnectionID(), enums.ConnectionProtocolString(event.Protocol()), event.Protocol()) return } analyzer.ReceiveSocketData(a.ctx, event) From 312ba5e1909dd9c2d51e3db8219ec88b6fccbce5 Mon Sep 17 00:00:00 2001 From: mrproliu <741550557@qq.com> Date: Thu, 28 Nov 2024 16:32:54 +0800 Subject: [PATCH 4/4] Fix lint --- pkg/accesslog/collector/protocols/http1.go | 20 ++++++++--------- pkg/accesslog/collector/protocols/queue.go | 25 ++++++++++++---------- 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/pkg/accesslog/collector/protocols/http1.go b/pkg/accesslog/collector/protocols/http1.go index 4fec8448..92a81736 100644 --- a/pkg/accesslog/collector/protocols/http1.go +++ b/pkg/accesslog/collector/protocols/http1.go @@ -68,19 +68,19 @@ func (p *HTTP1Protocol) GenerateConnection(connectionID, randomID uint64) Protoc func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelper) error { metrics := connection.Metrics(enums.ConnectionProtocolHTTP).(*HTTP1Metrics) - buffer := connection.Buffer(enums.ConnectionProtocolHTTP) + buf := connection.Buffer(enums.ConnectionProtocolHTTP) http1Log.Debugf("ready to analyze HTTP/1 protocol data, connection ID: %d, random ID: %d, data len: %d", - metrics.ConnectionID, metrics.RandomID, buffer.DataLength()) - buffer.ResetForLoopReading() + metrics.ConnectionID, metrics.RandomID, buf.DataLength()) + buf.ResetForLoopReading() for { - if !buffer.PrepareForReading() { + if !buf.PrepareForReading() { return nil } - messageType, err := reader.IdentityMessageType(buffer) + messageType, err := reader.IdentityMessageType(buf) if err != nil { http1Log.Debugf("failed to identity message type, %v", err) - if buffer.SkipCurrentElement() { + if buf.SkipCurrentElement() { break } continue @@ -89,9 +89,9 @@ func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelpe var result enums.ParseResult switch messageType { case reader.MessageTypeRequest: - result, _ = p.handleRequest(metrics, buffer) + result, _ = p.handleRequest(metrics, buf) case reader.MessageTypeResponse: - result, _ = p.handleResponse(metrics, buffer) + result, _ = p.handleResponse(metrics, buf) case reader.MessageTypeUnknown: result = enums.ParseResultSkipPackage } @@ -99,9 +99,9 @@ func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelpe finishReading := false switch result { case enums.ParseResultSuccess: - finishReading = buffer.RemoveReadElements() + finishReading = buf.RemoveReadElements() case enums.ParseResultSkipPackage: - finishReading = buffer.SkipCurrentElement() + finishReading = buf.SkipCurrentElement() } if finishReading { diff --git a/pkg/accesslog/collector/protocols/queue.go b/pkg/accesslog/collector/protocols/queue.go index 8aed76d5..ad665841 100644 --- a/pkg/accesslog/collector/protocols/queue.go +++ b/pkg/accesslog/collector/protocols/queue.go @@ -120,7 +120,8 @@ type PartitionContext struct { analyzeLocker sync.Mutex } -func newPartitionConnection(protocolMgr *ProtocolManager, conID, randomID uint64, protocol enums.ConnectionProtocol, currentDataId uint64) *PartitionConnection { +func newPartitionConnection(protocolMgr *ProtocolManager, conID, randomID uint64, + protocol enums.ConnectionProtocol, currentDataID uint64) *PartitionConnection { connection := &PartitionConnection{ connectionID: conID, randomID: randomID, @@ -129,19 +130,20 @@ func newPartitionConnection(protocolMgr *ProtocolManager, conID, randomID uint64 protocolAnalyzer: make(map[enums.ConnectionProtocol]Protocol), protocolMetrics: make(map[enums.ConnectionProtocol]ProtocolMetrics), } - connection.appendProtocolIfNeed(protocolMgr, conID, randomID, protocol, currentDataId) + connection.appendProtocolIfNeed(protocolMgr, conID, randomID, protocol, currentDataID) return connection } -func (p *PartitionConnection) appendProtocolIfNeed(protocolMgr *ProtocolManager, conID, randomID uint64, protocol enums.ConnectionProtocol, currentDataId uint64) { - if minDataId, exist := p.protocol[protocol]; !exist { +func (p *PartitionConnection) appendProtocolIfNeed(protocolMgr *ProtocolManager, conID, randomID uint64, + protocol enums.ConnectionProtocol, currentDataID uint64) { + if minDataID, exist := p.protocol[protocol]; !exist { analyzer := protocolMgr.GetProtocol(protocol) - p.protocol[protocol] = currentDataId + p.protocol[protocol] = currentDataID p.dataBuffers[protocol] = buffer.NewBuffer() p.protocolAnalyzer[protocol] = analyzer p.protocolMetrics[protocol] = analyzer.GenerateConnection(conID, randomID) - } else if currentDataId < minDataId { - p.protocol[protocol] = currentDataId + } else if currentDataID < minDataID { + p.protocol[protocol] = currentDataID } } @@ -227,15 +229,16 @@ func (p *PartitionContext) Consume(data interface{}) { } } -func (p *PartitionContext) getConnectionContext(connectionID, randomID uint64, protocol enums.ConnectionProtocol, currentDataId uint64) *PartitionConnection { +func (p *PartitionContext) getConnectionContext(connectionID, randomID uint64, + protocol enums.ConnectionProtocol, currentDataID uint64) *PartitionConnection { conKey := p.buildConnectionKey(connectionID, randomID) conn, exist := p.connections.Get(conKey) if exist { connection := conn.(*PartitionConnection) - connection.appendProtocolIfNeed(p.protocolMgr, connectionID, randomID, protocol, currentDataId) + connection.appendProtocolIfNeed(p.protocolMgr, connectionID, randomID, protocol, currentDataID) return connection } - result := newPartitionConnection(p.protocolMgr, connectionID, randomID, protocol, currentDataId) + result := newPartitionConnection(p.protocolMgr, connectionID, randomID, protocol, currentDataID) p.connections.Set(conKey, result) return result } @@ -332,7 +335,7 @@ func (p *PartitionContext) processConnectionEvents(connection *PartitionConnecti // since the socket data/detail are getting unsorted, so rover need to using the minimal data id to analyze to ensure the order sortedProtocols := make([]enums.ConnectionProtocol, 0, len(connection.protocol)) - for protocol, _ := range connection.protocol { + for protocol := range connection.protocol { sortedProtocols = append(sortedProtocols, protocol) } sort.Slice(sortedProtocols, func(i, j int) bool {