Skip to content

feat: Add new Emitter API to start and stop metrics #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/DataDog/go-runtime-metrics-internal

go 1.21

toolchain go1.22.3
go 1.23

require github.com/stretchr/testify v1.9.0

Expand Down
140 changes: 82 additions & 58 deletions pkg/runtimemetrics/runtime_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
package runtimemetrics

import (
"errors"
"cmp"
"fmt"
"log/slog"
"math"
Expand All @@ -13,48 +13,63 @@ import (
"time"
)

// pollFrequency is the frequency at which we poll runtime/metrics and report
// them to statsd. The statsd client aggregates this data, usually over a 2s
// window [1], and so does the agent, usually over a 10s window [2].
//
// Our goal is to submit one data point per aggregation window, using the
// CountWithTimestamp / GaugeWithTimestamp APIs for submitting precisely aligned
// metrics, to enable comparing them with one another.
//
// [1] https://github.com/DataDog/datadog-go/blob/e612112c8bb396b33ad5d9edd645d289b07d0e40/statsd/options.go/#L23
// [2] https://docs.datadoghq.com/developers/dogstatsd/data_aggregation/#how-is-aggregation-performed-with-the-dogstatsd-server
var pollFrequency = 10 * time.Second

var unknownMetricLogOnce, unsupportedKindLogOnce sync.Once

// mu protects the variables below
var mu sync.Mutex
var enabled bool

// NOTE: The Start method below is intentionally minimal for now. We probably want to think about
// this API a bit more before we publish it in dd-trace-go. I.e. do we want to make the
// pollFrequency configurable (higher resolution at the cost of higher overhead on the agent and
// statsd library)? Do we want to support multiple instances? We probably also want a (flushing?)
// stop method.

// Start starts reporting runtime/metrics to the given statsd client.
func Start(statsd partialStatsdClientInterface, logger *slog.Logger) error {
mu.Lock()
defer mu.Unlock()

if enabled {
// We could support multiple instances, but the use cases for it are not
// clear, so for now let's consider this to be a misconfiguration.
return errors.New("runtimemetrics has already been started")
// Options are the options for the runtime metrics emitter.
type Options struct {
// Logger is used to log errors. Defaults to slog.Default() if nil.
Logger *slog.Logger
// Tags are added to all metrics.
Tags []string
// Period is the period at which we poll runtime/metrics and report
// them to statsd. Defaults to 10s.
//
// The statsd client aggregates this data, usually over a 2s window [1], and
// so does the agent, usually over a 10s window [2].
//
// We submit one data point per aggregation window, using the
// CountWithTimestamp / GaugeWithTimestamp APIs for submitting precisely
// aligned metrics, to enable comparing them with one another.
//
// [1] https://github.com/DataDog/datadog-go/blob/e612112c8bb396b33ad5d9edd645d289b07d0e40/statsd/options.go/#L23
// [2] https://docs.datadoghq.com/developers/dogstatsd/data_aggregation/#how-is-aggregation-performed-with-the-dogstatsd-server
Period time.Duration
}

// NewEmitter creates a new runtime metrics emitter and starts it.
func NewEmitter(statsd partialStatsdClientInterface, opts *Options) *Emitter {
if opts == nil {
opts = &Options{}
}
e := &Emitter{
statsd: statsd,
logger: cmp.Or(opts.Logger, slog.Default()),
tags: opts.Tags,
stop: make(chan struct{}),
period: cmp.Or(opts.Period, 10*time.Second),
}
go e.emit()
return e
}

// Emitter submits runtime/metrics to statsd on a regular interval.
type Emitter struct {
statsd partialStatsdClientInterface
logger *slog.Logger
tags []string
period time.Duration

stop chan struct{}
}

// emit emits runtime/metrics to statsd on a regular interval.
func (e *Emitter) emit() {
descs := metrics.All()
rms := newRuntimeMetricStore(descs, statsd, logger)
tags := append(getBaseTags(), e.tags...)
rms := newRuntimeMetricStore(descs, e.statsd, e.logger, tags)
// TODO: Go services experiencing high scheduling latency might see a
// large variance for the period in between rms.report calls. This might
// cause spikes in cumulative metric reporting. Should we try to correct
// for this by measuring the actual reporting time delta and
// extrapolating our numbers?
// for this by measuring the actual reporting time delta to adjust
// the numbers?
//
// Another challenge is that some metrics only update after GC mark
// termination, see [1][2]. This means that it's likely that the rate of
Expand All @@ -63,20 +78,25 @@ func Start(statsd partialStatsdClientInterface, logger *slog.Logger) error {
//
// [1] https://github.com/golang/go/blob/go1.21.3/src/runtime/mstats.go#L939
// [2] https://github.com/golang/go/issues/59749
go func() {
for range time.Tick(pollFrequency) {
tick := time.Tick(e.period)
for {
select {
case <-e.stop:
return
case <-tick:
rms.report()
}
}()
enabled = true
return nil
}
}

func SetBaseTags(tags []string) {
muTags.Lock()
defer muTags.Unlock()

rootBaseTags = tags
// Stop stops the emitter. It is idempotent.
func (e *Emitter) Stop() {
select {
case <-e.stop:
return
default:
close(e.stop)
}
}

type runtimeMetric struct {
Expand All @@ -89,10 +109,12 @@ type runtimeMetric struct {

// the map key is the name of the metric in runtime/metrics
type runtimeMetricStore struct {
metrics map[string]*runtimeMetric
statsd partialStatsdClientInterface
logger *slog.Logger
baseTags []string
metrics map[string]*runtimeMetric
statsd partialStatsdClientInterface
logger *slog.Logger
baseTags []string
unknownMetricLogOnce *sync.Once
unsupportedKindLogOnce *sync.Once
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: follow-up PR - refactor this into the Emitter type


// partialStatsdClientInterface is the subset of statsd.ClientInterface that is
Expand All @@ -106,12 +128,14 @@ type partialStatsdClientInterface interface {
DistributionSamples(name string, values []float64, tags []string, rate float64) error
}

func newRuntimeMetricStore(descs []metrics.Description, statsdClient partialStatsdClientInterface, logger *slog.Logger) runtimeMetricStore {
func newRuntimeMetricStore(descs []metrics.Description, statsdClient partialStatsdClientInterface, logger *slog.Logger, tags []string) runtimeMetricStore {
rms := runtimeMetricStore{
metrics: map[string]*runtimeMetric{},
statsd: statsdClient,
logger: logger,
baseTags: getBaseTags(),
metrics: map[string]*runtimeMetric{},
statsd: statsdClient,
logger: logger,
baseTags: tags,
unknownMetricLogOnce: &sync.Once{},
unsupportedKindLogOnce: &sync.Once{},
}

for _, d := range descs {
Expand Down Expand Up @@ -269,15 +293,15 @@ func (rms runtimeMetricStore) report() {
case metrics.KindBad:
// This should never happen because all metrics are supported
// by construction.
unknownMetricLogOnce.Do(func() {
rms.unknownMetricLogOnce.Do(func() {
rms.logger.Error("runtimemetrics: encountered an unknown metric, this should never happen and might indicate a bug", slog.Attr{Key: "metric_name", Value: slog.StringValue(name)})
})
default:
// This may happen as new metric kinds get added.
//
// The safest thing to do here is to simply log it somewhere once
// as something to look into, but ignore it for now.
unsupportedKindLogOnce.Do(func() {
rms.unsupportedKindLogOnce.Do(func() {
rms.logger.Error("runtimemetrics: unsupported metric kind, support for that kind should be added in pkg/runtimemetrics",
slog.Attr{Key: "metric_name", Value: slog.StringValue(name)},
slog.Attr{Key: "kind", Value: slog.AnyValue(rm.currentValue.Kind())},
Expand Down
48 changes: 22 additions & 26 deletions pkg/runtimemetrics/runtime_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,29 @@ import (
"github.com/stretchr/testify/require"
)

func TestStart(t *testing.T) {
cleanup := func() {
mu.Lock()
enabled = false
mu.Unlock()
}
func TestEmitter(t *testing.T) {
// TODO: Use testing/synctest in go1.25 for this in the future.
t.Run("should emit metrics", func(t *testing.T) {
// Start the emitter and wait until some metrics are submitted.
statsd := &statsdClientMock{}
emitter := NewEmitter(statsd, &Options{Logger: slog.Default(), Period: 1 * time.Millisecond})
require.NotNil(t, emitter)
require.Eventually(t, func() bool { return len(statsd.gaugeCall) > 0 }, time.Second, 1*time.Millisecond)
calls := statsd.gaugeCall

t.Run("start returns an error when called successively", func(t *testing.T) {
t.Cleanup(cleanup)
err := Start(&statsdClientMock{}, slog.Default())
assert.NoError(t, err)
// After Stop, no more metrics should be submitted.
emitter.Stop()
time.Sleep(10 * time.Millisecond)
require.Equal(t, len(calls), len(statsd.gaugeCall))

err = Start(&statsdClientMock{}, slog.Default())
assert.Error(t, err)
// Stop should be idempotent.
emitter.Stop()
})

t.Run("should not race with other start calls", func(t *testing.T) {
t.Cleanup(cleanup)
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
Start(&statsdClientMock{}, slog.Default())
wg.Done()
}()
}
wg.Wait()
t.Run("should not panic on nil options", func(t *testing.T) {
emitter := NewEmitter(&statsdClientMock{}, nil)
require.NotNil(t, emitter)
emitter.Stop()
})
}

Expand Down Expand Up @@ -187,7 +183,7 @@ func TestSmoke(t *testing.T) {
// Initialize store for all metrics with a mocked statsd client.
descs := metrics.All()
mock := &statsdClientMock{}
rms := newRuntimeMetricStore(descs, mock, slog.Default())
rms := newRuntimeMetricStore(descs, mock, slog.Default(), []string{})

// This poulates most runtime/metrics.
runtime.GC()
Expand Down Expand Up @@ -215,7 +211,7 @@ func BenchmarkReport(b *testing.B) {
// Initialize store for all metrics with a mocked statsd client.
descs := metrics.All()
mock := &statsdClientMock{Discard: true}
rms := newRuntimeMetricStore(descs, mock, slog.Default())
rms := newRuntimeMetricStore(descs, mock, slog.Default(), []string{})

// Benchmark report method
b.ReportAllocs()
Expand All @@ -232,7 +228,7 @@ func BenchmarkReport(b *testing.B) {
func reportMetric(name string, kind metrics.ValueKind) (*statsdClientMock, runtimeMetricStore) {
desc := metricDesc(name, kind)
mock := &statsdClientMock{}
rms := newRuntimeMetricStore([]metrics.Description{desc}, mock, slog.Default())
rms := newRuntimeMetricStore([]metrics.Description{desc}, mock, slog.Default(), []string{})
// Populate Metrics. Test implicitly expect this to be the only GC cycle to happen before report is finished.
runtime.GC()
rms.report()
Expand Down
17 changes: 4 additions & 13 deletions pkg/runtimemetrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,13 @@ import (
"math"
"runtime"
"runtime/metrics"
"sync"
)

const gogcMetricName = "/gc/gogc:percent"
const gomemlimitMetricName = "/gc/gomemlimit:bytes"
const gomaxProcsMetricName = "/sched/gomaxprocs:threads"

// muTags protects rootBaseTags
var muTags sync.Mutex
var rootBaseTags []string

func getBaseTags() []string {
const gogcMetricName = "/gc/gogc:percent"
const gomemlimitMetricName = "/gc/gomemlimit:bytes"
const gomaxProcsMetricName = "/sched/gomaxprocs:threads"

samples := []metrics.Sample{
{Name: gogcMetricName},
{Name: gomemlimitMetricName},
Expand Down Expand Up @@ -55,10 +50,6 @@ func getBaseTags() []string {
}
}

muTags.Lock()
baseTags = append(baseTags, rootBaseTags...)
muTags.Unlock()

return baseTags
}

Expand Down
12 changes: 4 additions & 8 deletions pkg/runtimemetrics/tags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ func TestGetBaseTags(t *testing.T) {
old := debug.SetGCPercent(tt.gogc)
defer debug.SetGCPercent(old)

tags := getBaseTags()
assertTagValue(t, "gogc", tt.expected, tags)
assertTagValue(t, "gogc", tt.expected, getBaseTags())
})
}

Expand Down Expand Up @@ -89,22 +88,19 @@ func TestGetBaseTags(t *testing.T) {
old := debug.SetMemoryLimit(tt.gomemlimit)
defer debug.SetMemoryLimit(old)

tags := getBaseTags()
assertTagValue(t, "gomemlimit", tt.expected, tags)
assertTagValue(t, "gomemlimit", tt.expected, getBaseTags())
})
}

t.Run("should return the correct value for a specific gomaxprocs value", func(t *testing.T) {
old := runtime.GOMAXPROCS(42)
defer runtime.GOMAXPROCS(old)

tags := getBaseTags()
assertTagValue(t, "gomaxprocs", "42", tags)
assertTagValue(t, "gomaxprocs", "42", getBaseTags())
})

t.Run("should return the correct goversion", func(t *testing.T) {
tags := getBaseTags()
assertTagValue(t, "goversion", runtime.Version(), tags)
assertTagValue(t, "goversion", runtime.Version(), getBaseTags())
})
}

Expand Down
Loading