Skip to content

Commit

Permalink
OTEL process attributes moved to resource-level (#1003)
Browse files Browse the repository at this point in the history
* Generify ReporterPool key

* Move process.Status ID to its own field

* Move OTEL Process attributes to resource-level

* removed comment

* Add host name to resource attributes

* fix unit tests
  • Loading branch information
mariomac authored Jul 10, 2024
1 parent 683b97e commit 224e973
Show file tree
Hide file tree
Showing 12 changed files with 221 additions and 165 deletions.
21 changes: 13 additions & 8 deletions pkg/internal/export/attributes/attr_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,26 +159,31 @@ func getDefinitions(groups AttrGroups) map[Section]AttrReportGroup {
},
}

var processAttributes = AttrReportGroup{
SubGroups: []*AttrReportGroup{&appKubeAttributes, &hostAttributes},
// TODO: attributes below are resource-level, but in App O11y we don't treat processes as resources,
// but applications. Let's first consider how to match processes and Applications before marking this spec
// as stable
// the following attributes are only reported as metric attributes in Prometheus,
// as the OTEL standard defines them as resource attributes.
var promProcessAttributes = AttrReportGroup{
Disabled: !promEnabled,
Attributes: map[attr.Name]Default{
attr.ProcCommand: true,
attr.ProcCPUState: true,
attr.ProcOwner: true,
attr.ProcParentPid: true,
attr.ProcPid: true,
attr.ProcDiskIODir: true,
attr.ProcNetIODir: true,
attr.ProcCommandLine: false,
attr.ProcCommandArgs: false,
attr.ProcExecName: false,
attr.ProcExecPath: false,
},
}

var processAttributes = AttrReportGroup{
SubGroups: []*AttrReportGroup{&appKubeAttributes, &hostAttributes, &promProcessAttributes},
Attributes: map[attr.Name]Default{
attr.ProcCPUState: true,
attr.ProcDiskIODir: true,
attr.ProcNetIODir: true,
},
}

var messagingAttributes = AttrReportGroup{
SubGroups: []*AttrReportGroup{&appAttributes, &appKubeAttributes},
Attributes: map[attr.Name]Default{
Expand Down
52 changes: 32 additions & 20 deletions pkg/internal/export/otel/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.19.0"
"google.golang.org/grpc/credentials"

Expand Down Expand Up @@ -60,17 +59,23 @@ var DefaultBuckets = Buckets{
RequestSizeHistogram: []float64{0, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192},
}

func getResourceAttrs(service *svc.ID) *resource.Resource {
func getAppResourceAttrs(service *svc.ID) []attribute.KeyValue {
return append(getResourceAttrs(service),
semconv.ServiceInstanceID(service.Instance),
)
}

func getResourceAttrs(service *svc.ID) []attribute.KeyValue {
attrs := []attribute.KeyValue{
semconv.ServiceName(service.Name),
semconv.ServiceInstanceID(service.Instance),
// SpanMetrics requires an extra attribute besides service name
// to generate the traces_target_info metric,
// so the service is visible in the ServicesList
// This attribute also allows that App O11y plugin shows this app as a Go application.
semconv.TelemetrySDKLanguageKey.String(service.SDKLanguage.String()),
// We set the SDK name as Beyla, so we can distinguish beyla generated metrics from other SDKs
semconv.TelemetrySDKNameKey.String("beyla"),
semconv.HostName(service.HostName),
}

if service.Namespace != "" {
Expand All @@ -81,17 +86,18 @@ func getResourceAttrs(service *svc.ID) *resource.Resource {
attrs = append(attrs, k.OTEL().String(v))
}

return resource.NewWithAttributes(semconv.SchemaURL, attrs...)
return attrs
}

// ReporterPool keeps an LRU cache of different OTEL reporters given a service name.
type ReporterPool[T any] struct {
type ReporterPool[K uidGetter, T any] struct {
pool *simplelru.LRU[svc.UID, *expirable[T]]

itemConstructor func(*svc.ID) (T, error)
itemConstructor func(getter K) (T, error)

lastReporter *expirable[T]
lastService *svc.ID
lastReporter *expirable[T]
lastService uidGetter
lastServiceUID svc.UID

// TODO: use cacheable clock for efficiency
clock expire.Clock
Expand All @@ -106,23 +112,27 @@ type expirable[T any] struct {
value T
}

type uidGetter interface {
GetUID() svc.UID
}

// NewReporterPool creates a ReporterPool instance given a cache length,
// an eviction callback to be invoked each time an element is removed
// from the cache, and a constructor function that will specify how to
// instantiate the generic OTEL metrics/traces reporter.
func NewReporterPool[T any](
func NewReporterPool[K uidGetter, T any](
cacheLen int,
ttl time.Duration,
clock expire.Clock,
callback simplelru.EvictCallback[svc.UID, *expirable[T]],
itemConstructor func(id *svc.ID) (T, error),
) ReporterPool[T] {
itemConstructor func(id K) (T, error),
) ReporterPool[K, T] {
pool, err := simplelru.NewLRU[svc.UID, *expirable[T]](cacheLen, callback)
if err != nil {
// should never happen: bug!
panic(err)
}
return ReporterPool[T]{
return ReporterPool[K, T]{
pool: pool,
itemConstructor: itemConstructor,
ttl: ttl,
Expand All @@ -133,7 +143,7 @@ func NewReporterPool[T any](

// For retrieves the associated item for the given service name, or
// creates a new one if it does not exist
func (rp *ReporterPool[T]) For(service *svc.ID) (T, error) {
func (rp *ReporterPool[K, T]) For(service K) (T, error) {
rp.expireOldReporters()
// optimization: do not query the resources' cache if the
// previously processed span belongs to the same service name
Expand All @@ -142,12 +152,14 @@ func (rp *ReporterPool[T]) For(service *svc.ID) (T, error) {
// only a single instrumented process.
// In multi-process tracing, this is likely to happen as most
// tracers group traces belonging to the same service in the same slice.
if rp.lastService == nil || service.UID != rp.lastService.UID {
lm, err := rp.get(service)
svcUID := service.GetUID()
if rp.lastServiceUID == "" || svcUID != rp.lastService.GetUID() {
lm, err := rp.get(svcUID, service)
if err != nil {
var t T
return t, err
}
rp.lastServiceUID = svcUID
rp.lastService = service
rp.lastReporter = lm
}
Expand All @@ -159,7 +171,7 @@ func (rp *ReporterPool[T]) For(service *svc.ID) (T, error) {

// expireOldReporters will remove the metrics reporters that haven't been accessed
// during the last TTL period
func (rp *ReporterPool[T]) expireOldReporters() {
func (rp *ReporterPool[K, T]) expireOldReporters() {
now := rp.clock()
if now.Sub(rp.lastExpiration) < rp.ttl {
return
Expand All @@ -174,16 +186,16 @@ func (rp *ReporterPool[T]) expireOldReporters() {
}
}

func (rp *ReporterPool[T]) get(service *svc.ID) (*expirable[T], error) {
if e, ok := rp.pool.Get(service.UID); ok {
func (rp *ReporterPool[K, T]) get(uid svc.UID, service K) (*expirable[T], error) {
if e, ok := rp.pool.Get(uid); ok {
return e, nil
}
m, err := rp.itemConstructor(service)
if err != nil {
return nil, fmt.Errorf("creating resource for service %q: %w", service, err)
return nil, fmt.Errorf("creating resource for service %v: %w", service, err)
}
e := &expirable[T]{value: m}
rp.pool.Add(service.UID, e)
rp.pool.Add(uid, e)
return e, nil
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/internal/export/otel/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
instrument "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.19.0"

"github.com/grafana/beyla/pkg/internal/export/attributes"
Expand Down Expand Up @@ -162,7 +163,7 @@ type MetricsReporter struct {
cfg *MetricsConfig
attributes *attributes.AttrSelector
exporter metric.Exporter
reporters ReporterPool[*Metrics]
reporters ReporterPool[*svc.ID, *Metrics]
is instrumentations.InstrumentationSelection

// user-selected fields for each of the reported metrics
Expand Down Expand Up @@ -276,7 +277,7 @@ func newMetricsReporter(
request.SpanOTELGetters, mr.attributes.For(attributes.MessagingProcessDuration))
}

mr.reporters = NewReporterPool(cfg.ReportersCacheLen, cfg.TTL, timeNow,
mr.reporters = NewReporterPool[*svc.ID, *Metrics](cfg.ReportersCacheLen, cfg.TTL, timeNow,
func(id svc.UID, v *expirable[*Metrics]) {
if mr.cfg.SpanMetricsEnabled() {
attrOpt := instrument.WithAttributeSet(mr.metricResourceAttributes(v.value.service))
Expand Down Expand Up @@ -517,7 +518,7 @@ func (mr *MetricsReporter) setupGraphMeters(m *Metrics, meter instrument.Meter)
func (mr *MetricsReporter) newMetricSet(service *svc.ID) (*Metrics, error) {
mlog := mlog().With("service", service)
mlog.Debug("creating new Metrics reporter")
resources := getResourceAttrs(service)
resources := resource.NewWithAttributes(semconv.SchemaURL, getAppResourceAttrs(service)...)

opts := []metric.Option{
metric.WithResource(resources),
Expand Down
34 changes: 25 additions & 9 deletions pkg/internal/export/otel/metrics_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"fmt"
"log/slog"
"slices"
"strconv"

"github.com/mariomac/pipes/pipe"
"go.opentelemetry.io/otel/attribute"
metric2 "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.19.0"

"github.com/grafana/beyla/pkg/internal/export/attributes"
attr2 "github.com/grafana/beyla/pkg/internal/export/attributes/names"
Expand Down Expand Up @@ -52,7 +55,7 @@ type procMetricsExporter struct {
clock *expire.CachedClock

exporter metric.Exporter
reporters ReporterPool[*procMetrics]
reporters ReporterPool[*process.ID, *procMetrics]

log *slog.Logger

Expand All @@ -76,7 +79,6 @@ type procMetricsExporter struct {

type procMetrics struct {
ctx context.Context
service *svc.ID
provider *metric.MeterProvider

// don't forget to add the cleanup code in cleanupAllMetricsInstances function
Expand Down Expand Up @@ -165,7 +167,7 @@ func newProcMetricsExporter(
mr.netObserver = netAggregatedObserver
}

mr.reporters = NewReporterPool[*procMetrics](cfg.Metrics.ReportersCacheLen, cfg.Metrics.TTL, timeNow,
mr.reporters = NewReporterPool[*process.ID, *procMetrics](cfg.Metrics.ReportersCacheLen, cfg.Metrics.TTL, timeNow,
func(id svc.UID, v *expirable[*procMetrics]) {
llog := log.With("service", id)
llog.Debug("evicting metrics reporter from cache")
Expand All @@ -186,10 +188,25 @@ func newProcMetricsExporter(
return mr.Do, nil
}

func (me *procMetricsExporter) newMetricSet(service *svc.ID) (*procMetrics, error) {
log := me.log.With("service", service)
func getProcessResourceAttrs(procID *process.ID) []attribute.KeyValue {
return append(
getAppResourceAttrs(procID.Service),
semconv.ServiceInstanceID(string(procID.UID)),
attr2.ProcCommand.OTEL().String(procID.Command),
attr2.ProcOwner.OTEL().String(procID.User),
attr2.ProcParentPid.OTEL().String(strconv.Itoa(int(procID.ParentProcessID))),
attr2.ProcPid.OTEL().String(strconv.Itoa(int(procID.ProcessID))),
attr2.ProcCommandLine.OTEL().String(procID.CommandLine),
attr2.ProcCommandArgs.OTEL().StringSlice(procID.CommandArgs),
attr2.ProcExecName.OTEL().String(procID.ExecName),
attr2.ProcExecPath.OTEL().String(procID.ExecPath),
)
}

func (me *procMetricsExporter) newMetricSet(procID *process.ID) (*procMetrics, error) {
log := me.log.With("service", procID.Service, "processID", procID.UID)
log.Debug("creating new Metrics exporter")
resources := getResourceAttrs(service)
resources := resource.NewWithAttributes(semconv.SchemaURL, getProcessResourceAttrs(procID)...)
opts := []metric.Option{
metric.WithResource(resources),
metric.WithReader(metric.NewPeriodicReader(me.exporter,
Expand All @@ -198,7 +215,6 @@ func (me *procMetricsExporter) newMetricSet(service *svc.ID) (*procMetrics, erro

m := procMetrics{
ctx: me.ctx,
service: service,
provider: metric.NewMeterProvider(opts...),
}

Expand Down Expand Up @@ -285,10 +301,10 @@ func (me *procMetricsExporter) Do(in <-chan []*process.Status) {
for i := range in {
me.clock.Update()
for _, s := range i {
reporter, err := me.reporters.For(s.Service)
reporter, err := me.reporters.For(&s.ID)
if err != nil {
me.log.Error("unexpected error creating OTEL resource. Ignoring metric",
err, "service", s.Service)
err, "service", s.ID.Service)
continue
}
me.observeMetric(reporter, s)
Expand Down
Loading

0 comments on commit 224e973

Please sign in to comment.