diff --git a/.golangci.yml b/.golangci.yml index fd975b2..761908b 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,8 +1,11 @@ run: modules-download-mode: vendor - + skip-dirs: + - vendor + - e2e # Run only staticcheck for now. Additional linters will be enabled one-by-one. linters: enable: - staticcheck + - goimports disable-all: true diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index 25e6156..b1c93ed 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -46,7 +46,7 @@ func TestIssue61(t *testing.T) { } defer exporter.Process.Kill() - for i := 0; i < 10; i++ { + for i := 0; i < 20; i++ { if i > 0 { time.Sleep(1 * time.Second) } @@ -93,6 +93,8 @@ rspamd.spam_count 3 NOW` t.Fatalf("write error: %v", err) } + time.Sleep(5 * time.Second) + resp, err := http.Get("http://" + path.Join(webAddr, "metrics")) if err != nil { t.Fatalf("get error: %v", err) diff --git a/e2e/issue90_test.go b/e2e/issue90_test.go index 5aa16dd..5b6882d 100644 --- a/e2e/issue90_test.go +++ b/e2e/issue90_test.go @@ -83,6 +83,8 @@ func TestIssue90(t *testing.T) { conn.Close() } + time.Sleep(5 * time.Second) + resp, err := http.Get("http://" + path.Join(webAddr, "metrics")) if err != nil { t.Fatalf("get error: %v", err) diff --git a/main.go b/main.go index a324e2a..e4e85b0 100644 --- a/main.go +++ b/main.go @@ -17,17 +17,10 @@ import ( "bufio" "bytes" "fmt" - "io" - "math" "net" "net/http" _ "net/http/pprof" "os" - "regexp" - "strconv" - "strings" - "sync" - "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -38,6 +31,8 @@ import ( "github.com/prometheus/common/version" "github.com/prometheus/statsd_exporter/pkg/mapper" "gopkg.in/alecthomas/kingpin.v2" + + "github.com/prometheus/graphite_exporter/pkg/collector" ) var ( @@ -63,170 +58,20 @@ var ( Help: "How long in seconds a metric sample is valid for.", }, ) - invalidMetricChars = regexp.MustCompile("[^a-zA-Z0-9_:]") + tagErrors = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "graphite_tag_parse_errors", + Help: "Total count of samples with invalid tags", + }, + ) + invalidMetrics = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "graphite_invalid_metrics", + Help: "Total count of metrics dropped due to mismatched label keys", + }, + ) ) -type graphiteSample struct { - OriginalName string - Name string - Labels map[string]string - Help string - Value float64 - Type prometheus.ValueType - Timestamp time.Time -} - -func (s graphiteSample) String() string { - return fmt.Sprintf("%#v", s) -} - -type metricMapper interface { - GetMapping(string, mapper.MetricType) (*mapper.MetricMapping, prometheus.Labels, bool) - InitFromFile(string, int, ...mapper.CacheOption) error - InitCache(int, ...mapper.CacheOption) -} - -type graphiteCollector struct { - samples map[string]*graphiteSample - mu *sync.Mutex - mapper metricMapper - sampleCh chan *graphiteSample - lineCh chan string - strictMatch bool - logger log.Logger -} - -func newGraphiteCollector(logger log.Logger) *graphiteCollector { - c := &graphiteCollector{ - sampleCh: make(chan *graphiteSample), - lineCh: make(chan string), - mu: &sync.Mutex{}, - samples: map[string]*graphiteSample{}, - strictMatch: *strictMatch, - logger: logger, - } - go c.processSamples() - go c.processLines() - return c -} - -func (c *graphiteCollector) processReader(reader io.Reader) { - lineScanner := bufio.NewScanner(reader) - for { - if ok := lineScanner.Scan(); !ok { - break - } - c.lineCh <- lineScanner.Text() - } -} - -func (c *graphiteCollector) processLines() { - for line := range c.lineCh { - c.processLine(line) - } -} - -func (c *graphiteCollector) processLine(line string) { - line = strings.TrimSpace(line) - level.Debug(c.logger).Log("msg", "Incoming line", "line", line) - parts := strings.Split(line, " ") - if len(parts) != 3 { - level.Info(c.logger).Log("msg", "Invalid part count", "parts", len(parts), "line", line) - return - } - originalName := parts[0] - var name string - mapping, labels, present := c.mapper.GetMapping(originalName, mapper.MetricTypeGauge) - - if (present && mapping.Action == mapper.ActionTypeDrop) || (!present && c.strictMatch) { - return - } - - if present { - name = invalidMetricChars.ReplaceAllString(mapping.Name, "_") - } else { - name = invalidMetricChars.ReplaceAllString(originalName, "_") - } - - value, err := strconv.ParseFloat(parts[1], 64) - if err != nil { - level.Info(c.logger).Log("msg", "Invalid value", "line", line) - return - } - timestamp, err := strconv.ParseFloat(parts[2], 64) - if err != nil { - level.Info(c.logger).Log("msg", "Invalid timestamp", "line", line) - return - } - sample := graphiteSample{ - OriginalName: originalName, - Name: name, - Value: value, - Labels: labels, - Type: prometheus.GaugeValue, - Help: fmt.Sprintf("Graphite metric %s", name), - Timestamp: time.Unix(int64(timestamp), int64(math.Mod(timestamp, 1.0)*1e9)), - } - level.Debug(c.logger).Log("msg", "Processing sample", "sample", sample) - lastProcessed.Set(float64(time.Now().UnixNano()) / 1e9) - c.sampleCh <- &sample -} - -func (c *graphiteCollector) processSamples() { - ticker := time.NewTicker(time.Minute).C - - for { - select { - case sample, ok := <-c.sampleCh: - if sample == nil || !ok { - return - } - c.mu.Lock() - c.samples[sample.OriginalName] = sample - c.mu.Unlock() - case <-ticker: - // Garbage collect expired samples. - ageLimit := time.Now().Add(-*sampleExpiry) - c.mu.Lock() - for k, sample := range c.samples { - if ageLimit.After(sample.Timestamp) { - delete(c.samples, k) - } - } - c.mu.Unlock() - } - } -} - -// Collect implements prometheus.Collector. -func (c graphiteCollector) Collect(ch chan<- prometheus.Metric) { - ch <- lastProcessed - - c.mu.Lock() - samples := make([]*graphiteSample, 0, len(c.samples)) - for _, sample := range c.samples { - samples = append(samples, sample) - } - c.mu.Unlock() - - ageLimit := time.Now().Add(-*sampleExpiry) - for _, sample := range samples { - if ageLimit.After(sample.Timestamp) { - continue - } - ch <- prometheus.MustNewConstMetric( - prometheus.NewDesc(sample.Name, sample.Help, []string{}, sample.Labels), - sample.Type, - sample.Value, - ) - } -} - -// Describe implements prometheus.Collector. -func (c graphiteCollector) Describe(ch chan<- *prometheus.Desc) { - ch <- lastProcessed.Desc() -} - func init() { prometheus.MustRegister(version.NewCollector("graphite_exporter")) } @@ -235,16 +80,20 @@ func dumpFSM(mapper *mapper.MetricMapper, dumpFilename string, logger log.Logger if mapper.FSM == nil { return fmt.Errorf("no FSM available to be dumped, possibly because the mapping contains regex patterns") } + f, err := os.Create(dumpFilename) if err != nil { return err } + level.Info(logger).Log("msg", "Start dumping FSM", "to", dumpFilename) + w := bufio.NewWriter(f) mapper.FSM.DumpFSM(w) w.Flush() f.Close() level.Info(logger).Log("msg", "Finish dumping FSM") + return nil } @@ -254,33 +103,37 @@ func main() { kingpin.Version(version.Print("graphite_exporter")) kingpin.HelpFlag.Short('h') kingpin.Parse() + logger := promlog.New(promlogConfig) prometheus.MustRegister(sampleExpiryMetric) + prometheus.MustRegister(tagErrors) + prometheus.MustRegister(invalidMetrics) sampleExpiryMetric.Set(sampleExpiry.Seconds()) level.Info(logger).Log("msg", "Starting graphite_exporter", "version_info", version.Info()) level.Info(logger).Log("build_context", version.BuildContext()) http.Handle(*metricsPath, promhttp.Handler()) - c := newGraphiteCollector(logger) + + c := collector.NewGraphiteCollector(logger, *strictMatch, *sampleExpiry, tagErrors, lastProcessed, sampleExpiryMetric, invalidMetrics) prometheus.MustRegister(c) - c.mapper = &mapper.MetricMapper{} + c.Mapper = &mapper.MetricMapper{} cacheOption := mapper.WithCacheType(*cacheType) if *mappingConfig != "" { - err := c.mapper.InitFromFile(*mappingConfig, *cacheSize, cacheOption) + err := c.Mapper.InitFromFile(*mappingConfig, *cacheSize, cacheOption) if err != nil { level.Error(logger).Log("msg", "Error loading metric mapping config", "err", err) os.Exit(1) } } else { - c.mapper.InitCache(*cacheSize, cacheOption) + c.Mapper.InitCache(*cacheSize, cacheOption) } if *dumpFSMPath != "" { - err := dumpFSM(c.mapper.(*mapper.MetricMapper), *dumpFSMPath, logger) + err := dumpFSM(c.Mapper.(*mapper.MetricMapper), *dumpFSMPath, logger) if err != nil { level.Error(logger).Log("msg", "Error dumping FSM", "err", err) os.Exit(1) @@ -292,6 +145,7 @@ func main() { level.Error(logger).Log("msg", "Error binding to TCP socket", "err", err) os.Exit(1) } + go func() { for { conn, err := tcpSock.Accept() @@ -299,9 +153,10 @@ func main() { level.Error(logger).Log("msg", "Error accepting TCP connection", "err", err) continue } + go func() { defer conn.Close() - c.processReader(conn) + c.ProcessReader(conn) }() } }() @@ -311,21 +166,26 @@ func main() { level.Error(logger).Log("msg", "Error resolving UDP address", "err", err) os.Exit(1) } + udpSock, err := net.ListenUDP("udp", udpAddress) if err != nil { level.Error(logger).Log("msg", "Error listening to UDP address", "err", err) os.Exit(1) } + go func() { defer udpSock.Close() + for { buf := make([]byte, 65536) + chars, srcAddress, err := udpSock.ReadFromUDP(buf) if err != nil { level.Error(logger).Log("msg", "Error reading UDP packet", "from", srcAddress, "err", err) continue } - go c.processReader(bytes.NewReader(buf[0:chars])) + + go c.ProcessReader(bytes.NewReader(buf[0:chars])) } }() diff --git a/main_test.go b/main_test.go index 0f39e45..673df83 100644 --- a/main_test.go +++ b/main_test.go @@ -16,11 +16,15 @@ package main import ( "strings" "testing" + "time" "github.com/go-kit/kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/statsd_exporter/pkg/mapper" "github.com/stretchr/testify/assert" + + "github.com/prometheus/graphite_exporter/pkg/collector" + "github.com/prometheus/graphite_exporter/pkg/line" ) type mockMapper struct { @@ -31,11 +35,9 @@ type mockMapper struct { } func (m *mockMapper) GetMapping(metricName string, metricType mapper.MetricType) (*mapper.MetricMapping, prometheus.Labels, bool) { - mapping := mapper.MetricMapping{Name: m.name, Action: m.action} return &mapping, m.labels, m.present - } func (m *mockMapper) InitFromFile(string, int, ...mapper.CacheOption) error { @@ -44,109 +46,144 @@ func (m *mockMapper) InitFromFile(string, int, ...mapper.CacheOption) error { func (m *mockMapper) InitCache(int, ...mapper.CacheOption) { } -func TestProcessLine(t *testing.T) { +func TestProcessLine(t *testing.T) { type testCase struct { - line string - name string - labels map[string]string - value float64 - present bool - willFail bool - action mapper.ActionType - strict bool + line string + name string + mappingLabels prometheus.Labels + parsedLabels prometheus.Labels + value float64 + mappingPresent bool + willFail bool + action mapper.ActionType + strict bool } testCases := []testCase{ { line: "my.simple.metric 9001 1534620625", name: "my_simple_metric", - labels: map[string]string{ + mappingLabels: prometheus.Labels{ "foo": "bar", "zip": "zot", "name": "alabel", }, - present: true, - value: float64(9001), + mappingPresent: true, + value: float64(9001), }, { line: "my.simple.metric.baz 9002 1534620625", name: "my_simple_metric", - labels: map[string]string{ + mappingLabels: prometheus.Labels{ "baz": "bat", }, - present: true, - value: float64(9002), + mappingPresent: true, + value: float64(9002), }, { - line: "my.nomap.metric 9001 1534620625", - name: "my_nomap_metric", - value: float64(9001), - present: false, + line: "my.nomap.metric 9001 1534620625", + name: "my_nomap_metric", + value: float64(9001), + parsedLabels: prometheus.Labels{}, + mappingPresent: false, }, { - line: "my.nomap.metric.novalue 9001 ", - name: "my_nomap_metric_novalue", - labels: nil, - value: float64(9001), - willFail: true, + line: "my.nomap.metric.novalue 9001 ", + name: "my_nomap_metric_novalue", + mappingLabels: prometheus.Labels{}, + value: float64(9001), + willFail: true, }, { - line: "my.mapped.metric.drop 55 1534620625", - name: "my_mapped_metric_drop", - present: true, - willFail: true, - action: mapper.ActionTypeDrop, + line: "my.mapped.metric.drop 55 1534620625", + name: "my_mapped_metric_drop", + mappingPresent: true, + willFail: true, + action: mapper.ActionTypeDrop, }, { - line: "my.mapped.strict.metric 55 1534620625", - name: "my_mapped_strict_metric", - value: float64(55), - present: true, - willFail: false, - strict: true, + line: "my.mapped.strict.metric 55 1534620625", + name: "my_mapped_strict_metric", + value: float64(55), + mappingLabels: prometheus.Labels{}, + mappingPresent: true, + willFail: false, + strict: true, }, { - line: "my.mapped.strict.metric.drop 55 1534620625", - name: "my_mapped_strict_metric_drop", - present: false, + line: "my.mapped.strict.metric.drop 55 1534620625", + name: "my_mapped_strict_metric_drop", + mappingPresent: false, + willFail: true, + strict: true, + }, + { + line: "my.simple.metric.with.tags;tag1=value1;tag2=value2 9002 1534620625", + name: "my_simple_metric_with_tags", + parsedLabels: prometheus.Labels{ + "tag1": "value1", + "tag2": "value2", + }, + mappingPresent: false, + value: float64(9002), + }, + { + // same tags, different values, should parse + line: "my.simple.metric.with.tags;tag1=value3;tag2=value4 9002 1534620625", + name: "my_simple_metric_with_tags", + parsedLabels: prometheus.Labels{ + "tag1": "value3", + "tag2": "value4", + }, + mappingPresent: false, + value: float64(9002), + }, + { + // new tags other than previously used, should drop + line: "my.simple.metric.with.tags;tag1=value1;tag3=value2 9002 1534620625", + name: "my_simple_metric_with_tags", willFail: true, - strict: true, }, } - c := newGraphiteCollector(log.NewNopLogger()) + // func NewGraphiteCollector(logger log.Logger, strictMatch bool, sampleExpiry time.Duration, tagErrors prometheus.Counter, lastProcessed prometheus.Gauge, sampleExpiryMetric prometheus.Gauge, invalidMetrics prometheus.Counter) *graphiteCollector { + c := collector.NewGraphiteCollector(log.NewNopLogger(), false, 5*time.Minute, tagErrors, lastProcessed, sampleExpiryMetric, invalidMetrics) for _, testCase := range testCases { - - if testCase.present { - c.mapper = &mockMapper{ + if testCase.mappingPresent { + c.Mapper = &mockMapper{ name: testCase.name, - labels: testCase.labels, + labels: testCase.mappingLabels, action: testCase.action, - present: testCase.present, + present: testCase.mappingPresent, } } else { - c.mapper = &mockMapper{ - present: testCase.present, + c.Mapper = &mockMapper{ + present: testCase.mappingPresent, } } - c.strictMatch = testCase.strict - c.processLine(testCase.line) - + c.StrictMatch = testCase.strict + line.ProcessLine(testCase.line, c.Mapper, c.SampleCh, c.StrictMatch, tagErrors, lastProcessed, invalidMetrics, c.Logger) } - c.sampleCh <- nil + c.SampleCh <- nil + for _, k := range testCases { originalName := strings.Split(k.line, " ")[0] - sample := c.samples[originalName] + sample := c.Samples[originalName] + if k.willFail { assert.Nil(t, sample, "Found %s", k.name) } else { if assert.NotNil(t, sample, "Missing %s", k.name) { assert.Equal(t, k.name, sample.Name) - assert.Equal(t, k.labels, sample.Labels) + if k.mappingPresent { + assert.Equal(t, k.mappingLabels, sample.Labels) + } else { + assert.Equal(t, k.parsedLabels, sample.Labels) + } assert.Equal(t, k.value, sample.Value) } } diff --git a/pkg/collector/collector.go b/pkg/collector/collector.go new file mode 100644 index 0000000..1ee6725 --- /dev/null +++ b/pkg/collector/collector.go @@ -0,0 +1,142 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "bufio" + "io" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus/graphite_exporter/pkg/graphitesample" + "github.com/prometheus/graphite_exporter/pkg/line" + "github.com/prometheus/graphite_exporter/pkg/metricmapper" +) + +type graphiteCollector struct { + Mapper metricmapper.MetricMapper + Samples map[string]*graphitesample.GraphiteSample + mu *sync.Mutex + SampleCh chan *graphitesample.GraphiteSample + lineCh chan string + StrictMatch bool + sampleExpiry time.Duration + Logger log.Logger + tagErrors prometheus.Counter + lastProcessed prometheus.Gauge + sampleExpiryMetric prometheus.Gauge + invalidMetrics prometheus.Counter +} + +func NewGraphiteCollector(logger log.Logger, strictMatch bool, sampleExpiry time.Duration, tagErrors prometheus.Counter, lastProcessed prometheus.Gauge, sampleExpiryMetric prometheus.Gauge, invalidMetrics prometheus.Counter) *graphiteCollector { + c := &graphiteCollector{ + SampleCh: make(chan *graphitesample.GraphiteSample), + lineCh: make(chan string), + mu: &sync.Mutex{}, + Samples: map[string]*graphitesample.GraphiteSample{}, + StrictMatch: strictMatch, + sampleExpiry: sampleExpiry, + Logger: logger, + tagErrors: tagErrors, + lastProcessed: lastProcessed, + sampleExpiryMetric: sampleExpiryMetric, + invalidMetrics: invalidMetrics, + } + + go c.processSamples() + go c.processLines() + + return c +} + +func (c *graphiteCollector) ProcessReader(reader io.Reader) { + lineScanner := bufio.NewScanner(reader) + + for { + if ok := lineScanner.Scan(); !ok { + break + } + c.lineCh <- lineScanner.Text() + } +} + +func (c *graphiteCollector) processLines() { + for l := range c.lineCh { + line.ProcessLine(l, c.Mapper, c.SampleCh, c.StrictMatch, c.tagErrors, c.lastProcessed, c.invalidMetrics, c.Logger) + } +} + +func (c *graphiteCollector) processSamples() { + ticker := time.NewTicker(time.Minute).C + + for { + select { + case sample, ok := <-c.SampleCh: + if sample == nil || !ok { + return + } + + c.mu.Lock() + c.Samples[sample.OriginalName] = sample + c.mu.Unlock() + case <-ticker: + // Garbage collect expired Samples. + ageLimit := time.Now().Add(-c.sampleExpiry) + + c.mu.Lock() + for k, sample := range c.Samples { + if ageLimit.After(sample.Timestamp) { + delete(c.Samples, k) + } + } + c.mu.Unlock() + } + } +} + +// Collect implements prometheus.Collector. +func (c graphiteCollector) Collect(ch chan<- prometheus.Metric) { + ch <- c.lastProcessed + + c.mu.Lock() + level.Debug(c.Logger).Log("msg", "Samples length", "len", len(c.Samples)) + samples := make([]*graphitesample.GraphiteSample, 0, len(c.Samples)) + + for _, sample := range c.Samples { + samples = append(samples, sample) + } + c.mu.Unlock() + + ageLimit := time.Now().Add(-c.sampleExpiry) + + for _, sample := range samples { + if ageLimit.After(sample.Timestamp) { + continue + } + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(sample.Name, sample.Help, []string{}, sample.Labels), + sample.Type, + sample.Value, + ) + } +} + +// Describe implements prometheus.Collector. +func (c graphiteCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.lastProcessed.Desc() +} diff --git a/pkg/graphitesample/graphitesample.go b/pkg/graphitesample/graphitesample.go new file mode 100644 index 0000000..7f1d1d1 --- /dev/null +++ b/pkg/graphitesample/graphitesample.go @@ -0,0 +1,36 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package graphitesample + +import ( + "fmt" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +// GraphiteSample represents a graphite metric sample +type GraphiteSample struct { + OriginalName string + Name string + Labels prometheus.Labels + Help string + Value float64 + Type prometheus.ValueType + Timestamp time.Time +} + +func (s GraphiteSample) String() string { + return fmt.Sprintf("%#v", s) +} diff --git a/pkg/line/line.go b/pkg/line/line.go new file mode 100644 index 0000000..93c46c2 --- /dev/null +++ b/pkg/line/line.go @@ -0,0 +1,181 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package line + +import ( + "fmt" + "math" + "regexp" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/statsd_exporter/pkg/mapper" + + "github.com/prometheus/graphite_exporter/pkg/graphitesample" + "github.com/prometheus/graphite_exporter/pkg/metricmapper" +) + +var ( + invalidMetricChars = regexp.MustCompile("[^a-zA-Z0-9_:]") + metricNameKeysIndex = newMetricNameAndKeys() +) + +// metricNameAndKeys is a cache of metric names and the label keys previously used +type metricNameAndKeys struct { + mtx sync.Mutex + cache map[string]string +} + +func newMetricNameAndKeys() *metricNameAndKeys { + x := metricNameAndKeys{ + cache: make(map[string]string), + } + + return &x +} + +func keysFromLabels(labels prometheus.Labels) string { + labelKeys := make([]string, len(labels)) + for k := range labels { + labelKeys = append(labelKeys, k) + } + + sort.Strings(labelKeys) + + return strings.Join(labelKeys, ",") +} + +// checkNameAndKeys returns true if metric has the same label keys or is new, false if not +func (c *metricNameAndKeys) checkNameAndKeys(name string, labels prometheus.Labels) bool { + c.mtx.Lock() + defer c.mtx.Unlock() + + providedKeys := keysFromLabels(labels) + + if keys, found := c.cache[name]; found { + return keys == providedKeys + } + + c.cache[name] = providedKeys + + return true +} + +func parseMetricNameAndTags(name string, labels prometheus.Labels, tagErrors prometheus.Counter) (string, error) { + if strings.ContainsRune(name, ';') { + // name contains tags - parse tags and add to labels + if strings.Count(name, ";") != strings.Count(name, "=") { + tagErrors.Inc() + return name, fmt.Errorf("error parsing tags on %s", name) + } + + parts := strings.Split(name, ";") + parsedName := parts[0] + tags := parts[1:] + + for _, tag := range tags { + kv := strings.SplitN(tag, "=", 2) + if len(kv) != 2 { + // we may have added bad labels already... + tagErrors.Inc() + return name, fmt.Errorf("error parsing tags on %s", name) + } + + k := kv[0] + v := kv[1] + labels[k] = v + } + + return parsedName, nil + } + + return name, nil +} + +// ProcessLine takes a graphite metric line as a string, processes it into a GraphiteSample, and sends it to the sample channel +func ProcessLine(line string, metricmapper metricmapper.MetricMapper, sampleCh chan<- *graphitesample.GraphiteSample, strictMatch bool, tagErrors prometheus.Counter, lastProcessed prometheus.Gauge, invalidMetrics prometheus.Counter, logger log.Logger) { + line = strings.TrimSpace(line) + level.Debug(logger).Log("msg", "Incoming line", "line", line) + parts := strings.Split(line, " ") + + if len(parts) != 3 { + level.Info(logger).Log("msg", "Invalid part count", "parts", len(parts), "line", line) + return + } + + originalName := parts[0] + + labels := make(prometheus.Labels) + name, err := parseMetricNameAndTags(originalName, labels, tagErrors) + if err != nil { + level.Info(logger).Log("msg", "Invalid tags", "line", line) + return + } + + // check to ensure the same tags are present + if validKeys := metricNameKeysIndex.checkNameAndKeys(name, labels); !validKeys { + level.Info(logger).Log("msg", "Dropped because metric keys do not match previously used keys", "line", line) + invalidMetrics.Inc() + + return + } + + mapping, mlabels, mappingPresent := metricmapper.GetMapping(name, mapper.MetricTypeGauge) + + if (mappingPresent && mapping.Action == mapper.ActionTypeDrop) || (!mappingPresent && strictMatch) { + return + } + + if mappingPresent { + name = invalidMetricChars.ReplaceAllString(mapping.Name, "_") + + // append labels from the mapping to those parsed, with mapping labels overriding + for k, v := range mlabels { + labels[k] = v + } + } else { + name = invalidMetricChars.ReplaceAllString(name, "_") + } + + value, err := strconv.ParseFloat(parts[1], 64) + if err != nil { + level.Info(logger).Log("msg", "Invalid value", "line", line) + return + } + + timestamp, err := strconv.ParseFloat(parts[2], 64) + if err != nil { + level.Info(logger).Log("msg", "Invalid timestamp", "line", line) + return + } + + sample := graphitesample.GraphiteSample{ + OriginalName: originalName, + Name: name, + Value: value, + Labels: labels, + Type: prometheus.GaugeValue, + Help: fmt.Sprintf("Graphite metric %s", name), + Timestamp: time.Unix(int64(timestamp), int64(math.Mod(timestamp, 1.0)*1e9)), + } + level.Debug(logger).Log("msg", "Processing sample", "sample", sample) + lastProcessed.Set(float64(time.Now().UnixNano()) / 1e9) + sampleCh <- &sample +} diff --git a/pkg/line/line_test.go b/pkg/line/line_test.go new file mode 100644 index 0000000..6299b22 --- /dev/null +++ b/pkg/line/line_test.go @@ -0,0 +1,66 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package line + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" +) + +func TestParseNameAndTags(t *testing.T) { + type testCase struct { + line string + parsedName string + labels prometheus.Labels + willFail bool + } + + testCases := []testCase{ + { + line: "my_simple_metric_with_tags;tag1=value1;tag2=value2", + parsedName: "my_simple_metric_with_tags", + labels: prometheus.Labels{ + "tag1": "value1", + "tag2": "value2", + }, + }, + { + line: "my_simple_metric_with_bad_tags;tag1=value1;tag2", + parsedName: "my_simple_metric_with_bad_tags;tag1=value1;tag2", + labels: prometheus.Labels{}, + willFail: true, + }, + } + + tagErrorsTest := prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "graphite_tag_parse_errors_test", + Help: "Total count of samples with invalid tags", + }, + ) + + for _, testCase := range testCases { + labels := prometheus.Labels{} + n, err := parseMetricNameAndTags(testCase.line, labels, tagErrorsTest) + + if !testCase.willFail { + assert.NoError(t, err, "Got unexpected error parsing %s", testCase.line) + } + + assert.Equal(t, testCase.parsedName, n) + assert.Equal(t, testCase.labels, labels) + } +} diff --git a/pkg/metricmapper/mapper.go b/pkg/metricmapper/mapper.go new file mode 100644 index 0000000..d5ff268 --- /dev/null +++ b/pkg/metricmapper/mapper.go @@ -0,0 +1,26 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metricmapper + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/statsd_exporter/pkg/mapper" +) + +// MetricMapper is an interface for mapper methods +type MetricMapper interface { + GetMapping(string, mapper.MetricType) (*mapper.MetricMapping, prometheus.Labels, bool) + InitFromFile(string, int, ...mapper.CacheOption) error + InitCache(int, ...mapper.CacheOption) +}