Skip to content

Commit

Permalink
Set instance ID from pod:container and let setting metadata from anno…
Browse files Browse the repository at this point in the history
…tations (#1391)

* Set instance ID from container ID and let setting metadata from annotations

* amended tests

* refactored service unique ID

* some minor changes

* Fix kube tests and another memory leak in store
  • Loading branch information
mariomac authored Nov 22, 2024
1 parent 66b1975 commit 65111df
Show file tree
Hide file tree
Showing 52 changed files with 436 additions and 435 deletions.
1 change: 1 addition & 0 deletions pkg/components/beyla.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func buildCommonContextInfo(
ResyncPeriod: config.Attributes.Kubernetes.InformersResyncPeriod,
DisabledInformers: config.Attributes.Kubernetes.DisableInformers,
MetaCacheAddr: config.Attributes.Kubernetes.MetaCacheAddress,
MetaSourceLabels: config.Attributes.Kubernetes.MetaSourceLabels,
}),
}
switch {
Expand Down
8 changes: 4 additions & 4 deletions pkg/export/debug/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ func textPrinter(input <-chan []request.Span) {
hn := ""

if spans[i].IsClientSpan() {
if spans[i].ServiceID.Namespace != "" {
pn = "." + spans[i].ServiceID.Namespace
if spans[i].ServiceID.UID.Namespace != "" {
pn = "." + spans[i].ServiceID.UID.Namespace
}
if spans[i].OtherNamespace != "" {
hn = "." + spans[i].OtherNamespace
Expand All @@ -93,8 +93,8 @@ func textPrinter(input <-chan []request.Span) {
if spans[i].OtherNamespace != "" {
pn = "." + spans[i].OtherNamespace
}
if spans[i].ServiceID.Namespace != "" {
hn = "." + spans[i].ServiceID.Namespace
if spans[i].ServiceID.UID.Namespace != "" {
hn = "." + spans[i].ServiceID.UID.Namespace
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/export/debug/debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestTracePrinterValidEnabled(t *testing.T) {

func traceFuncHelper(t *testing.T, tracePrinter TracePrinter) string {
fakeSpan := request.Span{
ServiceID: svc.ID{Name: "bar", Namespace: "foo", SDKLanguage: svc.InstrumentableGolang},
ServiceID: svc.ID{UID: svc.UID{Name: "bar", Namespace: "foo"}, SDKLanguage: svc.InstrumentableGolang},
Type: request.EventTypeHTTP,
Method: "method",
Path: "path",
Expand Down
12 changes: 7 additions & 5 deletions pkg/export/otel/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ var DefaultBuckets = Buckets{

func getAppResourceAttrs(hostID string, service *svc.ID) []attribute.KeyValue {
return append(getResourceAttrs(hostID, service),
semconv.ServiceInstanceID(string(service.UID)),
semconv.ServiceInstanceID(service.UID.Instance),
)
}

func getResourceAttrs(hostID string, service *svc.ID) []attribute.KeyValue {
attrs := []attribute.KeyValue{
semconv.ServiceName(service.Name),
semconv.ServiceName(service.UID.Name),
// SpanMetrics requires an extra attribute besides service name
// to generate the traces_target_info metric,
// so the service is visible in the ServicesList
Expand All @@ -80,8 +80,8 @@ func getResourceAttrs(hostID string, service *svc.ID) []attribute.KeyValue {
semconv.HostID(hostID),
}

if service.Namespace != "" {
attrs = append(attrs, semconv.ServiceNamespace(service.Namespace))
if service.UID.Namespace != "" {
attrs = append(attrs, semconv.ServiceNamespace(service.UID.Namespace))
}

for k, v := range service.Metadata {
Expand Down Expand Up @@ -142,6 +142,8 @@ func NewReporterPool[K uidGetter, T any](
}
}

var emptyUID = svc.UID{}

// For retrieves the associated item for the given service name, or
// creates a new one if it does not exist
func (rp *ReporterPool[K, T]) For(service K) (T, error) {
Expand All @@ -154,7 +156,7 @@ func (rp *ReporterPool[K, T]) For(service K) (T, error) {
// In multi-process tracing, this is likely to happen as most
// tracers group traces belonging to the same service in the same slice.
svcUID := service.GetUID()
if rp.lastServiceUID == "" || svcUID != rp.lastService.GetUID() {
if rp.lastServiceUID == emptyUID || svcUID != rp.lastService.GetUID() {
lm, err := rp.get(svcUID, service)
if err != nil {
var t T
Expand Down
20 changes: 10 additions & 10 deletions pkg/export/otel/expirer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ func TestAppMetricsExpiration_ByMetricAttrs(t *testing.T) {

// WHEN it receives metrics
metrics <- []request.Span{
{ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200},
{ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 150, End: 175},
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200},
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 150, End: 175},
}

// THEN the metrics are exported
Expand All @@ -189,7 +189,7 @@ func TestAppMetricsExpiration_ByMetricAttrs(t *testing.T) {
// AND WHEN it keeps receiving a subset of the initial metrics during the TTL
now.Advance(2 * time.Minute)
metrics <- []request.Span{
{ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 250, End: 280},
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 250, End: 280},
}

// THEN THE metrics that have been received during the TTL period are still visible
Expand All @@ -203,7 +203,7 @@ func TestAppMetricsExpiration_ByMetricAttrs(t *testing.T) {

now.Advance(2 * time.Minute)
metrics <- []request.Span{
{ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 300, End: 310},
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 300, End: 310},
}

// makes sure that the records channel is emptied and any remaining
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestAppMetricsExpiration_ByMetricAttrs(t *testing.T) {
// AND WHEN the metrics labels that disappeared are received again
now.Advance(2 * time.Minute)
metrics <- []request.Span{
{ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 450, End: 520},
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 450, End: 520},
}

// THEN they are reported again, starting from zero in the case of counters
Expand Down Expand Up @@ -284,8 +284,8 @@ func TestAppMetricsExpiration_BySvcID(t *testing.T) {

// WHEN it receives metrics
metrics <- []request.Span{
{ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200},
{ServiceID: svc.ID{UID: "bar"}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 150, End: 175},
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200},
{ServiceID: svc.ID{UID: svc.UID{Instance: "bar"}}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 150, End: 175},
}

// THEN the metrics are exported
Expand All @@ -308,7 +308,7 @@ func TestAppMetricsExpiration_BySvcID(t *testing.T) {
// AND WHEN it keeps receiving a subset of the initial metrics during the TTL
now.Advance(2 * time.Minute)
metrics <- []request.Span{
{ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 250, End: 280},
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 250, End: 280},
}

// THEN THE metrics that have been received during the TTL period are still visible
Expand All @@ -322,7 +322,7 @@ func TestAppMetricsExpiration_BySvcID(t *testing.T) {

now.Advance(2 * time.Minute)
metrics <- []request.Span{
{ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 300, End: 310},
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 300, End: 310},
}

// BUT not the metrics that haven't been received during that time.
Expand Down Expand Up @@ -351,7 +351,7 @@ func TestAppMetricsExpiration_BySvcID(t *testing.T) {
// AND WHEN the metrics labels that disappeared are received again
now.Advance(2 * time.Minute)
metrics <- []request.Span{
{ServiceID: svc.ID{UID: "bar"}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 450, End: 520},
{ServiceID: svc.ID{UID: svc.UID{Instance: "bar"}}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 450, End: 520},
}

// THEN they are reported again, starting from zero in the case of counters
Expand Down
6 changes: 3 additions & 3 deletions pkg/export/otel/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,9 +709,9 @@ func otelHistogramConfig(metricName string, buckets []float64, useExponentialHis

func (mr *MetricsReporter) metricResourceAttributes(service *svc.ID) attribute.Set {
attrs := []attribute.KeyValue{
request.ServiceMetric(service.Name),
semconv.ServiceInstanceID(string(service.UID)),
semconv.ServiceNamespace(service.Namespace),
request.ServiceMetric(service.UID.Name),
semconv.ServiceInstanceID(service.UID.Instance),
semconv.ServiceNamespace(service.UID.Namespace),
semconv.TelemetrySDKLanguageKey.String(service.SDKLanguage.String()),
semconv.TelemetrySDKNameKey.String("beyla"),
request.SourceMetric("beyla"),
Expand Down
2 changes: 1 addition & 1 deletion pkg/export/otel/metrics_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func newProcMetricsExporter(
func getProcessResourceAttrs(hostID string, procID *process.ID) []attribute.KeyValue {
return append(
getResourceAttrs(hostID, procID.Service),
semconv.ServiceInstanceID(string(procID.UID)),
semconv.ServiceInstanceID(procID.UID.Instance),
attr2.ProcCommand.OTEL().String(procID.Command),
attr2.ProcOwner.OTEL().String(procID.User),
attr2.ProcParentPid.OTEL().String(strconv.Itoa(int(procID.ParentProcessID))),
Expand Down
8 changes: 4 additions & 4 deletions pkg/export/otel/metrics_proc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ func TestProcMetrics_Aggregated(t *testing.T) {

// WHEN it receives process metrics
metrics <- []*process.Status{
{ID: process.ID{Command: "foo", Service: &svc.ID{}, UID: "foo"},
{ID: process.ID{Command: "foo", Service: &svc.ID{}, UID: svc.UID{Instance: "foo"}},
CPUUtilisationWait: 3, CPUUtilisationSystem: 2, CPUUtilisationUser: 1,
CPUTimeUserDelta: 30, CPUTimeWaitDelta: 20, CPUTimeSystemDelta: 10,
IOReadBytesDelta: 123, IOWriteBytesDelta: 456,
NetRcvBytesDelta: 11, NetTxBytesDelta: 22,
},
{ID: process.ID{Command: "bar", Service: &svc.ID{}, UID: "bar"},
{ID: process.ID{Command: "bar", Service: &svc.ID{}, UID: svc.UID{Instance: "bar"}},
CPUUtilisationWait: 31, CPUUtilisationSystem: 21, CPUUtilisationUser: 11,
CPUTimeUserDelta: 301, CPUTimeWaitDelta: 201, CPUTimeSystemDelta: 101,
IOReadBytesDelta: 321, IOWriteBytesDelta: 654,
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestProcMetrics_Aggregated(t *testing.T) {

// AND WHEN new metrics are received
metrics <- []*process.Status{
{ID: process.ID{Command: "foo", Service: &svc.ID{}, UID: "foo"},
{ID: process.ID{Command: "foo", Service: &svc.ID{}, UID: svc.UID{Instance: "foo"}},
CPUUtilisationWait: 4, CPUUtilisationSystem: 1, CPUUtilisationUser: 2,
CPUTimeUserDelta: 3, CPUTimeWaitDelta: 2, CPUTimeSystemDelta: 1,
IOReadBytesDelta: 1, IOWriteBytesDelta: 2,
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestProcMetrics_Disaggregated(t *testing.T) {

// WHEN it receives process metrics
metrics <- []*process.Status{
{ID: process.ID{Command: "foo", Service: &svc.ID{}, UID: "foo"},
{ID: process.ID{Command: "foo", Service: &svc.ID{}, UID: svc.UID{Instance: "foo"}},
CPUUtilisationWait: 3, CPUUtilisationSystem: 2, CPUUtilisationUser: 1,
CPUTimeUserDelta: 30, CPUTimeWaitDelta: 20, CPUTimeSystemDelta: 10,
IOReadBytesDelta: 123, IOWriteBytesDelta: 456,
Expand Down
20 changes: 10 additions & 10 deletions pkg/export/otel/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,15 +492,15 @@ func TestAppMetrics_ByInstrumentation(t *testing.T) {
*/
// WHEN it receives metrics
metrics <- []request.Span{
{ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200},
{ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTPClient, Path: "/bar", RequestStart: 150, End: 175},
{ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeGRPC, Path: "/foo", RequestStart: 100, End: 200},
{ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeGRPCClient, Path: "/bar", RequestStart: 150, End: 175},
{ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeSQLClient, Path: "SELECT", RequestStart: 150, End: 175},
{ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeRedisClient, Method: "SET", RequestStart: 150, End: 175},
{ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeRedisServer, Method: "GET", RequestStart: 150, End: 175},
{ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeKafkaClient, Method: "publish", RequestStart: 150, End: 175},
{ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeKafkaServer, Method: "process", RequestStart: 150, End: 175},
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200},
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTPClient, Path: "/bar", RequestStart: 150, End: 175},
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeGRPC, Path: "/foo", RequestStart: 100, End: 200},
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeGRPCClient, Path: "/bar", RequestStart: 150, End: 175},
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeSQLClient, Path: "SELECT", RequestStart: 150, End: 175},
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeRedisClient, Method: "SET", RequestStart: 150, End: 175},
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeRedisServer, Method: "GET", RequestStart: 150, End: 175},
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeKafkaClient, Method: "publish", RequestStart: 150, End: 175},
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeKafkaServer, Method: "process", RequestStart: 150, End: 175},
}

// Read the exported metrics, add +extraColl for HTTP size metrics
Expand Down Expand Up @@ -545,7 +545,7 @@ func TestAppMetrics_ResourceAttributes(t *testing.T) {
go otelExporter(metrics)

metrics <- []request.Span{
{ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200},
{ServiceID: svc.ID{UID: svc.UID{Instance: "foo"}}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200},
}

res := readNChan(t, otlp.Records(), 1, timeout)
Expand Down
3 changes: 2 additions & 1 deletion pkg/export/otel/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,8 @@ func getRetrySettings(cfg TracesConfig) configretry.BackOffConfig {
}

func traceAppResourceAttrs(hostID string, service *svc.ID) []attribute.KeyValue {
if service.UID == "" {
// TODO: remove?
if service.UID == emptyUID {
return getAppResourceAttrs(hostID, service)
}

Expand Down
Loading

0 comments on commit 65111df

Please sign in to comment.