Skip to content

Commit

Permalink
Distributor shim: add test verifying receiver works (including metrics)
Browse files Browse the repository at this point in the history
  • Loading branch information
yvrhdn committed Dec 19, 2024
1 parent 0aecee5 commit 3d9bb0b
Show file tree
Hide file tree
Showing 24 changed files with 1,856 additions and 261 deletions.
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,19 @@ require (
go.opentelemetry.io/collector/config/configtls v1.18.0
go.opentelemetry.io/collector/exporter v0.102.1
go.opentelemetry.io/collector/exporter/otlpexporter v0.102.1
go.opentelemetry.io/collector/exporter/otlphttpexporter v0.102.1
go.opentelemetry.io/collector/extension v0.102.1
go.opentelemetry.io/collector/otelcol v0.102.1
go.opentelemetry.io/collector/pdata/testdata v0.102.1
go.opentelemetry.io/collector/processor v0.102.1
go.opentelemetry.io/collector/receiver v0.102.1
go.opentelemetry.io/collector/receiver/otlpreceiver v0.102.1
go.opentelemetry.io/contrib/exporters/autoexport v0.53.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0
go.opentelemetry.io/otel/exporters/prometheus v0.50.0
go.opentelemetry.io/otel/sdk/metric v1.28.0
go.opentelemetry.io/proto/otlp v1.3.1
golang.org/x/net v0.31.0
golang.org/x/oauth2 v0.21.0
Expand Down Expand Up @@ -310,13 +314,11 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.50.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.4.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0 // indirect
go.opentelemetry.io/otel/log v0.4.0 // indirect
go.opentelemetry.io/otel/sdk/log v0.4.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.28.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
golang.org/x/mod v0.19.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,8 @@ go.opentelemetry.io/collector/exporter v0.102.1 h1:4VURYgBNJscxfMhZWitzcwA1cig5a
go.opentelemetry.io/collector/exporter v0.102.1/go.mod h1:1pmNxvrvvbWDW6PiGObICdj0eOSGV4Fzwpm5QA1GU54=
go.opentelemetry.io/collector/exporter/otlpexporter v0.102.1 h1:bOXE7u1iy0SKwH2mnVyIMKkvFIR9bn9iIm1Cf/CJlZU=
go.opentelemetry.io/collector/exporter/otlpexporter v0.102.1/go.mod h1:4ya6xaUYvcXq9MQW0TbsR4QWkOJI02d/2Vt8plwdozA=
go.opentelemetry.io/collector/exporter/otlphttpexporter v0.102.1 h1:9TaxHrkVtEdssDAHqV5yU9PARkFph7CvfLqC1wS6m+c=
go.opentelemetry.io/collector/exporter/otlphttpexporter v0.102.1/go.mod h1:auKlkLfuUriyZ2CmV2dudJaVGB7ycZ+tTpypy2JNFEc=
go.opentelemetry.io/collector/extension v0.102.1 h1:gAvE3w15q+Vv0Tj100jzcDpeMTyc8dAiemHRtJbspLg=
go.opentelemetry.io/collector/extension v0.102.1/go.mod h1:XBxUOXjZpwYLZYOK5u3GWlbBTOKmzStY5eU1R/aXkIo=
go.opentelemetry.io/collector/extension/auth v0.102.1 h1:GP6oBmpFJjxuVruPb9X40bdf6PNu9779i8anxa+wW6U=
Expand Down
2 changes: 1 addition & 1 deletion modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi
cfgReceivers = defaultReceivers
}

receivers, err := receiver.New(cfgReceivers, d, middleware, cfg.RetryAfterOnResourceExhausted, loggingLevel)
receivers, err := receiver.New(cfgReceivers, d, middleware, cfg.RetryAfterOnResourceExhausted, loggingLevel, reg)
if err != nil {
return nil, err
}
Expand Down
304 changes: 49 additions & 255 deletions modules/distributor/receiver/metrics_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,184 +22,82 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/metric/noop"
)

var (
// Compile-time check this implements the OpenTelemetry API.

_ metric.MeterProvider = MeterProvider{}
_ metric.Meter = Meter{}
_ metric.Observer = Observer{}
_ metric.Registration = Registration{}
_ metric.Int64Counter = Int64Counter{}
_ metric.Float64Counter = Float64Counter{}
_ metric.Int64UpDownCounter = Int64UpDownCounter{}
_ metric.Float64UpDownCounter = Float64UpDownCounter{}
_ metric.Int64Histogram = Int64Histogram{}
_ metric.Float64Histogram = Float64Histogram{}
_ metric.Int64Gauge = Int64Gauge{}
_ metric.Float64Gauge = Float64Gauge{}
_ metric.Int64ObservableCounter = Int64ObservableCounter{}
_ metric.Float64ObservableCounter = Float64ObservableCounter{}
_ metric.Int64ObservableGauge = Int64ObservableGauge{}
_ metric.Float64ObservableGauge = Float64ObservableGauge{}
_ metric.Int64ObservableUpDownCounter = Int64ObservableUpDownCounter{}
_ metric.Float64ObservableUpDownCounter = Float64ObservableUpDownCounter{}
_ metric.Int64Observer = Int64Observer{}
_ metric.Float64Observer = Float64Observer{}
_ metric.MeterProvider = &MeterProvider{}
_ metric.Meter = &Meter{}
_ metric.Int64Counter = &Int64Counter{}
)

var (
receiverAcceptedSpans = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Name: "receiver_accepted_spans",
Help: "Number of spans successfully pushed into the pipeline.",
}, []string{"receiver", "transport"})
receiverRefusedSpans = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Name: "receiver_refused_spans",
Help: "Number of spans that could not be pushed into the pipeline.",
}, []string{"receiver", "transport"})
)
type metrics struct {
receiverAcceptedSpans *prometheus.CounterVec
receiverRefusedSpans *prometheus.CounterVec
}

func newMetrics(reg prometheus.Registerer) *metrics {
return &metrics{
receiverAcceptedSpans: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Name: "receiver_accepted_spans",
Help: "Number of spans successfully pushed into the pipeline.",
}, []string{"receiver", "transport"}),
receiverRefusedSpans: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Name: "receiver_refused_spans",
Help: "Number of spans that could not be pushed into the pipeline.",
}, []string{"receiver", "transport"}),
}
}

// MeterProvider is an OpenTelemetry No-Op MeterProvider.
type MeterProvider struct{ embedded.MeterProvider }
type MeterProvider struct {
embedded.MeterProvider
metrics *metrics
}

// NewMeterProvider returns a MeterProvider that does not record any telemetry.
func NewMeterProvider() MeterProvider {
return MeterProvider{}
func NewMeterProvider(reg prometheus.Registerer) *MeterProvider {
return &MeterProvider{
metrics: newMetrics(reg),
}
}

// Meter returns an OpenTelemetry Meter that does not record any telemetry.
func (MeterProvider) Meter(string, ...metric.MeterOption) metric.Meter {
return Meter{}
func (mp *MeterProvider) Meter(string, ...metric.MeterOption) metric.Meter {
return &Meter{
metrics: mp.metrics,
}
}

// Meter is an OpenTelemetry No-Op Meter.
type Meter struct{ embedded.Meter }
type Meter struct {
// embed the noop Meter, this provides noop for all methods we don't implement ourselves
noop.Meter
metrics *metrics
}

// Int64Counter returns a Counter used to record int64 measurements that
// produces no telemetry.
func (Meter) Int64Counter(name string, _ ...metric.Int64CounterOption) (metric.Int64Counter, error) {
// Int64Counter returns a Counter used to record int64 measurements
func (m *Meter) Int64Counter(name string, _ ...metric.Int64CounterOption) (metric.Int64Counter, error) {
switch name {
case "receiver_accepted_spans", "receiver_refused_spans":
return Int64Counter{Name: name}, nil
return &Int64Counter{Name: name, metrics: m.metrics}, nil
default:
return Int64Counter{}, nil
return noop.Int64Counter{}, nil
}
}

// Int64UpDownCounter returns an UpDownCounter used to record int64
// measurements that produces no telemetry.
func (Meter) Int64UpDownCounter(string, ...metric.Int64UpDownCounterOption) (metric.Int64UpDownCounter, error) {
return Int64UpDownCounter{}, nil
}

// Int64Histogram returns a Histogram used to record int64 measurements that
// produces no telemetry.
func (Meter) Int64Histogram(string, ...metric.Int64HistogramOption) (metric.Int64Histogram, error) {
return Int64Histogram{}, nil
}

// Int64Gauge returns a Gauge used to record int64 measurements that
// produces no telemetry.
func (Meter) Int64Gauge(string, ...metric.Int64GaugeOption) (metric.Int64Gauge, error) {
return Int64Gauge{}, nil
}

// Int64ObservableCounter returns an ObservableCounter used to record int64
// measurements that produces no telemetry.
func (Meter) Int64ObservableCounter(string, ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) {
return Int64ObservableCounter{}, nil
}

// Int64ObservableUpDownCounter returns an ObservableUpDownCounter used to
// record int64 measurements that produces no telemetry.
func (Meter) Int64ObservableUpDownCounter(string, ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) {
return Int64ObservableUpDownCounter{}, nil
}

// Int64ObservableGauge returns an ObservableGauge used to record int64
// measurements that produces no telemetry.
func (Meter) Int64ObservableGauge(string, ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) {
return Int64ObservableGauge{}, nil
}

// Float64Counter returns a Counter used to record int64 measurements that
// produces no telemetry.
func (Meter) Float64Counter(string, ...metric.Float64CounterOption) (metric.Float64Counter, error) {
return Float64Counter{}, nil
}

// Float64UpDownCounter returns an UpDownCounter used to record int64
// measurements that produces no telemetry.
func (Meter) Float64UpDownCounter(string, ...metric.Float64UpDownCounterOption) (metric.Float64UpDownCounter, error) {
return Float64UpDownCounter{}, nil
}

// Float64Histogram returns a Histogram used to record int64 measurements that
// produces no telemetry.
func (Meter) Float64Histogram(string, ...metric.Float64HistogramOption) (metric.Float64Histogram, error) {
return Float64Histogram{}, nil
}

// Float64Gauge returns a Gauge used to record float64 measurements that
// produces no telemetry.
func (Meter) Float64Gauge(string, ...metric.Float64GaugeOption) (metric.Float64Gauge, error) {
return Float64Gauge{}, nil
}

// Float64ObservableCounter returns an ObservableCounter used to record int64
// measurements that produces no telemetry.
func (Meter) Float64ObservableCounter(string, ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) {
return Float64ObservableCounter{}, nil
}

// Float64ObservableUpDownCounter returns an ObservableUpDownCounter used to
// record int64 measurements that produces no telemetry.
func (Meter) Float64ObservableUpDownCounter(string, ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) {
return Float64ObservableUpDownCounter{}, nil
}

// Float64ObservableGauge returns an ObservableGauge used to record int64
// measurements that produces no telemetry.
func (Meter) Float64ObservableGauge(string, ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) {
return Float64ObservableGauge{}, nil
}

// RegisterCallback performs no operation.
func (Meter) RegisterCallback(metric.Callback, ...metric.Observable) (metric.Registration, error) {
return Registration{}, nil
}

// Observer acts as a recorder of measurements for multiple instruments in a
// Callback, it performing no operation.
type Observer struct{ embedded.Observer }

// ObserveFloat64 performs no operation.
func (Observer) ObserveFloat64(metric.Float64Observable, float64, ...metric.ObserveOption) {
}

// ObserveInt64 performs no operation.
func (Observer) ObserveInt64(metric.Int64Observable, int64, ...metric.ObserveOption) {
}

// Registration is the registration of a Callback with a No-Op Meter.
type Registration struct{ embedded.Registration }

// Unregister unregisters the Callback the Registration represents with the
// No-Op Meter. This will always return nil because the No-Op Meter performs no
// operation, including hold any record of registrations.
func (Registration) Unregister() error { return nil }

// Int64Counter is an OpenTelemetry Counter used to record int64 measurements.
// It produces no telemetry.
type Int64Counter struct {
embedded.Int64Counter
Name string
Name string
metrics *metrics
}

func (r Int64Counter) Add(_ context.Context, value int64, options ...metric.AddOption) {
func (r *Int64Counter) Add(_ context.Context, value int64, options ...metric.AddOption) {
// don't do anything for metrics that we don't care
if r.Name == "" {
return
Expand All @@ -221,112 +119,8 @@ func (r Int64Counter) Add(_ context.Context, value int64, options ...metric.AddO

switch r.Name {
case "receiver_accepted_spans":
receiverAcceptedSpans.WithLabelValues(receiver, transport).Add(float64(value))
r.metrics.receiverAcceptedSpans.WithLabelValues(receiver, transport).Add(float64(value))
case "receiver_refused_spans":
receiverRefusedSpans.WithLabelValues(receiver, transport).Add(float64(value))
r.metrics.receiverRefusedSpans.WithLabelValues(receiver, transport).Add(float64(value))
}
}

// Float64Counter is an OpenTelemetry Counter used to record float64
// measurements. It produces no telemetry.
type Float64Counter struct{ embedded.Float64Counter }

// Add performs no operation.
func (Float64Counter) Add(context.Context, float64, ...metric.AddOption) {}

// Int64UpDownCounter is an OpenTelemetry UpDownCounter used to record int64
// measurements. It produces no telemetry.
type Int64UpDownCounter struct{ embedded.Int64UpDownCounter }

// Add performs no operation.
func (Int64UpDownCounter) Add(context.Context, int64, ...metric.AddOption) {}

// Float64UpDownCounter is an OpenTelemetry UpDownCounter used to record
// float64 measurements. It produces no telemetry.
type Float64UpDownCounter struct{ embedded.Float64UpDownCounter }

// Add performs no operation.
func (Float64UpDownCounter) Add(context.Context, float64, ...metric.AddOption) {}

// Int64Histogram is an OpenTelemetry Histogram used to record int64
// measurements. It produces no telemetry.
type Int64Histogram struct{ embedded.Int64Histogram }

// Record performs no operation.
func (Int64Histogram) Record(context.Context, int64, ...metric.RecordOption) {}

// Float64Histogram is an OpenTelemetry Histogram used to record float64
// measurements. It produces no telemetry.
type Float64Histogram struct{ embedded.Float64Histogram }

// Record performs no operation.
func (Float64Histogram) Record(context.Context, float64, ...metric.RecordOption) {}

// Int64Gauge is an OpenTelemetry Gauge used to record instantaneous int64
// measurements. It produces no telemetry.
type Int64Gauge struct{ embedded.Int64Gauge }

// Record performs no operation.
func (Int64Gauge) Record(context.Context, int64, ...metric.RecordOption) {}

// Float64Gauge is an OpenTelemetry Gauge used to record instantaneous float64
// measurements. It produces no telemetry.
type Float64Gauge struct{ embedded.Float64Gauge }

// Record performs no operation.
func (Float64Gauge) Record(context.Context, float64, ...metric.RecordOption) {}

// Int64ObservableCounter is an OpenTelemetry ObservableCounter used to record
// int64 measurements. It produces no telemetry.
type Int64ObservableCounter struct {
metric.Int64Observable
embedded.Int64ObservableCounter
}

// Float64ObservableCounter is an OpenTelemetry ObservableCounter used to record
// float64 measurements. It produces no telemetry.
type Float64ObservableCounter struct {
metric.Float64Observable
embedded.Float64ObservableCounter
}

// Int64ObservableGauge is an OpenTelemetry ObservableGauge used to record
// int64 measurements. It produces no telemetry.
type Int64ObservableGauge struct {
metric.Int64Observable
embedded.Int64ObservableGauge
}

// Float64ObservableGauge is an OpenTelemetry ObservableGauge used to record
// float64 measurements. It produces no telemetry.
type Float64ObservableGauge struct {
metric.Float64Observable
embedded.Float64ObservableGauge
}

// Int64ObservableUpDownCounter is an OpenTelemetry ObservableUpDownCounter
// used to record int64 measurements. It produces no telemetry.
type Int64ObservableUpDownCounter struct {
metric.Int64Observable
embedded.Int64ObservableUpDownCounter
}

// Float64ObservableUpDownCounter is an OpenTelemetry ObservableUpDownCounter
// used to record float64 measurements. It produces no telemetry.
type Float64ObservableUpDownCounter struct {
metric.Float64Observable
embedded.Float64ObservableUpDownCounter
}

// Int64Observer is a recorder of int64 measurements that performs no operation.
type Int64Observer struct{ embedded.Int64Observer }

// Observe performs no operation.
func (Int64Observer) Observe(int64, ...metric.ObserveOption) {}

// Float64Observer is a recorder of float64 measurements that performs no
// operation.
type Float64Observer struct{ embedded.Float64Observer }

// Observe performs no operation.
func (Float64Observer) Observe(float64, ...metric.ObserveOption) {}
Loading

0 comments on commit 3d9bb0b

Please sign in to comment.