Skip to content

Commit

Permalink
Merge pull request #15 from grcevski/sqlhostaddress_ext_1
Browse files Browse the repository at this point in the history
Added an integration test
  • Loading branch information
esara authored Oct 17, 2024
2 parents cea47c9 + f1da56c commit a254148
Show file tree
Hide file tree
Showing 334 changed files with 2,198 additions and 36,348 deletions.
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,11 @@ KIND = $(TOOLS_DIR)/kind
DASHBOARD_LINTER = $(TOOLS_DIR)/dashboard-linter
GINKGO = $(TOOLS_DIR)/ginkgo

GOIMPORTS_REVISER_ARGS = -company-prefixes github.com/grafana -project-name github.com/grafana/beyla/

define check_format
$(shell $(foreach FILE, $(shell find . -name "*.go" -not -path "**/vendor/*"), \
$(GOIMPORTS_REVISER) -company-prefixes github.com/grafana -list-diff -output stdout $(FILE);))
$(GOIMPORTS_REVISER) $(GOIMPORTS_REVISER_ARGS) -list-diff -output stdout $(FILE);))
endef


Expand Down Expand Up @@ -121,7 +123,7 @@ prereqs: install-hooks
fmt: prereqs
@echo "### Formatting code and fixing imports"
@$(foreach FILE, $(shell find . -name "*.go" -not -path "**/vendor/*"), \
$(GOIMPORTS_REVISER) -company-prefixes github.com/grafana $(FILE);)
$(GOIMPORTS_REVISER) $(GOIMPORTS_REVISER_ARGS) $(FILE);)

.PHONY: checkfmt
checkfmt:
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/grafana/beyla

go 1.23
go 1.23.0

require (
github.com/AlessandroPomponio/go-gibberish v0.0.0-20191004143433-a2d4156f0396
Expand All @@ -13,6 +13,7 @@ require (
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.1
github.com/grafana/beyla-k8s-cache v0.0.0-20241017152938-23a6d2229caf
github.com/grafana/go-offsets-tracker v0.1.7
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/mariomac/guara v0.0.0-20230621100729-42bd7716e524
Expand Down Expand Up @@ -62,7 +63,6 @@ require (
google.golang.org/grpc v1.67.1
google.golang.org/protobuf v1.34.2
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.31.1
k8s.io/apimachinery v0.31.1
k8s.io/client-go v0.31.1
sigs.k8s.io/e2e-framework v0.3.0
Expand Down Expand Up @@ -171,9 +171,9 @@ require (
golang.org/x/time v0.5.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
k8s.io/api v0.31.1 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20240620174524-b456828f718b // indirect
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grafana/beyla-k8s-cache v0.0.0-20241017152938-23a6d2229caf h1:OQnoOTdx+7aCdNZqYGRZMNNdmyToL6U3DG3IoZJOeX8=
github.com/grafana/beyla-k8s-cache v0.0.0-20241017152938-23a6d2229caf/go.mod h1:SMZM7CaUstB0UGj+Wf4PWHPRCJ6cO/ax9ebUyb0Dt1o=
github.com/grafana/go-offsets-tracker v0.1.7 h1:2zBQ7iiGzvyXY7LA8kaaSiEqH/Yx82UcfRabbY5aOG4=
github.com/grafana/go-offsets-tracker v0.1.7/go.mod h1:qcQdu7zlUKIFNUdBJlLyNHuJGW0SKWKjkrN6jtt+jds=
github.com/grafana/opentelemetry-go v1.28.0-grafana.3 h1:vExZiZKDZTdDi7fP1GG3GOGuoZ0GNu76408tNXfsnD0=
Expand Down Expand Up @@ -435,8 +437,6 @@ google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWn
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/evanphx/json-patch.v4 v4.12.0 h1:n6jtcsulIzXPJaxegRbvFNNrZDjbij7ny3gmSPG+6V4=
gopkg.in/evanphx/json-patch.v4 v4.12.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWMG4EsWvDvM72M=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
5 changes: 0 additions & 5 deletions pkg/components/beyla.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,6 @@ func RunBeyla(ctx context.Context, cfg *beyla.Config) error {

func setupAppO11y(ctx context.Context, ctxInfo *global.ContextInfo, config *beyla.Config) error {
slog.Info("starting Beyla in Application Observability mode")
// TODO: when we split Beyla in two processes with different permissions, this code can be split:
// in two parts:
// 1st process (privileged) - Invoke FindTarget, which also mounts the BPF maps
// 2nd executable (unprivileged) - Invoke ReadAndForward, receiving the BPF map mountpoint as argument

instr := appolly.New(ctx, ctxInfo, config)
if err := instr.FindAndInstrument(); err != nil {
return fmt.Errorf("can't find target process: %w", err)
Expand Down
14 changes: 6 additions & 8 deletions pkg/internal/appolly/appolly.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/grafana/beyla/pkg/internal/pipe"
"github.com/grafana/beyla/pkg/internal/pipe/global"
"github.com/grafana/beyla/pkg/internal/request"
"github.com/grafana/beyla/pkg/internal/transform/kube"
)

func log() *slog.Logger {
Expand Down Expand Up @@ -124,17 +123,16 @@ func setupKubernetes(ctx context.Context, ctxInfo *global.ContextInfo) {
return
}

informer, err := ctxInfo.K8sInformer.Get(ctx)
if err != nil {
if err := refreshK8sInformerCache(ctx, ctxInfo); err != nil {
slog.Error("can't init Kubernetes informer. You can't setup Kubernetes discovery and your"+
" traces won't be decorated with Kubernetes metadata", "error", err)
ctxInfo.K8sInformer.ForceDisable()
return
}
}

if ctxInfo.AppO11y.K8sDatabase, err = kube.StartDatabase(informer); err != nil {
slog.Error("can't setup Kubernetes database. Your traces won't be decorated with Kubernetes metadata",
"error", err)
ctxInfo.K8sInformer.ForceDisable()
}
func refreshK8sInformerCache(ctx context.Context, ctxInfo *global.ContextInfo) error {
// force the cache to be populated and cached
_, err := ctxInfo.K8sInformer.Get(ctx)
return err
}
3 changes: 2 additions & 1 deletion pkg/internal/discover/attacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func (ta *TraceAttacher) attacherLoop() (pipe.FinalFunc[[]Event[ebpf.Instrumenta
mainLoop:
for instrumentables := range in {
for _, instr := range instrumentables {
ta.log.Debug("Instrumentable", "len", len(instrumentables), "inst", instr)
ta.log.Debug("Instrumentable", "created", instr.Type, "type", instr.Obj.Type,
"exec", instr.Obj.FileInfo.CmdExePath, "pid", instr.Obj.FileInfo.Pid)
switch instr.Type {
case EventCreated:
ta.processInstances.Inc(instr.Obj.FileInfo.Ino)
Expand Down
22 changes: 15 additions & 7 deletions pkg/internal/discover/container_updater.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,45 @@
package discover

import (
"context"
"fmt"
"log/slog"

"github.com/mariomac/pipes/pipe"

"github.com/grafana/beyla/pkg/internal/ebpf"
"github.com/grafana/beyla/pkg/internal/transform/kube"
"github.com/grafana/beyla/pkg/internal/kube"
)

// ContainerDBUpdaterProvider is a stage in the Process Finder pipeline that will be
// enabled only if Kubernetes decoration is enabled.
// It just updates part of the kubernetes database when a new process is discovered.
func ContainerDBUpdaterProvider(enabled bool, db *kube.Database) pipe.MiddleProvider[[]Event[ebpf.Instrumentable], []Event[ebpf.Instrumentable]] {
func ContainerDBUpdaterProvider(ctx context.Context, meta kubeMetadataProvider) pipe.MiddleProvider[[]Event[ebpf.Instrumentable], []Event[ebpf.Instrumentable]] {
return func() (pipe.MiddleFunc[[]Event[ebpf.Instrumentable], []Event[ebpf.Instrumentable]], error) {
if !enabled {
if !meta.IsKubeEnabled() {
return pipe.Bypass[[]Event[ebpf.Instrumentable]](), nil
}
return updateLoop(db), nil
store, err := meta.Get(ctx)
if err != nil {
return nil, fmt.Errorf("instantiating ContainerDBUpdater: %w", err)
}
return updateLoop(store), nil
}
}

func updateLoop(db *kube.Database) pipe.MiddleFunc[[]Event[ebpf.Instrumentable], []Event[ebpf.Instrumentable]] {
func updateLoop(db *kube.Store) pipe.MiddleFunc[[]Event[ebpf.Instrumentable], []Event[ebpf.Instrumentable]] {
log := slog.With("component", "ContainerDBUpdater")
return func(in <-chan []Event[ebpf.Instrumentable], out chan<- []Event[ebpf.Instrumentable]) {
for instrumentables := range in {
for i := range instrumentables {
ev := &instrumentables[i]
switch ev.Type {
case EventCreated:
log.Debug("adding process", "pid", ev.Obj.FileInfo.Pid)
db.AddProcess(uint32(ev.Obj.FileInfo.Pid))
case EventDeleted:
// we don't need to handle process deletion from here, as the Kubernetes informer will
// remove the process from the database when the Pod that contains it is deleted.
// However we clean-up the performance related caches, in case we miss pod removal event
db.CleanProcessCaches(ev.Obj.FileInfo.Ns)
}
}
out <- instrumentables
Expand Down
3 changes: 1 addition & 2 deletions pkg/internal/discover/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ func (pf *ProcessFinder) Start() (<-chan *ebpf.Instrumentable, <-chan *ebpf.Inst
WatcherKubeEnricherProvider(pf.ctx, pf.ctxInfo.K8sInformer))
pipe.AddMiddleProvider(gb, criteriaMatcher, CriteriaMatcherProvider(pf.cfg))
pipe.AddMiddleProvider(gb, execTyper, ExecTyperProvider(pf.cfg, pf.ctxInfo.Metrics))
pipe.AddMiddleProvider(gb, containerDBUpdater,
ContainerDBUpdaterProvider(pf.ctxInfo.K8sInformer.IsKubeEnabled(), pf.ctxInfo.AppO11y.K8sDatabase))
pipe.AddMiddleProvider(gb, containerDBUpdater, ContainerDBUpdaterProvider(pf.ctx, pf.ctxInfo.K8sInformer))
pipe.AddFinalProvider(gb, traceAttacher, TraceAttacherProvider(&TraceAttacher{
Cfg: pf.cfg,
Ctx: pf.ctx,
Expand Down
Loading

0 comments on commit a254148

Please sign in to comment.