diff --git a/component/common/loki/client/client.go b/component/common/loki/client/client.go index 36839dbcffa3..964b6c98513b 100644 --- a/component/common/loki/client/client.go +++ b/component/common/loki/client/client.go @@ -21,6 +21,7 @@ import ( "github.com/grafana/agent/component/common/loki" "github.com/grafana/agent/pkg/build" + "github.com/grafana/agent/pkg/util" lokiutil "github.com/grafana/loki/pkg/util" ) @@ -116,30 +117,20 @@ func NewMetrics(reg prometheus.Registerer) *Metrics { } if reg != nil { - m.encodedBytes = mustRegisterOrGet(reg, m.encodedBytes).(*prometheus.CounterVec) - m.sentBytes = mustRegisterOrGet(reg, m.sentBytes).(*prometheus.CounterVec) - m.droppedBytes = mustRegisterOrGet(reg, m.droppedBytes).(*prometheus.CounterVec) - m.sentEntries = mustRegisterOrGet(reg, m.sentEntries).(*prometheus.CounterVec) - m.droppedEntries = mustRegisterOrGet(reg, m.droppedEntries).(*prometheus.CounterVec) - m.mutatedEntries = mustRegisterOrGet(reg, m.mutatedEntries).(*prometheus.CounterVec) - m.mutatedBytes = mustRegisterOrGet(reg, m.mutatedBytes).(*prometheus.CounterVec) - m.requestDuration = mustRegisterOrGet(reg, m.requestDuration).(*prometheus.HistogramVec) - m.batchRetries = mustRegisterOrGet(reg, m.batchRetries).(*prometheus.CounterVec) + m.encodedBytes = util.MustRegisterOrGet(reg, m.encodedBytes).(*prometheus.CounterVec) + m.sentBytes = util.MustRegisterOrGet(reg, m.sentBytes).(*prometheus.CounterVec) + m.droppedBytes = util.MustRegisterOrGet(reg, m.droppedBytes).(*prometheus.CounterVec) + m.sentEntries = util.MustRegisterOrGet(reg, m.sentEntries).(*prometheus.CounterVec) + m.droppedEntries = util.MustRegisterOrGet(reg, m.droppedEntries).(*prometheus.CounterVec) + m.mutatedEntries = util.MustRegisterOrGet(reg, m.mutatedEntries).(*prometheus.CounterVec) + m.mutatedBytes = util.MustRegisterOrGet(reg, m.mutatedBytes).(*prometheus.CounterVec) + m.requestDuration = util.MustRegisterOrGet(reg, m.requestDuration).(*prometheus.HistogramVec) + m.batchRetries = util.MustRegisterOrGet(reg, m.batchRetries).(*prometheus.CounterVec) } return &m } -func mustRegisterOrGet(reg prometheus.Registerer, c prometheus.Collector) prometheus.Collector { - if err := reg.Register(c); err != nil { - if are, ok := err.(prometheus.AlreadyRegisteredError); ok { - return are.ExistingCollector - } - panic(err) - } - return c -} - // Client pushes entries to Loki and can be stopped type Client interface { loki.EntryHandler diff --git a/component/common/loki/client/manager.go b/component/common/loki/client/manager.go index 290ddcc0f951..6683e9e5772b 100644 --- a/component/common/loki/client/manager.go +++ b/component/common/loki/client/manager.go @@ -70,6 +70,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr walWatcherMetrics := wal.NewWatcherMetrics(reg) walMarkerMetrics := internal.NewMarkerMetrics(reg) + queueClientMetrics := NewQueueClientMetrics(reg) if len(clientCfgs) == 0 { return nil, fmt.Errorf("at least one client config must be provided") @@ -98,7 +99,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr } markerHandler := internal.NewMarkerHandler(markerFileHandler, walCfg.MaxSegmentAge, logger, walMarkerMetrics.WithCurriedId(clientName)) - queue, err := NewQueue(metrics, cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger, markerHandler) + queue, err := NewQueue(metrics, queueClientMetrics.CurryWithId(clientName), cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger, markerHandler) if err != nil { return nil, fmt.Errorf("error starting queue client: %w", err) } diff --git a/component/common/loki/client/metrics.go b/component/common/loki/client/metrics.go new file mode 100644 index 000000000000..6d32bf1ce459 --- /dev/null +++ b/component/common/loki/client/metrics.go @@ -0,0 +1,37 @@ +package client + +import ( + "github.com/grafana/agent/pkg/util" + "github.com/prometheus/client_golang/prometheus" +) + +type QueueClientMetrics struct { + lastReadTimestamp *prometheus.GaugeVec +} + +func NewQueueClientMetrics(reg prometheus.Registerer) *QueueClientMetrics { + m := &QueueClientMetrics{ + lastReadTimestamp: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "loki_write", + Name: "last_read_timestamp", + Help: "Latest timestamp read from the WAL", + }, + []string{"id"}, + ), + } + + if reg != nil { + m.lastReadTimestamp = util.MustRegisterOrGet(reg, m.lastReadTimestamp).(*prometheus.GaugeVec) + } + + return m +} + +func (m *QueueClientMetrics) CurryWithId(id string) *QueueClientMetrics { + return &QueueClientMetrics{ + lastReadTimestamp: m.lastReadTimestamp.MustCurryWith(map[string]string{ + "id": id, + }), + } +} diff --git a/component/common/loki/client/queue_client.go b/component/common/loki/client/queue_client.go index 458839db60dd..dc8ed469fba3 100644 --- a/component/common/loki/client/queue_client.go +++ b/component/common/loki/client/queue_client.go @@ -150,10 +150,11 @@ func (q *queue) closeNow() { // which allows it to be injected in the wal.Watcher as a destination where to write read series and entries. As the watcher // reads from the WAL, batches are created and dispatched onto a send queue when ready to be sent. type queueClient struct { - metrics *Metrics - logger log.Logger - cfg Config - client *http.Client + metrics *Metrics + qcMetrics *QueueClientMetrics + logger log.Logger + cfg Config + client *http.Client batches map[string]*batch batchesMtx sync.Mutex @@ -180,14 +181,14 @@ type queueClient struct { } // NewQueue creates a new queueClient. -func NewQueue(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger, markerHandler MarkerHandler) (StoppableWriteTo, error) { +func NewQueue(metrics *Metrics, queueClientMetrics *QueueClientMetrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger, markerHandler MarkerHandler) (StoppableWriteTo, error) { if cfg.StreamLagLabels.String() != "" { return nil, fmt.Errorf("client config stream_lag_labels is deprecated and the associated metric has been removed, stream_lag_labels: %+v", cfg.StreamLagLabels.String()) } - return newQueueClient(metrics, cfg, maxStreams, maxLineSize, maxLineSizeTruncate, logger, markerHandler) + return newQueueClient(metrics, queueClientMetrics, cfg, maxStreams, maxLineSize, maxLineSizeTruncate, logger, markerHandler) } -func newQueueClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger, markerHandler MarkerHandler) (*queueClient, error) { +func newQueueClient(metrics *Metrics, qcMetrics *QueueClientMetrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger, markerHandler MarkerHandler) (*queueClient, error) { if cfg.URL.URL == nil { return nil, errors.New("client needs target URL") } @@ -198,6 +199,7 @@ func newQueueClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, m logger: log.With(logger, "component", "client", "host", cfg.URL.Host), cfg: cfg, metrics: metrics, + qcMetrics: qcMetrics, drainTimeout: cfg.Queue.DrainTimeout, quit: make(chan struct{}), @@ -283,9 +285,13 @@ func (c *queueClient) AppendEntries(entries wal.RefEntries, segment int) error { c.seriesLock.RLock() l, ok := c.series[entries.Ref] c.seriesLock.RUnlock() + var maxSeenTimestamp int64 = -1 if ok { for _, e := range entries.Entries { c.appendSingleEntry(segment, l, e) + if e.Timestamp.Unix() > maxSeenTimestamp { + maxSeenTimestamp = e.Timestamp.Unix() + } } // count all enqueued appended entries as received from WAL c.markerHandler.UpdateReceivedData(segment, len(entries.Entries)) @@ -293,6 +299,11 @@ func (c *queueClient) AppendEntries(entries wal.RefEntries, segment int) error { // TODO(thepalbi): Add metric here level.Debug(c.logger).Log("msg", "series for entry not found") } + + // It's safe to assume that upon an AppendEntries call, there will always be at least + // one entry. + c.qcMetrics.lastReadTimestamp.WithLabelValues().Set(float64(maxSeenTimestamp)) + return nil } diff --git a/component/common/loki/client/queue_client_test.go b/component/common/loki/client/queue_client_test.go index a23804d44634..cf59f49e1b7a 100644 --- a/component/common/loki/client/queue_client_test.go +++ b/component/common/loki/client/queue_client_test.go @@ -135,8 +135,7 @@ func TestQueueClient(t *testing.T) { logger := log.NewLogfmtLogger(os.Stdout) - m := NewMetrics(reg) - qc, err := NewQueue(m, cfg, 0, 0, false, logger, nilMarkerHandler{}) + qc, err := NewQueue(NewMetrics(reg), NewQueueClientMetrics(reg).CurryWithId("test"), cfg, 0, 0, false, logger, nilMarkerHandler{}) require.NoError(t, err) //labels := model.LabelSet{"app": "test"} @@ -281,8 +280,7 @@ func runQueueClientBenchCase(b *testing.B, bc testCase, mhFactory func(t *testin logger := log.NewLogfmtLogger(os.Stdout) - m := NewMetrics(reg) - qc, err := NewQueue(m, cfg, 0, 0, false, logger, mhFactory(b)) + qc, err := NewQueue(NewMetrics(reg), NewQueueClientMetrics(reg).CurryWithId("test"), cfg, 0, 0, false, logger, mhFactory(b)) require.NoError(b, err) //labels := model.LabelSet{"app": "test"} diff --git a/component/common/loki/wal/watcher_metrics.go b/component/common/loki/wal/watcher_metrics.go index 4064f8b22aac..ce8052fd442d 100644 --- a/component/common/loki/wal/watcher_metrics.go +++ b/component/common/loki/wal/watcher_metrics.go @@ -1,6 +1,9 @@ package wal -import "github.com/prometheus/client_golang/prometheus" +import ( + "github.com/grafana/agent/pkg/util" + "github.com/prometheus/client_golang/prometheus" +) type WatcherMetrics struct { recordsRead *prometheus.CounterVec @@ -80,23 +83,13 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { } if reg != nil { - m.recordsRead = mustRegisterOrGet(reg, m.recordsRead).(*prometheus.CounterVec) - m.recordDecodeFails = mustRegisterOrGet(reg, m.recordDecodeFails).(*prometheus.CounterVec) - m.droppedWriteNotifications = mustRegisterOrGet(reg, m.droppedWriteNotifications).(*prometheus.CounterVec) - m.segmentRead = mustRegisterOrGet(reg, m.segmentRead).(*prometheus.CounterVec) - m.currentSegment = mustRegisterOrGet(reg, m.currentSegment).(*prometheus.GaugeVec) - m.watchersRunning = mustRegisterOrGet(reg, m.watchersRunning).(*prometheus.GaugeVec) + m.recordsRead = util.MustRegisterOrGet(reg, m.recordsRead).(*prometheus.CounterVec) + m.recordDecodeFails = util.MustRegisterOrGet(reg, m.recordDecodeFails).(*prometheus.CounterVec) + m.droppedWriteNotifications = util.MustRegisterOrGet(reg, m.droppedWriteNotifications).(*prometheus.CounterVec) + m.segmentRead = util.MustRegisterOrGet(reg, m.segmentRead).(*prometheus.CounterVec) + m.currentSegment = util.MustRegisterOrGet(reg, m.currentSegment).(*prometheus.GaugeVec) + m.watchersRunning = util.MustRegisterOrGet(reg, m.watchersRunning).(*prometheus.GaugeVec) } return m } - -func mustRegisterOrGet(reg prometheus.Registerer, c prometheus.Collector) prometheus.Collector { - if err := reg.Register(c); err != nil { - if are, ok := err.(prometheus.AlreadyRegisteredError); ok { - return are.ExistingCollector - } - panic(err) - } - return c -} diff --git a/component/common/loki/wal/writer.go b/component/common/loki/wal/writer.go index 929199529c5d..e71773d944d6 100644 --- a/component/common/loki/wal/writer.go +++ b/component/common/loki/wal/writer.go @@ -59,6 +59,7 @@ type Writer struct { reclaimedOldSegmentsSpaceCounter *prometheus.CounterVec lastReclaimedSegment *prometheus.GaugeVec + lastWrittenTimestamp *prometheus.GaugeVec closeCleaner chan struct{} } @@ -96,10 +97,17 @@ func NewWriter(walCfg Config, logger log.Logger, reg prometheus.Registerer) (*Wr Name: "last_reclaimed_segment", Help: "Last reclaimed segment number", }, []string{}) + wrt.lastWrittenTimestamp = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "loki_write", + Subsystem: "wal_writer", + Name: "last_written_timestamp", + Help: "Latest timestamp that was written to the WAL", + }, []string{}) if reg != nil { _ = reg.Register(wrt.reclaimedOldSegmentsSpaceCounter) _ = reg.Register(wrt.lastReclaimedSegment) + _ = reg.Register(wrt.lastWrittenTimestamp) } wrt.start(walCfg.MaxSegmentAge) @@ -118,6 +126,9 @@ func (wrt *Writer) start(maxSegmentAge time.Duration) { continue } + // emit metric with latest written timestamp, to be able to track delay from writer to watcher + wrt.lastWrittenTimestamp.WithLabelValues().Set(float64(e.Timestamp.Unix())) + wrt.writeSubscribersLock.RLock() for _, s := range wrt.writeSubscribers { s.NotifyWrite() diff --git a/pkg/util/metrics.go b/pkg/util/metrics.go new file mode 100644 index 000000000000..850b535fe8fb --- /dev/null +++ b/pkg/util/metrics.go @@ -0,0 +1,16 @@ +package util + +import "github.com/prometheus/client_golang/prometheus" + +// MustRegisterOrGet will attempt to register the supplied collector into the register. If it's already +// registered, it will return that one. +// In case that the register procedure fails with something other than already registered, this will panic. +func MustRegisterOrGet(reg prometheus.Registerer, c prometheus.Collector) prometheus.Collector { + if err := reg.Register(c); err != nil { + if are, ok := err.(prometheus.AlreadyRegisteredError); ok { + return are.ExistingCollector + } + panic(err) + } + return c +}