diff --git a/changelogs/8.15.asciidoc b/changelogs/8.15.asciidoc index 2c2eae9120b..d0dfd40f761 100644 --- a/changelogs/8.15.asciidoc +++ b/changelogs/8.15.asciidoc @@ -33,5 +33,5 @@ https://github.com/elastic/apm-server/compare/v8.14.2\...v8.15.0[View commits] - Upgraded bundled APM Java agent attacher CLI to version 1.50.0 {pull}13326[13326] - Enable Kibana curated UIs to work with hostmetrics from OpenTelemetry's https://pkg.go.dev/go.opentelemetry.io/collector/receiver/hostmetricsreceiver[hostmetricsreceiver] {pull}13196[13196] - Add require data stream to bulk index requests {pull}13398[13398] -- Support self-instrumentation when in managed mode by getting tracing configs via reloader {pull}13514[13514] +- Support self-instrumentation when in managed mode by getting tracing configs via reloader {pull}13514[13514] {pull}13653[13653] - Add mapping for OpenTelemetry attribute `messaging.destination.name` to derive `service.target` correctly {pull}13472[13472] diff --git a/internal/beater/beater.go b/internal/beater/beater.go index 2071e11f84b..da7327920a5 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -292,7 +292,7 @@ func (s *Runner) Run(ctx context.Context) error { } } - instrumentation, err := instrumentation.New(s.rawConfig, "apm-server", version.Version) + instrumentation, err := newInstrumentation(s.rawConfig) if err != nil { return err } @@ -537,6 +537,57 @@ func (s *Runner) Run(ctx context.Context) error { return errors.Join(result, closeErr) } +// newInstrumentation is a thin wrapper around libbeat instrumentation that +// sets missing tracer configuration from elastic agent. +func newInstrumentation(rawConfig *agentconfig.C) (instrumentation.Instrumentation, error) { + // This config struct contains missing fields from elastic agent APMConfig + // https://github.com/elastic/elastic-agent/blob/main/internal/pkg/core/monitoring/config/config.go#L127 + // that are not directly handled by libbeat instrumentation below. + // + // Note that original config keys were additionally marshalled by + // https://github.com/elastic/elastic-agent/blob/main/pkg/component/runtime/apm_config_mapper.go#L18 + // that's why the keys are different from the original APMConfig struct. + var apmCfg struct { + GlobalLabels string `config:"globallabels"` + TLS struct { + SkipVerify bool `config:"skipverify"` + ServerCertificate string `config:"servercert"` + ServerCA string `config:"serverca"` + } `config:"tls"` + } + cfg, err := rawConfig.Child("instrumentation", -1) + if err != nil { + // Fallback to instrumentation.New if the configs are not present. + return instrumentation.New(rawConfig, "apm-server", version.Version) + } + if err := cfg.Unpack(&apmCfg); err != nil { + return nil, err + } + const ( + envVerifyServerCert = "ELASTIC_APM_VERIFY_SERVER_CERT" + envServerCert = "ELASTIC_APM_SERVER_CERT" + envCACert = "ELASTIC_APM_SERVER_CA_CERT_FILE" + envGlobalLabels = "ELASTIC_APM_GLOBAL_LABELS" + ) + if apmCfg.TLS.SkipVerify { + os.Setenv(envVerifyServerCert, "false") + defer os.Unsetenv(envVerifyServerCert) + } + if apmCfg.TLS.ServerCertificate != "" { + os.Setenv(envServerCert, apmCfg.TLS.ServerCertificate) + defer os.Unsetenv(envServerCert) + } + if apmCfg.TLS.ServerCA != "" { + os.Setenv(envCACert, apmCfg.TLS.ServerCA) + defer os.Unsetenv(envCACert) + } + if len(apmCfg.GlobalLabels) > 0 { + os.Setenv(envGlobalLabels, apmCfg.GlobalLabels) + defer os.Unsetenv(envGlobalLabels) + } + return instrumentation.New(rawConfig, "apm-server", version.Version) +} + func maxConcurrentDecoders(memLimitGB float64) uint { // Allow 128 concurrent decoders for each 1GB memory, limited to at most 2048. const max = 2048 diff --git a/internal/beater/beater_test.go b/internal/beater/beater_test.go index 3e911b35d8b..b510a89b066 100644 --- a/internal/beater/beater_test.go +++ b/internal/beater/beater_test.go @@ -18,11 +18,15 @@ package beater import ( + "compress/zlib" "context" "encoding/json" + "encoding/pem" "fmt" "net/http" "net/http/httptest" + "os" + "path/filepath" "testing" "time" @@ -227,3 +231,43 @@ func TestRunnerNewDocappenderConfig(t *testing.T) { }) } } + +func TestNewInstrumentation(t *testing.T) { + labels := make(chan map[string]string, 1) + defer close(labels) + s := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/intake/v2/events" { + var b struct { + Metadata struct { + Labels map[string]string `json:"labels"` + } `json:"metadata"` + } + zr, _ := zlib.NewReader(r.Body) + _ = json.NewDecoder(zr).Decode(&b) + labels <- b.Metadata.Labels + } + w.WriteHeader(http.StatusOK) + })) + defer s.Close() + certPath := filepath.Join(t.TempDir(), "cert.pem") + f, err := os.Create(certPath) + assert.NoError(t, err) + err = pem.Encode(f, &pem.Block{Type: "CERTIFICATE", Bytes: s.Certificate().Raw}) + assert.NoError(t, err) + cfg := agentconfig.MustNewConfigFrom(map[string]interface{}{ + "instrumentation": map[string]interface{}{ + "enabled": true, + "hosts": []string{s.URL}, + "tls": map[string]interface{}{ + "servercert": certPath, + }, + "globallabels": "k1=val,k2=new val", + }, + }) + i, err := newInstrumentation(cfg) + require.NoError(t, err) + tracer := i.Tracer() + tracer.StartTransaction("name", "type").End() + tracer.Flush(nil) + assert.Equal(t, map[string]string{"k1": "val", "k2": "new val"}, <-labels) +}