diff --git a/CHANGELOG.md b/CHANGELOG.md index 827e29b14e..232df6b1bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ Main (unreleased) - Add json format support for log export via faro receiver (@ravishankar15) +- Add livedebugging support for `prometheus.remote_write` (@ravishankar15) + v1.6.0-rc.1 ----------------- diff --git a/docs/sources/troubleshoot/debug.md b/docs/sources/troubleshoot/debug.md index f57c8a1542..4ff505f7be 100644 --- a/docs/sources/troubleshoot/debug.md +++ b/docs/sources/troubleshoot/debug.md @@ -109,6 +109,7 @@ Supported components: * `loki.secretfilter` * `otelcol.processor.*` * `otelcol.receiver.*` +* `prometheus.remote_write` * `prometheus.relabel` * `discovery.*` {{< /admonition >}} diff --git a/internal/component/prometheus/remotewrite/remote_write.go b/internal/component/prometheus/remotewrite/remote_write.go index c9b26475db..144a742c6d 100644 --- a/internal/component/prometheus/remotewrite/remote_write.go +++ b/internal/component/prometheus/remotewrite/remote_write.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/service/labelstore" + "github.com/grafana/alloy/internal/service/livedebugging" "github.com/grafana/alloy/internal/static/metrics/wal" "github.com/grafana/alloy/internal/useragent" "github.com/prometheus/prometheus/model/exemplar" @@ -62,6 +63,8 @@ type Component struct { cfg Arguments receiver *prometheus.Interceptor + + debugDataPublisher livedebugging.DebugDataPublisher } // New creates a new prometheus.remote_write component. @@ -92,13 +95,20 @@ func New(o component.Options, c Arguments) (*Component, error) { } ls := service.(labelstore.LabelStore) + debugDataPublisher, err := o.GetServiceData(livedebugging.ServiceName) + if err != nil { + return nil, err + } + res := &Component{ - log: o.Logger, - opts: o, - walStore: walStorage, - remoteStore: remoteStore, - storage: storage.NewFanout(o.Logger, walStorage, remoteStore), + log: o.Logger, + opts: o, + walStore: walStorage, + remoteStore: remoteStore, + storage: storage.NewFanout(o.Logger, walStorage, remoteStore), + debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher), } + componentID := livedebugging.ComponentID(res.opts.ID) res.receiver = prometheus.NewInterceptor( res.storage, ls, @@ -119,6 +129,9 @@ func New(o component.Options, c Arguments) (*Component, error) { if localID == 0 { ls.GetOrAddLink(res.opts.ID, uint64(newRef), l) } + if res.debugDataPublisher.IsActive(componentID) { + res.debugDataPublisher.Publish(componentID, fmt.Sprintf("ts=%d, labels=%s, value=%f", t, l, v)) + } return globalRef, nextErr }), prometheus.WithHistogramHook(func(globalRef storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, next storage.Appender) (storage.SeriesRef, error) { @@ -131,6 +144,17 @@ func New(o component.Options, c Arguments) (*Component, error) { if localID == 0 { ls.GetOrAddLink(res.opts.ID, uint64(newRef), l) } + if res.debugDataPublisher.IsActive(componentID) { + var data string + if h != nil { + data = fmt.Sprintf("ts=%d, labels=%s, histogram=%s", t, l, h.String()) + } else if fh != nil { + data = fmt.Sprintf("ts=%d, labels=%s, float_histogram=%s", t, l, fh.String()) + } else { + data = fmt.Sprintf("ts=%d, labels=%s, no_value", t, l) + } + res.debugDataPublisher.Publish(componentID, data) + } return globalRef, nextErr }), prometheus.WithMetadataHook(func(globalRef storage.SeriesRef, l labels.Labels, m metadata.Metadata, next storage.Appender) (storage.SeriesRef, error) { @@ -143,6 +167,9 @@ func New(o component.Options, c Arguments) (*Component, error) { if localID == 0 { ls.GetOrAddLink(res.opts.ID, uint64(newRef), l) } + if res.debugDataPublisher.IsActive(componentID) { + res.debugDataPublisher.Publish(componentID, fmt.Sprintf("labels=%s, type=%s, unit=%s, help=%s", l, m.Type, m.Unit, m.Help)) + } return globalRef, nextErr }), prometheus.WithExemplarHook(func(globalRef storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, next storage.Appender) (storage.SeriesRef, error) { @@ -155,6 +182,9 @@ func New(o component.Options, c Arguments) (*Component, error) { if localID == 0 { ls.GetOrAddLink(res.opts.ID, uint64(newRef), l) } + if res.debugDataPublisher.IsActive(componentID) { + res.debugDataPublisher.Publish(componentID, fmt.Sprintf("ts=%d, labels=%s, exemplar_labels=%s, value=%f", e.Ts, l, e.Labels, e.Value)) + } return globalRef, nextErr }), ) @@ -172,6 +202,7 @@ func New(o component.Options, c Arguments) (*Component, error) { func startTime() (int64, error) { return 0, nil } var _ component.Component = (*Component)(nil) +var _ component.LiveDebugging = (*Component)(nil) // Run implements Component. func (c *Component) Run(ctx context.Context) error { @@ -275,3 +306,5 @@ func (c *Component) Update(newConfig component.Arguments) error { c.cfg = cfg return nil } + +func (c *Component) LiveDebugging(_ int) {}