diff --git a/internal/impl/pure/buffer_memory.go b/internal/impl/pure/buffer_memory.go index de0d504a5..e479867da 100644 --- a/internal/impl/pure/buffer_memory.go +++ b/internal/impl/pure/buffer_memory.go @@ -44,13 +44,21 @@ This buffer intentionally weakens the delivery guarantees of the pipeline and th ## Batching -It is possible to batch up messages sent from this buffer using a [batch policy](/docs/configuration/batching#batch-policy).`). +It is possible to batch up messages sent from this buffer using a [batch policy](/docs/configuration/batching#batch-policy). + +## Metrics + +- ` + "`buffer_active`" + ` Gauge metric tracking the current number of bytes in the buffer. +- ` + "`buffer_spillover`" + ` Counter metric tracking the total number of bytes dropped because of spillover. + +`). Field(service.NewIntField("limit"). Description(`The maximum buffer size (in bytes) to allow before applying backpressure upstream.`). Default(524288000)). Field(service.NewBoolField("spillover"). Description("Whether to drop incoming messages that will exceed the buffer limit."). Advanced(). + Version("1.13.0"). Default(false)). Field(service.NewInternalField(bs)) } @@ -97,7 +105,7 @@ func newMemoryBufferFromConfig(conf *service.ParsedConfig, res *service.Resource } } - return newMemoryBuffer(limit, spilloverEnabled, batcher), nil + return newMemoryBuffer(limit, spilloverEnabled, batcher, res), nil } //------------------------------------------------------------------------------ @@ -118,14 +126,19 @@ type memoryBuffer struct { closed bool batcher *service.Batcher + + activeBytes *service.MetricGauge + spilloverBytes *service.MetricCounter } -func newMemoryBuffer(capacity int, spilloverEnabled bool, batcher *service.Batcher) *memoryBuffer { +func newMemoryBuffer(capacity int, spilloverEnabled bool, batcher *service.Batcher, res *service.Resources) *memoryBuffer { return &memoryBuffer{ cap: capacity, spilloverEnabled: spilloverEnabled, cond: sync.NewCond(&sync.Mutex{}), batcher: batcher, + activeBytes: res.Metrics().NewGauge("buffer_active"), + spilloverBytes: res.Metrics().NewCounter("buffer_spillover"), } } @@ -224,6 +237,7 @@ func (m *memoryBuffer) ReadBatch(ctx context.Context) (service.MessageBatch, ser defer m.cond.L.Unlock() if err == nil { m.bytes -= outSize + m.activeBytes.Set(int64(m.bytes)) } else { m.batches = append(batchSources, m.batches...) } @@ -262,6 +276,7 @@ func (m *memoryBuffer) WriteBatch(ctx context.Context, msgBatch service.MessageB for (m.bytes + extraBytes) > m.cap { if m.spilloverEnabled { + m.spilloverBytes.Incr(int64(extraBytes)) return component.ErrLimitReached } @@ -276,6 +291,7 @@ func (m *memoryBuffer) WriteBatch(ctx context.Context, msgBatch service.MessageB size: extraBytes, }) m.bytes += extraBytes + m.activeBytes.Set(int64(m.bytes)) m.cond.Broadcast() return nil diff --git a/website/docs/components/buffers/memory.md b/website/docs/components/buffers/memory.md index acf835f4d..bd6d5ccb4 100644 --- a/website/docs/components/buffers/memory.md +++ b/website/docs/components/buffers/memory.md @@ -73,6 +73,13 @@ This buffer intentionally weakens the delivery guarantees of the pipeline and th It is possible to batch up messages sent from this buffer using a [batch policy](/docs/configuration/batching#batch-policy). +## Metrics + +- `buffer_active` Gauge metric tracking the current number of bytes in the buffer. +- `buffer_spillover` Counter metric tracking the total number of bytes dropped because of spillover. + + + ## Fields ### `limit` @@ -90,6 +97,7 @@ Whether to drop incoming messages that will exceed the buffer limit. Type: `bool` Default: `false` +Requires version 1.13.0 or newer ### `batch_policy`