Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Add missing tenant label to some loki.write metrics #2297

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ Main (unreleased)

- Update `prometheus.write.queue` library for performance increases in cpu. (@mattdurham)

- Add `tenant` label to remaining `loki_write_.+` metrics (@towolf)

### Bugfixes

- Fixed issue with automemlimit logging bad messages and trying to access cgroup on non-linux builds (@dehaansa)
Expand Down
29 changes: 9 additions & 20 deletions internal/component/common/loki/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ type Metrics struct {
mutatedBytes *prometheus.CounterVec
requestDuration *prometheus.HistogramVec
batchRetries *prometheus.CounterVec
countersWithHost []*prometheus.CounterVec
countersWithHostTenant []*prometheus.CounterVec
countersWithHostTenantReason []*prometheus.CounterVec
}
Expand All @@ -71,19 +70,19 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {
m.encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "loki_write_encoded_bytes_total",
Help: "Number of bytes encoded and ready to send.",
}, []string{HostLabel})
}, []string{HostLabel, TenantLabel})
m.sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "loki_write_sent_bytes_total",
Help: "Number of bytes sent.",
}, []string{HostLabel})
}, []string{HostLabel, TenantLabel})
m.droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "loki_write_dropped_bytes_total",
Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.",
}, []string{HostLabel, TenantLabel, ReasonLabel})
m.sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "loki_write_sent_entries_total",
Help: "Number of log entries sent to the ingester.",
}, []string{HostLabel})
}, []string{HostLabel, TenantLabel})
m.droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "loki_write_dropped_entries_total",
Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.",
Expand All @@ -99,18 +98,14 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {
m.requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "loki_write_request_duration_seconds",
Help: "Duration of send requests.",
}, []string{"status_code", HostLabel})
}, []string{"status_code", HostLabel, TenantLabel})
m.batchRetries = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "loki_write_batch_retries_total",
Help: "Number of times batches has had to be retried.",
}, []string{HostLabel, TenantLabel})

m.countersWithHost = []*prometheus.CounterVec{
m.encodedBytes, m.sentBytes, m.sentEntries,
}

m.countersWithHostTenant = []*prometheus.CounterVec{
m.batchRetries,
m.batchRetries, m.encodedBytes, m.sentBytes, m.sentEntries,
}

m.countersWithHostTenantReason = []*prometheus.CounterVec{
Expand Down Expand Up @@ -210,12 +205,6 @@ func newClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLin

c.client.Timeout = cfg.Timeout

// Initialize counters to 0 so the metrics are exported before the first
// occurrence of incrementing to avoid missing metrics.
for _, counter := range c.metrics.countersWithHost {
counter.WithLabelValues(c.cfg.URL.Host).Add(0)
}

c.wg.Add(1)
go c.run()
return c, nil
Expand Down Expand Up @@ -357,7 +346,7 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
return
}
bufBytes := float64(len(buf))
c.metrics.encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
c.metrics.encodedBytes.WithLabelValues(c.cfg.URL.Host, tenantID).Add(bufBytes)

backoff := backoff.New(c.ctx, c.cfg.BackoffConfig)
var status int
Expand All @@ -366,7 +355,7 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
// send uses `timeout` internally, so `context.Background` is good enough.
status, err = c.send(context.Background(), tenantID, buf)

c.metrics.requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())
c.metrics.requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host, tenantID).Observe(time.Since(start).Seconds())

// Immediately drop rate limited batches to avoid HOL blocking for other tenants not experiencing throttling
if c.cfg.DropRateLimitedBatches && batchIsRateLimited(status) {
Expand All @@ -377,8 +366,8 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
}

if err == nil {
c.metrics.sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
c.metrics.sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
c.metrics.sentBytes.WithLabelValues(c.cfg.URL.Host, tenantID).Add(bufBytes)
c.metrics.sentEntries.WithLabelValues(c.cfg.URL.Host, tenantID).Add(float64(entriesCount))

return
}
Expand Down
20 changes: 10 additions & 10 deletions internal/component/common/loki/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestClient_Handle(t *testing.T) {
expectedMetrics: `
# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.
# TYPE loki_write_sent_entries_total counter
loki_write_sent_entries_total{host="__HOST__"} 3.0
loki_write_sent_entries_total{host="__HOST__",tenant=""} 3.0
# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE loki_write_dropped_entries_total counter
loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestClient_Handle(t *testing.T) {
expectedMetrics: `
# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.
# TYPE loki_write_sent_entries_total counter
loki_write_sent_entries_total{host="__HOST__"} 2.0
loki_write_sent_entries_total{host="__HOST__",tenant=""} 2.0
# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE loki_write_dropped_entries_total counter
loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestClient_Handle(t *testing.T) {
expectedMetrics: `
# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.
# TYPE loki_write_sent_entries_total counter
loki_write_sent_entries_total{host="__HOST__"} 3.0
loki_write_sent_entries_total{host="__HOST__",tenant=""} 3.0
# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE loki_write_dropped_entries_total counter
loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
Expand Down Expand Up @@ -208,7 +208,7 @@ func TestClient_Handle(t *testing.T) {
expectedMetrics: `
# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.
# TYPE loki_write_sent_entries_total counter
loki_write_sent_entries_total{host="__HOST__"} 2.0
loki_write_sent_entries_total{host="__HOST__",tenant=""} 2.0
# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE loki_write_dropped_entries_total counter
loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant=""} 0
Expand Down Expand Up @@ -270,7 +270,7 @@ func TestClient_Handle(t *testing.T) {
loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.
# TYPE loki_write_sent_entries_total counter
loki_write_sent_entries_total{host="__HOST__"} 0
loki_write_sent_entries_total{host="__HOST__",tenant=""} 0
`,
},
"do not retry send a batch in case the server responds with a 4xx": {
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestClient_Handle(t *testing.T) {
loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.
# TYPE loki_write_sent_entries_total counter
loki_write_sent_entries_total{host="__HOST__"} 0
loki_write_sent_entries_total{host="__HOST__",tenant=""} 0
`,
},
"do retry sending a batch in case the server responds with a 429": {
Expand Down Expand Up @@ -350,7 +350,7 @@ func TestClient_Handle(t *testing.T) {
loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.
# TYPE loki_write_sent_entries_total counter
loki_write_sent_entries_total{host="__HOST__"} 0
loki_write_sent_entries_total{host="__HOST__",tenant=""} 0
`,
},
"do not retry in case of 429 when client is configured to drop rate limited batches": {
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestClient_Handle(t *testing.T) {
loki_write_mutated_bytes_total{host="__HOST__",reason="stream_limited",tenant=""} 0
# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.
# TYPE loki_write_sent_entries_total counter
loki_write_sent_entries_total{host="__HOST__"} 0
loki_write_sent_entries_total{host="__HOST__",tenant=""} 0
`,
},
"batch log entries together honoring the client tenant ID": {
Expand All @@ -406,7 +406,7 @@ func TestClient_Handle(t *testing.T) {
expectedMetrics: `
# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.
# TYPE loki_write_sent_entries_total counter
loki_write_sent_entries_total{host="__HOST__"} 2.0
loki_write_sent_entries_total{host="__HOST__",tenant="tenant-default"} 2.0
# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE loki_write_dropped_entries_total counter
loki_write_dropped_entries_total{host="__HOST__", reason="ingester_error", tenant="tenant-default"} 0
Expand Down Expand Up @@ -451,7 +451,7 @@ func TestClient_Handle(t *testing.T) {
expectedMetrics: `
# HELP loki_write_sent_entries_total Number of log entries sent to the ingester.
# TYPE loki_write_sent_entries_total counter
loki_write_sent_entries_total{host="__HOST__"} 4.0
loki_write_sent_entries_total{host="__HOST__",tenant=""} 4.0
# HELP loki_write_dropped_entries_total Number of log entries dropped because failed to be sent to the ingester after all retries.
# TYPE loki_write_dropped_entries_total counter
loki_write_dropped_entries_total{host="__HOST__",reason="ingester_error",tenant="tenant-1"} 0
Expand Down
14 changes: 4 additions & 10 deletions internal/component/common/loki/client/queue_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,6 @@ func newQueueClient(metrics *Metrics, qcMetrics *QueueClientMetrics, cfg Config,

c.client.Timeout = cfg.Timeout

// Initialize counters to 0 so the metrics are exported before the first
// occurrence of incrementing to avoid missing metrics.
for _, counter := range c.metrics.countersWithHost {
counter.WithLabelValues(c.cfg.URL.Host).Add(0)
}

c.wg.Add(1)
go c.runSendOldBatches()
return c, nil
Expand Down Expand Up @@ -456,7 +450,7 @@ func (c *queueClient) sendBatch(ctx context.Context, tenantID string, batch *bat
return
}
bufBytes := float64(len(buf))
c.metrics.encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
c.metrics.encodedBytes.WithLabelValues(c.cfg.URL.Host, tenantID).Add(bufBytes)

backoff := backoff.New(c.ctx, c.cfg.BackoffConfig)
var status int
Expand All @@ -465,7 +459,7 @@ func (c *queueClient) sendBatch(ctx context.Context, tenantID string, batch *bat
// send uses `timeout` internally, so `context.Background` is good enough.
status, err = c.send(ctx, tenantID, buf)

c.metrics.requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())
c.metrics.requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host, tenantID).Observe(time.Since(start).Seconds())

// Immediately drop rate limited batches to avoid HOL blocking for other tenants not experiencing throttling
if c.cfg.DropRateLimitedBatches && batchIsRateLimited(status) {
Expand All @@ -476,8 +470,8 @@ func (c *queueClient) sendBatch(ctx context.Context, tenantID string, batch *bat
}

if err == nil {
c.metrics.sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
c.metrics.sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
c.metrics.sentBytes.WithLabelValues(c.cfg.URL.Host, tenantID).Add(bufBytes)
c.metrics.sentEntries.WithLabelValues(c.cfg.URL.Host, tenantID).Add(float64(entriesCount))

return
}
Expand Down
Loading