diff --git a/CHANGES.md b/CHANGES.md index 3b22bbae..d9fc284c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -17,6 +17,7 @@ Release Notes. * Fix the profiling cannot found process issue. * Fix cannot translate peer address in some UDP scenarios. * Fix the protocol logs may be missing if the process is short-lived. +* Fix some connections not called close syscall, causing unnecessary memory usage. #### Documentation diff --git a/docker/Dockerfile.base b/docker/Dockerfile.base index f4bc05e7..0d57cf14 100644 --- a/docker/Dockerfile.base +++ b/docker/Dockerfile.base @@ -26,6 +26,7 @@ RUN apt update && \ cd bpftool && make -C src install && cp $(which bpftool) /usr/sbin/bpftool && \ wget https://apt.llvm.org/llvm.sh && \ chmod +x llvm.sh && \ - ./llvm.sh 18 + ./llvm.sh 18 && \ + apt install -y llvm-18 ENV PATH="${PATH}:/usr/lib/llvm-18/bin" \ No newline at end of file diff --git a/pkg/accesslog/common/connection.go b/pkg/accesslog/common/connection.go index 72fbce47..71d10566 100644 --- a/pkg/accesslog/common/connection.go +++ b/pkg/accesslog/common/connection.go @@ -18,6 +18,7 @@ package common import ( + "context" "errors" "fmt" "strings" @@ -34,7 +35,9 @@ import ( "github.com/apache/skywalking-rover/pkg/process/finders/kubernetes" "github.com/apache/skywalking-rover/pkg/tools" "github.com/apache/skywalking-rover/pkg/tools/enums" + "github.com/apache/skywalking-rover/pkg/tools/host" "github.com/apache/skywalking-rover/pkg/tools/ip" + "github.com/apache/skywalking-rover/pkg/tools/path" "github.com/cilium/ebpf" @@ -46,8 +49,13 @@ import ( v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3" ) -// only using to match the remote IP address -const localAddressPairCacheTime = time.Second * 15 +const ( + // only using to match the remote IP address + localAddressPairCacheTime = time.Second * 15 + + // clean the active connection in BPF interval + cleanActiveConnectionInterval = time.Second * 20 +) type addressProcessType int @@ -157,8 +165,56 @@ func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader * return mgr } -func (c *ConnectionManager) Start() { +func (c *ConnectionManager) Start(ctx context.Context, accessLogContext *AccessLogContext) { c.processOP.AddListener(c) + + // starting to clean up the un-active connection in BPF + go func() { + ticker := time.NewTicker(cleanActiveConnectionInterval) + for { + select { + case <-ticker.C: + activeConnections := c.activeConnectionMap.Iterate() + var conID uint64 + var activateConn ActiveConnection + for activeConnections.Next(&conID, &activateConn) { + // if the connection is existed, then check the next one + pid, fd := events.ParseConnectionID(conID) + if c.checkProcessFDExist(pid, fd) { + continue + } + + // if the connection is not existed, then delete it + if err := c.activeConnectionMap.Delete(conID); err != nil { + log.Warnf("failed to delete the active connection, pid: %d, fd: %d, connection ID: %d, random ID: %d, error: %v", + pid, fd, conID, activateConn.RandomID, err) + continue + } + log.Debugf("deleted the active connection as not exist in file system, pid: %d, fd: %d, connection ID: %d, random ID: %d", + pid, fd, conID, activateConn.RandomID) + + // building and send the close event + wapperedEvent := c.OnConnectionClose(&events.SocketCloseEvent{ + ConnectionID: conID, + RandomID: activateConn.RandomID, + StartTime: 0, + EndTime: 0, + PID: activateConn.PID, + SocketFD: activateConn.SocketFD, + Success: 0, + }) + accessLogContext.Queue.AppendKernelLog(LogTypeClose, wapperedEvent) + } + + case <-ctx.Done(): + return + } + } + }() +} + +func (c *ConnectionManager) checkProcessFDExist(pid, fd uint32) bool { + return path.Exists(host.GetFileInHost(fmt.Sprintf("/proc/%d/fd/%d", pid, fd))) } func (c *ConnectionManager) Stop() { @@ -358,20 +414,20 @@ func (c *ConnectionManager) OnConnectionClose(event *events.SocketCloseEvent) *C return result } -func (c *ConnectionManager) savingTheAddress(host string, port uint16, localPid bool, pid uint32) { +func (c *ConnectionManager) savingTheAddress(hostAddress string, port uint16, localPid bool, pid uint32) { localAddrInfo := &addressInfo{ pid: pid, } - c.addressWithPid.Set(fmt.Sprintf("%s_%d_%t", host, port, localPid), localAddrInfo, localAddressPairCacheTime) + c.addressWithPid.Set(fmt.Sprintf("%s_%d_%t", hostAddress, port, localPid), localAddrInfo, localAddressPairCacheTime) localStr := strRemote if localPid { localStr = strLocal } - log.Debugf("saving the %s address with pid cache, address: %s:%d, pid: %d", localStr, host, port, pid) + log.Debugf("saving the %s address with pid cache, address: %s:%d, pid: %d", localStr, hostAddress, port, pid) } -func (c *ConnectionManager) getAddressPid(host string, port uint16, localPid bool) *addressInfo { - addrInfo, ok := c.addressWithPid.Get(fmt.Sprintf("%s_%d_%t", host, port, localPid)) +func (c *ConnectionManager) getAddressPid(hostAddress string, port uint16, localPid bool) *addressInfo { + addrInfo, ok := c.addressWithPid.Get(fmt.Sprintf("%s_%d_%t", hostAddress, port, localPid)) if ok && addrInfo != nil { return addrInfo.(*addressInfo) } diff --git a/pkg/accesslog/forwarder/close.go b/pkg/accesslog/forwarder/close.go index 239c18dd..5afbec3e 100644 --- a/pkg/accesslog/forwarder/close.go +++ b/pkg/accesslog/forwarder/close.go @@ -34,6 +34,9 @@ func SendCloseEvent(context *common.AccessLogContext, event *common.CloseEventWi func closeLogBuilder(event events.Event) *v3.AccessLogKernelLog { closeEvent := event.(*common.CloseEventWithNotify) + if closeEvent.StartTime == 0 { + return nil + } closeOp := &v3.AccessLogKernelCloseOperation{} closeOp.StartTime = BuildOffsetTimestamp(closeEvent.StartTime) closeOp.EndTime = BuildOffsetTimestamp(closeEvent.EndTime) diff --git a/pkg/accesslog/runner.go b/pkg/accesslog/runner.go index 8164e814..e79a81b3 100644 --- a/pkg/accesslog/runner.go +++ b/pkg/accesslog/runner.go @@ -88,7 +88,7 @@ func (r *Runner) Start(ctx context.Context) error { r.ctx = ctx r.context.RuntimeContext = ctx r.context.Queue.Start(ctx) - r.context.ConnectionMgr.Start() + r.context.ConnectionMgr.Start(ctx, r.context) for _, c := range r.collectors { err := c.Start(r.mgr, r.context) if err != nil {