Skip to content

Commit

Permalink
Merge branch 'release-v0.40' into jdb/2024-04-use-ref-uris-v0.40
Browse files Browse the repository at this point in the history
  • Loading branch information
jdbaldry authored May 20, 2024
2 parents f2e2bc3 + 6e3c0c2 commit d035906
Show file tree
Hide file tree
Showing 39 changed files with 487 additions and 241 deletions.
2 changes: 1 addition & 1 deletion .github/depcheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ github_repos:
- github.com/google/dnsmasq_exporter v0.2.0
- github.com/ncabatoff/process-exporter v0.7.5
- github.com/prometheus/mysqld_exporter v0.13.0
- github.com/prometheus-community/postgres_exporter v0.10.0
- github.com/prometheus-community/postgres_exporter v0.15.0
- github.com/prometheus-community/windows_exporter v0.16.0
- github.com/percona/mongodb_exporter v0.20.7
- project: github.com/prometheus/prometheus
Expand Down
32 changes: 32 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,38 @@ This document contains a historical list of changes between releases. Only
changes that impact end-user behavior are listed; changes to documentation or
internal API changes are not present.

v0.40.5 (2024-05-15)
--------------------

### Breaking changes

- `prometheus.exporter.postgres` has been updated to the latest upstream
version which changes the set of exported metrics. The following metrics were
removed: `pg_stat_database_session_time`, `pg_stat_database_sessions`,
`pg_stat_database_sessions_abandoned`, `pg_stat_database_sessions_fatal`,
`pg_stat_database_sessions_killed`, `pg_stat_database_idle_in_transaction_time`,
`pg_stat_database_checksum_failures`, `pg_stat_database_checksum_last_failure`,
`pg_stat_database_active_time`. The following metrics were
renamed: `pg_stat_bgwriter_buffers_alloc`, `pg_stat_bgwriter_buffers_backend`,
`pg_stat_bgwriter_buffers_backend_fsync`, `pg_stat_bgwriter_buffers_checkpoint`,
`pg_stat_bgwriter_buffers_clean`, `pg_stat_bgwriter_checkpoint_sync_time`,
`pg_stat_bgwriter_checkpoint_write_time`, `pg_stat_bgwriter_checkpoints_req`,
`pg_stat_bgwriter_checkpoints_timed`, `pg_stat_bgwriter_maxwritten_clean`,
`pg_stat_bgwriter_stats_reset` - the new names include the `_total` suffix. (@thampiotr)

### Bugfixes

- Fix an issue where the azure exporter was not correctly gathering subscription scoped metrics when only one region was configured (@kgeckhart)

- Fixed an issue where creating a `prometheus.exporter.postgres` component with
multiple `data_source_names` would result in an error. (@thampiotr)

- Fix a bug with the logs pipeline in static mode which prevented it from shutting down cleanly.

### Other changes

- Updating SNMP exporter from v0.24.1 to v0.26.0.

v0.40.4 (2024-04-12)
--------------------

Expand Down
4 changes: 2 additions & 2 deletions component/common/net/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ func (g *GRPCConfig) Into(c *dskit.Config) {
c.GRPCServerMaxConnectionAge = g.MaxConnectionAge
c.GRPCServerMaxConnectionAgeGrace = g.MaxConnectionAgeGrace
c.GRPCServerMaxConnectionIdle = g.MaxConnectionIdle
c.GPRCServerMaxRecvMsgSize = g.ServerMaxRecvMsg
c.GRPCServerMaxRecvMsgSize = g.ServerMaxRecvMsg
c.GRPCServerMaxSendMsgSize = g.ServerMaxSendMsg
c.GPRCServerMaxConcurrentStreams = g.ServerMaxConcurrentStreams
c.GRPCServerMaxConcurrentStreams = g.ServerMaxConcurrentStreams
}

// Convert converts the River-based ServerConfig into a dskit.Config object.
Expand Down
8 changes: 4 additions & 4 deletions component/common/net/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestConfig(t *testing.T) {
require.Equal(t, time.Second*30, config.ServerGracefulShutdownTimeout)

require.Equal(t, size4MB, config.GRPCServerMaxSendMsgSize)
require.Equal(t, size4MB, config.GPRCServerMaxRecvMsgSize)
require.Equal(t, size4MB, config.GRPCServerMaxRecvMsgSize)
},
},
"overriding defaults": {
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestConfig(t *testing.T) {
require.Equal(t, "0.0.0.0", config.GRPCListenAddress)
require.Equal(t, 10, config.GRPCServerMaxSendMsgSize)
// this should have the default applied
require.Equal(t, size4MB, config.GPRCServerMaxRecvMsgSize)
require.Equal(t, size4MB, config.GRPCServerMaxRecvMsgSize)

require.Equal(t, time.Minute, config.ServerGracefulShutdownTimeout)
},
Expand Down Expand Up @@ -141,9 +141,9 @@ func TestConfig(t *testing.T) {
require.Equal(t, 5*time.Minute, config.GRPCServerMaxConnectionAge)
require.Equal(t, 6*time.Minute, config.GRPCServerMaxConnectionAgeGrace)
require.Equal(t, 7*time.Minute, config.GRPCServerMaxConnectionIdle)
require.Equal(t, 5, config.GPRCServerMaxRecvMsgSize)
require.Equal(t, 5, config.GRPCServerMaxRecvMsgSize)
require.Equal(t, 6, config.GRPCServerMaxSendMsgSize)
require.Equal(t, uint(7), config.GPRCServerMaxConcurrentStreams)
require.Equal(t, uint(7), config.GRPCServerMaxConcurrentStreams)
},
},
}
Expand Down
6 changes: 6 additions & 0 deletions component/loki/process/metric/metricvec.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ func (c *metricVec) Delete(labels model.LabelSet) bool {
return ok
}

func (c *metricVec) DeleteAll() {
c.mtx.Lock()
defer c.mtx.Unlock()
c.metrics = map[model.Fingerprint]prometheus.Metric{}
}

// prune will remove all metrics which implement the Expirable interface and have expired
// it does not take out a lock on the metrics map so whoever calls this function should do so.
func (c *metricVec) prune() {
Expand Down
5 changes: 5 additions & 0 deletions component/loki/process/stages/decolorize.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,8 @@ func (m *decolorizeStage) Run(in chan Entry) chan Entry {
func (m *decolorizeStage) Name() string {
return StageTypeDecolorize
}

// Cleanup implements Stage.
func (*decolorizeStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions component/loki/process/stages/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,8 @@ func splitSource(s string) []string {
func (m *dropStage) Name() string {
return StageTypeDrop
}

// Cleanup implements Stage.
func (*dropStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions component/loki/process/stages/eventlogmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ func (m *eventLogMessageStage) Name() string {
return StageTypeEventLogMessage
}

// Cleanup implements Stage.
func (*eventLogMessageStage) Cleanup() {
// no-op
}

// Sanitize a input string to convert it into a valid prometheus label
// TODO: switch to prometheus/prometheus/util/strutil/SanitizeFullLabelName
func SanitizeFullLabelName(input string) string {
Expand Down
5 changes: 5 additions & 0 deletions component/loki/process/stages/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ func (c *cri) Name() string {
return StageTypeCRI
}

// Cleanup implements Stage.
func (*cri) Cleanup() {
// no-op
}

// implements Stage interface
func (c *cri) Run(entry chan Entry) chan Entry {
entry = c.base.Run(entry)
Expand Down
5 changes: 5 additions & 0 deletions component/loki/process/stages/geoip.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ func (g *geoIPStage) Name() string {
return StageTypeGeoIP
}

// Cleanup implements Stage.
func (*geoIPStage) Cleanup() {
// no-op
}

func (g *geoIPStage) process(_ model.LabelSet, extracted map[string]interface{}) {
var ip net.IP
if g.cfgs.Source != nil {
Expand Down
5 changes: 5 additions & 0 deletions component/loki/process/stages/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,8 @@ func (j *jsonStage) processEntry(extracted map[string]interface{}, entry *string
func (j *jsonStage) Name() string {
return StageTypeJSON
}

// Cleanup implements Stage.
func (*jsonStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions component/loki/process/stages/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ func (m *limitStage) Name() string {
return StageTypeLimit
}

// Cleanup implements Stage.
func (*limitStage) Cleanup() {
// no-op
}

func getDropCountByLabelMetric(registerer prometheus.Registerer) *prometheus.CounterVec {
return registerCounterVec(registerer, "loki_process", "dropped_lines_by_label_total",
"A count of all log lines dropped as a result of a pipeline stage",
Expand Down
5 changes: 5 additions & 0 deletions component/loki/process/stages/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,8 @@ func (m *matcherStage) processLogQL(e Entry) (Entry, bool) {
func (m *matcherStage) Name() string {
return StageTypeMatch
}

// Cleanup implements Stage.
func (*matcherStage) Cleanup() {
// no-op
}
31 changes: 29 additions & 2 deletions component/loki/process/stages/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ func newMetricStage(logger log.Logger, config MetricsConfig, registry prometheus
return nil, fmt.Errorf("undefined stage type in '%v', exiting", cfg)
}
}
return toStage(&metricStage{
return &metricStage{
logger: logger,
cfg: config,
metrics: metrics,
}), nil
}, nil
}

// metricStage creates and updates prometheus metrics based on extracted pipeline data
Expand All @@ -118,6 +118,19 @@ type metricStage struct {
metrics map[string]cfgCollector
}

func (m *metricStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)

for e := range in {
m.Process(e.Labels, e.Extracted, &e.Timestamp, &e.Line)
out <- e
}
}()
return out
}

// Process implements Stage
func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
for name, cc := range m.metrics {
Expand Down Expand Up @@ -162,6 +175,20 @@ func (m *metricStage) Name() string {
return StageTypeMetric
}

// Cleanup implements Stage.
func (m *metricStage) Cleanup() {
for _, cfgCollector := range m.metrics {
switch vec := cfgCollector.collector.(type) {
case *metric.Counters:
vec.DeleteAll()
case *metric.Gauges:
vec.DeleteAll()
case *metric.Histograms:
vec.DeleteAll()
}
}
}

// recordCounter will update a counter metric
func (m *metricStage) recordCounter(name string, counter *metric.Counters, labels model.LabelSet, v interface{}) {
// If value matching is defined, make sure value matches.
Expand Down
7 changes: 7 additions & 0 deletions component/loki/process/stages/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ func TestMetricsPipeline(t *testing.T) {
strings.NewReader(expectedMetrics)); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}

pl.Cleanup()

if err := testutil.GatherAndCompare(registry,
strings.NewReader("")); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}
}

func TestNegativeGauge(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions component/loki/process/stages/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,8 @@ func (m *multilineStage) flush(out chan Entry, s *multilineState) {
func (m *multilineStage) Name() string {
return StageTypeMultiline
}

// Cleanup implements Stage.
func (*multilineStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions component/loki/process/stages/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,8 @@ func (m *packStage) pack(e Entry) Entry {
func (m *packStage) Name() string {
return StageTypePack
}

// Cleanup implements Stage.
func (*packStage) Cleanup() {
// no-op
}
8 changes: 8 additions & 0 deletions component/loki/process/stages/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ func (p *Pipeline) Name() string {
return StageTypePipeline
}

// Cleanup implements Stage.
func (p *Pipeline) Cleanup() {
for _, s := range p.stages {
s.Cleanup()
}
}

// Wrap implements EntryMiddleware
func (p *Pipeline) Wrap(next loki.EntryHandler) loki.EntryHandler {
handlerIn := make(chan loki.Entry)
Expand Down Expand Up @@ -180,6 +187,7 @@ func (p *Pipeline) Wrap(next loki.EntryHandler) loki.EntryHandler {
return loki.NewEntryHandler(handlerIn, func() {
once.Do(func() { close(handlerIn) })
wg.Wait()
p.Cleanup()
})
}

Expand Down
5 changes: 5 additions & 0 deletions component/loki/process/stages/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,8 @@ func (m *samplingStage) randomNumber() uint64 {
func (m *samplingStage) Name() string {
return StageTypeSampling
}

// Cleanup implements Stage.
func (*samplingStage) Cleanup() {
// no-op
}
6 changes: 6 additions & 0 deletions component/loki/process/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Entry struct {
type Stage interface {
Name() string
Run(chan Entry) chan Entry
Cleanup()
}

func (entry *Entry) copy() *Entry {
Expand Down Expand Up @@ -237,3 +238,8 @@ func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometh
}
return s, nil
}

// Cleanup implements Stage.
func (*stageProcessor) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions component/loki/process/stages/structured_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ func (s *structuredMetadataStage) Name() string {
return StageTypeStructuredMetadata
}

// Cleanup implements Stage.
func (*structuredMetadataStage) Cleanup() {
// no-op
}

func (s *structuredMetadataStage) Run(in chan Entry) chan Entry {
return RunWith(in, func(e Entry) Entry {
processLabelsConfigs(s.logger, e.Extracted, s.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (s *PushAPIServer) getRelabelRules() []*relabel.Config {
func (s *PushAPIServer) handleLoki(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := tenant.TenantID(r.Context())
req, err := push.ParseRequest(logger, userID, r, nil)
req, err := push.ParseRequest(logger, userID, r, nil, nil, push.ParseLokiRequest)
if err != nil {
level.Warn(s.logger).Log("msg", "failed to parse incoming push request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down
Loading

0 comments on commit d035906

Please sign in to comment.