Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the performance of access log module #171

Merged
merged 2 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ Release Notes.
* Support parallel parsing protocol data in the access log module.
* Upgrade Go library to `1.22`, eBPF library to `0.16.0`.
* Reduce missing details issue in the access log module.
* Introduce ringbuf queue to improve performance in the access log module.
* Improve HTTP/1.x protocol parsing strategy to encase missing data.
* Add gRPC sender to sending the access log to the backend.
* Add warning log when the event queue almost full in the access log module.
* Reduce unessential `conntrack` query when detect new connection.

#### Bug Fixes
* Fix the base image cannot run in the arm64.
Expand Down
6 changes: 4 additions & 2 deletions bpf/accesslog/common/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "data_args.h"
#include "socket_opts.h"
#include "queue.h"
#include "socket_data.h"

// syscall:connect
struct connect_args_t {
Expand Down Expand Up @@ -107,7 +108,7 @@ struct socket_connect_event_t {
__u64 conntrack_upstream_iph;
__u32 conntrack_upstream_port;
};
DATA_QUEUE(socket_connection_event_queue, 1024 * 1024);
DATA_QUEUE(socket_connection_event_queue);

// active connection cached into the hashmap
// if connection closed, then deleted
Expand Down Expand Up @@ -159,7 +160,7 @@ struct socket_close_event_t {
// close success
__u32 success;
};
DATA_QUEUE(socket_close_event_queue, 1024 * 1024);
DATA_QUEUE(socket_close_event_queue);

static __inline bool family_should_trace(const __u32 family) {
return family != AF_UNKNOWN && family != AF_INET && family != AF_INET6 ? false : true;
Expand Down Expand Up @@ -303,6 +304,7 @@ static __inline void submit_connection_when_not_exists(void *ctx, __u64 id, stru
}

static __inline void notify_close_connection(void* ctx, __u64 conid, struct active_connection_t* con, __u64 start_time, __u64 end_time, int ret) {
bpf_map_delete_elem(&socket_data_last_id_map, &conid);
struct socket_close_event_t *close_event;
close_event = rover_reserve_buf(&socket_close_event_queue, sizeof(*close_event));
if (close_event == NULL) {
Expand Down
2 changes: 1 addition & 1 deletion bpf/accesslog/syscalls/transfer.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ struct socket_detail_t {
__u8 ssl;
};

DATA_QUEUE(socket_detail_queue, 1024 * 1024);
DATA_QUEUE(socket_detail_queue);

static __always_inline void process_write_data(void *ctx, __u64 id, struct sock_data_args_t *args, ssize_t bytes_count,
__u32 data_direction, const bool vecs, __u8 func_name, bool ssl) {
Expand Down
26 changes: 5 additions & 21 deletions bpf/include/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@

#include "api.h"

#define DATA_QUEUE(name, size) \
struct { \
__uint(type, BPF_MAP_TYPE_RINGBUF); \
__uint(max_entries, size); \
} name SEC(".maps"); \
const void *rover_data_queue_##name __attribute__((unused));
#define DATA_QUEUE(name) \
struct { \
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);\
} name SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
Expand All @@ -36,26 +34,12 @@ struct {
static __always_inline void *rover_reserve_buf(void *map, __u64 size) {
static const int zero = 0;

if (bpf_core_enum_value_exists(enum bpf_func_id,
BPF_FUNC_ringbuf_reserve))
return bpf_ringbuf_reserve(map, size, 0);

return bpf_map_lookup_elem(&rover_data_heap, &zero);
}

static __always_inline void rover_discard_buf(void *buf)
{
if (bpf_core_enum_value_exists(enum bpf_func_id,
BPF_FUNC_ringbuf_discard))
bpf_ringbuf_discard(buf, 0);
static __always_inline void rover_discard_buf(void *buf) {
}

static __always_inline long rover_submit_buf(void *ctx, void *map, void *buf, __u64 size) {
if (bpf_core_enum_value_exists(enum bpf_func_id,
BPF_FUNC_ringbuf_submit)) {
bpf_ringbuf_submit(buf, 0);
return 0;
}

return bpf_perf_event_output(ctx, map, BPF_F_CURRENT_CPU, buf, size);
}
29 changes: 28 additions & 1 deletion bpf/include/socket_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct socket_data_upload_event {
__u64 conid;
__u64 randomid;
__u64 data_id;
__u64 prev_data_id;
__u64 total_size;
char buffer[MAX_TRANSMIT_SOCKET_READ_LENGTH];
};
Expand All @@ -44,7 +45,7 @@ struct {
__type(value, struct socket_data_upload_event);
__uint(max_entries, 1);
} socket_data_upload_event_per_cpu_map SEC(".maps");
DATA_QUEUE(socket_data_upload_queue, 1024 * 1024);
DATA_QUEUE(socket_data_upload_queue);

struct socket_data_sequence_t {
__u64 data_id;
Expand Down Expand Up @@ -81,6 +82,7 @@ struct upload_data_args {
__u64 random_id;

__u64 socket_data_id;
__u64 prev_socket_data_id;
struct iovec *socket_data_iovec;
size_t socket_data_iovlen;
ssize_t bytes_count;
Expand Down Expand Up @@ -129,11 +131,13 @@ static __always_inline void __upload_socket_data_with_buffer(void *ctx, __u8 ind
socket_data_event->randomid = args->random_id;
socket_data_event->total_size = args->bytes_count;
socket_data_event->data_id = args->socket_data_id;
socket_data_event->prev_data_id = args->prev_socket_data_id;

socket_data_event->sequence = index;
socket_data_event->data_len = size;
socket_data_event->finished = is_finished;
socket_data_event->have_reduce_after_chunk = have_reduce_after_chunk;
asm volatile("%[size] &= 0x7ff;\n" ::[size] "+r"(size) :);
bpf_probe_read(&socket_data_event->buffer, size, buf);
rover_submit_buf(ctx, &socket_data_upload_queue, socket_data_event, sizeof(*socket_data_event));
}
Expand Down Expand Up @@ -208,15 +212,38 @@ static __always_inline void upload_socket_data_iov(void *ctx, struct iovec* iov,
UPLOAD_PER_SOCKET_DATA_IOV();
}

struct socket_data_last_id_t {
__u64 random_id;
__u64 socket_data_id;
};
struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__uint(max_entries, 10000);
__type(key, __u64);
__type(value, struct socket_data_last_id_t);
} socket_data_last_id_map SEC(".maps");

static __inline void upload_socket_data(void *ctx, struct upload_data_args *args) {
// must have protocol and ssl must same(plain)
// if the connection data is needs to skip upload, then skip
if (args->connection_protocol == CONNECTION_PROTOCOL_UNKNOWN || args->connection_ssl != args->socket_data_ssl || args->connection_skip_data_upload == 1) {
return;
}
struct socket_data_last_id_t *latest = bpf_map_lookup_elem(&socket_data_last_id_map, &args->con_id);
args->prev_socket_data_id = 0;
if (latest != NULL && latest->random_id == args->random_id) {
args->prev_socket_data_id = latest->socket_data_id;
}
if (args->socket_data_buf != NULL) {
upload_socket_data_buf(ctx, args->socket_data_buf, args->bytes_count, args, args->socket_ssl_buffer_force_unfinished);
} else if (args->socket_data_iovec != NULL) {
upload_socket_data_iov(ctx, args->socket_data_iovec, args->socket_data_iovlen, args->bytes_count, args);
}

if (latest == NULL || latest->socket_data_id != args->socket_data_id) {
struct socket_data_last_id_t data = {};
data.random_id = args->random_id;
data.socket_data_id = args->socket_data_id;
bpf_map_update_elem(&socket_data_last_id_map, &args->con_id, &data, BPF_ANY);
}
}
2 changes: 1 addition & 1 deletion pkg/accesslog/collector/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx *common.AccessLogContext
if err != nil {
connectionLogger.Warnf("cannot create the connection tracker, %v", err)
}
c.eventQueue = btf.NewEventQueue(ctx.Config.ConnectionAnalyze.AnalyzeParallels,
c.eventQueue = btf.NewEventQueue("connection resolver", ctx.Config.ConnectionAnalyze.AnalyzeParallels,
ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext {
return newConnectionPartitionContext(ctx, track)
})
Expand Down
29 changes: 21 additions & 8 deletions pkg/accesslog/collector/protocols/http1.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,6 @@ func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelpe
}

messageType, err := p.reader.IdentityMessageType(buf)
log.Debugf("ready to reading message type, messageType: %v, buf: %p, data id: %d, "+
"connection ID: %d, random ID: %d, error: %v", messageType, buf, buf.Position().DataID(),
metrics.ConnectionID, metrics.RandomID, err)
if err != nil {
http1Log.Debugf("failed to identity message type, %v", err)
if buf.SkipCurrentElement() {
Expand All @@ -115,6 +112,9 @@ func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelpe
metrics.ConnectionID, metrics.RandomID, buf.Position().DataID(), err)
}

http1Log.Debugf("readed message, messageType: %v, buf: %p, data id: %d, "+
"connection ID: %d, random ID: %d, metrics : %p, handle result: %d",
messageType, buf, buf.Position().DataID(), metrics.ConnectionID, metrics.RandomID, metrics, result)
finishReading := false
switch result {
case enums.ParseResultSuccess:
Expand Down Expand Up @@ -149,13 +149,13 @@ func (p *HTTP1Protocol) handleRequest(metrics *HTTP1Metrics, buf *buffer.Buffer)
}

func (p *HTTP1Protocol) handleResponse(metrics *HTTP1Metrics, b *buffer.Buffer) (enums.ParseResult, error) {
firstRequest := metrics.halfRequests.Front()
if firstRequest == nil {
log.Debugf("cannot found request for response, skip response, connection ID: %d, random ID: %d",
metrics.ConnectionID, metrics.RandomID)
request := metrics.findMatchesRequest(b.Position().DataID(), b.Position().PrevDataID())
if request == nil {
log.Debugf("cannot found request for response, skip response, connection ID: %d, random ID: %d, "+
"required prev data id: %d, current data id: %d",
metrics.ConnectionID, metrics.RandomID, b.Position().PrevDataID(), b.Position().DataID())
return enums.ParseResultSkipPackage, nil
}
request := metrics.halfRequests.Remove(firstRequest).(*reader.Request)

// parsing response
response, result, err := p.reader.ReadResponse(request, b, true)
Expand Down Expand Up @@ -286,3 +286,16 @@ func (m *HTTP1Metrics) appendRequestToList(req *reader.Request) {
m.halfRequests.PushBack(req)
}
}

func (m *HTTP1Metrics) findMatchesRequest(currentDataID, prevDataID uint64) *reader.Request {
for element := m.halfRequests.Front(); element != nil; element = element.Next() {
req := element.Value.(*reader.Request)
// if the tail data id of request is equals to the prev data id of response
// or tail request data id+1==first response data id, then return the request
if uint64(req.MaxDataID()) == prevDataID || uint64(req.MaxDataID()+1) == currentDataID {
m.halfRequests.Remove(element)
return req
}
}
return nil
}
22 changes: 12 additions & 10 deletions pkg/accesslog/collector/protocols/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext) (*AnalyzeQueue, error) {
}

func (q *AnalyzeQueue) Start(ctx context.Context) {
q.eventQueue = btf.NewEventQueue(q.context.Config.ProtocolAnalyze.AnalyzeParallels, q.context.Config.ProtocolAnalyze.QueueSize,
q.eventQueue = btf.NewEventQueue("socket data analyzer",
q.context.Config.ProtocolAnalyze.AnalyzeParallels, q.context.Config.ProtocolAnalyze.QueueSize,
func(num int) btf.PartitionContext {
return NewPartitionContext(q.context, num, q.supportAnalyzers(q.context))
})
Expand Down Expand Up @@ -128,12 +129,13 @@ type PartitionContext struct {
func newPartitionConnection(protocolMgr *ProtocolManager, conID, randomID uint64,
protocol enums.ConnectionProtocol, currentDataID uint64) *PartitionConnection {
connection := &PartitionConnection{
connectionID: conID,
randomID: randomID,
dataBuffers: make(map[enums.ConnectionProtocol]*buffer.Buffer),
protocol: make(map[enums.ConnectionProtocol]uint64),
protocolAnalyzer: make(map[enums.ConnectionProtocol]Protocol),
protocolMetrics: make(map[enums.ConnectionProtocol]ProtocolMetrics),
connectionID: conID,
randomID: randomID,
dataBuffers: make(map[enums.ConnectionProtocol]*buffer.Buffer),
protocol: make(map[enums.ConnectionProtocol]uint64),
protocolAnalyzer: make(map[enums.ConnectionProtocol]Protocol),
protocolMetrics: make(map[enums.ConnectionProtocol]ProtocolMetrics),
lastCheckCloseTime: time.Now(),
}
connection.appendProtocolIfNeed(protocolMgr, conID, randomID, protocol, currentDataID)
return connection
Expand Down Expand Up @@ -173,7 +175,6 @@ func (p *PartitionContext) OnConnectionClose(event *events.SocketCloseEvent, clo
}
connection := conn.(*PartitionConnection)
connection.closeCallback = closeCallback
connection.closed = true
log.Debugf("receive the connection close event and mark is closable, connection ID: %d, random ID: %d, partition number: %d",
event.GetConnectionID(), event.GetRandomID(), p.partitionNum)
}
Expand Down Expand Up @@ -227,8 +228,9 @@ func (p *PartitionContext) Consume(data interface{}) {
connection.AppendDetail(p.context, event)
case *events.SocketDataUploadEvent:
pid, _ := events.ParseConnectionID(event.ConnectionID)
log.Debugf("receive the socket data event, connection ID: %d, random ID: %d, pid: %d, data id: %d, sequence: %d, protocol: %d",
event.ConnectionID, event.RandomID, pid, event.DataID0, event.Sequence0, event.Protocol0)
log.Debugf("receive the socket data event, connection ID: %d, random ID: %d, pid: %d, prev data id: %d, "+
"data id: %d, sequence: %d, protocol: %d",
event.ConnectionID, event.RandomID, pid, event.PrevDataID0, event.DataID0, event.Sequence0, event.Protocol0)
connection := p.getConnectionContext(event.ConnectionID, event.RandomID, event.Protocol0, event.DataID0)
connection.AppendData(event)
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/accesslog/collector/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package collector

import (
"sync"

"github.com/apache/skywalking-rover/pkg/accesslog/common"
"github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/module"
Expand All @@ -33,6 +35,7 @@ type TLSCollector struct {
context *common.AccessLogContext
monitoredProcesses map[int32]bool
linker *btf.Linker
mutex sync.Mutex
}

func NewTLSCollector() *TLSCollector {
Expand All @@ -57,6 +60,14 @@ func (c *TLSCollector) Stop() {
}

func (c *TLSCollector) OnNewProcessMonitoring(pid int32) {
go func() {
c.addProcess(pid)
}()
}

func (c *TLSCollector) addProcess(pid int32) {
c.mutex.Lock()
defer c.mutex.Unlock()
if _, ok := c.monitoredProcesses[pid]; ok {
return
}
Expand All @@ -83,5 +94,7 @@ func (c *TLSCollector) OnNewProcessMonitoring(pid int32) {
}

func (c *TLSCollector) OnProcessRemoved(pid int32) {
c.mutex.Lock()
defer c.mutex.Unlock()
delete(c.monitoredProcesses, pid)
}
Loading
Loading