From aa78e0de2ca55a772c3237124b597503548fceec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Wed, 25 Jun 2025 16:49:52 +0200 Subject: [PATCH 1/3] chore: remove toolchain directive from go.mod MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit IDK where it was there to begin with. The [go module spec][1] says: > The toolchain directive only has an effect when the module is the main > module and the default toolchain’s version is less than the suggested > toolchain’s version. So removing this should not have any impact. [1]: https://go.dev/ref/mod#go-mod-file-toolchain --- go.mod | 2 -- 1 file changed, 2 deletions(-) diff --git a/go.mod b/go.mod index fe8b6ae..1db5981 100644 --- a/go.mod +++ b/go.mod @@ -2,8 +2,6 @@ module github.com/DataDog/go-runtime-metrics-internal go 1.21 -toolchain go1.22.3 - require github.com/stretchr/testify v1.9.0 require ( From ca40190dd2d69ec624e6d805cd098a6e18467a77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Wed, 25 Jun 2025 16:49:52 +0200 Subject: [PATCH 2/3] chore: upgrade go.mod go directive to 1.23 This brings us inline with the minimal go version required by dd-trace-go and also allows using `cmp.Or` which is introduced by a later commit in this PR. --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 1db5981..46a2cb4 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/DataDog/go-runtime-metrics-internal -go 1.21 +go 1.23 require github.com/stretchr/testify v1.9.0 From 7914529110f8ac903562fb8f93cabc7571f30e72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Wed, 25 Jun 2025 16:49:52 +0200 Subject: [PATCH 3/3] feat: Add new Emitter API to start and stop metrics --- pkg/runtimemetrics/runtime_metrics.go | 140 ++++++++++++--------- pkg/runtimemetrics/runtime_metrics_test.go | 48 ++++--- pkg/runtimemetrics/tags.go | 17 +-- pkg/runtimemetrics/tags_test.go | 12 +- 4 files changed, 112 insertions(+), 105 deletions(-) diff --git a/pkg/runtimemetrics/runtime_metrics.go b/pkg/runtimemetrics/runtime_metrics.go index 1d351f5..0aef287 100644 --- a/pkg/runtimemetrics/runtime_metrics.go +++ b/pkg/runtimemetrics/runtime_metrics.go @@ -2,7 +2,7 @@ package runtimemetrics import ( - "errors" + "cmp" "fmt" "log/slog" "math" @@ -13,48 +13,63 @@ import ( "time" ) -// pollFrequency is the frequency at which we poll runtime/metrics and report -// them to statsd. The statsd client aggregates this data, usually over a 2s -// window [1], and so does the agent, usually over a 10s window [2]. -// -// Our goal is to submit one data point per aggregation window, using the -// CountWithTimestamp / GaugeWithTimestamp APIs for submitting precisely aligned -// metrics, to enable comparing them with one another. -// -// [1] https://github.com/DataDog/datadog-go/blob/e612112c8bb396b33ad5d9edd645d289b07d0e40/statsd/options.go/#L23 -// [2] https://docs.datadoghq.com/developers/dogstatsd/data_aggregation/#how-is-aggregation-performed-with-the-dogstatsd-server -var pollFrequency = 10 * time.Second - -var unknownMetricLogOnce, unsupportedKindLogOnce sync.Once - -// mu protects the variables below -var mu sync.Mutex -var enabled bool - -// NOTE: The Start method below is intentionally minimal for now. We probably want to think about -// this API a bit more before we publish it in dd-trace-go. I.e. do we want to make the -// pollFrequency configurable (higher resolution at the cost of higher overhead on the agent and -// statsd library)? Do we want to support multiple instances? We probably also want a (flushing?) -// stop method. - -// Start starts reporting runtime/metrics to the given statsd client. -func Start(statsd partialStatsdClientInterface, logger *slog.Logger) error { - mu.Lock() - defer mu.Unlock() - - if enabled { - // We could support multiple instances, but the use cases for it are not - // clear, so for now let's consider this to be a misconfiguration. - return errors.New("runtimemetrics has already been started") +// Options are the options for the runtime metrics emitter. +type Options struct { + // Logger is used to log errors. Defaults to slog.Default() if nil. + Logger *slog.Logger + // Tags are added to all metrics. + Tags []string + // Period is the period at which we poll runtime/metrics and report + // them to statsd. Defaults to 10s. + // + // The statsd client aggregates this data, usually over a 2s window [1], and + // so does the agent, usually over a 10s window [2]. + // + // We submit one data point per aggregation window, using the + // CountWithTimestamp / GaugeWithTimestamp APIs for submitting precisely + // aligned metrics, to enable comparing them with one another. + // + // [1] https://github.com/DataDog/datadog-go/blob/e612112c8bb396b33ad5d9edd645d289b07d0e40/statsd/options.go/#L23 + // [2] https://docs.datadoghq.com/developers/dogstatsd/data_aggregation/#how-is-aggregation-performed-with-the-dogstatsd-server + Period time.Duration +} + +// NewEmitter creates a new runtime metrics emitter and starts it. +func NewEmitter(statsd partialStatsdClientInterface, opts *Options) *Emitter { + if opts == nil { + opts = &Options{} + } + e := &Emitter{ + statsd: statsd, + logger: cmp.Or(opts.Logger, slog.Default()), + tags: opts.Tags, + stop: make(chan struct{}), + period: cmp.Or(opts.Period, 10*time.Second), } + go e.emit() + return e +} +// Emitter submits runtime/metrics to statsd on a regular interval. +type Emitter struct { + statsd partialStatsdClientInterface + logger *slog.Logger + tags []string + period time.Duration + + stop chan struct{} +} + +// emit emits runtime/metrics to statsd on a regular interval. +func (e *Emitter) emit() { descs := metrics.All() - rms := newRuntimeMetricStore(descs, statsd, logger) + tags := append(getBaseTags(), e.tags...) + rms := newRuntimeMetricStore(descs, e.statsd, e.logger, tags) // TODO: Go services experiencing high scheduling latency might see a // large variance for the period in between rms.report calls. This might // cause spikes in cumulative metric reporting. Should we try to correct - // for this by measuring the actual reporting time delta and - // extrapolating our numbers? + // for this by measuring the actual reporting time delta to adjust + // the numbers? // // Another challenge is that some metrics only update after GC mark // termination, see [1][2]. This means that it's likely that the rate of @@ -63,20 +78,25 @@ func Start(statsd partialStatsdClientInterface, logger *slog.Logger) error { // // [1] https://github.com/golang/go/blob/go1.21.3/src/runtime/mstats.go#L939 // [2] https://github.com/golang/go/issues/59749 - go func() { - for range time.Tick(pollFrequency) { + tick := time.Tick(e.period) + for { + select { + case <-e.stop: + return + case <-tick: rms.report() } - }() - enabled = true - return nil + } } -func SetBaseTags(tags []string) { - muTags.Lock() - defer muTags.Unlock() - - rootBaseTags = tags +// Stop stops the emitter. It is idempotent. +func (e *Emitter) Stop() { + select { + case <-e.stop: + return + default: + close(e.stop) + } } type runtimeMetric struct { @@ -89,10 +109,12 @@ type runtimeMetric struct { // the map key is the name of the metric in runtime/metrics type runtimeMetricStore struct { - metrics map[string]*runtimeMetric - statsd partialStatsdClientInterface - logger *slog.Logger - baseTags []string + metrics map[string]*runtimeMetric + statsd partialStatsdClientInterface + logger *slog.Logger + baseTags []string + unknownMetricLogOnce *sync.Once + unsupportedKindLogOnce *sync.Once } // partialStatsdClientInterface is the subset of statsd.ClientInterface that is @@ -106,12 +128,14 @@ type partialStatsdClientInterface interface { DistributionSamples(name string, values []float64, tags []string, rate float64) error } -func newRuntimeMetricStore(descs []metrics.Description, statsdClient partialStatsdClientInterface, logger *slog.Logger) runtimeMetricStore { +func newRuntimeMetricStore(descs []metrics.Description, statsdClient partialStatsdClientInterface, logger *slog.Logger, tags []string) runtimeMetricStore { rms := runtimeMetricStore{ - metrics: map[string]*runtimeMetric{}, - statsd: statsdClient, - logger: logger, - baseTags: getBaseTags(), + metrics: map[string]*runtimeMetric{}, + statsd: statsdClient, + logger: logger, + baseTags: tags, + unknownMetricLogOnce: &sync.Once{}, + unsupportedKindLogOnce: &sync.Once{}, } for _, d := range descs { @@ -269,7 +293,7 @@ func (rms runtimeMetricStore) report() { case metrics.KindBad: // This should never happen because all metrics are supported // by construction. - unknownMetricLogOnce.Do(func() { + rms.unknownMetricLogOnce.Do(func() { rms.logger.Error("runtimemetrics: encountered an unknown metric, this should never happen and might indicate a bug", slog.Attr{Key: "metric_name", Value: slog.StringValue(name)}) }) default: @@ -277,7 +301,7 @@ func (rms runtimeMetricStore) report() { // // The safest thing to do here is to simply log it somewhere once // as something to look into, but ignore it for now. - unsupportedKindLogOnce.Do(func() { + rms.unsupportedKindLogOnce.Do(func() { rms.logger.Error("runtimemetrics: unsupported metric kind, support for that kind should be added in pkg/runtimemetrics", slog.Attr{Key: "metric_name", Value: slog.StringValue(name)}, slog.Attr{Key: "kind", Value: slog.AnyValue(rm.currentValue.Kind())}, diff --git a/pkg/runtimemetrics/runtime_metrics_test.go b/pkg/runtimemetrics/runtime_metrics_test.go index 6eb8746..0c12c54 100644 --- a/pkg/runtimemetrics/runtime_metrics_test.go +++ b/pkg/runtimemetrics/runtime_metrics_test.go @@ -15,33 +15,29 @@ import ( "github.com/stretchr/testify/require" ) -func TestStart(t *testing.T) { - cleanup := func() { - mu.Lock() - enabled = false - mu.Unlock() - } +func TestEmitter(t *testing.T) { + // TODO: Use testing/synctest in go1.25 for this in the future. + t.Run("should emit metrics", func(t *testing.T) { + // Start the emitter and wait until some metrics are submitted. + statsd := &statsdClientMock{} + emitter := NewEmitter(statsd, &Options{Logger: slog.Default(), Period: 1 * time.Millisecond}) + require.NotNil(t, emitter) + require.Eventually(t, func() bool { return len(statsd.gaugeCall) > 0 }, time.Second, 1*time.Millisecond) + calls := statsd.gaugeCall - t.Run("start returns an error when called successively", func(t *testing.T) { - t.Cleanup(cleanup) - err := Start(&statsdClientMock{}, slog.Default()) - assert.NoError(t, err) + // After Stop, no more metrics should be submitted. + emitter.Stop() + time.Sleep(10 * time.Millisecond) + require.Equal(t, len(calls), len(statsd.gaugeCall)) - err = Start(&statsdClientMock{}, slog.Default()) - assert.Error(t, err) + // Stop should be idempotent. + emitter.Stop() }) - t.Run("should not race with other start calls", func(t *testing.T) { - t.Cleanup(cleanup) - wg := sync.WaitGroup{} - for i := 0; i < 10; i++ { - wg.Add(1) - go func() { - Start(&statsdClientMock{}, slog.Default()) - wg.Done() - }() - } - wg.Wait() + t.Run("should not panic on nil options", func(t *testing.T) { + emitter := NewEmitter(&statsdClientMock{}, nil) + require.NotNil(t, emitter) + emitter.Stop() }) } @@ -187,7 +183,7 @@ func TestSmoke(t *testing.T) { // Initialize store for all metrics with a mocked statsd client. descs := metrics.All() mock := &statsdClientMock{} - rms := newRuntimeMetricStore(descs, mock, slog.Default()) + rms := newRuntimeMetricStore(descs, mock, slog.Default(), []string{}) // This poulates most runtime/metrics. runtime.GC() @@ -215,7 +211,7 @@ func BenchmarkReport(b *testing.B) { // Initialize store for all metrics with a mocked statsd client. descs := metrics.All() mock := &statsdClientMock{Discard: true} - rms := newRuntimeMetricStore(descs, mock, slog.Default()) + rms := newRuntimeMetricStore(descs, mock, slog.Default(), []string{}) // Benchmark report method b.ReportAllocs() @@ -232,7 +228,7 @@ func BenchmarkReport(b *testing.B) { func reportMetric(name string, kind metrics.ValueKind) (*statsdClientMock, runtimeMetricStore) { desc := metricDesc(name, kind) mock := &statsdClientMock{} - rms := newRuntimeMetricStore([]metrics.Description{desc}, mock, slog.Default()) + rms := newRuntimeMetricStore([]metrics.Description{desc}, mock, slog.Default(), []string{}) // Populate Metrics. Test implicitly expect this to be the only GC cycle to happen before report is finished. runtime.GC() rms.report() diff --git a/pkg/runtimemetrics/tags.go b/pkg/runtimemetrics/tags.go index 16b100d..9e34094 100644 --- a/pkg/runtimemetrics/tags.go +++ b/pkg/runtimemetrics/tags.go @@ -5,18 +5,13 @@ import ( "math" "runtime" "runtime/metrics" - "sync" ) -const gogcMetricName = "/gc/gogc:percent" -const gomemlimitMetricName = "/gc/gomemlimit:bytes" -const gomaxProcsMetricName = "/sched/gomaxprocs:threads" - -// muTags protects rootBaseTags -var muTags sync.Mutex -var rootBaseTags []string - func getBaseTags() []string { + const gogcMetricName = "/gc/gogc:percent" + const gomemlimitMetricName = "/gc/gomemlimit:bytes" + const gomaxProcsMetricName = "/sched/gomaxprocs:threads" + samples := []metrics.Sample{ {Name: gogcMetricName}, {Name: gomemlimitMetricName}, @@ -55,10 +50,6 @@ func getBaseTags() []string { } } - muTags.Lock() - baseTags = append(baseTags, rootBaseTags...) - muTags.Unlock() - return baseTags } diff --git a/pkg/runtimemetrics/tags_test.go b/pkg/runtimemetrics/tags_test.go index d2534c1..a38d923 100644 --- a/pkg/runtimemetrics/tags_test.go +++ b/pkg/runtimemetrics/tags_test.go @@ -57,8 +57,7 @@ func TestGetBaseTags(t *testing.T) { old := debug.SetGCPercent(tt.gogc) defer debug.SetGCPercent(old) - tags := getBaseTags() - assertTagValue(t, "gogc", tt.expected, tags) + assertTagValue(t, "gogc", tt.expected, getBaseTags()) }) } @@ -89,8 +88,7 @@ func TestGetBaseTags(t *testing.T) { old := debug.SetMemoryLimit(tt.gomemlimit) defer debug.SetMemoryLimit(old) - tags := getBaseTags() - assertTagValue(t, "gomemlimit", tt.expected, tags) + assertTagValue(t, "gomemlimit", tt.expected, getBaseTags()) }) } @@ -98,13 +96,11 @@ func TestGetBaseTags(t *testing.T) { old := runtime.GOMAXPROCS(42) defer runtime.GOMAXPROCS(old) - tags := getBaseTags() - assertTagValue(t, "gomaxprocs", "42", tags) + assertTagValue(t, "gomaxprocs", "42", getBaseTags()) }) t.Run("should return the correct goversion", func(t *testing.T) { - tags := getBaseTags() - assertTagValue(t, "goversion", runtime.Version(), tags) + assertTagValue(t, "goversion", runtime.Version(), getBaseTags()) }) }