Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metric label filtering #283

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,22 @@ The flag may be repeated to provide several sets of filters, in which case the m

#### File

The sidecar can also be provided with a configuration file. It allows to define static metric renames and to overwrite metric metadata which is usually provided by Prometheus. A configuration file should not be required for the majority of users.
The sidecar can also be provided with a configuration file. It allows to define static metric renames, filter metric labels and to overwrite metric metadata which is usually provided by Prometheus. A configuration file should not be required for the majority of users.

```yaml
metric_renames:
- from: original_metric_name
to: new_metric_name
# - ...

metric_label_filters:
- metric: "^too_many_labels.*"
allow:
- important_label
- other_important_label
- ...
# - ...

static_metadata:
- metric: some_metric_name
type: counter # or gauge/histogram
Expand All @@ -83,6 +91,7 @@ static_metadata:
# - ...
```

* The `metric_label_filters` accept regular expressions for `metric` names and a list of label names to `allow` for matching metrics.
* All `static_metadata` entries must have `type` specified. This specifies the Stackdriver metric type and overrides the metric type chosen by the Prometheus client.
* If `value_type` is specified, it will override the default value type for counters and gauges. All Prometheus metrics have a default type of double.

Expand Down
41 changes: 31 additions & 10 deletions cmd/stackdriver-prometheus-sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os/signal"
"path"
"path/filepath"
"regexp"
"runtime"
"strings"
"syscall"
Expand Down Expand Up @@ -144,6 +145,11 @@ type metricRenamesConfig struct {
To string `json:"to"`
}

type metricLabelFiltersConfig struct {
Metric string `json:"metric"`
Allow []string `json:"allow"`
}

type staticMetadataConfig struct {
Metric string `json:"metric"`
Type string `json:"type"`
Expand All @@ -159,6 +165,7 @@ type aggregatedCountersConfig struct {

type fileConfig struct {
MetricRenames []metricRenamesConfig `json:"metric_renames"`
MetricLabelFilters []metricLabelFiltersConfig `json:"metric_label_filters"`
StaticMetadata []staticMetadataConfig `json:"static_metadata"`
AggregatedCounters []aggregatedCountersConfig `json:"aggregated_counters"`
}
Expand All @@ -182,6 +189,7 @@ type mainConfig struct {
Filtersets []string
Aggregations retrieval.CounterAggregatorConfig
MetricRenames map[string]string
MetricLabelFilters []retrieval.LabelFilter
StaticMetadata []*metadata.Entry
UseRestrictedIPs bool
manualResolver *manual.Resolver
Expand Down Expand Up @@ -269,7 +277,7 @@ func main() {

logger := promlog.New(&cfg.PromlogConfig)
if cfg.ConfigFilename != "" {
cfg.MetricRenames, cfg.StaticMetadata, cfg.Aggregations, err = parseConfigFile(cfg.ConfigFilename)
cfg.MetricRenames, cfg.MetricLabelFilters, cfg.StaticMetadata, cfg.Aggregations, err = parseConfigFile(cfg.ConfigFilename)
if err != nil {
msg := fmt.Sprintf("Parse config file %s", cfg.ConfigFilename)
level.Error(logger).Log("msg", msg, "err", err)
Expand Down Expand Up @@ -473,6 +481,7 @@ func main() {
tailer,
filtersets,
cfg.MetricRenames,
cfg.MetricLabelFilters,
retrieval.TargetsWithDiscoveredLabels(targetCache, labels.FromMap(staticLabels)),
metadataCache,
queueManager,
Expand Down Expand Up @@ -745,23 +754,35 @@ func fillMetadata(staticConfig *map[string]string) {
}
}

func parseConfigFile(filename string) (map[string]string, []*metadata.Entry, retrieval.CounterAggregatorConfig, error) {
func parseConfigFile(filename string) (map[string]string, []retrieval.LabelFilter, []*metadata.Entry, retrieval.CounterAggregatorConfig, error) {
b, err := ioutil.ReadFile(filename)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "reading file")
return nil, nil, nil, nil, errors.Wrap(err, "reading file")
}
var fc fileConfig
if err := yaml.Unmarshal(b, &fc); err != nil {
return nil, nil, nil, errors.Wrap(err, "invalid YAML")
return nil, nil, nil, nil, errors.Wrap(err, "invalid YAML")
}
return processFileConfig(fc)
}

func processFileConfig(fc fileConfig) (map[string]string, []*metadata.Entry, retrieval.CounterAggregatorConfig, error) {
func processFileConfig(fc fileConfig) (map[string]string, []retrieval.LabelFilter, []*metadata.Entry, retrieval.CounterAggregatorConfig, error) {
renameMapping := map[string]string{}
for _, r := range fc.MetricRenames {
renameMapping[r.From] = r.To
}
labelFilters := []retrieval.LabelFilter{}
for _, lf := range fc.MetricLabelFilters {
allow := map[string]bool{}
for _, l := range lf.Allow {
allow[l] = true
}
filter := retrieval.LabelFilter{
regexp.MustCompile(lf.Metric),
allow,
}
labelFilters = append(labelFilters, filter)
}
staticMetadata := []*metadata.Entry{}
for _, sm := range fc.StaticMetadata {
switch sm.Type {
Expand All @@ -771,7 +792,7 @@ func processFileConfig(fc fileConfig) (map[string]string, []*metadata.Entry, ret
case textparse.MetricTypeCounter, textparse.MetricTypeGauge, textparse.MetricTypeHistogram,
textparse.MetricTypeSummary, textparse.MetricTypeUnknown:
default:
return nil, nil, nil, errors.Errorf("invalid metric type %q", sm.Type)
return nil, nil, nil, nil, errors.Errorf("invalid metric type %q", sm.Type)
}
var valueType metric_pb.MetricDescriptor_ValueType
switch sm.ValueType {
Expand All @@ -782,7 +803,7 @@ func processFileConfig(fc fileConfig) (map[string]string, []*metadata.Entry, ret
case "":
valueType = metric_pb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED
default:
return nil, nil, nil, errors.Errorf("invalid value type %q", sm.ValueType)
return nil, nil, nil, nil, errors.Errorf("invalid value type %q", sm.ValueType)
}
staticMetadata = append(staticMetadata,
&metadata.Entry{Metric: sm.Metric, MetricType: textparse.MetricType(sm.Type), ValueType: valueType, Help: sm.Help})
Expand All @@ -791,17 +812,17 @@ func processFileConfig(fc fileConfig) (map[string]string, []*metadata.Entry, ret
aggregations := make(retrieval.CounterAggregatorConfig)
for _, c := range fc.AggregatedCounters {
if _, ok := aggregations[c.Metric]; ok {
return nil, nil, nil, errors.Errorf("duplicate counter aggregator metric %s", c.Metric)
return nil, nil, nil, nil, errors.Errorf("duplicate counter aggregator metric %s", c.Metric)
}
a := &retrieval.CounterAggregatorMetricConfig{Help: c.Help}
for _, f := range c.Filters {
matcher, err := promql.ParseMetricSelector(f)
if err != nil {
return nil, nil, nil, errors.Errorf("cannot parse metric selector '%s': %q", f, err)
return nil, nil, nil, nil, errors.Errorf("cannot parse metric selector '%s': %q", f, err)
}
a.Matchers = append(a.Matchers, matcher)
}
aggregations[c.Metric] = a
}
return renameMapping, staticMetadata, aggregations, nil
return renameMapping, labelFilters, staticMetadata, aggregations, nil
}
22 changes: 20 additions & 2 deletions cmd/stackdriver-prometheus-sidecar/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"net/http"
"os"
"os/exec"
"regexp"
"testing"
"time"

Expand Down Expand Up @@ -165,10 +166,14 @@ func TestProcessFileConfig(t *testing.T) {
}
return m
}
regexpComparer := cmp.Comparer(func(x, y regexp.Regexp) bool {
return x.String() == y.String()
})
for _, tt := range []struct {
name string
config fileConfig
renameMappings map[string]string
labelFilters []retrieval.LabelFilter
staticMetadata []*metadata.Entry
aggregations retrieval.CounterAggregatorConfig
err error
Expand All @@ -177,6 +182,7 @@ func TestProcessFileConfig(t *testing.T) {
"empty",
fileConfig{},
map[string]string{},
[]retrieval.LabelFilter{},
[]*metadata.Entry{},
retrieval.CounterAggregatorConfig{},
nil,
Expand All @@ -187,6 +193,9 @@ func TestProcessFileConfig(t *testing.T) {
MetricRenames: []metricRenamesConfig{
{From: "from", To: "to"},
},
MetricLabelFilters: []metricLabelFiltersConfig{
{Metric: "^too_many_labels.*", Allow: []string{"important_label", "other_important_label"}},
},
StaticMetadata: []staticMetadataConfig{
{Metric: "int64_counter", Type: "counter", ValueType: "int64", Help: "help1"},
{Metric: "double_gauge", Type: "gauge", ValueType: "double", Help: "help2"},
Expand All @@ -201,6 +210,12 @@ func TestProcessFileConfig(t *testing.T) {
},
},
map[string]string{"from": "to"},
[]retrieval.LabelFilter{
retrieval.LabelFilter{
regexp.MustCompile("^too_many_labels.*"),
map[string]bool{"important_label": true, "other_important_label": true},
},
},
[]*metadata.Entry{
&metadata.Entry{Metric: "int64_counter", MetricType: textparse.MetricTypeCounter, ValueType: metric_pb.MetricDescriptor_INT64, Help: "help1"},
&metadata.Entry{Metric: "double_gauge", MetricType: textparse.MetricTypeGauge, ValueType: metric_pb.MetricDescriptor_DOUBLE, Help: "help2"},
Expand All @@ -222,15 +237,18 @@ func TestProcessFileConfig(t *testing.T) {
fileConfig{
StaticMetadata: []staticMetadataConfig{{Metric: "int64_default", ValueType: "int64"}},
},
nil, nil, nil,
nil, nil, nil, nil,
errors.New("invalid metric type \"\""),
},
} {
t.Run(tt.name, func(t *testing.T) {
renameMappings, staticMetadata, aggregations, err := processFileConfig(tt.config)
renameMappings, labelFilters, staticMetadata, aggregations, err := processFileConfig(tt.config)
if diff := cmp.Diff(tt.renameMappings, renameMappings); diff != "" {
t.Errorf("renameMappings mismatch: %v", diff)
}
if diff := cmp.Diff(tt.labelFilters, labelFilters, regexpComparer); diff != "" {
t.Errorf("labelFilters mismatch: %v", diff)
}
if diff := cmp.Diff(tt.staticMetadata, staticMetadata); diff != "" {
t.Errorf("staticMetadata mismatch: %v", diff)
}
Expand Down
4 changes: 4 additions & 0 deletions retrieval/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func NewPrometheusReader(
tailer *tail.Tailer,
filtersets [][]*labels.Matcher,
metricRenames map[string]string,
labelFilters []LabelFilter,
targetGetter TargetGetter,
metadataGetter MetadataGetter,
appender Appender,
Expand All @@ -102,6 +103,7 @@ func NewPrometheusReader(
metadataGetter: metadataGetter,
progressSaveInterval: time.Minute,
metricRenames: metricRenames,
labelFilters: labelFilters,
metricsPrefix: metricsPrefix,
useGkeResource: useGkeResource,
counterAggregator: counterAggregator,
Expand All @@ -114,6 +116,7 @@ type PrometheusReader struct {
tailer *tail.Tailer
filtersets [][]*labels.Matcher
metricRenames map[string]string
labelFilters []LabelFilter
targetGetter TargetGetter
metadataGetter MetadataGetter
appender Appender
Expand Down Expand Up @@ -157,6 +160,7 @@ func (r *PrometheusReader) Run(ctx context.Context, startOffset int) error {
r.walDirectory,
r.filtersets,
r.metricRenames,
r.labelFilters,
r.targetGetter,
r.metadataGetter,
ResourceMappings,
Expand Down
4 changes: 2 additions & 2 deletions retrieval/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestReader_Progress(t *testing.T) {
}

aggr, _ := NewCounterAggregator(log.NewNopLogger(), new(CounterAggregatorConfig))
r := NewPrometheusReader(nil, dir, tailer, nil, nil, targetMap, metadataMap, &nopAppender{}, "", false, aggr)
r := NewPrometheusReader(nil, dir, tailer, nil, nil, nil, targetMap, metadataMap, &nopAppender{}, "", false, aggr)
r.progressSaveInterval = 200 * time.Millisecond

// Populate sample data
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestReader_Progress(t *testing.T) {
}

recorder := &nopAppender{}
r = NewPrometheusReader(nil, dir, tailer, nil, nil, targetMap, metadataMap, recorder, "", false, aggr)
r = NewPrometheusReader(nil, dir, tailer, nil, nil, nil, targetMap, metadataMap, recorder, "", false, aggr)
go r.Run(ctx, progressOffset)

// Wait for reader to process until the end.
Expand Down
23 changes: 22 additions & 1 deletion retrieval/series_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package retrieval

import (
"context"
"regexp"
"sync"
"time"

Expand Down Expand Up @@ -82,6 +83,7 @@ type seriesCache struct {
metricsPrefix string
useGkeResource bool
renames map[string]string
labelFilters []LabelFilter

// lastCheckpoint holds the index of the last checkpoint we garbage collected for.
// We don't have to redo garbage collection until a higher checkpoint appears.
Expand Down Expand Up @@ -137,6 +139,11 @@ type seriesCacheEntry struct {
tracker *counterTracker
}

type LabelFilter struct {
Metric *regexp.Regexp
Allow map[string]bool
}

const refreshInterval = 3 * time.Minute

func (e *seriesCacheEntry) populated() bool {
Expand All @@ -152,6 +159,7 @@ func newSeriesCache(
dir string,
filtersets [][]*promlabels.Matcher,
renames map[string]string,
labelFilters []LabelFilter,
targets TargetGetter,
metadata MetadataGetter,
resourceMaps []ResourceMap,
Expand All @@ -174,6 +182,7 @@ func newSeriesCache(
metricsPrefix: metricsPrefix,
useGkeResource: useGkeResource,
renames: renames,
labelFilters: labelFilters,
counterAggregator: counterAggregator,
}
}
Expand Down Expand Up @@ -388,6 +397,19 @@ func (c *seriesCache) refresh(ctx context.Context, ref uint64) error {
break
}
}
// Apply label filters
metricName := entry.lset.Get("__name__")
for _, lf := range c.labelFilters {
if lf.Metric.MatchString(metricName) {
filteredLabels := finalLabels[:0]
for _, l := range finalLabels {
if lf.Allow[l.Name] {
filteredLabels = append(filteredLabels, l)
}
}
finalLabels = filteredLabels
}
}
// Drop series with too many labels.
if len(finalLabels) > maxLabelCount {
ctx, _ = tag.New(ctx, tag.Insert(keyReason, "too_many_labels"))
Expand All @@ -397,7 +419,6 @@ func (c *seriesCache) refresh(ctx context.Context, ref uint64) error {
}

var (
metricName = entry.lset.Get("__name__")
baseMetricName string
suffix string
job = entry.lset.Get("job")
Expand Down
Loading