Skip to content

Commit

Permalink
Fix some connections not called close syscall, causing unnecessary me…
Browse files Browse the repository at this point in the history
…mory usage (#136)
  • Loading branch information
mrproliu authored Aug 1, 2024
1 parent aa05b04 commit 608d1e6
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Release Notes.
* Fix the profiling cannot found process issue.
* Fix cannot translate peer address in some UDP scenarios.
* Fix the protocol logs may be missing if the process is short-lived.
* Fix some connections not called close syscall, causing unnecessary memory usage.

#### Documentation

Expand Down
3 changes: 2 additions & 1 deletion docker/Dockerfile.base
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ RUN apt update && \
cd bpftool && make -C src install && cp $(which bpftool) /usr/sbin/bpftool && \
wget https://apt.llvm.org/llvm.sh && \
chmod +x llvm.sh && \
./llvm.sh 18
./llvm.sh 18 && \
apt install -y llvm-18

ENV PATH="${PATH}:/usr/lib/llvm-18/bin"
72 changes: 64 additions & 8 deletions pkg/accesslog/common/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package common

import (
"context"
"errors"
"fmt"
"strings"
Expand All @@ -34,7 +35,9 @@ import (
"github.com/apache/skywalking-rover/pkg/process/finders/kubernetes"
"github.com/apache/skywalking-rover/pkg/tools"
"github.com/apache/skywalking-rover/pkg/tools/enums"
"github.com/apache/skywalking-rover/pkg/tools/host"
"github.com/apache/skywalking-rover/pkg/tools/ip"
"github.com/apache/skywalking-rover/pkg/tools/path"

"github.com/cilium/ebpf"

Expand All @@ -46,8 +49,13 @@ import (
v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
)

// only using to match the remote IP address
const localAddressPairCacheTime = time.Second * 15
const (
// only using to match the remote IP address
localAddressPairCacheTime = time.Second * 15

// clean the active connection in BPF interval
cleanActiveConnectionInterval = time.Second * 20
)

type addressProcessType int

Expand Down Expand Up @@ -157,8 +165,56 @@ func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader *
return mgr
}

func (c *ConnectionManager) Start() {
func (c *ConnectionManager) Start(ctx context.Context, accessLogContext *AccessLogContext) {
c.processOP.AddListener(c)

// starting to clean up the un-active connection in BPF
go func() {
ticker := time.NewTicker(cleanActiveConnectionInterval)
for {
select {
case <-ticker.C:
activeConnections := c.activeConnectionMap.Iterate()
var conID uint64
var activateConn ActiveConnection
for activeConnections.Next(&conID, &activateConn) {
// if the connection is existed, then check the next one
pid, fd := events.ParseConnectionID(conID)
if c.checkProcessFDExist(pid, fd) {
continue
}

// if the connection is not existed, then delete it
if err := c.activeConnectionMap.Delete(conID); err != nil {
log.Warnf("failed to delete the active connection, pid: %d, fd: %d, connection ID: %d, random ID: %d, error: %v",
pid, fd, conID, activateConn.RandomID, err)
continue
}
log.Debugf("deleted the active connection as not exist in file system, pid: %d, fd: %d, connection ID: %d, random ID: %d",
pid, fd, conID, activateConn.RandomID)

// building and send the close event
wapperedEvent := c.OnConnectionClose(&events.SocketCloseEvent{
ConnectionID: conID,
RandomID: activateConn.RandomID,
StartTime: 0,
EndTime: 0,
PID: activateConn.PID,
SocketFD: activateConn.SocketFD,
Success: 0,
})
accessLogContext.Queue.AppendKernelLog(LogTypeClose, wapperedEvent)
}

case <-ctx.Done():
return
}
}
}()
}

func (c *ConnectionManager) checkProcessFDExist(pid, fd uint32) bool {
return path.Exists(host.GetFileInHost(fmt.Sprintf("/proc/%d/fd/%d", pid, fd)))
}

func (c *ConnectionManager) Stop() {
Expand Down Expand Up @@ -358,20 +414,20 @@ func (c *ConnectionManager) OnConnectionClose(event *events.SocketCloseEvent) *C
return result
}

func (c *ConnectionManager) savingTheAddress(host string, port uint16, localPid bool, pid uint32) {
func (c *ConnectionManager) savingTheAddress(hostAddress string, port uint16, localPid bool, pid uint32) {
localAddrInfo := &addressInfo{
pid: pid,
}
c.addressWithPid.Set(fmt.Sprintf("%s_%d_%t", host, port, localPid), localAddrInfo, localAddressPairCacheTime)
c.addressWithPid.Set(fmt.Sprintf("%s_%d_%t", hostAddress, port, localPid), localAddrInfo, localAddressPairCacheTime)
localStr := strRemote
if localPid {
localStr = strLocal
}
log.Debugf("saving the %s address with pid cache, address: %s:%d, pid: %d", localStr, host, port, pid)
log.Debugf("saving the %s address with pid cache, address: %s:%d, pid: %d", localStr, hostAddress, port, pid)
}

func (c *ConnectionManager) getAddressPid(host string, port uint16, localPid bool) *addressInfo {
addrInfo, ok := c.addressWithPid.Get(fmt.Sprintf("%s_%d_%t", host, port, localPid))
func (c *ConnectionManager) getAddressPid(hostAddress string, port uint16, localPid bool) *addressInfo {
addrInfo, ok := c.addressWithPid.Get(fmt.Sprintf("%s_%d_%t", hostAddress, port, localPid))
if ok && addrInfo != nil {
return addrInfo.(*addressInfo)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/accesslog/forwarder/close.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ func SendCloseEvent(context *common.AccessLogContext, event *common.CloseEventWi

func closeLogBuilder(event events.Event) *v3.AccessLogKernelLog {
closeEvent := event.(*common.CloseEventWithNotify)
if closeEvent.StartTime == 0 {
return nil
}
closeOp := &v3.AccessLogKernelCloseOperation{}
closeOp.StartTime = BuildOffsetTimestamp(closeEvent.StartTime)
closeOp.EndTime = BuildOffsetTimestamp(closeEvent.EndTime)
Expand Down
2 changes: 1 addition & 1 deletion pkg/accesslog/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (r *Runner) Start(ctx context.Context) error {
r.ctx = ctx
r.context.RuntimeContext = ctx
r.context.Queue.Start(ctx)
r.context.ConnectionMgr.Start()
r.context.ConnectionMgr.Start(ctx, r.context)
for _, c := range r.collectors {
err := c.Start(r.mgr, r.context)
if err != nil {
Expand Down

0 comments on commit 608d1e6

Please sign in to comment.