From 4b7d6c8fb8f1f82412c263705a89bc2424332a23 Mon Sep 17 00:00:00 2001 From: Mario Macias Date: Tue, 24 Oct 2023 09:28:57 +0200 Subject: [PATCH] Avoid duplication/triplication/n-plication of traces for the same process (#371) * Failint tests for multi-process * Avoid multi-instrumentation of the same process --- pkg/internal/discover/attacher.go | 47 +++++++++-- pkg/internal/discover/typer.go | 5 +- .../testserver/Dockerfile_duplicate | 28 +++++++ .../testserver/duplicate_testserver.sh | 12 +++ test/integration/docker-compose-multiexec.yml | 22 ++++++ test/integration/multiprocess_test.go | 79 +++++++++++++++++++ test/integration/suites_test.go | 20 ----- 7 files changed, 185 insertions(+), 28 deletions(-) create mode 100644 test/integration/components/testserver/Dockerfile_duplicate create mode 100644 test/integration/components/testserver/duplicate_testserver.sh create mode 100644 test/integration/multiprocess_test.go diff --git a/pkg/internal/discover/attacher.go b/pkg/internal/discover/attacher.go index 3a6400c54..8a87459ef 100644 --- a/pkg/internal/discover/attacher.go +++ b/pkg/internal/discover/attacher.go @@ -3,6 +3,7 @@ package discover import ( "context" "fmt" + "hash/fnv" "log/slog" "os" "path" @@ -21,21 +22,25 @@ import ( // for each received Instrumentable process and forwards an ebpf.ProcessTracer instance ready to run and start // instrumenting the executable type TraceAttacher struct { + log *slog.Logger Cfg *pipe.Config Ctx context.Context DiscoveredTracers chan *ebpf.ProcessTracer Metrics imetrics.Reporter - log *slog.Logger + // keeps a copy of all the tracers for a given executable path + existingTracers map[string]*ebpf.ProcessTracer } func TraceAttacherProvider(ta TraceAttacher) (node.TerminalFunc[[]Event[Instrumentable]], error) { ta.log = slog.With("component", "discover.TraceAttacher") + ta.existingTracers = map[string]*ebpf.ProcessTracer{} + return func(in <-chan []Event[Instrumentable]) { mainLoop: for instrumentables := range in { for _, instr := range instrumentables { - if pt, ok := ta.getTracer(instr.Obj); ok { + if pt, ok := ta.getTracer(&instr.Obj); ok { // we can create multiple tracers for the same executable (ran from different processes) // even if we just need to instrument the executable once. TODO: deduplicate ta.DiscoveredTracers <- pt @@ -51,8 +56,17 @@ func TraceAttacherProvider(ta TraceAttacher) (node.TerminalFunc[[]Event[Instrume }, nil } -func (ta *TraceAttacher) getTracer(ie Instrumentable) (*ebpf.ProcessTracer, bool) { - // gets the +func (ta *TraceAttacher) getTracer(ie *Instrumentable) (*ebpf.ProcessTracer, bool) { + if tracer, ok := ta.existingTracers[ie.FileInfo.CmdExePath]; ok { + ta.log.Info("new process for already instrumented executable", + "pid", ie.FileInfo.Pid, + "exec", ie.FileInfo.CmdExePath) + _ = tracer + return nil, false + } + ta.log.Info("instrumenting process", "cmd", ie.FileInfo.CmdExePath, "pid", ie.FileInfo.Pid) + + // builds a tracer for that executable var programs []ebpf.Tracer switch ie.Type { case svc.InstrumentableGolang: @@ -82,14 +96,33 @@ func (ta *TraceAttacher) getTracer(ie Instrumentable) (*ebpf.ProcessTracer, bool return nil, false } - return &ebpf.ProcessTracer{ + tracer := &ebpf.ProcessTracer{ Programs: programs, ELFInfo: ie.FileInfo, Goffsets: ie.Offsets, Exe: exe, - PinPath: path.Join(ta.Cfg.EBPF.BpfBaseDir, fmt.Sprintf("%d-%d", os.Getpid(), ie.FileInfo.Pid)), + PinPath: ta.buildPinPath(ie), SystemWide: ta.Cfg.Discovery.SystemWide, - }, true + } + ta.existingTracers[ie.FileInfo.CmdExePath] = tracer + return tracer, true +} + +// pinpath must be unique for a given executable group +// it will be: +// - current beyla PID +// - PID of the first process that matched that executable +// (don't mind if that process stops and other processes of the same executable keep using this pinPath) +// - Hash of the executable path +// +// This way we prevent improbable (almost impossible) collisions of the exec hash +// or that the first process stopped and a different process with the same PID +// started, with a different executable +func (ta *TraceAttacher) buildPinPath(ie *Instrumentable) string { + execHash := fnv.New32() + _, _ = execHash.Write([]byte(ie.FileInfo.CmdExePath)) + return path.Join(ta.Cfg.EBPF.BpfBaseDir, + fmt.Sprintf("%d-%d-%x", os.Getpid(), ie.FileInfo.Pid, execHash.Sum32())) } // filterNotFoundPrograms will filter these programs whose required functions (as diff --git a/pkg/internal/discover/typer.go b/pkg/internal/discover/typer.go index d7838ddc4..40b23df8a 100644 --- a/pkg/internal/discover/typer.go +++ b/pkg/internal/discover/typer.go @@ -91,7 +91,10 @@ func (t *typer) FilterClassify(evs []Event[ProcessMatch]) []Event[Instrumentable // if we found a process and returned its parent, it might be already // instrumented. We skip it in that case if _, ok := t.instrumentedPids[inst.FileInfo.Pid]; !ok { - t.log.Info("instrumenting process", "cmd", inst.FileInfo.CmdExePath, "pid", inst.FileInfo.Pid) + t.log.Debug( + "found an instrumentable process", + "type", inst.Type.String(), + "exec", inst.FileInfo.CmdExePath, "pid", inst.FileInfo.Pid) out = append(out, Event[Instrumentable]{Type: EventCreated, Obj: inst}) t.instrumentedPids[inst.FileInfo.Pid] = struct{}{} } diff --git a/test/integration/components/testserver/Dockerfile_duplicate b/test/integration/components/testserver/Dockerfile_duplicate new file mode 100644 index 000000000..afb403312 --- /dev/null +++ b/test/integration/components/testserver/Dockerfile_duplicate @@ -0,0 +1,28 @@ +# Build the testserver binary +# Docker command must be invoked from the projec root directory +FROM golang:1.21 as builder + +ARG TARGETARCH + +ENV GOARCH=$TARGETARCH + +WORKDIR /src + +# Copy the go manifests and source +COPY vendor/ vendor/ +COPY test/ test/ +COPY go.mod go.mod +COPY go.sum go.sum + +# Build +RUN go build -o testserver ./test/integration/components/testserver/testserver.go + +# Create final image from minimal + built binary +FROM debian:bookworm-slim + +WORKDIR / +COPY --from=builder /src/test/integration/components/testserver/duplicate_testserver.sh . +COPY --from=builder /src/testserver dupe_testserver +USER 0:0 + +CMD [ "sh", "/duplicate_testserver.sh" ] \ No newline at end of file diff --git a/test/integration/components/testserver/duplicate_testserver.sh b/test/integration/components/testserver/duplicate_testserver.sh new file mode 100644 index 000000000..5c447b4c9 --- /dev/null +++ b/test/integration/components/testserver/duplicate_testserver.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env sh + +run_testserver() +{ + # prefix for start ports. E.g. 808 + sp=$1 + STD_PORT=${1}0 GIN_PORT=${1}1 GORILLA_PORT=${1}2 GORILLA_MID_PORT=${1}3 ./dupe_testserver -port ${1}4 +} + +# runs testserver twice +run_testserver 1808 & +run_testserver 1809 diff --git a/test/integration/docker-compose-multiexec.yml b/test/integration/docker-compose-multiexec.yml index 041d1ab10..2361d3d24 100644 --- a/test/integration/docker-compose-multiexec.yml +++ b/test/integration/docker-compose-multiexec.yml @@ -10,6 +10,14 @@ services: - "8080:8080" environment: LOG_LEVEL: DEBUG + # another instance of the above image. Used to test the deduplication + # of metrics when they come from the same executable file + testserver-unused: + image: hatest-testserver + ports: + - "38080:8080" + environment: + LOG_LEVEL: DEBUG testserver1: build: @@ -20,6 +28,20 @@ services: - "8900:8900" environment: LOG_LEVEL: DEBUG + + # image that runs two instances of the 'testserver' executable + # Used to test the deduplication + # of metrics when they come from the same executable file + testserver-duplicate: + build: + context: ../.. + dockerfile: test/integration/components/testserver/Dockerfile_duplicate + image: hatest-testserver-duplicate + ports: + - "18080:18080" + - "18090:18090" + environment: + LOG_LEVEL: DEBUG autoinstrumenter: build: diff --git a/test/integration/multiprocess_test.go b/test/integration/multiprocess_test.go new file mode 100644 index 000000000..96126c1b2 --- /dev/null +++ b/test/integration/multiprocess_test.go @@ -0,0 +1,79 @@ +//go:build integration + +package integration + +import ( + "net/http" + "path" + "testing" + "time" + + "github.com/mariomac/guara/pkg/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/beyla/test/integration/components/docker" + "github.com/grafana/beyla/test/integration/components/prom" +) + +func TestMultiProcess(t *testing.T) { + compose, err := docker.ComposeSuite("docker-compose-multiexec.yml", path.Join(pathOutput, "test-suite-multiexec.log")) + // we are going to setup discovery directly in the configuration file + compose.Env = append(compose.Env, `EXECUTABLE_NAME=`, `OPEN_PORT=`) + require.NoError(t, err) + require.NoError(t, compose.Up()) + t.Run("Go RED metrics: usual service", func(t *testing.T) { + waitForTestComponents(t, instrumentedServiceStdURL) + testREDMetricsForHTTPLibrary(t, instrumentedServiceStdURL, "testserver", "initial-set") + // checks that, instrumenting the process from this container, + // it doesn't instrument too the process from the other container + checkReportedOnlyOnce(t, instrumentedServiceStdURL, "testserver") + }) + t.Run("Go RED metrics: service 1", func(t *testing.T) { + waitForTestComponents(t, "http://localhost:8900") + testREDMetricsForHTTPLibrary(t, "http://localhost:8900", "rename1", "initial-set") + // checks that, instrumenting the process from this container, + // it doesn't instrument too the process from the other container + checkReportedOnlyOnce(t, "http://localhost:8900", "rename1") + }) + t.Run("Processes in the same host are instrumented once and only once", func(t *testing.T) { + waitForTestComponents(t, "http://localhost:18080") + checkReportedOnlyOnce(t, "http://localhost:18080", "dupe_testserver") + }) + + t.Run("BPF pinning folders mounted", func(t *testing.T) { + // 1 pinned map for testserver and testserver-unused containers + // 1 pinned map for testserver1 container + // 1 pinned map for all the processes in testserver-duplicate container + testBPFPinningMountedWithCount(t, 3) + }) + + require.NoError(t, compose.Close()) + t.Run("BPF pinning folder unmounted", testBPFPinningUnmounted) +} + +// Addresses bug https://github.com/grafana/beyla/issues/370 for Go executables +// Prevents that two instances of the same process report traces or metrics by duplicate +func checkReportedOnlyOnce(t *testing.T, baseURL, serviceName string) { + const path = "/check-only-once" + for i := 0; i < 3; i++ { + resp, err := http.Get(baseURL + path) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + } + pq := prom.Client{HostPort: prometheusHostPort} + var results []prom.Result + test.Eventually(t, testTimeout, func(t require.TestingT) { + var err error + results, err = pq.Query(`http_server_duration_seconds_count{` + + `http_method="GET",` + + `http_status_code="200",` + + `service_name="` + serviceName + `",` + + `http_target="` + path + `"}`) + require.NoError(t, err) + // check duration_count has 3 calls and all the arguments + require.Len(t, results, 1) + assert.Equal(t, 3, totalPromCount(t, results)) + }, test.Interval(1000*time.Millisecond)) + +} diff --git a/test/integration/suites_test.go b/test/integration/suites_test.go index 7453fa72b..fe1e740d1 100644 --- a/test/integration/suites_test.go +++ b/test/integration/suites_test.go @@ -377,23 +377,3 @@ func TestSuiteNoRoutes(t *testing.T) { require.NoError(t, compose.Close()) t.Run("BPF pinning folder unmounted", testBPFPinningUnmounted) } - -func TestSuite_MultiExec(t *testing.T) { - compose, err := docker.ComposeSuite("docker-compose-multiexec.yml", path.Join(pathOutput, "test-suite-multiexec.log")) - // we are going to setup discovery directly in the configuration file - compose.Env = append(compose.Env, `EXECUTABLE_NAME=`, `OPEN_PORT=`) - require.NoError(t, err) - require.NoError(t, compose.Up()) - t.Run("Go RED metrics: usual service", func(t *testing.T) { - waitForTestComponents(t, instrumentedServiceStdURL) - testREDMetricsForHTTPLibrary(t, instrumentedServiceStdURL, "testserver", "initial-set") - }) - t.Run("Go RED metrics: service 1", func(t *testing.T) { - waitForTestComponents(t, "http://localhost:8900") - testREDMetricsForHTTPLibrary(t, "http://localhost:8900", "rename1", "initial-set") - }) - t.Run("BPF pinning folders mounted", func(t *testing.T) { testBPFPinningMountedWithCount(t, 2) }) - - require.NoError(t, compose.Close()) - t.Run("BPF pinning folder unmounted", testBPFPinningUnmounted) -}