diff --git a/pkg/components/beyla.go b/pkg/components/beyla.go index dee2cb957..99f714137 100644 --- a/pkg/components/beyla.go +++ b/pkg/components/beyla.go @@ -113,6 +113,7 @@ func buildCommonContextInfo( ResyncPeriod: config.Attributes.Kubernetes.InformersResyncPeriod, DisabledInformers: config.Attributes.Kubernetes.DisableInformers, MetaCacheAddr: config.Attributes.Kubernetes.MetaCacheAddress, + MetaSourceLabels: config.Attributes.Kubernetes.MetaSourceLabels, }), } switch { diff --git a/pkg/export/debug/debug.go b/pkg/export/debug/debug.go index 8adf724bc..608fd5516 100644 --- a/pkg/export/debug/debug.go +++ b/pkg/export/debug/debug.go @@ -83,8 +83,8 @@ func textPrinter(input <-chan []request.Span) { hn := "" if spans[i].IsClientSpan() { - if spans[i].ServiceID.Namespace != "" { - pn = "." + spans[i].ServiceID.Namespace + if spans[i].ServiceID.UID.Namespace != "" { + pn = "." + spans[i].ServiceID.UID.Namespace } if spans[i].OtherNamespace != "" { hn = "." + spans[i].OtherNamespace @@ -93,8 +93,8 @@ func textPrinter(input <-chan []request.Span) { if spans[i].OtherNamespace != "" { pn = "." + spans[i].OtherNamespace } - if spans[i].ServiceID.Namespace != "" { - hn = "." + spans[i].ServiceID.Namespace + if spans[i].ServiceID.UID.Namespace != "" { + hn = "." + spans[i].ServiceID.UID.Namespace } } diff --git a/pkg/export/debug/debug_test.go b/pkg/export/debug/debug_test.go index 995f98450..eab27807e 100644 --- a/pkg/export/debug/debug_test.go +++ b/pkg/export/debug/debug_test.go @@ -40,7 +40,7 @@ func TestTracePrinterValidEnabled(t *testing.T) { func traceFuncHelper(t *testing.T, tracePrinter TracePrinter) string { fakeSpan := request.Span{ - ServiceID: svc.ID{Name: "bar", Namespace: "foo", SDKLanguage: svc.InstrumentableGolang}, + ServiceID: svc.ID{UID: svc.UID{Name: "bar", Namespace: "foo"}, SDKLanguage: svc.InstrumentableGolang}, Type: request.EventTypeHTTP, Method: "method", Path: "path", diff --git a/pkg/export/otel/common.go b/pkg/export/otel/common.go index 0d6b817aa..fae9d6d06 100644 --- a/pkg/export/otel/common.go +++ b/pkg/export/otel/common.go @@ -62,13 +62,13 @@ var DefaultBuckets = Buckets{ func getAppResourceAttrs(hostID string, service *svc.ID) []attribute.KeyValue { return append(getResourceAttrs(hostID, service), - semconv.ServiceInstanceID(string(service.UID)), + semconv.ServiceInstanceID(service.UID.Instance), ) } func getResourceAttrs(hostID string, service *svc.ID) []attribute.KeyValue { attrs := []attribute.KeyValue{ - semconv.ServiceName(service.Name), + semconv.ServiceName(service.UID.Name), // SpanMetrics requires an extra attribute besides service name // to generate the traces_target_info metric, // so the service is visible in the ServicesList @@ -80,8 +80,8 @@ func getResourceAttrs(hostID string, service *svc.ID) []attribute.KeyValue { semconv.HostID(hostID), } - if service.Namespace != "" { - attrs = append(attrs, semconv.ServiceNamespace(service.Namespace)) + if service.UID.Namespace != "" { + attrs = append(attrs, semconv.ServiceNamespace(service.UID.Namespace)) } for k, v := range service.Metadata { @@ -142,6 +142,8 @@ func NewReporterPool[K uidGetter, T any]( } } +var emptyUID = svc.UID{} + // For retrieves the associated item for the given service name, or // creates a new one if it does not exist func (rp *ReporterPool[K, T]) For(service K) (T, error) { @@ -154,7 +156,7 @@ func (rp *ReporterPool[K, T]) For(service K) (T, error) { // In multi-process tracing, this is likely to happen as most // tracers group traces belonging to the same service in the same slice. svcUID := service.GetUID() - if rp.lastServiceUID == "" || svcUID != rp.lastService.GetUID() { + if rp.lastServiceUID == emptyUID || svcUID != rp.lastService.GetUID() { lm, err := rp.get(svcUID, service) if err != nil { var t T diff --git a/pkg/export/otel/expirer_test.go b/pkg/export/otel/expirer_test.go index 36a632ffd..2def7d7d4 100644 --- a/pkg/export/otel/expirer_test.go +++ b/pkg/export/otel/expirer_test.go @@ -165,8 +165,8 @@ func TestAppMetricsExpiration_ByMetricAttrs(t *testing.T) { // WHEN it receives metrics metrics <- []request.Span{ - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200}, - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 150, End: 175}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 150, End: 175}, } // THEN the metrics are exported @@ -189,7 +189,7 @@ func TestAppMetricsExpiration_ByMetricAttrs(t *testing.T) { // AND WHEN it keeps receiving a subset of the initial metrics during the TTL now.Advance(2 * time.Minute) metrics <- []request.Span{ - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 250, End: 280}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 250, End: 280}, } // THEN THE metrics that have been received during the TTL period are still visible @@ -203,7 +203,7 @@ func TestAppMetricsExpiration_ByMetricAttrs(t *testing.T) { now.Advance(2 * time.Minute) metrics <- []request.Span{ - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 300, End: 310}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 300, End: 310}, } // makes sure that the records channel is emptied and any remaining @@ -231,7 +231,7 @@ func TestAppMetricsExpiration_ByMetricAttrs(t *testing.T) { // AND WHEN the metrics labels that disappeared are received again now.Advance(2 * time.Minute) metrics <- []request.Span{ - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 450, End: 520}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 450, End: 520}, } // THEN they are reported again, starting from zero in the case of counters @@ -284,8 +284,8 @@ func TestAppMetricsExpiration_BySvcID(t *testing.T) { // WHEN it receives metrics metrics <- []request.Span{ - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200}, - {ServiceID: svc.ID{UID: "bar"}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 150, End: 175}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "bar"}}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 150, End: 175}, } // THEN the metrics are exported @@ -308,7 +308,7 @@ func TestAppMetricsExpiration_BySvcID(t *testing.T) { // AND WHEN it keeps receiving a subset of the initial metrics during the TTL now.Advance(2 * time.Minute) metrics <- []request.Span{ - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 250, End: 280}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 250, End: 280}, } // THEN THE metrics that have been received during the TTL period are still visible @@ -322,7 +322,7 @@ func TestAppMetricsExpiration_BySvcID(t *testing.T) { now.Advance(2 * time.Minute) metrics <- []request.Span{ - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 300, End: 310}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 300, End: 310}, } // BUT not the metrics that haven't been received during that time. @@ -351,7 +351,7 @@ func TestAppMetricsExpiration_BySvcID(t *testing.T) { // AND WHEN the metrics labels that disappeared are received again now.Advance(2 * time.Minute) metrics <- []request.Span{ - {ServiceID: svc.ID{UID: "bar"}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 450, End: 520}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "bar"}}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 450, End: 520}, } // THEN they are reported again, starting from zero in the case of counters diff --git a/pkg/export/otel/metrics.go b/pkg/export/otel/metrics.go index 3cc0f3e28..0f0ced9d6 100644 --- a/pkg/export/otel/metrics.go +++ b/pkg/export/otel/metrics.go @@ -709,9 +709,9 @@ func otelHistogramConfig(metricName string, buckets []float64, useExponentialHis func (mr *MetricsReporter) metricResourceAttributes(service *svc.ID) attribute.Set { attrs := []attribute.KeyValue{ - request.ServiceMetric(service.Name), - semconv.ServiceInstanceID(string(service.UID)), - semconv.ServiceNamespace(service.Namespace), + request.ServiceMetric(service.UID.Name), + semconv.ServiceInstanceID(service.UID.Instance), + semconv.ServiceNamespace(service.UID.Namespace), semconv.TelemetrySDKLanguageKey.String(service.SDKLanguage.String()), semconv.TelemetrySDKNameKey.String("beyla"), request.SourceMetric("beyla"), diff --git a/pkg/export/otel/metrics_proc.go b/pkg/export/otel/metrics_proc.go index 0b14e4079..8d27471c1 100644 --- a/pkg/export/otel/metrics_proc.go +++ b/pkg/export/otel/metrics_proc.go @@ -194,7 +194,7 @@ func newProcMetricsExporter( func getProcessResourceAttrs(hostID string, procID *process.ID) []attribute.KeyValue { return append( getResourceAttrs(hostID, procID.Service), - semconv.ServiceInstanceID(string(procID.UID)), + semconv.ServiceInstanceID(procID.UID.Instance), attr2.ProcCommand.OTEL().String(procID.Command), attr2.ProcOwner.OTEL().String(procID.User), attr2.ProcParentPid.OTEL().String(strconv.Itoa(int(procID.ParentProcessID))), diff --git a/pkg/export/otel/metrics_proc_test.go b/pkg/export/otel/metrics_proc_test.go index 980c9bc85..fef835f88 100644 --- a/pkg/export/otel/metrics_proc_test.go +++ b/pkg/export/otel/metrics_proc_test.go @@ -55,13 +55,13 @@ func TestProcMetrics_Aggregated(t *testing.T) { // WHEN it receives process metrics metrics <- []*process.Status{ - {ID: process.ID{Command: "foo", Service: &svc.ID{}, UID: "foo"}, + {ID: process.ID{Command: "foo", Service: &svc.ID{}, UID: svc.UID{Instance: "foo"}}, CPUUtilisationWait: 3, CPUUtilisationSystem: 2, CPUUtilisationUser: 1, CPUTimeUserDelta: 30, CPUTimeWaitDelta: 20, CPUTimeSystemDelta: 10, IOReadBytesDelta: 123, IOWriteBytesDelta: 456, NetRcvBytesDelta: 11, NetTxBytesDelta: 22, }, - {ID: process.ID{Command: "bar", Service: &svc.ID{}, UID: "bar"}, + {ID: process.ID{Command: "bar", Service: &svc.ID{}, UID: svc.UID{Instance: "bar"}}, CPUUtilisationWait: 31, CPUUtilisationSystem: 21, CPUUtilisationUser: 11, CPUTimeUserDelta: 301, CPUTimeWaitDelta: 201, CPUTimeSystemDelta: 101, IOReadBytesDelta: 321, IOWriteBytesDelta: 654, @@ -129,7 +129,7 @@ func TestProcMetrics_Aggregated(t *testing.T) { // AND WHEN new metrics are received metrics <- []*process.Status{ - {ID: process.ID{Command: "foo", Service: &svc.ID{}, UID: "foo"}, + {ID: process.ID{Command: "foo", Service: &svc.ID{}, UID: svc.UID{Instance: "foo"}}, CPUUtilisationWait: 4, CPUUtilisationSystem: 1, CPUUtilisationUser: 2, CPUTimeUserDelta: 3, CPUTimeWaitDelta: 2, CPUTimeSystemDelta: 1, IOReadBytesDelta: 1, IOWriteBytesDelta: 2, @@ -220,7 +220,7 @@ func TestProcMetrics_Disaggregated(t *testing.T) { // WHEN it receives process metrics metrics <- []*process.Status{ - {ID: process.ID{Command: "foo", Service: &svc.ID{}, UID: "foo"}, + {ID: process.ID{Command: "foo", Service: &svc.ID{}, UID: svc.UID{Instance: "foo"}}, CPUUtilisationWait: 3, CPUUtilisationSystem: 2, CPUUtilisationUser: 1, CPUTimeUserDelta: 30, CPUTimeWaitDelta: 20, CPUTimeSystemDelta: 10, IOReadBytesDelta: 123, IOWriteBytesDelta: 456, diff --git a/pkg/export/otel/metrics_test.go b/pkg/export/otel/metrics_test.go index 6c7474eef..34e9a9f76 100644 --- a/pkg/export/otel/metrics_test.go +++ b/pkg/export/otel/metrics_test.go @@ -492,15 +492,15 @@ func TestAppMetrics_ByInstrumentation(t *testing.T) { */ // WHEN it receives metrics metrics <- []request.Span{ - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200}, - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTPClient, Path: "/bar", RequestStart: 150, End: 175}, - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeGRPC, Path: "/foo", RequestStart: 100, End: 200}, - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeGRPCClient, Path: "/bar", RequestStart: 150, End: 175}, - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeSQLClient, Path: "SELECT", RequestStart: 150, End: 175}, - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeRedisClient, Method: "SET", RequestStart: 150, End: 175}, - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeRedisServer, Method: "GET", RequestStart: 150, End: 175}, - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeKafkaClient, Method: "publish", RequestStart: 150, End: 175}, - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeKafkaServer, Method: "process", RequestStart: 150, End: 175}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTPClient, Path: "/bar", RequestStart: 150, End: 175}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeGRPC, Path: "/foo", RequestStart: 100, End: 200}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeGRPCClient, Path: "/bar", RequestStart: 150, End: 175}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeSQLClient, Path: "SELECT", RequestStart: 150, End: 175}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeRedisClient, Method: "SET", RequestStart: 150, End: 175}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeRedisServer, Method: "GET", RequestStart: 150, End: 175}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeKafkaClient, Method: "publish", RequestStart: 150, End: 175}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeKafkaServer, Method: "process", RequestStart: 150, End: 175}, } // Read the exported metrics, add +extraColl for HTTP size metrics @@ -545,7 +545,7 @@ func TestAppMetrics_ResourceAttributes(t *testing.T) { go otelExporter(metrics) metrics <- []request.Span{ - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200}, } res := readNChan(t, otlp.Records(), 1, timeout) diff --git a/pkg/export/otel/traces.go b/pkg/export/otel/traces.go index 2c414a4d2..95cde49ed 100644 --- a/pkg/export/otel/traces.go +++ b/pkg/export/otel/traces.go @@ -410,7 +410,8 @@ func getRetrySettings(cfg TracesConfig) configretry.BackOffConfig { } func traceAppResourceAttrs(hostID string, service *svc.ID) []attribute.KeyValue { - if service.UID == "" { + // TODO: remove? + if service.UID == emptyUID { return getAppResourceAttrs(hostID, service) } diff --git a/pkg/export/otel/traces_test.go b/pkg/export/otel/traces_test.go index ab4b53b0a..4bc863e0f 100644 --- a/pkg/export/otel/traces_test.go +++ b/pkg/export/otel/traces_test.go @@ -841,13 +841,13 @@ func TestTracesInstrumentations(t *testing.T) { } spans := []request.Span{ - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Method: "GET", Route: "/foo", RequestStart: 100, End: 200}, - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTPClient, Method: "PUT", Route: "/bar", RequestStart: 150, End: 175}, - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeGRPC, Path: "/grpcFoo", RequestStart: 100, End: 200}, - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeGRPCClient, Path: "/grpcGoo", RequestStart: 150, End: 175}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Method: "GET", Route: "/foo", RequestStart: 100, End: 200}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTPClient, Method: "PUT", Route: "/bar", RequestStart: 150, End: 175}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeGRPC, Path: "/grpcFoo", RequestStart: 100, End: 200}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeGRPCClient, Path: "/grpcGoo", RequestStart: 150, End: 175}, makeSQLRequestSpan("SELECT password FROM credentials WHERE username=\"bill\""), - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeRedisClient, Method: "SET", Path: "redis_db", RequestStart: 150, End: 175}, - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeRedisServer, Method: "GET", Path: "redis_db", RequestStart: 150, End: 175}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeRedisClient, Method: "SET", Path: "redis_db", RequestStart: 150, End: 175}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeRedisServer, Method: "GET", Path: "redis_db", RequestStart: 150, End: 175}, {Type: request.EventTypeKafkaClient, Method: "process", Path: "important-topic", Statement: "test"}, {Type: request.EventTypeKafkaServer, Method: "publish", Path: "important-topic", Statement: "test"}, } @@ -976,12 +976,12 @@ func TestTracesAttrReuse(t *testing.T) { same bool }{ { - name: "Reuses the trace attributes, with svc.UID defined", - span: request.Span{ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Method: "GET", Route: "/foo", RequestStart: 100, End: 200}, + name: "Reuses the trace attributes, with svc.Instance defined", + span: request.Span{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Method: "GET", Route: "/foo", RequestStart: 100, End: 200}, same: true, }, { - name: "No UID, no caching of trace attributes", + name: "No Instance, no caching of trace attributes", span: request.Span{ServiceID: svc.ID{}, Type: request.EventTypeHTTP, Method: "GET", Route: "/foo", RequestStart: 100, End: 200}, same: false, }, @@ -1210,91 +1210,91 @@ func TestHostPeerAttributes(t *testing.T) { }{ { name: "Same namespaces HTTP", - span: request.Span{Type: request.EventTypeHTTP, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{Namespace: "same"}}, + span: request.Span{Type: request.EventTypeHTTP, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "client", server: "server", }, { name: "Client in different namespace", - span: request.Span{Type: request.EventTypeHTTP, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{Namespace: "same"}}, + span: request.Span{Type: request.EventTypeHTTP, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "client.far", server: "server", }, { name: "Same namespaces for HTTP client", - span: request.Span{Type: request.EventTypeHTTPClient, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{Namespace: "same"}}, + span: request.Span{Type: request.EventTypeHTTPClient, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "client", server: "server", }, { name: "Server in different namespace ", - span: request.Span{Type: request.EventTypeHTTPClient, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{Namespace: "same"}}, + span: request.Span{Type: request.EventTypeHTTPClient, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "client", server: "server.far", }, { name: "Same namespaces GRPC", - span: request.Span{Type: request.EventTypeGRPC, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{Namespace: "same"}}, + span: request.Span{Type: request.EventTypeGRPC, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "client", server: "server", }, { name: "Client in different namespace GRPC", - span: request.Span{Type: request.EventTypeGRPC, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{Namespace: "same"}}, + span: request.Span{Type: request.EventTypeGRPC, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "client.far", server: "server", }, { name: "Same namespaces for GRPC client", - span: request.Span{Type: request.EventTypeGRPCClient, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{Namespace: "same"}}, + span: request.Span{Type: request.EventTypeGRPCClient, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "client", server: "server", }, { name: "Server in different namespace GRPC", - span: request.Span{Type: request.EventTypeGRPCClient, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{Namespace: "same"}}, + span: request.Span{Type: request.EventTypeGRPCClient, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "client", server: "server.far", }, { name: "Same namespaces for SQL client", - span: request.Span{Type: request.EventTypeSQLClient, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{Namespace: "same"}}, + span: request.Span{Type: request.EventTypeSQLClient, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "", server: "server", }, { name: "Server in different namespace SQL", - span: request.Span{Type: request.EventTypeSQLClient, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{Namespace: "same"}}, + span: request.Span{Type: request.EventTypeSQLClient, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "", server: "server.far", }, { name: "Same namespaces for Redis client", - span: request.Span{Type: request.EventTypeRedisClient, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{Namespace: "same"}}, + span: request.Span{Type: request.EventTypeRedisClient, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "", server: "server", }, { name: "Server in different namespace Redis", - span: request.Span{Type: request.EventTypeRedisClient, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{Namespace: "same"}}, + span: request.Span{Type: request.EventTypeRedisClient, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "", server: "server.far", }, { name: "Client in different namespace Redis", - span: request.Span{Type: request.EventTypeRedisServer, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{Namespace: "same"}}, + span: request.Span{Type: request.EventTypeRedisServer, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "", server: "server", }, { name: "Server in different namespace Kafka", - span: request.Span{Type: request.EventTypeKafkaClient, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{Namespace: "same"}}, + span: request.Span{Type: request.EventTypeKafkaClient, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "", server: "server.far", }, { name: "Client in different namespace Kafka", - span: request.Span{Type: request.EventTypeKafkaServer, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{Namespace: "same"}}, + span: request.Span{Type: request.EventTypeKafkaServer, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "", server: "server", }, diff --git a/pkg/export/prom/prom.go b/pkg/export/prom/prom.go index 0e78ae0a3..61da41300 100644 --- a/pkg/export/prom/prom.go +++ b/pkg/export/prom/prom.go @@ -705,12 +705,12 @@ func labelNamesSpans() []string { func (r *metricsReporter) labelValuesSpans(span *request.Span) []string { return []string{ - span.ServiceID.Name, - span.ServiceID.Namespace, + span.ServiceID.UID.Name, + span.ServiceID.UID.Namespace, span.TraceName(), strconv.Itoa(int(request.SpanStatusCode(span))), span.ServiceGraphKind(), - string(span.ServiceID.UID), // app instance ID + string(span.ServiceID.UID.Instance), // app instance ID span.ServiceID.Job(), "beyla", } @@ -730,9 +730,9 @@ func (r *metricsReporter) labelValuesTargetInfo(service svc.ID) []string { values := []string{ r.hostID, service.HostName, - service.Name, - service.Namespace, - string(service.UID), // app instance ID + service.UID.Name, + service.UID.Namespace, + service.UID.Instance, // app instance ID service.Job(), service.SDKLanguage.String(), "beyla", @@ -754,7 +754,7 @@ func (r *metricsReporter) labelValuesServiceGraph(span *request.Span) []string { if span.IsClientSpan() { return []string{ request.SpanPeer(span), - span.ServiceID.Namespace, + span.ServiceID.UID.Namespace, request.SpanHost(span), span.OtherNamespace, "beyla", @@ -764,7 +764,7 @@ func (r *metricsReporter) labelValuesServiceGraph(span *request.Span) []string { request.SpanPeer(span), span.OtherNamespace, request.SpanHost(span), - span.ServiceID.Namespace, + span.ServiceID.UID.Namespace, "beyla", } } diff --git a/pkg/export/prom/prom_test.go b/pkg/export/prom/prom_test.go index b3c0c6398..2816cf569 100644 --- a/pkg/export/prom/prom_test.go +++ b/pkg/export/prom/prom_test.go @@ -274,15 +274,15 @@ func TestAppMetrics_ByInstrumentation(t *testing.T) { go exporter(metrics) metrics <- []request.Span{ - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200}, - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTPClient, Path: "/bar", RequestStart: 150, End: 175}, - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeGRPC, Path: "/foo", RequestStart: 100, End: 200}, - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeGRPCClient, Path: "/bar", RequestStart: 150, End: 175}, - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeSQLClient, Path: "SELECT", RequestStart: 150, End: 175}, - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeRedisClient, Method: "SET", RequestStart: 150, End: 175}, - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeRedisServer, Method: "GET", RequestStart: 150, End: 175}, - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeKafkaClient, Method: "publish", RequestStart: 150, End: 175}, - {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeKafkaServer, Method: "process", RequestStart: 150, End: 175}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTPClient, Path: "/bar", RequestStart: 150, End: 175}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeGRPC, Path: "/foo", RequestStart: 100, End: 200}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeGRPCClient, Path: "/bar", RequestStart: 150, End: 175}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeSQLClient, Path: "SELECT", RequestStart: 150, End: 175}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeRedisClient, Method: "SET", RequestStart: 150, End: 175}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeRedisServer, Method: "GET", RequestStart: 150, End: 175}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeKafkaClient, Method: "publish", RequestStart: 150, End: 175}, + {ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeKafkaServer, Method: "process", RequestStart: 150, End: 175}, } var exported string diff --git a/pkg/internal/discover/attacher.go b/pkg/internal/discover/attacher.go index 6508aa1e9..589a65860 100644 --- a/pkg/internal/discover/attacher.go +++ b/pkg/internal/discover/attacher.go @@ -275,8 +275,8 @@ func (ta *TraceAttacher) monitorPIDs(tracer *ebpf.ProcessTracer, ie *ebpf.Instru // the service name is the name of the found executable // Unless the case of system-wide tracing, where the name of the // executable will be dynamically set for each traced http request call. - if ie.FileInfo.Service.Name == "" { - ie.FileInfo.Service.Name = ie.FileInfo.ExecutableName() + if ie.FileInfo.Service.UID.Name == "" { + ie.FileInfo.Service.UID.Name = ie.FileInfo.ExecutableName() // we mark the service ID as automatically named in case we want to look, // in later stages of the pipeline, for better automatic service name ie.FileInfo.Service.SetAutoName() diff --git a/pkg/internal/discover/typer.go b/pkg/internal/discover/typer.go index f392ab186..898a698fc 100644 --- a/pkg/internal/discover/typer.go +++ b/pkg/internal/discover/typer.go @@ -71,9 +71,11 @@ func (t *typer) FilterClassify(evs []Event[ProcessMatch]) []Event[ebpf.Instrumen switch evs[i].Type { case EventCreated: svcID := svc.ID{ - Name: ev.Obj.Criteria.Name, - Namespace: ev.Obj.Criteria.Namespace, - ProcPID: ev.Obj.Process.Pid, + UID: svc.UID{ + Name: ev.Obj.Criteria.Name, + Namespace: ev.Obj.Criteria.Namespace, + }, + ProcPID: ev.Obj.Process.Pid, } if elfFile, err := exec.FindExecELF(ev.Obj.Process, svcID, t.k8sInformer.IsKubeEnabled()); err != nil { t.log.Warn("error finding process ELF. Ignoring", "error", err) diff --git a/pkg/internal/discover/watcher_kube_test.go b/pkg/internal/discover/watcher_kube_test.go index 66b03981b..6c8d0c3df 100644 --- a/pkg/internal/discover/watcher_kube_test.go +++ b/pkg/internal/discover/watcher_kube_test.go @@ -67,7 +67,7 @@ func TestWatcherKubeEnricher(t *testing.T) { // Setup a fake K8s API connected to the watcherKubeEnricher fInformer := &fakeInformer{} - store := kube.NewStore(fInformer) + store := kube.NewStore(fInformer, kube.MetaSourceLabels{}) wkeNodeFunc, err := WatcherKubeEnricherProvider(context.TODO(), &fakeMetadataProvider{store: store})() require.NoError(t, err) inputCh, outputCh := make(chan []Event[processAttrs], 10), make(chan []Event[processAttrs], 10) @@ -107,7 +107,7 @@ func TestWatcherKubeEnricherWithMatcher(t *testing.T) { processInfo = fakeProcessInfo // Setup a fake K8s API connected to the watcherKubeEnricher fInformer := &fakeInformer{} - store := kube.NewStore(fInformer) + store := kube.NewStore(fInformer, kube.MetaSourceLabels{}) wkeNodeFunc, err := WatcherKubeEnricherProvider(context.TODO(), &fakeMetadataProvider{store: store})() require.NoError(t, err) pipeConfig := beyla.Config{} diff --git a/pkg/internal/ebpf/common/pids.go b/pkg/internal/ebpf/common/pids.go index 3faa8601f..a69a46098 100644 --- a/pkg/internal/ebpf/common/pids.go +++ b/pkg/internal/ebpf/common/pids.go @@ -227,7 +227,7 @@ func serviceInfo(pid uint32) *svc.ID { name := commName(pid) lang := exec.FindProcLanguage(int32(pid), nil, name) - result := svc.ID{Name: name, SDKLanguage: lang, ProcPID: int32(pid)} + result := svc.ID{UID: svc.UID{Name: name}, SDKLanguage: lang, ProcPID: int32(pid)} activePids.Add(pid, &result) diff --git a/pkg/internal/ebpf/common/ringbuf_test.go b/pkg/internal/ebpf/common/ringbuf_test.go index 3f1440ac3..01d6ccfb7 100644 --- a/pkg/internal/ebpf/common/ringbuf_test.go +++ b/pkg/internal/ebpf/common/ringbuf_test.go @@ -32,7 +32,7 @@ func TestForwardRingbuf_CapacityFull(t *testing.T) { metrics := &metricsReporter{} forwardedMessages := make(chan []request.Span, 100) fltr := TestPidsFilter{services: map[uint32]svc.ID{}} - fltr.AllowPID(1, 1, &svc.ID{Name: "myService"}, PIDTypeGo) + fltr.AllowPID(1, 1, &svc.ID{UID: svc.UID{Name: "myService"}}, PIDTypeGo) go ForwardRingbuf( &config.EPPFTracer{BatchLength: 10}, nil, // the source ring buffer can be null @@ -55,13 +55,13 @@ func TestForwardRingbuf_CapacityFull(t *testing.T) { batch := testutil.ReadChannel(t, forwardedMessages, testTimeout) require.Len(t, batch, 10) for i := range batch { - assert.Equal(t, request.Span{Type: 1, Method: "GET", ContentLength: int64(i), ServiceID: svc.ID{Name: "myService"}, Pid: request.PidInfo{HostPID: 1}}, batch[i]) + assert.Equal(t, request.Span{Type: 1, Method: "GET", ContentLength: int64(i), ServiceID: svc.ID{UID: svc.UID{Name: "myService"}}, Pid: request.PidInfo{HostPID: 1}}, batch[i]) } batch = testutil.ReadChannel(t, forwardedMessages, testTimeout) require.Len(t, batch, 10) for i := range batch { - assert.Equal(t, request.Span{Type: 1, Method: "GET", ContentLength: int64(10 + i), ServiceID: svc.ID{Name: "myService"}, Pid: request.PidInfo{HostPID: 1}}, batch[i]) + assert.Equal(t, request.Span{Type: 1, Method: "GET", ContentLength: int64(10 + i), ServiceID: svc.ID{UID: svc.UID{Name: "myService"}}, Pid: request.PidInfo{HostPID: 1}}, batch[i]) } // AND metrics are properly updated assert.Equal(t, 2, metrics.flushes) @@ -84,7 +84,7 @@ func TestForwardRingbuf_Deadline(t *testing.T) { metrics := &metricsReporter{} forwardedMessages := make(chan []request.Span, 100) fltr := TestPidsFilter{services: map[uint32]svc.ID{}} - fltr.AllowPID(1, 1, &svc.ID{Name: "myService"}, PIDTypeGo) + fltr.AllowPID(1, 1, &svc.ID{UID: svc.UID{Name: "myService"}}, PIDTypeGo) go ForwardRingbuf( &config.EPPFTracer{BatchLength: 10, BatchTimeout: 20 * time.Millisecond}, nil, // the source ring buffer can be null @@ -110,7 +110,7 @@ func TestForwardRingbuf_Deadline(t *testing.T) { } require.Len(t, batch, 7) for i := range batch { - assert.Equal(t, request.Span{Type: 1, Method: "GET", ContentLength: int64(i), ServiceID: svc.ID{Name: "myService"}, Pid: request.PidInfo{HostPID: 1}}, batch[i]) + assert.Equal(t, request.Span{Type: 1, Method: "GET", ContentLength: int64(i), ServiceID: svc.ID{UID: svc.UID{Name: "myService"}}, Pid: request.PidInfo{HostPID: 1}}, batch[i]) } // AND metrics are properly updated diff --git a/pkg/internal/exec/file.go b/pkg/internal/exec/file.go index 8e9710803..fe4f0b17c 100644 --- a/pkg/internal/exec/file.go +++ b/pkg/internal/exec/file.go @@ -77,7 +77,7 @@ func FindExecELF(p *services.ProcessInfo, svcID svc.ID, k8sEnabled bool) (*FileI if svcName, ok := file.Service.EnvVars[envServiceName]; ok { // If Kubernetes is enabled we use the K8S metadata as the source of truth if !k8sEnabled { - file.Service.Name = svcName + file.Service.UID.Name = svcName } } else { if resourceAttrs, ok := file.Service.EnvVars[envResourceAttrs]; ok { @@ -87,7 +87,7 @@ func FindExecELF(p *services.ProcessInfo, svcID svc.ID, k8sEnabled bool) (*FileI } attributes.ParseOTELResourceVariable(resourceAttrs, collect) if result, ok := allVars[serviceNameKey]; ok { - file.Service.Name = result + file.Service.UID.Name = result } } } diff --git a/pkg/internal/infraolly/process/status.go b/pkg/internal/infraolly/process/status.go index e50135d20..e92f8ef31 100644 --- a/pkg/internal/infraolly/process/status.go +++ b/pkg/internal/infraolly/process/status.go @@ -77,7 +77,11 @@ func NewStatus(pid int32, svcID *svc.ID) *Status { return &Status{ID: ID{ ProcessID: pid, Service: svcID, - UID: svcID.UID.AppendUint32(uint32(pid)), + UID: svc.UID{ + Name: svcID.UID.Name, + Namespace: svcID.UID.Namespace, + Instance: svcID.UID.Instance + ":" + strconv.Itoa(int(pid)), + }, }} } @@ -120,7 +124,7 @@ func PromGetters(name attr.Name) (attributes.Getter[*Status, string], bool) { // the attributes are handled explicitly by the prometheus exporter, but we need to // ignore them to avoid that the default case tries to report them from service metadata case attr.Instance: - g = func(s *Status) string { return string(s.ID.UID) } + g = func(s *Status) string { return s.ID.UID.Instance } case attr.Job: g = func(s *Status) string { return s.ID.Service.Job() } default: diff --git a/pkg/internal/kube/informer_provider.go b/pkg/internal/kube/informer_provider.go index 3e45ed251..1a6518bbb 100644 --- a/pkg/internal/kube/informer_provider.go +++ b/pkg/internal/kube/informer_provider.go @@ -34,6 +34,7 @@ type MetadataConfig struct { SyncTimeout time.Duration ResyncPeriod time.Duration MetaCacheAddr string + MetaSourceLabels MetaSourceLabels } type MetadataProvider struct { @@ -103,7 +104,7 @@ func (mp *MetadataProvider) Get(ctx context.Context) (*Store, error) { return nil, err } - mp.metadata = NewStore(informer) + mp.metadata = NewStore(informer, mp.cfg.MetaSourceLabels) return mp.metadata, nil } diff --git a/pkg/internal/kube/store.go b/pkg/internal/kube/store.go index 885f54135..86238e6b8 100644 --- a/pkg/internal/kube/store.go +++ b/pkg/internal/kube/store.go @@ -2,12 +2,12 @@ package kube import ( "log/slog" - "slices" "strings" "sync" "github.com/grafana/beyla/pkg/export/attributes" "github.com/grafana/beyla/pkg/internal/helpers/container" + "github.com/grafana/beyla/pkg/internal/helpers/maps" "github.com/grafana/beyla/pkg/kubecache/informer" "github.com/grafana/beyla/pkg/kubecache/meta" ) @@ -36,6 +36,12 @@ func qName(om *informer.ObjectMeta) qualifiedName { return qualifiedName{name: om.Name, namespace: om.Namespace, kind: om.Kind} } +// MetaSourceLabels allow overriding some metadata from kubernetes labels +type MetaSourceLabels struct { + ServiceName string `yaml:"service_name" env:"BEYLA_KUBE_META_SOURCE_LABEL_SERVICE_NAME"` + ServiceNamespace string `yaml:"service_namespace" env:"BEYLA_KUBE_META_SOURCE_LABEL_SERVICE_NAMESPACE"` +} + // Store aggregates Kubernetes information from multiple sources: // - the informer that keep an indexed copy of the existing pods and replicasets. // - the inspected container.Info objects, indexed either by container ID and PID namespace @@ -57,8 +63,9 @@ type Store struct { namespaces map[uint32]*container.Info // container ID to pod matcher - podsByContainer map[string]*informer.ObjectMeta - containersByOwner map[string][]*informer.ContainerInfo + podsByContainer map[string]*informer.ObjectMeta + // first key: pod owner ID, second key: container ID + containersByOwner maps.Map2[string, string, *informer.ContainerInfo] // ip to generic IP info (Node, Service, *including* Pods) objectMetaByIP map[string]*informer.ObjectMeta @@ -71,9 +78,11 @@ type Store struct { // will subscribe to this store, to make sure that any "new object" notification // they receive is already present in the store meta.BaseNotifier + + sourceLabels MetaSourceLabels } -func NewStore(kubeMetadata meta.Notifier) *Store { +func NewStore(kubeMetadata meta.Notifier, sourceLabels MetaSourceLabels) *Store { log := dblog() db := &Store{ log: log, @@ -83,10 +92,11 @@ func NewStore(kubeMetadata meta.Notifier) *Store { containerByPID: map[uint32]*container.Info{}, objectMetaByIP: map[string]*informer.ObjectMeta{}, objectMetaByQName: map[qualifiedName]*informer.ObjectMeta{}, - containersByOwner: map[string][]*informer.ContainerInfo{}, + containersByOwner: maps.Map2[string, string, *informer.ContainerInfo]{}, otelServiceInfoByIP: map[string]OTelServiceNamePair{}, metadataNotifier: kubeMetadata, BaseNotifier: meta.NewBaseNotifier(log), + sourceLabels: sourceLabels, } kubeMetadata.Subscribe(db) return db @@ -144,33 +154,27 @@ func (s *Store) addObjectMeta(meta *informer.ObjectMeta) { s.access.Lock() defer s.access.Unlock() - s.unlockedAddObjectMeta(qName(meta), meta) + s.unlockedAddObjectMeta(meta) } func (s *Store) updateObjectMeta(meta *informer.ObjectMeta) { s.access.Lock() defer s.access.Unlock() - // if the update removes IPs from the original object meta, - // we remove them from the indexes - qn := qName(meta) - if om, ok := s.objectMetaByQName[qn]; ok { - for _, ip := range om.Ips { - // theoretically, linear search into a list is not efficient and we should first build a map - // with all the IPs - // however, the IPs slice is expected to have a small size (few entries), so - // it's more efficient, also in terms of memory generation, to keep it as a slice - // and avoid generating temporary maps - if !slices.Contains(meta.Ips, ip) { - delete(s.objectMetaByIP, ip) - } - } + // atomically remove the previously stored version of the updated object + // then re-adding it + // this will avoid to leak some IPs and containers that exist in the + // stored snapshot but not in the updated snapshot + if previousObject, ok := s.objectMetaByQName[qName(meta)]; ok { + s.unlockedDeleteObjectMeta(previousObject) } - - s.unlockedAddObjectMeta(qn, meta) + s.unlockedAddObjectMeta(meta) } -func (s *Store) unlockedAddObjectMeta(qn qualifiedName, meta *informer.ObjectMeta) { +// it's important to make sure that any element added here is removed when +// calling unlockedDeleteObjectMeta with the same ObjectMeta +func (s *Store) unlockedAddObjectMeta(meta *informer.ObjectMeta) { + qn := qName(meta) s.objectMetaByQName[qn] = meta for _, ip := range meta.Ips { @@ -180,6 +184,7 @@ func (s *Store) unlockedAddObjectMeta(qn qualifiedName, meta *informer.ObjectMet s.otelServiceInfoByIP = map[string]OTelServiceNamePair{} if meta.Pod != nil { + oID := fetchOwnerID(meta) s.log.Debug("adding pod to store", "ips", meta.Ips, "pod", meta.Name, "namespace", meta.Namespace, "containers", meta.Pod.Containers) for _, c := range meta.Pod.Containers { @@ -189,15 +194,7 @@ func (s *Store) unlockedAddObjectMeta(qn qualifiedName, meta *informer.ObjectMet if ok { s.namespaces[info.PIDNamespace] = info } - } - if owner := TopOwner(meta.Pod); owner != nil { - oID := ownerID(meta.Namespace, owner.Name) - containers, ok := s.containersByOwner[oID] - if !ok { - containers = []*informer.ContainerInfo{} - } - containers = append(containers, meta.Pod.Containers...) - s.containersByOwner[oID] = containers + s.containersByOwner.Put(oID, c.Id, c) } } } @@ -225,40 +222,28 @@ func (s *Store) unlockedDeleteObjectMeta(meta *informer.ObjectMeta) { delete(s.objectMetaByIP, ip) } if meta.Pod != nil { + oID := fetchOwnerID(meta) s.log.Debug("deleting pod from store", "ips", meta.Ips, "pod", meta.Name, "namespace", meta.Namespace, "containers", meta.Pod.Containers) - toRemove := map[string]struct{}{} for _, c := range meta.Pod.Containers { - toRemove[c.Id] = struct{}{} - info, ok := s.containerIDs[c.Id] if ok { delete(s.containerIDs, c.Id) delete(s.namespaces, info.PIDNamespace) } delete(s.podsByContainer, c.Id) + s.containersByOwner.Delete(oID, c.Id) } + } +} - // clean up the owner to container map - if owner := TopOwner(meta.Pod); owner != nil { - oID := ownerID(meta.Namespace, owner.Name) - if containers, ok := s.containersByOwner[oID]; ok { - withoutPod := []*informer.ContainerInfo{} - // filter out all containers owned by this pod - for _, c := range containers { - if _, ok := toRemove[c.Id]; !ok { - withoutPod = append(withoutPod, c) - } - } - // update the owner to container mapping or remove if empty - if len(withoutPod) > 0 { - s.containersByOwner[oID] = withoutPod - } else { - delete(s.containersByOwner, oID) - } - } - } +func fetchOwnerID(meta *informer.ObjectMeta) string { + ownerName := meta.Name + if owner := TopOwner(meta.Pod); owner != nil { + ownerName = owner.Name } + oID := ownerID(meta.Namespace, ownerName) + return oID } func (s *Store) PodByContainerID(cid string) *informer.ObjectMeta { @@ -267,13 +252,21 @@ func (s *Store) PodByContainerID(cid string) *informer.ObjectMeta { return s.podsByContainer[cid] } -func (s *Store) PodByPIDNs(pidns uint32) *informer.ObjectMeta { +// PodContainerByPIDNs second return value: container Name +func (s *Store) PodContainerByPIDNs(pidns uint32) (*informer.ObjectMeta, string) { s.access.RLock() defer s.access.RUnlock() if info, ok := s.namespaces[pidns]; ok { - return s.podsByContainer[info.ContainerID] + if om, ok := s.podsByContainer[info.ContainerID]; ok { + oID := fetchOwnerID(om) + containerName := "" + if containerInfo, ok := s.containersByOwner.Get(oID, info.ContainerID); ok { + containerName = containerInfo.Name + } + return om, containerName + } } - return nil + return nil, "" } func (s *Store) ObjectMetaByIP(ip string) *informer.ObjectMeta { @@ -296,6 +289,16 @@ func (s *Store) serviceNameNamespaceForMetadata(om *informer.ObjectMeta) (string } else { name, namespace = s.serviceNameNamespaceForOwner(om) } + if s.sourceLabels.ServiceName != "" { + if on, ok := om.Labels[s.sourceLabels.ServiceName]; ok { + name = on + } + } + if s.sourceLabels.ServiceNamespace != "" { + if ons, ok := om.Labels[s.sourceLabels.ServiceNamespace]; ok { + namespace = ons + } + } return name, namespace } diff --git a/pkg/internal/kube/store_test.go b/pkg/internal/kube/store_test.go index 370e0eaf4..4bf30b05b 100644 --- a/pkg/internal/kube/store_test.go +++ b/pkg/internal/kube/store_test.go @@ -90,7 +90,7 @@ func TestContainerInfo(t *testing.T) { fInformer := &fakeInformer{} - store := NewStore(fInformer) + store := NewStore(fInformer, MetaSourceLabels{}) _ = store.On(&informer.Event{Type: informer.EventType_CREATED, Resource: &service}) _ = store.On(&informer.Event{Type: informer.EventType_CREATED, Resource: &podMetaA}) @@ -257,7 +257,7 @@ func TestMemoryCleanedUp(t *testing.T) { fInformer := &fakeInformer{} - store := NewStore(fInformer) + store := NewStore(fInformer, MetaSourceLabels{}) _ = store.On(&informer.Event{Type: informer.EventType_CREATED, Resource: &service}) _ = store.On(&informer.Event{Type: informer.EventType_CREATED, Resource: &podMetaA}) @@ -281,7 +281,7 @@ func TestMemoryCleanedUp(t *testing.T) { // Fixes a memory leak in the store where the objectMetaByIP map was not cleaned up func TestMetaByIPEntryRemovedIfIPGroupChanges(t *testing.T) { // GIVEN a store with - store := NewStore(&fakeInformer{}) + store := NewStore(&fakeInformer{}, MetaSourceLabels{}) // WHEN an object is created with several IPs _ = store.On(&informer.Event{ Type: informer.EventType_CREATED, diff --git a/pkg/internal/pipe/instrumenter_test.go b/pkg/internal/pipe/instrumenter_test.go index 567c9f886..3f139f745 100644 --- a/pkg/internal/pipe/instrumenter_test.go +++ b/pkg/internal/pipe/instrumenter_test.go @@ -647,7 +647,7 @@ func newRequest(serviceName string, method, path, peer string, status int) []req Start: 2, RequestStart: 1, End: 3, - ServiceID: svc.ID{HostName: "the-host", Namespace: "ns", Name: serviceName, SDKLanguage: svc.InstrumentableGolang}, + ServiceID: svc.ID{HostName: "the-host", UID: svc.UID{Namespace: "ns", Name: serviceName}, SDKLanguage: svc.InstrumentableGolang}, }} } @@ -663,7 +663,7 @@ func newRequestWithTiming(svcName string, kind request.EventType, method, path, RequestStart: int64(goStart), Start: int64(start), End: int64(end), - ServiceID: svc.ID{HostName: "the-host", Name: svcName, SDKLanguage: svc.InstrumentableGolang}, + ServiceID: svc.ID{HostName: "the-host", UID: svc.UID{Name: svcName}, SDKLanguage: svc.InstrumentableGolang}, }} } @@ -678,7 +678,7 @@ func newGRPCRequest(svcName string, path string, status int) []request.Span { Start: 2, RequestStart: 1, End: 3, - ServiceID: svc.ID{HostName: "the-host", Name: svcName, SDKLanguage: svc.InstrumentableGolang}, + ServiceID: svc.ID{HostName: "the-host", UID: svc.UID{Name: svcName}, SDKLanguage: svc.InstrumentableGolang}, }} } @@ -807,7 +807,7 @@ func newHTTPInfo(method, path, peer string, status int) []request.Span { Start: 2, RequestStart: 2, End: 3, - ServiceID: svc.ID{HostName: "the-host", Name: "comm", SDKLanguage: svc.InstrumentableGolang}, + ServiceID: svc.ID{HostName: "the-host", UID: svc.UID{Name: "comm"}, SDKLanguage: svc.InstrumentableGolang}, }} } diff --git a/pkg/internal/request/metric_attributes.go b/pkg/internal/request/metric_attributes.go index 08269ce30..d8f582c33 100644 --- a/pkg/internal/request/metric_attributes.go +++ b/pkg/internal/request/metric_attributes.go @@ -120,7 +120,7 @@ func SpanPeer(span *Span) string { } func HostAsServer(span *Span) string { - if span.OtherNamespace != "" && span.OtherNamespace != span.ServiceID.Namespace && span.HostName != "" { + if span.OtherNamespace != "" && span.OtherNamespace != span.ServiceID.UID.Namespace && span.HostName != "" { if span.IsClientSpan() { return SpanHost(span) + "." + span.OtherNamespace } @@ -130,7 +130,7 @@ func HostAsServer(span *Span) string { } func PeerAsClient(span *Span) string { - if span.OtherNamespace != "" && span.OtherNamespace != span.ServiceID.Namespace && span.PeerName != "" { + if span.OtherNamespace != "" && span.OtherNamespace != span.ServiceID.UID.Namespace && span.PeerName != "" { if !span.IsClientSpan() { return SpanPeer(span) + "." + span.OtherNamespace } diff --git a/pkg/internal/request/span.go b/pkg/internal/request/span.go index 1b39df211..3010a44a8 100644 --- a/pkg/internal/request/span.go +++ b/pkg/internal/request/span.go @@ -495,5 +495,5 @@ func (s *Span) IsExportTracesSpan() bool { } func (s *Span) IsSelfReferenceSpan() bool { - return s.Peer == s.Host && (s.ServiceID.Namespace == s.OtherNamespace || s.OtherNamespace == "") + return s.Peer == s.Host && (s.ServiceID.UID.Namespace == s.OtherNamespace || s.OtherNamespace == "") } diff --git a/pkg/internal/request/span_getters.go b/pkg/internal/request/span_getters.go index 0d095bc9f..2a15d2052 100644 --- a/pkg/internal/request/span_getters.go +++ b/pkg/internal/request/span_getters.go @@ -22,7 +22,7 @@ func SpanOTELGetters(name attr.Name) (attributes.Getter[*Span, attribute.KeyValu case attr.ClientNamespace: getter = func(s *Span) attribute.KeyValue { if s.IsClientSpan() { - return ClientNamespaceMetric(s.ServiceID.Namespace) + return ClientNamespaceMetric(s.ServiceID.UID.Namespace) } return ClientNamespaceMetric(s.OtherNamespace) } @@ -53,16 +53,16 @@ func SpanOTELGetters(name attr.Name) (attributes.Getter[*Span, attribute.KeyValu if s.IsClientSpan() { return ServerNamespaceMetric(s.OtherNamespace) } - return ServerNamespaceMetric(s.ServiceID.Namespace) + return ServerNamespaceMetric(s.ServiceID.UID.Namespace) } case attr.Service: - getter = func(s *Span) attribute.KeyValue { return ServiceMetric(s.ServiceID.Name) } + getter = func(s *Span) attribute.KeyValue { return ServiceMetric(s.ServiceID.UID.Name) } case attr.ServiceInstanceID: - getter = func(s *Span) attribute.KeyValue { return semconv.ServiceInstanceID(string(s.ServiceID.UID)) } + getter = func(s *Span) attribute.KeyValue { return semconv.ServiceInstanceID(string(s.ServiceID.UID.Instance)) } case attr.ServiceName: - getter = func(s *Span) attribute.KeyValue { return semconv.ServiceName(s.ServiceID.Name) } + getter = func(s *Span) attribute.KeyValue { return semconv.ServiceName(s.ServiceID.UID.Name) } case attr.ServiceNamespace: - getter = func(s *Span) attribute.KeyValue { return semconv.ServiceNamespace(s.ServiceID.Namespace) } + getter = func(s *Span) attribute.KeyValue { return semconv.ServiceNamespace(s.ServiceID.UID.Namespace) } case attr.SpanKind: getter = func(s *Span) attribute.KeyValue { return SpanKindMetric(s.ServiceGraphKind()) } case attr.SpanName: @@ -177,17 +177,17 @@ func SpanPromGetters(attrName attr.Name) (attributes.Getter[*Span, string], bool return "" } case attr.ServiceInstanceID: - getter = func(s *Span) string { return string(s.ServiceID.UID) } + getter = func(s *Span) string { return string(s.ServiceID.UID.Instance) } // resource metadata values below. Unlike OTEL, they are included here because they // belong to the metric, instead of the Resource case attr.Instance: - getter = func(s *Span) string { return string(s.ServiceID.UID) } + getter = func(s *Span) string { return string(s.ServiceID.UID.Instance) } case attr.Job: getter = func(s *Span) string { return s.ServiceID.Job() } case attr.ServiceName: - getter = func(s *Span) string { return s.ServiceID.Name } + getter = func(s *Span) string { return s.ServiceID.UID.Name } case attr.ServiceNamespace: - getter = func(s *Span) string { return s.ServiceID.Namespace } + getter = func(s *Span) string { return s.ServiceID.UID.Namespace } default: getter = func(s *Span) string { return s.ServiceID.Metadata[attrName] } } diff --git a/pkg/internal/request/span_test.go b/pkg/internal/request/span_test.go index b3f78bf45..4340f0c3c 100644 --- a/pkg/internal/request/span_test.go +++ b/pkg/internal/request/span_test.go @@ -361,17 +361,17 @@ func TestSelfReferencingSpan(t *testing.T) { }{ { name: "Not a self-reference", - span: Span{Type: EventTypeHTTP, Method: "GET", Path: "/v1/metrics", RequestStart: 100, End: 200, Status: 200, Host: "10.10.10.10", Peer: "10.11.10.11", OtherNamespace: "", ServiceID: svc.ID{Namespace: ""}}, + span: Span{Type: EventTypeHTTP, Method: "GET", Path: "/v1/metrics", RequestStart: 100, End: 200, Status: 200, Host: "10.10.10.10", Peer: "10.11.10.11", OtherNamespace: "", ServiceID: svc.ID{UID: svc.UID{Namespace: ""}}}, selfref: false, }, { name: "Not a self-reference, same IP, different namespace", - span: Span{Type: EventTypeHTTP, Method: "GET", Path: "/v1/metrics", RequestStart: 100, End: 200, Status: 200, Host: "10.10.10.10", Peer: "10.10.10.10", OtherNamespace: "B", ServiceID: svc.ID{Namespace: "A"}}, + span: Span{Type: EventTypeHTTP, Method: "GET", Path: "/v1/metrics", RequestStart: 100, End: 200, Status: 200, Host: "10.10.10.10", Peer: "10.10.10.10", OtherNamespace: "B", ServiceID: svc.ID{UID: svc.UID{Namespace: "A"}}}, selfref: false, }, { name: "Same IP different namespace, but the other namespace is empty", - span: Span{Type: EventTypeHTTP, Method: "GET", Path: "/v1/metrics", RequestStart: 100, End: 200, Status: 200, Host: "10.10.10.10", Peer: "10.10.10.10", OtherNamespace: "", ServiceID: svc.ID{Namespace: "A"}}, + span: Span{Type: EventTypeHTTP, Method: "GET", Path: "/v1/metrics", RequestStart: 100, End: 200, Status: 200, Host: "10.10.10.10", Peer: "10.10.10.10", OtherNamespace: "", ServiceID: svc.ID{UID: svc.UID{Namespace: "A"}}}, selfref: true, }, } @@ -393,91 +393,91 @@ func TestHostPeerClientServer(t *testing.T) { }{ { name: "Same namespaces HTTP", - span: Span{Type: EventTypeHTTP, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{Namespace: "same"}}, + span: Span{Type: EventTypeHTTP, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "client", server: "server", }, { name: "Client in different namespace", - span: Span{Type: EventTypeHTTP, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{Namespace: "same"}}, + span: Span{Type: EventTypeHTTP, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "client.far", server: "server", }, { name: "Client in different namespace", - span: Span{Type: EventTypeHTTP, Peer: "1.1.1.1", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{Namespace: "same"}}, + span: Span{Type: EventTypeHTTP, Peer: "1.1.1.1", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "1.1.1.1", server: "server", }, { name: "Same namespaces for HTTP client", - span: Span{Type: EventTypeHTTPClient, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{Namespace: "same"}}, + span: Span{Type: EventTypeHTTPClient, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "client", server: "server", }, { name: "Server in different namespace ", - span: Span{Type: EventTypeHTTPClient, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{Namespace: "same"}}, + span: Span{Type: EventTypeHTTPClient, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "client", server: "server.far", }, { name: "Server in different namespace ", - span: Span{Type: EventTypeHTTPClient, PeerName: "client", Host: "2.2.2.2", OtherNamespace: "far", ServiceID: svc.ID{Namespace: "same"}}, + span: Span{Type: EventTypeHTTPClient, PeerName: "client", Host: "2.2.2.2", OtherNamespace: "far", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "client", server: "2.2.2.2", }, { name: "Same namespaces GRPC", - span: Span{Type: EventTypeGRPC, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{Namespace: "same"}}, + span: Span{Type: EventTypeGRPC, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "client", server: "server", }, { name: "Client in different namespace GRPC", - span: Span{Type: EventTypeGRPC, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{Namespace: "same"}}, + span: Span{Type: EventTypeGRPC, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "client.far", server: "server", }, { name: "Same namespaces for GRPC client", - span: Span{Type: EventTypeGRPCClient, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{Namespace: "same"}}, + span: Span{Type: EventTypeGRPCClient, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "client", server: "server", }, { name: "Server in different namespace GRPC", - span: Span{Type: EventTypeGRPCClient, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{Namespace: "same"}}, + span: Span{Type: EventTypeGRPCClient, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "client", server: "server.far", }, { name: "Same namespaces for SQL client", - span: Span{Type: EventTypeSQLClient, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{Namespace: "same"}}, + span: Span{Type: EventTypeSQLClient, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "client", server: "server", }, { name: "Server in different namespace SQL", - span: Span{Type: EventTypeSQLClient, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{Namespace: "same"}}, + span: Span{Type: EventTypeSQLClient, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "client", server: "server.far", }, { name: "Same namespaces for Redis client", - span: Span{Type: EventTypeRedisClient, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{Namespace: "same"}}, + span: Span{Type: EventTypeRedisClient, PeerName: "client", HostName: "server", OtherNamespace: "same", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "client", server: "server", }, { name: "Server in different namespace Redis", - span: Span{Type: EventTypeRedisClient, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{Namespace: "same"}}, + span: Span{Type: EventTypeRedisClient, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "client", server: "server.far", }, { name: "Client in different namespace Redis", - span: Span{Type: EventTypeRedisServer, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{Namespace: "same"}}, + span: Span{Type: EventTypeRedisServer, PeerName: "client", HostName: "server", OtherNamespace: "far", ServiceID: svc.ID{UID: svc.UID{Namespace: "same"}}}, client: "client.far", server: "server", }, diff --git a/pkg/internal/svc/svc.go b/pkg/internal/svc/svc.go index 443b5dc66..6350258b2 100644 --- a/pkg/internal/svc/svc.go +++ b/pkg/internal/svc/svc.go @@ -54,18 +54,21 @@ const ( exportsOTelTraces idFlags = 0x4 ) +// UID uniquely identifies a service instance across the whole system +// according to the OpenTelemetry specification: (name, namespace, instance) +type UID struct { + Name string + Namespace string + Instance string +} + // ID stores the metadata attributes of a service/resource // TODO: rename to svc.Attributes type ID struct { - // UID uniquely identifies a service instance. It is not exported - // in the metrics or traces, but it is used to compose the InstanceID + // Instance uniquely identifies a service instance. It is not exported + // in the metrics or traces, but it is used to compose the Instance UID UID - Name string - // AutoName is true if the Name has been automatically set by Beyla (e.g. executable name when - // the Name is empty). This will allow later refinement of the Name value (e.g. to override it - // again with Kubernetes metadata). - Namespace string SDKLanguage InstrumentableType Metadata map[attr.Name]string @@ -93,10 +96,10 @@ func (i *ID) String() string { } func (i *ID) Job() string { - if i.Namespace != "" { - return i.Namespace + "/" + i.Name + if i.UID.Namespace != "" { + return i.UID.Namespace + "/" + i.UID.Name } - return i.Name + return i.UID.Name } func (i *ID) setFlag(flag idFlags) { diff --git a/pkg/internal/svc/svc_test.go b/pkg/internal/svc/svc_test.go index 8ad9b9cb9..16da4a6a1 100644 --- a/pkg/internal/svc/svc_test.go +++ b/pkg/internal/svc/svc_test.go @@ -7,6 +7,6 @@ import ( ) func TestToString(t *testing.T) { - assert.Equal(t, "thens/thename", (&ID{Namespace: "thens", Name: "thename"}).String()) - assert.Equal(t, "thename", (&ID{Name: "thename"}).String()) + assert.Equal(t, "thens/thename", (&ID{UID: UID{Namespace: "thens", Name: "thename"}}).String()) + assert.Equal(t, "thename", (&ID{UID: UID{Name: "thename"}}).String()) } diff --git a/pkg/internal/svc/uid.go b/pkg/internal/svc/uid.go deleted file mode 100644 index d40249a88..000000000 --- a/pkg/internal/svc/uid.go +++ /dev/null @@ -1,43 +0,0 @@ -package svc - -import ( - "bytes" - "encoding/base32" - "encoding/binary" - "hash/fnv" -) - -var encoding = base32.NewEncoding("0123456789abcdefghijklmnopqrstuv").WithPadding('w') - -// UID uniquely identifies a service instance. -// When built through the Append function, it will contain a FNV64 hash string in Base32 -type UID string - -// NewUID creates a UID containing the argument hash as a Base32 string -func NewUID(fromString string) UID { - return UID("").Append(fromString) -} - -// Append returns a UID whose contents are the hash concatenating the current UID value -// (which is already a hash) with the passed string -func (u UID) Append(str string) UID { - return u.append([]byte(str)) -} - -// AppendUint32 returns a UID whose contents are the hash concatenating the current UID value -// (which is already a hash) with the bytes of the passed integer -func (u UID) AppendUint32(i uint32) UID { - return u.append(binary.LittleEndian.AppendUint32(nil, i)) -} - -func (u UID) append(content []byte) UID { - hasher := fnv.New64a() - _, _ = hasher.Write([]byte(u)) - _, _ = hasher.Write(content) - buf := bytes.Buffer{} - - encoder := base32.NewEncoder(encoding, &buf) - _, _ = encoder.Write(hasher.Sum(nil)) - - return UID(buf.String()) -} diff --git a/pkg/internal/svc/uid_test.go b/pkg/internal/svc/uid_test.go deleted file mode 100644 index 4d53f0b92..000000000 --- a/pkg/internal/svc/uid_test.go +++ /dev/null @@ -1,36 +0,0 @@ -package svc - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestUID_Append(t *testing.T) { - u := NewUID("test") - require.NotEmpty(t, string(u)) - - u1 := u.Append("a") - u2 := u.Append("a") - u3 := u.Append("b") - - // same operations in the same order must provide exact results - assert.Equal(t, u1, u2) - // different operations must provide different results - assert.NotEqual(t, u1, u3) -} - -func TestUID_AppendUint32(t *testing.T) { - u := NewUID("test") - require.NotEmpty(t, string(u)) - - u1 := u.AppendUint32(1) - u2 := u.AppendUint32(1) - u3 := u.AppendUint32(2) - - // same operations in the same order must provide exact results - assert.Equal(t, u1, u2) - // different operations must provide different results - assert.NotEqual(t, u1, u3) -} diff --git a/pkg/internal/traces/read_decorator.go b/pkg/internal/traces/read_decorator.go index 89ff0e947..3d60e4ad7 100644 --- a/pkg/internal/traces/read_decorator.go +++ b/pkg/internal/traces/read_decorator.go @@ -3,17 +3,14 @@ package traces import ( "context" "log/slog" + "strconv" - lru "github.com/hashicorp/golang-lru/v2" "github.com/mariomac/pipes/pipe" "github.com/grafana/beyla/pkg/internal/request" - "github.com/grafana/beyla/pkg/internal/svc" "github.com/grafana/beyla/pkg/internal/traces/hostname" ) -const defaultIDCacheLen = 128 - func rlog() *slog.Logger { return slog.With("component", "traces.ReadDecorator") } @@ -28,12 +25,6 @@ type InstanceIDConfig struct { // value. Beyla will anyway attach the process ID to the given hostname for composing // the instance ID. OverrideHostname string `yaml:"override_hostname" env:"BEYLA_HOSTNAME"` - - // Undocumented properties aimed at fine-grained tuning - - // InternalIDCacheLen will need to be increased if the number of instrumented processes by - // a single instance is larger than defaultIDCacheLen - InternalIDCacheLen int `yaml:"internal_cache_len" env:"BEYLA_INSTANCE_ID_INTERNAL_CACHE_LEN"` } // ReadDecorator is the input node of the processing graph. The eBPF tracers will send their @@ -84,20 +75,9 @@ func hostNamePIDDecorator(cfg *InstanceIDConfig) decorator { } // caching instance ID composition for speed and saving memory generation - cacheLen := defaultIDCacheLen - if cfg.InternalIDCacheLen != 0 { - cacheLen = cfg.InternalIDCacheLen - } - uidsCache, _ := lru.New[uint32, svc.UID](cacheLen) - return func(spans []request.Span) { for i := range spans { - uid, ok := uidsCache.Get(spans[i].Pid.HostPID) - if !ok { - uid = svc.NewUID(fullHostName).AppendUint32(spans[i].Pid.HostPID) - uidsCache.Add(spans[i].Pid.HostPID, uid) - } - spans[i].ServiceID.UID = uid + spans[i].ServiceID.UID.Instance = fullHostName + ":" + strconv.Itoa(int(spans[i].Pid.HostPID)) spans[i].ServiceID.HostName = fullHostName } } diff --git a/pkg/internal/traces/read_decorator_test.go b/pkg/internal/traces/read_decorator_test.go index 19cae32b8..9bfaf6ce7 100644 --- a/pkg/internal/traces/read_decorator_test.go +++ b/pkg/internal/traces/read_decorator_test.go @@ -25,25 +25,25 @@ func TestReadDecorator(t *testing.T) { require.NotEmpty(t, dnsHostname) type testCase struct { - desc string - cfg ReadDecorator - expectedUID svc.UID - expectedHN string + desc string + cfg ReadDecorator + expectedInstance string + expectedHN string } for _, tc := range []testCase{{ - desc: "dns", - cfg: ReadDecorator{InstanceID: InstanceIDConfig{HostnameDNSResolution: true}}, - expectedUID: svc.NewUID(dnsHostname).AppendUint32(1234), - expectedHN: dnsHostname, + desc: "dns", + cfg: ReadDecorator{InstanceID: InstanceIDConfig{HostnameDNSResolution: true}}, + expectedInstance: dnsHostname + ":1234", + expectedHN: dnsHostname, }, { - desc: "no-dns", - expectedUID: svc.NewUID(localHostname).AppendUint32(1234), - expectedHN: localHostname, + desc: "no-dns", + expectedInstance: localHostname + ":1234", + expectedHN: localHostname, }, { - desc: "override hostname", - cfg: ReadDecorator{InstanceID: InstanceIDConfig{OverrideHostname: "foooo"}}, - expectedUID: svc.NewUID("foooo").AppendUint32(1234), - expectedHN: "foooo", + desc: "override hostname", + cfg: ReadDecorator{InstanceID: InstanceIDConfig{OverrideHostname: "foooo"}}, + expectedInstance: "foooo:1234", + expectedHN: "foooo", }} { t.Run(tc.desc, func(t *testing.T) { cfg := tc.cfg @@ -60,9 +60,9 @@ func TestReadDecorator(t *testing.T) { } outSpans := testutil.ReadChannel(t, decoratedOutput, testTimeout) assert.Equal(t, []request.Span{ - {ServiceID: svc.ID{UID: tc.expectedUID, HostName: tc.expectedHN}, + {ServiceID: svc.ID{UID: svc.UID{Instance: tc.expectedInstance}, HostName: tc.expectedHN}, Path: "/foo", Pid: request.PidInfo{HostPID: 1234}}, - {ServiceID: svc.ID{UID: tc.expectedUID, HostName: tc.expectedHN}, + {ServiceID: svc.ID{UID: svc.UID{Instance: tc.expectedInstance}, HostName: tc.expectedHN}, Path: "/bar", Pid: request.PidInfo{HostPID: 1234}}, }, outSpans) }) diff --git a/pkg/kubecache/informer/informer.pb.go b/pkg/kubecache/informer/informer.pb.go index 58464c1b4..a3d4c6a1a 100644 --- a/pkg/kubecache/informer/informer.pb.go +++ b/pkg/kubecache/informer/informer.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.35.1 +// protoc-gen-go v1.35.2 // protoc v5.28.2 // source: proto/informer.proto @@ -249,8 +249,9 @@ type ContainerInfo struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` - Env map[string]string `protobuf:"bytes,2,rep,name=env,proto3" json:"env,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Name string `protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty"` + Env map[string]string `protobuf:"bytes,2,rep,name=env,proto3" json:"env,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } func (x *ContainerInfo) Reset() { @@ -290,6 +291,13 @@ func (x *ContainerInfo) GetId() string { return "" } +func (x *ContainerInfo) GetName() string { + if x != nil { + return x.Name + } + return "" +} + func (x *ContainerInfo) GetEnv() map[string]string { if x != nil { return x.Env @@ -479,39 +487,40 @@ var file_proto_informer_proto_rawDesc = []byte{ 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x0a, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x73, 0x12, 0x27, 0x0a, 0x06, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x73, 0x18, 0x06, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x69, 0x6e, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x4f, 0x77, 0x6e, - 0x65, 0x72, 0x52, 0x06, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x73, 0x22, 0x8b, 0x01, 0x0a, 0x0d, 0x43, + 0x65, 0x72, 0x52, 0x06, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x73, 0x22, 0x9f, 0x01, 0x0a, 0x0d, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x0e, 0x0a, 0x02, - 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x32, 0x0a, 0x03, - 0x65, 0x6e, 0x76, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x69, 0x6e, 0x66, 0x6f, - 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x49, 0x6e, - 0x66, 0x6f, 0x2e, 0x45, 0x6e, 0x76, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x03, 0x65, 0x6e, 0x76, - 0x1a, 0x36, 0x0a, 0x08, 0x45, 0x6e, 0x76, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, - 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, - 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, - 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x2f, 0x0a, 0x05, 0x4f, 0x77, 0x6e, 0x65, - 0x72, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x22, 0x74, 0x0a, 0x05, 0x45, 0x76, 0x65, - 0x6e, 0x74, 0x12, 0x27, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, - 0x32, 0x13, 0x2e, 0x69, 0x6e, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x45, 0x76, 0x65, 0x6e, - 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x35, 0x0a, 0x08, 0x72, - 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, - 0x69, 0x6e, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x4d, - 0x65, 0x74, 0x61, 0x48, 0x00, 0x52, 0x08, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x88, - 0x01, 0x01, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x22, - 0x12, 0x0a, 0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x65, 0x2a, 0x45, 0x0a, 0x09, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, - 0x12, 0x0b, 0x0a, 0x07, 0x43, 0x52, 0x45, 0x41, 0x54, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0b, 0x0a, - 0x07, 0x55, 0x50, 0x44, 0x41, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x44, 0x45, - 0x4c, 0x45, 0x54, 0x45, 0x44, 0x10, 0x02, 0x12, 0x11, 0x0a, 0x0d, 0x53, 0x59, 0x4e, 0x43, 0x5f, - 0x46, 0x49, 0x4e, 0x49, 0x53, 0x48, 0x45, 0x44, 0x10, 0x03, 0x32, 0x50, 0x0a, 0x12, 0x45, 0x76, - 0x65, 0x6e, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, - 0x12, 0x3a, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x1a, 0x2e, - 0x69, 0x6e, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, - 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x0f, 0x2e, 0x69, 0x6e, 0x66, 0x6f, - 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x30, 0x01, 0x42, 0x0c, 0x5a, 0x0a, - 0x2e, 0x2f, 0x69, 0x6e, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, + 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, + 0x12, 0x32, 0x0a, 0x03, 0x65, 0x6e, 0x76, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, + 0x69, 0x6e, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, + 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x45, 0x6e, 0x76, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, + 0x03, 0x65, 0x6e, 0x76, 0x1a, 0x36, 0x0a, 0x08, 0x45, 0x6e, 0x76, 0x45, 0x6e, 0x74, 0x72, 0x79, + 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, + 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x2f, 0x0a, 0x05, + 0x4f, 0x77, 0x6e, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x69, 0x6e, + 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x22, 0x74, 0x0a, + 0x05, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x27, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0e, 0x32, 0x13, 0x2e, 0x69, 0x6e, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, + 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, + 0x35, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x14, 0x2e, 0x69, 0x6e, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x4f, 0x62, 0x6a, + 0x65, 0x63, 0x74, 0x4d, 0x65, 0x74, 0x61, 0x48, 0x00, 0x52, 0x08, 0x72, 0x65, 0x73, 0x6f, 0x75, + 0x72, 0x63, 0x65, 0x88, 0x01, 0x01, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x72, 0x65, 0x73, 0x6f, 0x75, + 0x72, 0x63, 0x65, 0x22, 0x12, 0x0a, 0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, + 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2a, 0x45, 0x0a, 0x09, 0x45, 0x76, 0x65, 0x6e, 0x74, + 0x54, 0x79, 0x70, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x43, 0x52, 0x45, 0x41, 0x54, 0x45, 0x44, 0x10, + 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x50, 0x44, 0x41, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0b, + 0x0a, 0x07, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, 0x44, 0x10, 0x02, 0x12, 0x11, 0x0a, 0x0d, 0x53, + 0x59, 0x4e, 0x43, 0x5f, 0x46, 0x49, 0x4e, 0x49, 0x53, 0x48, 0x45, 0x44, 0x10, 0x03, 0x32, 0x50, + 0x0a, 0x12, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x65, 0x72, + 0x76, 0x69, 0x63, 0x65, 0x12, 0x3a, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, + 0x65, 0x12, 0x1a, 0x2e, 0x69, 0x6e, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x53, 0x75, 0x62, + 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x0f, 0x2e, + 0x69, 0x6e, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x30, 0x01, + 0x42, 0x0c, 0x5a, 0x0a, 0x2e, 0x2f, 0x69, 0x6e, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/pkg/kubecache/meta/informers_init.go b/pkg/kubecache/meta/informers_init.go index 44dabb04c..b72728a8e 100644 --- a/pkg/kubecache/meta/informers_init.go +++ b/pkg/kubecache/meta/informers_init.go @@ -234,24 +234,27 @@ func (inf *Informers) initPodInformer(ctx context.Context, informerFactory infor for i := range pod.Status.ContainerStatuses { containers = append(containers, &informer.ContainerInfo{ - Id: rmContainerIDSchema(pod.Status.ContainerStatuses[i].ContainerID), - Env: envToMap(inf.config.kubeClient, pod.ObjectMeta, pod.Spec.Containers[i].Env), + Name: pod.Spec.Containers[i].Name, + Id: rmContainerIDSchema(pod.Status.ContainerStatuses[i].ContainerID), + Env: envToMap(inf.config.kubeClient, pod.ObjectMeta, pod.Spec.Containers[i].Env), }, ) } for i := range pod.Status.InitContainerStatuses { containers = append(containers, &informer.ContainerInfo{ - Id: rmContainerIDSchema(pod.Status.InitContainerStatuses[i].ContainerID), - Env: envToMap(inf.config.kubeClient, pod.ObjectMeta, pod.Spec.InitContainers[i].Env), + Name: pod.Spec.InitContainers[i].Name, + Id: rmContainerIDSchema(pod.Status.InitContainerStatuses[i].ContainerID), + Env: envToMap(inf.config.kubeClient, pod.ObjectMeta, pod.Spec.InitContainers[i].Env), }, ) } for i := range pod.Status.EphemeralContainerStatuses { containers = append(containers, &informer.ContainerInfo{ - Id: rmContainerIDSchema(pod.Status.EphemeralContainerStatuses[i].ContainerID), - Env: envToMap(inf.config.kubeClient, pod.ObjectMeta, pod.Spec.EphemeralContainers[i].Env), + Name: pod.Spec.EphemeralContainers[i].Name, + Id: rmContainerIDSchema(pod.Status.EphemeralContainerStatuses[i].ContainerID), + Env: envToMap(inf.config.kubeClient, pod.ObjectMeta, pod.Spec.EphemeralContainers[i].Env), }, ) } diff --git a/pkg/transform/k8s.go b/pkg/transform/k8s.go index 124d88c8e..24c91a979 100644 --- a/pkg/transform/k8s.go +++ b/pkg/transform/k8s.go @@ -12,7 +12,6 @@ import ( "github.com/grafana/beyla/pkg/internal/kube" "github.com/grafana/beyla/pkg/internal/pipe/global" "github.com/grafana/beyla/pkg/internal/request" - "github.com/grafana/beyla/pkg/internal/svc" "github.com/grafana/beyla/pkg/kubecache/informer" "github.com/grafana/beyla/pkg/kubeflags" ) @@ -49,6 +48,13 @@ type KubernetesDecorator struct { // MetaCacheAddress is the host:port address of the beyla-k8s-cache service instance MetaCacheAddress string `yaml:"meta_cache_address" env:"BEYLA_KUBE_META_CACHE_ADDRESS"` + + // MetaSourceLabels allows Beyla overriding the service name and namespace of an application from + // the given labels. + // TODO Beyla 2.0. Consider defaulting to (and report as a breaking change): + // Name: "app.kubernetes.io/name", + // Namespace: "app.kubernetes.io/part-of", + MetaSourceLabels kube.MetaSourceLabels `yaml:"meta_source_labels"` } const ( @@ -93,8 +99,8 @@ func (md *metadataDecorator) nodeLoop(in <-chan []request.Span, out chan<- []req } func (md *metadataDecorator) do(span *request.Span) { - if objectMeta := md.db.PodByPIDNs(span.Pid.Namespace); objectMeta != nil { - md.appendMetadata(span, objectMeta) + if podMeta, containerName := md.db.PodContainerByPIDNs(span.Pid.Namespace); podMeta != nil { + md.appendMetadata(span, podMeta, containerName) } else { // do not leave the service attributes map as nil span.ServiceID.Metadata = map[attr.Name]string{} @@ -108,7 +114,7 @@ func (md *metadataDecorator) do(span *request.Span) { } } -func (md *metadataDecorator) appendMetadata(span *request.Span, meta *informer.ObjectMeta) { +func (md *metadataDecorator) appendMetadata(span *request.Span, meta *informer.ObjectMeta, containerName string) { if meta.Pod == nil { // if this message happen, there is a bug klog().Debug("pod metadata for is nil. Ignoring decoration", "meta", meta) @@ -120,16 +126,16 @@ func (md *metadataDecorator) appendMetadata(span *request.Span, meta *informer.O // service name and namespace, we will automatically set it from // the kubernetes metadata if span.ServiceID.AutoName() { - span.ServiceID.Name = name + span.ServiceID.UID.Name = name } - if span.ServiceID.Namespace == "" { - span.ServiceID.Namespace = namespace + if span.ServiceID.UID.Namespace == "" { + span.ServiceID.UID.Namespace = namespace } - // overriding the UID here will avoid reusing the OTEL resource reporter + // overriding the Instance here will avoid reusing the OTEL resource reporter // if the application/process was discovered and reported information // before the kubernetes metadata was available // (related issue: https://github.com/grafana/beyla/issues/1124) - span.ServiceID.UID = svc.NewUID(meta.Pod.Uid) + span.ServiceID.UID.Instance = meta.Name + ":" + containerName // if, in the future, other pipeline steps modify the service metadata, we should // replace the map literal by individual entry insertions diff --git a/pkg/transform/k8s_test.go b/pkg/transform/k8s_test.go index dbb0fdd3e..f20e008e2 100644 --- a/pkg/transform/k8s_test.go +++ b/pkg/transform/k8s_test.go @@ -22,7 +22,10 @@ const timeout = 5 * time.Second func TestDecoration(t *testing.T) { inf := &fakeInformer{} - store := kube.NewStore(inf) + store := kube.NewStore(inf, kube.MetaSourceLabels{ + ServiceName: "app.kubernetes.io/name", + ServiceNamespace: "app.kubernetes.io/part-of", + }) // pre-populated kubernetes metadata database inf.Notify(&informer.Event{Type: informer.EventType_CREATED, Resource: &informer.ObjectMeta{ Name: "pod-12", Namespace: "the-ns", Kind: "Pod", @@ -31,7 +34,7 @@ func TestDecoration(t *testing.T) { StartTimeStr: "2020-01-02 12:12:56", Uid: "uid-12", Owners: []*informer.Owner{{Kind: "Deployment", Name: "deployment-12"}}, - Containers: []*informer.ContainerInfo{{Id: "container-12"}}, + Containers: []*informer.ContainerInfo{{Name: "a-container", Id: "container-12"}}, }, }}) inf.Notify(&informer.Event{Type: informer.EventType_CREATED, Resource: &informer.ObjectMeta{ @@ -41,7 +44,7 @@ func TestDecoration(t *testing.T) { StartTimeStr: "2020-01-02 12:34:56", Uid: "uid-34", Owners: []*informer.Owner{{Kind: "ReplicaSet", Name: "rs"}}, - Containers: []*informer.ContainerInfo{{Id: "container-34"}}, + Containers: []*informer.ContainerInfo{{Name: "a-container", Id: "container-34"}}, }, }}) inf.Notify(&informer.Event{Type: informer.EventType_CREATED, Resource: &informer.ObjectMeta{ @@ -50,7 +53,20 @@ func TestDecoration(t *testing.T) { NodeName: "the-node", Uid: "uid-56", StartTimeStr: "2020-01-02 12:56:56", - Containers: []*informer.ContainerInfo{{Id: "container-56"}}, + Containers: []*informer.ContainerInfo{{Name: "a-container", Id: "container-56"}}, + }, + }}) + inf.Notify(&informer.Event{Type: informer.EventType_CREATED, Resource: &informer.ObjectMeta{ + Name: "overridden-meta", Namespace: "the-ns", Kind: "Pod", + Labels: map[string]string{ + "app.kubernetes.io/name": "a-cool-name", + "app.kubernetes.io/part-of": "a-cool-namespace", + }, + Pod: &informer.PodInfo{ + NodeName: "the-node", + Uid: "uid-78", + StartTimeStr: "2020-01-02 12:56:56", + Containers: []*informer.ContainerInfo{{Name: "a-container", Id: "container-78"}}, }, }}) kube.InfoForPID = func(pid uint32) (container.Info, error) { @@ -62,6 +78,7 @@ func TestDecoration(t *testing.T) { store.AddProcess(12) store.AddProcess(34) store.AddProcess(56) + store.AddProcess(78) dec := metadataDecorator{db: store, clusterName: "the-cluster"} inputCh, outputhCh := make(chan []request.Span, 10), make(chan []request.Span, 10) @@ -77,8 +94,9 @@ func TestDecoration(t *testing.T) { }} deco := testutil.ReadChannel(t, outputhCh, timeout) require.Len(t, deco, 1) - assert.Equal(t, "the-ns", deco[0].ServiceID.Namespace) - assert.Equal(t, "deployment-12", deco[0].ServiceID.Name) + assert.Equal(t, "the-ns", deco[0].ServiceID.UID.Namespace) + assert.Equal(t, "deployment-12", deco[0].ServiceID.UID.Name) + assert.EqualValues(t, "pod-12:a-container", deco[0].ServiceID.UID.Instance) assert.Equal(t, map[attr.Name]string{ "k8s.node.name": "the-node", "k8s.namespace.name": "the-ns", @@ -96,8 +114,9 @@ func TestDecoration(t *testing.T) { }} deco := testutil.ReadChannel(t, outputhCh, timeout) require.Len(t, deco, 1) - assert.Equal(t, "the-ns", deco[0].ServiceID.Namespace) - assert.Equal(t, "rs", deco[0].ServiceID.Name) + assert.Equal(t, "the-ns", deco[0].ServiceID.UID.Namespace) + assert.Equal(t, "rs", deco[0].ServiceID.UID.Name) + assert.EqualValues(t, "pod-34:a-container", deco[0].ServiceID.UID.Instance) assert.Equal(t, map[attr.Name]string{ "k8s.node.name": "the-node", "k8s.namespace.name": "the-ns", @@ -115,8 +134,9 @@ func TestDecoration(t *testing.T) { }} deco := testutil.ReadChannel(t, outputhCh, timeout) require.Len(t, deco, 1) - assert.Equal(t, "the-ns", deco[0].ServiceID.Namespace) - assert.Equal(t, "the-pod", deco[0].ServiceID.Name) + assert.Equal(t, "the-ns", deco[0].ServiceID.UID.Namespace) + assert.Equal(t, "the-pod", deco[0].ServiceID.UID.Name) + assert.EqualValues(t, "the-pod:a-container", deco[0].ServiceID.UID.Instance) assert.Equal(t, map[attr.Name]string{ "k8s.node.name": "the-node", "k8s.namespace.name": "the-ns", @@ -126,26 +146,45 @@ func TestDecoration(t *testing.T) { "k8s.cluster.name": "the-cluster", }, deco[0].ServiceID.Metadata) }) + t.Run("user can override service name and annotations via labels", func(t *testing.T) { + inputCh <- []request.Span{{ + Pid: request.PidInfo{Namespace: 1078}, ServiceID: autoNameSvc, + }} + deco := testutil.ReadChannel(t, outputhCh, timeout) + require.Len(t, deco, 1) + assert.Equal(t, "a-cool-namespace", deco[0].ServiceID.UID.Namespace) + assert.Equal(t, "a-cool-name", deco[0].ServiceID.UID.Name) + assert.EqualValues(t, "overridden-meta:a-container", deco[0].ServiceID.UID.Instance) + assert.Equal(t, map[attr.Name]string{ + "k8s.node.name": "the-node", + "k8s.namespace.name": "the-ns", + "k8s.pod.name": "overridden-meta", + "k8s.pod.uid": "uid-78", + "k8s.pod.start_time": "2020-01-02 12:56:56", + "k8s.cluster.name": "the-cluster", + }, deco[0].ServiceID.Metadata) + }) t.Run("process without pod Info won't be decorated", func(t *testing.T) { - svc := svc.ID{Name: "exec"} + svc := svc.ID{UID: svc.UID{Name: "exec"}} svc.SetAutoName() inputCh <- []request.Span{{ - Pid: request.PidInfo{Namespace: 1078}, ServiceID: svc, + Pid: request.PidInfo{Namespace: 1099}, ServiceID: svc, }} deco := testutil.ReadChannel(t, outputhCh, timeout) require.Len(t, deco, 1) - assert.Empty(t, deco[0].ServiceID.Namespace) - assert.Equal(t, "exec", deco[0].ServiceID.Name) + assert.Empty(t, deco[0].ServiceID.UID.Namespace) + assert.Equal(t, "exec", deco[0].ServiceID.UID.Name) assert.Empty(t, deco[0].ServiceID.Metadata) }) t.Run("if service name or namespace are manually specified, don't override them", func(t *testing.T) { inputCh <- []request.Span{{ - Pid: request.PidInfo{Namespace: 1012}, ServiceID: svc.ID{Name: "tralari", Namespace: "tralara"}, + Pid: request.PidInfo{Namespace: 1012}, ServiceID: svc.ID{UID: svc.UID{Name: "tralari", Namespace: "tralara"}}, }} deco := testutil.ReadChannel(t, outputhCh, timeout) require.Len(t, deco, 1) - assert.Equal(t, "tralara", deco[0].ServiceID.Namespace) - assert.Equal(t, "tralari", deco[0].ServiceID.Name) + assert.Equal(t, "tralara", deco[0].ServiceID.UID.Namespace) + assert.Equal(t, "tralari", deco[0].ServiceID.UID.Name) + assert.EqualValues(t, "pod-12:a-container", deco[0].ServiceID.UID.Instance) assert.Equal(t, map[attr.Name]string{ "k8s.node.name": "the-node", "k8s.namespace.name": "the-ns", diff --git a/pkg/transform/name_resolver.go b/pkg/transform/name_resolver.go index aa4903320..922966241 100644 --- a/pkg/transform/name_resolver.go +++ b/pkg/transform/name_resolver.go @@ -121,8 +121,8 @@ func (nr *NameResolver) resolveNames(span *request.Span) { pn, span.OtherNamespace = nr.resolve(&span.ServiceID, span.Peer) hn, ns = nr.resolve(&span.ServiceID, span.Host) } - if span.ServiceID.Namespace == "" && ns != "" { - span.ServiceID.Namespace = ns + if span.ServiceID.UID.Namespace == "" && ns != "" { + span.ServiceID.UID.Namespace = ns } // don't set names if the peer and host names have been already decorated // in a previous stage (e.g. Kubernetes decorator) @@ -153,10 +153,10 @@ func (nr *NameResolver) resolve(svc *svc.ID, ip string) (string, string) { func (nr *NameResolver) cleanName(svc *svc.ID, ip, n string) string { n = strings.TrimSuffix(n, ".") n = trimSuffixIgnoreCase(n, ".svc.cluster.local") - n = trimSuffixIgnoreCase(n, "."+svc.Namespace) + n = trimSuffixIgnoreCase(n, "."+svc.UID.Namespace) kubeNamespace, ok := svc.Metadata[attr.K8sNamespaceName] - if ok && kubeNamespace != "" && kubeNamespace != svc.Namespace { + if ok && kubeNamespace != "" && kubeNamespace != svc.UID.Namespace { n = trimSuffixIgnoreCase(n, "."+kubeNamespace) } @@ -186,10 +186,10 @@ func (nr *NameResolver) dnsResolve(svc *svc.ID, ip string) (string, string) { if nr.sources.Has(ResolverDNS) { n := nr.resolveIP(ip) if n == ip { - return n, svc.Namespace + return n, svc.UID.Namespace } n = nr.cleanName(svc, ip, n) - return n, svc.Namespace + return n, svc.UID.Namespace } return "", "" } diff --git a/pkg/transform/name_resolver_test.go b/pkg/transform/name_resolver_test.go index 49e82896b..89eafc82d 100644 --- a/pkg/transform/name_resolver_test.go +++ b/pkg/transform/name_resolver_test.go @@ -32,7 +32,7 @@ func TestSuffixPrefix(t *testing.T) { func TestResolvePodsFromK8s(t *testing.T) { inf := &fakeInformer{} - db := kube2.NewStore(inf) + db := kube2.NewStore(inf, kube2.MetaSourceLabels{}) pod1 := &informer.ObjectMeta{Name: "pod1", Kind: "Pod", Ips: []string{"10.0.0.1", "10.1.0.1"}} pod2 := &informer.ObjectMeta{Name: "pod2", Namespace: "something", Kind: "Pod", Ips: []string{"10.0.0.2", "10.1.0.2"}} pod3 := &informer.ObjectMeta{Name: "pod3", Kind: "Pod", Ips: []string{"10.0.0.3", "10.1.0.3"}} @@ -71,26 +71,26 @@ func TestResolvePodsFromK8s(t *testing.T) { Type: request.EventTypeHTTPClient, Peer: "10.0.0.1", Host: "10.0.0.2", - ServiceID: svc.ID{ + ServiceID: svc.ID{UID: svc.UID{ Name: "pod1", Namespace: "", - }, + }}, } serverSpan := request.Span{ Type: request.EventTypeHTTP, Peer: "10.0.0.1", Host: "10.0.0.2", - ServiceID: svc.ID{ + ServiceID: svc.ID{UID: svc.UID{ Name: "pod2", Namespace: "something", - }, + }}, } nr.resolveNames(&clientSpan) assert.Equal(t, "pod1", clientSpan.PeerName) - assert.Equal(t, "", clientSpan.ServiceID.Namespace) + assert.Equal(t, "", clientSpan.ServiceID.UID.Namespace) assert.Equal(t, "pod2", clientSpan.HostName) assert.Equal(t, "something", clientSpan.OtherNamespace) @@ -99,12 +99,12 @@ func TestResolvePodsFromK8s(t *testing.T) { assert.Equal(t, "pod1", serverSpan.PeerName) assert.Equal(t, "", serverSpan.OtherNamespace) assert.Equal(t, "pod2", serverSpan.HostName) - assert.Equal(t, "something", serverSpan.ServiceID.Namespace) + assert.Equal(t, "something", serverSpan.ServiceID.UID.Namespace) } func TestResolveServiceFromK8s(t *testing.T) { inf := &fakeInformer{} - db := kube2.NewStore(inf) + db := kube2.NewStore(inf, kube2.MetaSourceLabels{}) pod1 := &informer.ObjectMeta{Name: "pod1", Kind: "Service", Ips: []string{"10.0.0.1", "10.1.0.1"}} pod2 := &informer.ObjectMeta{Name: "pod2", Namespace: "something", Kind: "Service", Ips: []string{"10.0.0.2", "10.1.0.2"}} pod3 := &informer.ObjectMeta{Name: "pod3", Kind: "Service", Ips: []string{"10.0.0.3", "10.1.0.3"}} @@ -142,26 +142,26 @@ func TestResolveServiceFromK8s(t *testing.T) { Type: request.EventTypeHTTPClient, Peer: "10.0.0.1", Host: "10.0.0.2", - ServiceID: svc.ID{ + ServiceID: svc.ID{UID: svc.UID{ Name: "pod1", Namespace: "", - }, + }}, } serverSpan := request.Span{ Type: request.EventTypeHTTP, Peer: "10.0.0.1", Host: "10.0.0.2", - ServiceID: svc.ID{ + ServiceID: svc.ID{UID: svc.UID{ Name: "pod2", Namespace: "something", - }, + }}, } nr.resolveNames(&clientSpan) assert.Equal(t, "pod1", clientSpan.PeerName) - assert.Equal(t, "", clientSpan.ServiceID.Namespace) + assert.Equal(t, "", clientSpan.ServiceID.UID.Namespace) assert.Equal(t, "pod2", clientSpan.HostName) assert.Equal(t, "something", clientSpan.OtherNamespace) @@ -170,13 +170,15 @@ func TestResolveServiceFromK8s(t *testing.T) { assert.Equal(t, "pod1", serverSpan.PeerName) assert.Equal(t, "", serverSpan.OtherNamespace) assert.Equal(t, "pod2", serverSpan.HostName) - assert.Equal(t, "something", serverSpan.ServiceID.Namespace) + assert.Equal(t, "something", serverSpan.ServiceID.UID.Namespace) } func TestCleanName(t *testing.T) { s := svc.ID{ - Name: "service", - Namespace: "special.namespace", + UID: svc.UID{ + Name: "service", + Namespace: "special.namespace", + }, Metadata: map[attr.Name]string{ attr.K8sNamespaceName: "k8snamespace", }, @@ -194,7 +196,7 @@ func TestCleanName(t *testing.T) { func TestResolveNodesFromK8s(t *testing.T) { inf := &fakeInformer{} - db := kube2.NewStore(inf) + db := kube2.NewStore(inf, kube2.MetaSourceLabels{}) node1 := &informer.ObjectMeta{Name: "node1", Kind: "Node", Ips: []string{"10.0.0.1", "10.1.0.1"}} node2 := &informer.ObjectMeta{Name: "node2", Namespace: "something", Kind: "Node", Ips: []string{"10.0.0.2", "10.1.0.2"}} node3 := &informer.ObjectMeta{Name: "node3", Kind: "Node", Ips: []string{"10.0.0.3", "10.1.0.3"}} @@ -232,26 +234,26 @@ func TestResolveNodesFromK8s(t *testing.T) { Type: request.EventTypeHTTPClient, Peer: "10.0.0.1", Host: "10.0.0.2", - ServiceID: svc.ID{ + ServiceID: svc.ID{UID: svc.UID{ Name: "node1", Namespace: "", - }, + }}, } serverSpan := request.Span{ Type: request.EventTypeHTTP, Peer: "10.0.0.1", Host: "10.0.0.2", - ServiceID: svc.ID{ + ServiceID: svc.ID{UID: svc.UID{ Name: "node2", Namespace: "something", - }, + }}, } nr.resolveNames(&clientSpan) assert.Equal(t, "node1", clientSpan.PeerName) - assert.Equal(t, "", clientSpan.ServiceID.Namespace) + assert.Equal(t, "", clientSpan.ServiceID.UID.Namespace) assert.Equal(t, "node2", clientSpan.HostName) assert.Equal(t, "something", clientSpan.OtherNamespace) @@ -260,5 +262,5 @@ func TestResolveNodesFromK8s(t *testing.T) { assert.Equal(t, "node1", serverSpan.PeerName) assert.Equal(t, "", serverSpan.OtherNamespace) assert.Equal(t, "node2", serverSpan.HostName) - assert.Equal(t, "something", serverSpan.ServiceID.Namespace) + assert.Equal(t, "something", serverSpan.ServiceID.UID.Namespace) } diff --git a/proto/informer.proto b/proto/informer.proto index a9352c950..83071af38 100644 --- a/proto/informer.proto +++ b/proto/informer.proto @@ -34,6 +34,7 @@ message PodInfo { message ContainerInfo { string id = 1; + string name = 3; map env = 2; } diff --git a/test/integration/k8s/daemonset/k8s_daemonset_traces_test.go b/test/integration/k8s/daemonset/k8s_daemonset_traces_test.go index 81803c185..671aaaa9f 100644 --- a/test/integration/k8s/daemonset/k8s_daemonset_traces_test.go +++ b/test/integration/k8s/daemonset/k8s_daemonset_traces_test.go @@ -57,6 +57,7 @@ func TestBasicTracing(t *testing.T) { for _, proc := range trace.Processes { sd := jaeger.DiffAsRegexp([]jaeger.Tag{ {Key: "service.namespace", Type: "string", Value: "^default$"}, + {Key: "service.instance.id", Type: "string", Value: "^otherinstance-.+:otherinstance"}, }, proc.Tags) require.Empty(t, sd) } @@ -131,6 +132,7 @@ func TestBasicTracing(t *testing.T) { for _, proc := range trace.Processes { sd := jaeger.DiffAsRegexp([]jaeger.Tag{ {Key: "service.namespace", Type: "string", Value: "^default$"}, + {Key: "service.instance.id", Type: "string", Value: "^otherinstance-.+:otherinstance"}, }, proc.Tags) require.Empty(t, sd) } diff --git a/test/integration/k8s/daemonset_multi_node/k8s_daemonset_multi_node_traces_test.go b/test/integration/k8s/daemonset_multi_node/k8s_daemonset_multi_node_traces_test.go index 365d750b4..45b6444a6 100644 --- a/test/integration/k8s/daemonset_multi_node/k8s_daemonset_multi_node_traces_test.go +++ b/test/integration/k8s/daemonset_multi_node/k8s_daemonset_multi_node_traces_test.go @@ -61,9 +61,10 @@ func TestMultiNodeTracing(t *testing.T) { parent := res[0] require.NotEmpty(t, parent.TraceID) traceID = parent.TraceID - sd := jaeger.Diff([]jaeger.Tag{ - {Key: "service.namespace", Type: "string", Value: "integration-test"}, - {Key: "telemetry.sdk.language", Type: "string", Value: "go"}, + sd := jaeger.DiffAsRegexp([]jaeger.Tag{ + {Key: "service.namespace", Type: "string", Value: "^integration-test$"}, + {Key: "telemetry.sdk.language", Type: "string", Value: "^go$"}, + {Key: "service.instance.id", Type: "string", Value: "^testserver-.+:testserver$"}, }, trace.Processes[parent.ProcessID].Tags) require.Empty(t, sd, sd.String()) diff --git a/test/integration/k8s/daemonset_multi_node_l7/k8s_daemonset_multi_node_traces_test.go b/test/integration/k8s/daemonset_multi_node_l7/k8s_daemonset_multi_node_traces_test.go index 757f1775c..f7fc0242a 100644 --- a/test/integration/k8s/daemonset_multi_node_l7/k8s_daemonset_multi_node_traces_test.go +++ b/test/integration/k8s/daemonset_multi_node_l7/k8s_daemonset_multi_node_traces_test.go @@ -61,9 +61,10 @@ func TestMultiNodeTracingL7(t *testing.T) { parent := res[0] require.NotEmpty(t, parent.TraceID) traceID = parent.TraceID - sd := jaeger.Diff([]jaeger.Tag{ - {Key: "service.namespace", Type: "string", Value: "integration-test"}, - {Key: "telemetry.sdk.language", Type: "string", Value: "go"}, + sd := jaeger.DiffAsRegexp([]jaeger.Tag{ + {Key: "service.namespace", Type: "string", Value: "^integration-test$"}, + {Key: "telemetry.sdk.language", Type: "string", Value: "^go$"}, + {Key: "service.instance.id", Type: "string", Value: "^testserver-.+:testserver"}, }, trace.Processes[parent.ProcessID].Tags) require.Empty(t, sd, sd.String()) diff --git a/test/integration/k8s/daemonset_python/k8s_daemonset_traces_test.go b/test/integration/k8s/daemonset_python/k8s_daemonset_traces_test.go index 325b9c48d..763f2bbdc 100644 --- a/test/integration/k8s/daemonset_python/k8s_daemonset_traces_test.go +++ b/test/integration/k8s/daemonset_python/k8s_daemonset_traces_test.go @@ -51,9 +51,10 @@ func TestPythonBasicTracing(t *testing.T) { res := trace.FindByOperationName("GET /greeting") require.Len(t, res, 1) parent := res[0] - sd := jaeger.Diff([]jaeger.Tag{ - {Key: "service.namespace", Type: "string", Value: "integration-test"}, - {Key: "telemetry.sdk.language", Type: "string", Value: "python"}, + sd := jaeger.DiffAsRegexp([]jaeger.Tag{ + {Key: "service.namespace", Type: "string", Value: "^integration-test$"}, + {Key: "telemetry.sdk.language", Type: "string", Value: "^python$"}, + {Key: "service.instance.id", Type: "string", Value: "^pytestserver-.+:pytestserver$"}, }, trace.Processes[parent.ProcessID].Tags) require.Empty(t, sd, sd.String()) @@ -65,6 +66,7 @@ func TestPythonBasicTracing(t *testing.T) { {Key: "k8s.pod.start_time", Type: "string", Value: k8s.TimeRegex}, {Key: "k8s.namespace.name", Type: "string", Value: "^default$"}, {Key: "k8s.cluster.name", Type: "string", Value: "^beyla$"}, + {Key: "service.instance.id", Type: "string", Value: "^pytestserver-.+:pytestserver"}, }, trace.Processes[parent.ProcessID].Tags) require.Empty(t, sd, sd.String()) @@ -106,9 +108,10 @@ func TestPythonBasicTracing(t *testing.T) { res := trace.FindByOperationName("GET /smoke") require.Len(t, res, 1) parent := res[0] - sd := jaeger.Diff([]jaeger.Tag{ - {Key: "service.namespace", Type: "string", Value: "integration-test"}, - {Key: "telemetry.sdk.language", Type: "string", Value: "python"}, + sd := jaeger.DiffAsRegexp([]jaeger.Tag{ + {Key: "service.namespace", Type: "string", Value: "^integration-test$"}, + {Key: "telemetry.sdk.language", Type: "string", Value: "^python$"}, + {Key: "service.instance.id", Type: "string", Value: "^pytestserver-.+:pytestserver$"}, }, trace.Processes[parent.ProcessID].Tags) require.Empty(t, sd, sd.String()) @@ -120,6 +123,7 @@ func TestPythonBasicTracing(t *testing.T) { {Key: "k8s.pod.start_time", Type: "string", Value: k8s.TimeRegex}, {Key: "k8s.namespace.name", Type: "string", Value: "^default$"}, {Key: "k8s.cluster.name", Type: "string", Value: "^beyla$"}, + {Key: "service.instance.id", Type: "string", Value: "^pytestserver-.+:pytestserver"}, }, trace.Processes[parent.ProcessID].Tags) require.Empty(t, sd, sd.String()) diff --git a/test/integration/k8s/informer_cache/k8s_informer_cache_main_test.go b/test/integration/k8s/informer_cache/k8s_informer_cache_main_test.go index dbd15dc04..dace50eba 100644 --- a/test/integration/k8s/informer_cache/k8s_informer_cache_main_test.go +++ b/test/integration/k8s/informer_cache/k8s_informer_cache_main_test.go @@ -66,6 +66,9 @@ func TestInformersCache_MetricsDecoration_HTTP(t *testing.T) { map[string]string{ "server_service_namespace": "default", "k8s_cluster_name": "my-kube", + "service_name": "overridden-testserver-name", + "service_namespace": "overridden-testserver-namespace", + "service_instance_id": "testserver-.+:testserver", })) } @@ -73,6 +76,7 @@ func TestInformersCache_ProcessMetrics(t *testing.T) { cluster.TestEnv().Test(t, k8s.FeatureProcessMetricsDecoration( map[string]string{ "k8s_cluster_name": "my-kube", + "instance": "testserver-.+:testserver", })) } diff --git a/test/integration/k8s/manifests/05-uninstrumented-service.yml b/test/integration/k8s/manifests/05-uninstrumented-service.yml index 5cbf9a1aa..8c3d06cf4 100644 --- a/test/integration/k8s/manifests/05-uninstrumented-service.yml +++ b/test/integration/k8s/manifests/05-uninstrumented-service.yml @@ -33,6 +33,8 @@ metadata: name: testserver labels: app: testserver + app.kubernetes.io/name: 'overridden-testserver-name' + app.kubernetes.io/part-of: 'overridden-testserver-namespace' spec: replicas: 1 selector: diff --git a/test/integration/k8s/manifests/06-beyla-external-informer.yml b/test/integration/k8s/manifests/06-beyla-external-informer.yml index 9d6fc6ecb..438b32fc1 100644 --- a/test/integration/k8s/manifests/06-beyla-external-informer.yml +++ b/test/integration/k8s/manifests/06-beyla-external-informer.yml @@ -17,6 +17,9 @@ data: enable: true cluster_name: my-kube meta_cache_address: k8s-cache:50055 + meta_source_labels: + service_name: "app.kubernetes.io/name" + service_namespace: "app.kubernetes.io/part-of" select: "*": include: [ "*" ] diff --git a/test/integration/k8s/owners/k8s_daemonset_metadata_test.go b/test/integration/k8s/owners/k8s_daemonset_metadata_test.go index a5c8a2336..76aed8f98 100644 --- a/test/integration/k8s/owners/k8s_daemonset_metadata_test.go +++ b/test/integration/k8s/owners/k8s_daemonset_metadata_test.go @@ -51,6 +51,7 @@ func TestDaemonSetMetadata(t *testing.T) { for _, proc := range trace.Processes { sd := jaeger.DiffAsRegexp([]jaeger.Tag{ {Key: "service.namespace", Type: "string", Value: "^default$"}, + {Key: "service.instance.id", Type: "string", Value: "^dsservice-.+:dsservice"}, }, proc.Tags) require.Empty(t, sd) } @@ -67,6 +68,8 @@ func TestDaemonSetMetadata(t *testing.T) { {Key: "k8s.daemonset.name", Type: "string", Value: "^dsservice$"}, {Key: "k8s.namespace.name", Type: "string", Value: "^default$"}, {Key: "k8s.cluster.name", Type: "string", Value: "^beyla$"}, + {Key: "service.namespace", Type: "string", Value: "^default$"}, + {Key: "service.instance.id", Type: "string", Value: "^dsservice-.+:dsservice"}, }, trace.Processes[parent.ProcessID].Tags) require.Empty(t, sd) diff --git a/test/integration/k8s/owners/k8s_statefulset_metadata_test.go b/test/integration/k8s/owners/k8s_statefulset_metadata_test.go index 047ea1800..a21f77a70 100644 --- a/test/integration/k8s/owners/k8s_statefulset_metadata_test.go +++ b/test/integration/k8s/owners/k8s_statefulset_metadata_test.go @@ -51,6 +51,7 @@ func TestStatefulSetMetadata(t *testing.T) { for _, proc := range trace.Processes { sd := jaeger.DiffAsRegexp([]jaeger.Tag{ {Key: "service.namespace", Type: "string", Value: "^default$"}, + {Key: "service.instance.id", Type: "string", Value: "^statefulservice-.+:statefulservice"}, }, proc.Tags) require.Empty(t, sd) } @@ -67,6 +68,8 @@ func TestStatefulSetMetadata(t *testing.T) { {Key: "k8s.statefulset.name", Type: "string", Value: "^statefulservice$"}, {Key: "k8s.namespace.name", Type: "string", Value: "^default$"}, {Key: "k8s.cluster.name", Type: "string", Value: "^beyla$"}, + {Key: "service.namespace", Type: "string", Value: "^default$"}, + {Key: "service.instance.id", Type: "string", Value: "^statefulservice-.+:statefulservice"}, }, trace.Processes[parent.ProcessID].Tags) require.Empty(t, sd) diff --git a/test/integration/red_test_rust.go b/test/integration/red_test_rust.go index 67101396e..838071d7d 100644 --- a/test/integration/red_test_rust.go +++ b/test/integration/red_test_rust.go @@ -108,7 +108,7 @@ func testREDMetricsForRustHTTPLibrary(t *testing.T, url, comm, namespace string, assert.Equal(t, comm, process.ServiceName) serviceInstance, ok := jaeger.FindIn(process.Tags, "service.instance.id") require.Truef(t, ok, "service.instance.id not found in tags: %v", process.Tags) - assert.Regexp(t, `^\w+$`, serviceInstance.Value) + assert.Regexp(t, `^beyla:\d+$$`, serviceInstance.Value) sd = jaeger.Diff([]jaeger.Tag{ {Key: "otel.library.name", Type: "string", Value: "github.com/grafana/beyla"}, {Key: "telemetry.sdk.language", Type: "string", Value: "rust"}, diff --git a/test/integration/traces_test.go b/test/integration/traces_test.go index 4517abee9..0355242d3 100644 --- a/test/integration/traces_test.go +++ b/test/integration/traces_test.go @@ -142,7 +142,7 @@ func testHTTPTracesCommon(t *testing.T, doTraceID bool, httpCode int) { serviceInstance, ok := jaeger.FindIn(process.Tags, "service.instance.id") require.Truef(t, ok, "service.instance.id not found in tags: %v", process.Tags) - assert.Regexp(t, `^\w+$`, serviceInstance.Value) + assert.Regexp(t, `^beyla:\d+$$`, serviceInstance.Value) jaeger.Diff([]jaeger.Tag{ {Key: "otel.library.name", Type: "string", Value: "github.com/grafana/beyla"}, @@ -252,7 +252,7 @@ func testGRPCTracesForServiceName(t *testing.T, svcName string) { serviceInstance, ok := jaeger.FindIn(process.Tags, "service.instance.id") require.Truef(t, ok, "service.instance.id not found in tags: %v", process.Tags) - assert.Regexp(t, `^\w+$`, serviceInstance.Value) + assert.Regexp(t, `^beyla:\d+$$`, serviceInstance.Value) jaeger.Diff([]jaeger.Tag{ {Key: "otel.library.name", Type: "string", Value: "github.com/grafana/beyla"}, @@ -395,7 +395,7 @@ func testHTTPTracesKProbes(t *testing.T) { serviceInstance, ok := jaeger.FindIn(process.Tags, "service.instance.id") require.Truef(t, ok, "service.instance.id not found in tags: %v", process.Tags) - assert.Regexp(t, `^\w+$`, serviceInstance.Value) + assert.Regexp(t, `^beyla:\d+$$`, serviceInstance.Value) jaeger.Diff([]jaeger.Tag{ {Key: "otel.library.name", Type: "string", Value: "github.com/grafana/beyla"},