From 9328f9acd1efea73ba720d6122a4566495afe1b9 Mon Sep 17 00:00:00 2001 From: glightfoot Date: Thu, 18 Jun 2020 16:30:37 -0400 Subject: [PATCH 1/4] initial support for tagged graphite metrics Signed-off-by: glightfoot --- main.go | 118 ++++++++++++++++++++++++++++++++++++-- main_test.go | 156 +++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 226 insertions(+), 48 deletions(-) diff --git a/main.go b/main.go index a324e2a..e20c1c0 100644 --- a/main.go +++ b/main.go @@ -24,6 +24,7 @@ import ( _ "net/http/pprof" "os" "regexp" + "sort" "strconv" "strings" "sync" @@ -63,13 +64,60 @@ var ( Help: "How long in seconds a metric sample is valid for.", }, ) + tagParseFailures = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "graphite_tag_parse_failures", + Help: "Total count of samples with invalid tags", + }) + invalidMetrics = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "graphite_invalid_metrics", + Help: "Total count of metrics dropped due to mismatched label keys", + }) invalidMetricChars = regexp.MustCompile("[^a-zA-Z0-9_:]") + + metricNameKeysIndex = newMetricNameAndKeys() ) +// metricNameAndKeys is a cache of metric names and the label keys previously used +type metricNameAndKeys struct { + mtx sync.Mutex + cache map[string]string +} + +func newMetricNameAndKeys() *metricNameAndKeys { + x := metricNameAndKeys{ + cache: make(map[string]string), + } + return &x +} + +func keysFromLabels(labels prometheus.Labels) string { + labelKeys := make([]string, len(labels)) + for k, _ := range labels { + labelKeys = append(labelKeys, k) + } + sort.Strings(labelKeys) + return strings.Join(labelKeys, ",") +} + +// checkNameAndKeys returns true if metric has the same label keys or is new, false if not +func (c *metricNameAndKeys) checkNameAndKeys(name string, labels prometheus.Labels) bool { + c.mtx.Lock() + defer c.mtx.Unlock() + providedKeys := keysFromLabels(labels) + if keys, found := c.cache[name]; found { + return keys == providedKeys + } + + c.cache[name] = providedKeys + return true +} + type graphiteSample struct { OriginalName string Name string - Labels map[string]string + Labels prometheus.Labels Help string Value float64 Type prometheus.ValueType @@ -126,6 +174,37 @@ func (c *graphiteCollector) processLines() { } } +func parseMetricNameAndTags(name string, labels prometheus.Labels) (string, error) { + if strings.ContainsRune(name, ';') { + // name contains tags - parse tags and add to labels + if strings.Count(name, ";") != strings.Count(name, "=") { + tagParseFailures.Inc() + return name, fmt.Errorf("error parsing tags on %s", name) + } + + parts := strings.Split(name, ";") + parsedName := parts[0] + tags := parts[1:] + + for _, tag := range tags { + kv := strings.SplitN(tag, "=", 2) + if len(kv) != 2 { + // we may have added bad labels already... + tagParseFailures.Inc() + return name, fmt.Errorf("error parsing tags on %s", name) + } + + k := kv[0] + v := kv[1] + labels[k] = v + } + + return parsedName, nil + } + + return name, nil +} + func (c *graphiteCollector) processLine(line string) { line = strings.TrimSpace(line) level.Debug(c.logger).Log("msg", "Incoming line", "line", line) @@ -136,16 +215,42 @@ func (c *graphiteCollector) processLine(line string) { } originalName := parts[0] var name string - mapping, labels, present := c.mapper.GetMapping(originalName, mapper.MetricTypeGauge) + var err error + mapping, labels, mappingPresent := c.mapper.GetMapping(originalName, mapper.MetricTypeGauge) - if (present && mapping.Action == mapper.ActionTypeDrop) || (!present && c.strictMatch) { + if (mappingPresent && mapping.Action == mapper.ActionTypeDrop) || (!mappingPresent && c.strictMatch) { return } - if present { + if mappingPresent { + parsedLabels := make(prometheus.Labels) + _, err = parseMetricNameAndTags(originalName, parsedLabels) + if err != nil { + level.Info(c.logger).Log("msg", "Invalid tags", "line", line) + return + } + name = invalidMetricChars.ReplaceAllString(mapping.Name, "_") + // check to ensure the same tags are present + if validKeys := metricNameKeysIndex.checkNameAndKeys(name, parsedLabels); !validKeys { + level.Info(c.logger).Log("msg", "Dropped because metric keys do not match previously used keys", "line", line) + invalidMetrics.Inc() + return + } } else { - name = invalidMetricChars.ReplaceAllString(originalName, "_") + labels = make(prometheus.Labels) + name, err = parseMetricNameAndTags(originalName, labels) + if err != nil { + level.Info(c.logger).Log("msg", "Invalid tags", "line", line) + return + } + name = invalidMetricChars.ReplaceAllString(name, "_") + // check to ensure the same tags are present + if validKeys := metricNameKeysIndex.checkNameAndKeys(name, labels); !validKeys { + level.Info(c.logger).Log("msg", "Dropped because metric keys do not match previously used keys", "line", line) + invalidMetrics.Inc() + return + } } value, err := strconv.ParseFloat(parts[1], 64) @@ -158,6 +263,7 @@ func (c *graphiteCollector) processLine(line string) { level.Info(c.logger).Log("msg", "Invalid timestamp", "line", line) return } + sample := graphiteSample{ OriginalName: originalName, Name: name, @@ -257,6 +363,8 @@ func main() { logger := promlog.New(promlogConfig) prometheus.MustRegister(sampleExpiryMetric) + prometheus.MustRegister(tagParseFailures) + prometheus.MustRegister(invalidMetrics) sampleExpiryMetric.Set(sampleExpiry.Seconds()) level.Info(logger).Log("msg", "Starting graphite_exporter", "version_info", version.Info()) diff --git a/main_test.go b/main_test.go index 0f39e45..4d4e909 100644 --- a/main_test.go +++ b/main_test.go @@ -44,74 +44,140 @@ func (m *mockMapper) InitFromFile(string, int, ...mapper.CacheOption) error { func (m *mockMapper) InitCache(int, ...mapper.CacheOption) { } + +func TestParseNameAndTags(t *testing.T) { + type testCase struct { + line string + parsedName string + labels prometheus.Labels + willFail bool + } + + testCases := []testCase{ + { + line: "my_simple_metric_with_tags;tag1=value1;tag2=value2", + parsedName: "my_simple_metric_with_tags", + labels: prometheus.Labels{ + "tag1": "value1", + "tag2": "value2", + }, + }, + { + line: "my_simple_metric_with_bad_tags;tag1=value1;tag2", + parsedName: "my_simple_metric_with_bad_tags;tag1=value1;tag2", + labels: prometheus.Labels{}, + willFail: true, + }, + } + + for _, testCase := range testCases { + labels := prometheus.Labels{} + n, err := parseMetricNameAndTags(testCase.line, labels) + if !testCase.willFail { + assert.NoError(t, err, "Got unexpected error parsing %s", testCase.line) + } + assert.Equal(t, testCase.parsedName, n) + assert.Equal(t, testCase.labels, labels) + } +} + func TestProcessLine(t *testing.T) { type testCase struct { - line string - name string - labels map[string]string - value float64 - present bool - willFail bool - action mapper.ActionType - strict bool + line string + name string + mappingLabels prometheus.Labels + parsedLabels prometheus.Labels + value float64 + mappingPresent bool + willFail bool + action mapper.ActionType + strict bool } testCases := []testCase{ { line: "my.simple.metric 9001 1534620625", name: "my_simple_metric", - labels: map[string]string{ + mappingLabels: prometheus.Labels{ "foo": "bar", "zip": "zot", "name": "alabel", }, - present: true, - value: float64(9001), + mappingPresent: true, + value: float64(9001), }, { line: "my.simple.metric.baz 9002 1534620625", name: "my_simple_metric", - labels: map[string]string{ + mappingLabels: prometheus.Labels{ "baz": "bat", }, - present: true, - value: float64(9002), + mappingPresent: true, + value: float64(9002), }, { - line: "my.nomap.metric 9001 1534620625", - name: "my_nomap_metric", - value: float64(9001), - present: false, + line: "my.nomap.metric 9001 1534620625", + name: "my_nomap_metric", + value: float64(9001), + parsedLabels: prometheus.Labels{}, + mappingPresent: false, }, { - line: "my.nomap.metric.novalue 9001 ", - name: "my_nomap_metric_novalue", - labels: nil, - value: float64(9001), - willFail: true, + line: "my.nomap.metric.novalue 9001 ", + name: "my_nomap_metric_novalue", + mappingLabels: nil, + value: float64(9001), + willFail: true, }, { - line: "my.mapped.metric.drop 55 1534620625", - name: "my_mapped_metric_drop", - present: true, - willFail: true, - action: mapper.ActionTypeDrop, + line: "my.mapped.metric.drop 55 1534620625", + name: "my_mapped_metric_drop", + mappingPresent: true, + willFail: true, + action: mapper.ActionTypeDrop, + }, + { + line: "my.mapped.strict.metric 55 1534620625", + name: "my_mapped_strict_metric", + value: float64(55), + mappingPresent: true, + willFail: false, + strict: true, }, { - line: "my.mapped.strict.metric 55 1534620625", - name: "my_mapped_strict_metric", - value: float64(55), - present: true, - willFail: false, - strict: true, + line: "my.mapped.strict.metric.drop 55 1534620625", + name: "my_mapped_strict_metric_drop", + mappingPresent: false, + willFail: true, + strict: true, + }, + { + line: "my.simple.metric.with.tags;tag1=value1;tag2=value2 9002 1534620625", + name: "my_simple_metric_with_tags", + parsedLabels: prometheus.Labels{ + "tag1": "value1", + "tag2": "value2", + }, + mappingPresent: false, + value: float64(9002), + }, + { + // same tags, different values, should parse + line: "my.simple.metric.with.tags;tag1=value3;tag2=value4 9002 1534620625", + name: "my_simple_metric_with_tags", + parsedLabels: prometheus.Labels{ + "tag1": "value3", + "tag2": "value4", + }, + mappingPresent: false, + value: float64(9002), }, { - line: "my.mapped.strict.metric.drop 55 1534620625", - name: "my_mapped_strict_metric_drop", - present: false, + // new tags other than previously used, should drop + line: "my.simple.metric.with.tags;tag1=value1;tag3=value2 9002 1534620625", + name: "my_simple_metric_with_tags", willFail: true, - strict: true, }, } @@ -119,16 +185,16 @@ func TestProcessLine(t *testing.T) { for _, testCase := range testCases { - if testCase.present { + if testCase.mappingPresent { c.mapper = &mockMapper{ name: testCase.name, - labels: testCase.labels, + labels: testCase.mappingLabels, action: testCase.action, - present: testCase.present, + present: testCase.mappingPresent, } } else { c.mapper = &mockMapper{ - present: testCase.present, + present: testCase.mappingPresent, } } @@ -146,7 +212,11 @@ func TestProcessLine(t *testing.T) { } else { if assert.NotNil(t, sample, "Missing %s", k.name) { assert.Equal(t, k.name, sample.Name) - assert.Equal(t, k.labels, sample.Labels) + if k.mappingPresent { + assert.Equal(t, k.mappingLabels, sample.Labels) + } else { + assert.Equal(t, k.parsedLabels, sample.Labels) + } assert.Equal(t, k.value, sample.Value) } } From d17a56300ad87ca7ab19b94b04d566271015439b Mon Sep 17 00:00:00 2001 From: glightfoot Date: Fri, 19 Jun 2020 15:42:34 -0400 Subject: [PATCH 2/4] break out into packages Signed-off-by: glightfoot --- .golangci.yml | 5 +- main.go | 219 +++++---------------------- main_test.go | 47 +----- pkg/graphitesample/graphitesample.go | 36 +++++ pkg/line/line.go | 190 +++++++++++++++++++++++ pkg/line/line_test.go | 66 ++++++++ pkg/metricmapper/mapper.go | 26 ++++ 7 files changed, 366 insertions(+), 223 deletions(-) create mode 100644 pkg/graphitesample/graphitesample.go create mode 100644 pkg/line/line.go create mode 100644 pkg/line/line_test.go create mode 100644 pkg/metricmapper/mapper.go diff --git a/.golangci.yml b/.golangci.yml index fd975b2..761908b 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,8 +1,11 @@ run: modules-download-mode: vendor - + skip-dirs: + - vendor + - e2e # Run only staticcheck for now. Additional linters will be enabled one-by-one. linters: enable: - staticcheck + - goimports disable-all: true diff --git a/main.go b/main.go index e20c1c0..c198e77 100644 --- a/main.go +++ b/main.go @@ -18,15 +18,10 @@ import ( "bytes" "fmt" "io" - "math" "net" "net/http" _ "net/http/pprof" "os" - "regexp" - "sort" - "strconv" - "strings" "sync" "time" @@ -39,6 +34,10 @@ import ( "github.com/prometheus/common/version" "github.com/prometheus/statsd_exporter/pkg/mapper" "gopkg.in/alecthomas/kingpin.v2" + + "github.com/prometheus/graphite_exporter/pkg/graphitesample" + "github.com/prometheus/graphite_exporter/pkg/line" + "github.com/prometheus/graphite_exporter/pkg/metricmapper" ) var ( @@ -64,81 +63,25 @@ var ( Help: "How long in seconds a metric sample is valid for.", }, ) - tagParseFailures = prometheus.NewCounter( + tagErrors = prometheus.NewCounter( prometheus.CounterOpts{ - Name: "graphite_tag_parse_failures", + Name: "graphite_tag_parse_errors", Help: "Total count of samples with invalid tags", - }) + }, + ) invalidMetrics = prometheus.NewCounter( prometheus.CounterOpts{ Name: "graphite_invalid_metrics", Help: "Total count of metrics dropped due to mismatched label keys", - }) - invalidMetricChars = regexp.MustCompile("[^a-zA-Z0-9_:]") - - metricNameKeysIndex = newMetricNameAndKeys() + }, + ) ) -// metricNameAndKeys is a cache of metric names and the label keys previously used -type metricNameAndKeys struct { - mtx sync.Mutex - cache map[string]string -} - -func newMetricNameAndKeys() *metricNameAndKeys { - x := metricNameAndKeys{ - cache: make(map[string]string), - } - return &x -} - -func keysFromLabels(labels prometheus.Labels) string { - labelKeys := make([]string, len(labels)) - for k, _ := range labels { - labelKeys = append(labelKeys, k) - } - sort.Strings(labelKeys) - return strings.Join(labelKeys, ",") -} - -// checkNameAndKeys returns true if metric has the same label keys or is new, false if not -func (c *metricNameAndKeys) checkNameAndKeys(name string, labels prometheus.Labels) bool { - c.mtx.Lock() - defer c.mtx.Unlock() - providedKeys := keysFromLabels(labels) - if keys, found := c.cache[name]; found { - return keys == providedKeys - } - - c.cache[name] = providedKeys - return true -} - -type graphiteSample struct { - OriginalName string - Name string - Labels prometheus.Labels - Help string - Value float64 - Type prometheus.ValueType - Timestamp time.Time -} - -func (s graphiteSample) String() string { - return fmt.Sprintf("%#v", s) -} - -type metricMapper interface { - GetMapping(string, mapper.MetricType) (*mapper.MetricMapping, prometheus.Labels, bool) - InitFromFile(string, int, ...mapper.CacheOption) error - InitCache(int, ...mapper.CacheOption) -} - type graphiteCollector struct { - samples map[string]*graphiteSample + samples map[string]*graphitesample.GraphiteSample mu *sync.Mutex - mapper metricMapper - sampleCh chan *graphiteSample + mapper metricmapper.MetricMapper + sampleCh chan *graphitesample.GraphiteSample lineCh chan string strictMatch bool logger log.Logger @@ -146,20 +89,23 @@ type graphiteCollector struct { func newGraphiteCollector(logger log.Logger) *graphiteCollector { c := &graphiteCollector{ - sampleCh: make(chan *graphiteSample), + sampleCh: make(chan *graphitesample.GraphiteSample), lineCh: make(chan string), mu: &sync.Mutex{}, - samples: map[string]*graphiteSample{}, + samples: map[string]*graphitesample.GraphiteSample{}, strictMatch: *strictMatch, logger: logger, } + go c.processSamples() go c.processLines() + return c } func (c *graphiteCollector) processReader(reader io.Reader) { lineScanner := bufio.NewScanner(reader) + for { if ok := lineScanner.Scan(); !ok { break @@ -169,113 +115,9 @@ func (c *graphiteCollector) processReader(reader io.Reader) { } func (c *graphiteCollector) processLines() { - for line := range c.lineCh { - c.processLine(line) - } -} - -func parseMetricNameAndTags(name string, labels prometheus.Labels) (string, error) { - if strings.ContainsRune(name, ';') { - // name contains tags - parse tags and add to labels - if strings.Count(name, ";") != strings.Count(name, "=") { - tagParseFailures.Inc() - return name, fmt.Errorf("error parsing tags on %s", name) - } - - parts := strings.Split(name, ";") - parsedName := parts[0] - tags := parts[1:] - - for _, tag := range tags { - kv := strings.SplitN(tag, "=", 2) - if len(kv) != 2 { - // we may have added bad labels already... - tagParseFailures.Inc() - return name, fmt.Errorf("error parsing tags on %s", name) - } - - k := kv[0] - v := kv[1] - labels[k] = v - } - - return parsedName, nil + for l := range c.lineCh { + line.ProcessLine(l, c.mapper, c.sampleCh, c.strictMatch, tagErrors, lastProcessed, invalidMetrics, c.logger) } - - return name, nil -} - -func (c *graphiteCollector) processLine(line string) { - line = strings.TrimSpace(line) - level.Debug(c.logger).Log("msg", "Incoming line", "line", line) - parts := strings.Split(line, " ") - if len(parts) != 3 { - level.Info(c.logger).Log("msg", "Invalid part count", "parts", len(parts), "line", line) - return - } - originalName := parts[0] - var name string - var err error - mapping, labels, mappingPresent := c.mapper.GetMapping(originalName, mapper.MetricTypeGauge) - - if (mappingPresent && mapping.Action == mapper.ActionTypeDrop) || (!mappingPresent && c.strictMatch) { - return - } - - if mappingPresent { - parsedLabels := make(prometheus.Labels) - _, err = parseMetricNameAndTags(originalName, parsedLabels) - if err != nil { - level.Info(c.logger).Log("msg", "Invalid tags", "line", line) - return - } - - name = invalidMetricChars.ReplaceAllString(mapping.Name, "_") - // check to ensure the same tags are present - if validKeys := metricNameKeysIndex.checkNameAndKeys(name, parsedLabels); !validKeys { - level.Info(c.logger).Log("msg", "Dropped because metric keys do not match previously used keys", "line", line) - invalidMetrics.Inc() - return - } - } else { - labels = make(prometheus.Labels) - name, err = parseMetricNameAndTags(originalName, labels) - if err != nil { - level.Info(c.logger).Log("msg", "Invalid tags", "line", line) - return - } - name = invalidMetricChars.ReplaceAllString(name, "_") - // check to ensure the same tags are present - if validKeys := metricNameKeysIndex.checkNameAndKeys(name, labels); !validKeys { - level.Info(c.logger).Log("msg", "Dropped because metric keys do not match previously used keys", "line", line) - invalidMetrics.Inc() - return - } - } - - value, err := strconv.ParseFloat(parts[1], 64) - if err != nil { - level.Info(c.logger).Log("msg", "Invalid value", "line", line) - return - } - timestamp, err := strconv.ParseFloat(parts[2], 64) - if err != nil { - level.Info(c.logger).Log("msg", "Invalid timestamp", "line", line) - return - } - - sample := graphiteSample{ - OriginalName: originalName, - Name: name, - Value: value, - Labels: labels, - Type: prometheus.GaugeValue, - Help: fmt.Sprintf("Graphite metric %s", name), - Timestamp: time.Unix(int64(timestamp), int64(math.Mod(timestamp, 1.0)*1e9)), - } - level.Debug(c.logger).Log("msg", "Processing sample", "sample", sample) - lastProcessed.Set(float64(time.Now().UnixNano()) / 1e9) - c.sampleCh <- &sample } func (c *graphiteCollector) processSamples() { @@ -287,12 +129,14 @@ func (c *graphiteCollector) processSamples() { if sample == nil || !ok { return } + c.mu.Lock() c.samples[sample.OriginalName] = sample c.mu.Unlock() case <-ticker: // Garbage collect expired samples. ageLimit := time.Now().Add(-*sampleExpiry) + c.mu.Lock() for k, sample := range c.samples { if ageLimit.After(sample.Timestamp) { @@ -309,13 +153,15 @@ func (c graphiteCollector) Collect(ch chan<- prometheus.Metric) { ch <- lastProcessed c.mu.Lock() - samples := make([]*graphiteSample, 0, len(c.samples)) + samples := make([]*graphitesample.GraphiteSample, 0, len(c.samples)) + for _, sample := range c.samples { samples = append(samples, sample) } c.mu.Unlock() ageLimit := time.Now().Add(-*sampleExpiry) + for _, sample := range samples { if ageLimit.After(sample.Timestamp) { continue @@ -341,16 +187,20 @@ func dumpFSM(mapper *mapper.MetricMapper, dumpFilename string, logger log.Logger if mapper.FSM == nil { return fmt.Errorf("no FSM available to be dumped, possibly because the mapping contains regex patterns") } + f, err := os.Create(dumpFilename) if err != nil { return err } + level.Info(logger).Log("msg", "Start dumping FSM", "to", dumpFilename) + w := bufio.NewWriter(f) mapper.FSM.DumpFSM(w) w.Flush() f.Close() level.Info(logger).Log("msg", "Finish dumping FSM") + return nil } @@ -360,10 +210,11 @@ func main() { kingpin.Version(version.Print("graphite_exporter")) kingpin.HelpFlag.Short('h') kingpin.Parse() + logger := promlog.New(promlogConfig) prometheus.MustRegister(sampleExpiryMetric) - prometheus.MustRegister(tagParseFailures) + prometheus.MustRegister(tagErrors) prometheus.MustRegister(invalidMetrics) sampleExpiryMetric.Set(sampleExpiry.Seconds()) @@ -371,6 +222,7 @@ func main() { level.Info(logger).Log("build_context", version.BuildContext()) http.Handle(*metricsPath, promhttp.Handler()) + c := newGraphiteCollector(logger) prometheus.MustRegister(c) @@ -400,6 +252,7 @@ func main() { level.Error(logger).Log("msg", "Error binding to TCP socket", "err", err) os.Exit(1) } + go func() { for { conn, err := tcpSock.Accept() @@ -407,6 +260,7 @@ func main() { level.Error(logger).Log("msg", "Error accepting TCP connection", "err", err) continue } + go func() { defer conn.Close() c.processReader(conn) @@ -419,20 +273,25 @@ func main() { level.Error(logger).Log("msg", "Error resolving UDP address", "err", err) os.Exit(1) } + udpSock, err := net.ListenUDP("udp", udpAddress) if err != nil { level.Error(logger).Log("msg", "Error listening to UDP address", "err", err) os.Exit(1) } + go func() { defer udpSock.Close() + for { buf := make([]byte, 65536) + chars, srcAddress, err := udpSock.ReadFromUDP(buf) if err != nil { level.Error(logger).Log("msg", "Error reading UDP packet", "from", srcAddress, "err", err) continue } + go c.processReader(bytes.NewReader(buf[0:chars])) } }() diff --git a/main_test.go b/main_test.go index 4d4e909..28fa2fd 100644 --- a/main_test.go +++ b/main_test.go @@ -21,6 +21,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/statsd_exporter/pkg/mapper" "github.com/stretchr/testify/assert" + + "github.com/prometheus/graphite_exporter/pkg/line" ) type mockMapper struct { @@ -31,11 +33,9 @@ type mockMapper struct { } func (m *mockMapper) GetMapping(metricName string, metricType mapper.MetricType) (*mapper.MetricMapping, prometheus.Labels, bool) { - mapping := mapper.MetricMapping{Name: m.name, Action: m.action} return &mapping, m.labels, m.present - } func (m *mockMapper) InitFromFile(string, int, ...mapper.CacheOption) error { @@ -45,44 +45,7 @@ func (m *mockMapper) InitCache(int, ...mapper.CacheOption) { } -func TestParseNameAndTags(t *testing.T) { - type testCase struct { - line string - parsedName string - labels prometheus.Labels - willFail bool - } - - testCases := []testCase{ - { - line: "my_simple_metric_with_tags;tag1=value1;tag2=value2", - parsedName: "my_simple_metric_with_tags", - labels: prometheus.Labels{ - "tag1": "value1", - "tag2": "value2", - }, - }, - { - line: "my_simple_metric_with_bad_tags;tag1=value1;tag2", - parsedName: "my_simple_metric_with_bad_tags;tag1=value1;tag2", - labels: prometheus.Labels{}, - willFail: true, - }, - } - - for _, testCase := range testCases { - labels := prometheus.Labels{} - n, err := parseMetricNameAndTags(testCase.line, labels) - if !testCase.willFail { - assert.NoError(t, err, "Got unexpected error parsing %s", testCase.line) - } - assert.Equal(t, testCase.parsedName, n) - assert.Equal(t, testCase.labels, labels) - } -} - func TestProcessLine(t *testing.T) { - type testCase struct { line string name string @@ -184,7 +147,6 @@ func TestProcessLine(t *testing.T) { c := newGraphiteCollector(log.NewNopLogger()) for _, testCase := range testCases { - if testCase.mappingPresent { c.mapper = &mockMapper{ name: testCase.name, @@ -199,14 +161,15 @@ func TestProcessLine(t *testing.T) { } c.strictMatch = testCase.strict - c.processLine(testCase.line) - + line.ProcessLine(testCase.line, c.mapper, c.sampleCh, c.strictMatch, tagErrors, lastProcessed, invalidMetrics, c.logger) } c.sampleCh <- nil + for _, k := range testCases { originalName := strings.Split(k.line, " ")[0] sample := c.samples[originalName] + if k.willFail { assert.Nil(t, sample, "Found %s", k.name) } else { diff --git a/pkg/graphitesample/graphitesample.go b/pkg/graphitesample/graphitesample.go new file mode 100644 index 0000000..7f1d1d1 --- /dev/null +++ b/pkg/graphitesample/graphitesample.go @@ -0,0 +1,36 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package graphitesample + +import ( + "fmt" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +// GraphiteSample represents a graphite metric sample +type GraphiteSample struct { + OriginalName string + Name string + Labels prometheus.Labels + Help string + Value float64 + Type prometheus.ValueType + Timestamp time.Time +} + +func (s GraphiteSample) String() string { + return fmt.Sprintf("%#v", s) +} diff --git a/pkg/line/line.go b/pkg/line/line.go new file mode 100644 index 0000000..99749d0 --- /dev/null +++ b/pkg/line/line.go @@ -0,0 +1,190 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package line + +import ( + "fmt" + "math" + "regexp" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/statsd_exporter/pkg/mapper" + + "github.com/prometheus/graphite_exporter/pkg/graphitesample" + "github.com/prometheus/graphite_exporter/pkg/metricmapper" +) + +var ( + invalidMetricChars = regexp.MustCompile("[^a-zA-Z0-9_:]") + metricNameKeysIndex = newMetricNameAndKeys() +) + +// metricNameAndKeys is a cache of metric names and the label keys previously used +type metricNameAndKeys struct { + mtx sync.Mutex + cache map[string]string +} + +func newMetricNameAndKeys() *metricNameAndKeys { + x := metricNameAndKeys{ + cache: make(map[string]string), + } + + return &x +} + +func keysFromLabels(labels prometheus.Labels) string { + labelKeys := make([]string, len(labels)) + for k := range labels { + labelKeys = append(labelKeys, k) + } + + sort.Strings(labelKeys) + + return strings.Join(labelKeys, ",") +} + +// checkNameAndKeys returns true if metric has the same label keys or is new, false if not +func (c *metricNameAndKeys) checkNameAndKeys(name string, labels prometheus.Labels) bool { + c.mtx.Lock() + defer c.mtx.Unlock() + + providedKeys := keysFromLabels(labels) + + if keys, found := c.cache[name]; found { + return keys == providedKeys + } + + c.cache[name] = providedKeys + + return true +} + +func parseMetricNameAndTags(name string, labels prometheus.Labels, tagErrors prometheus.Counter) (string, error) { + if strings.ContainsRune(name, ';') { + // name contains tags - parse tags and add to labels + if strings.Count(name, ";") != strings.Count(name, "=") { + tagErrors.Inc() + return name, fmt.Errorf("error parsing tags on %s", name) + } + + parts := strings.Split(name, ";") + parsedName := parts[0] + tags := parts[1:] + + for _, tag := range tags { + kv := strings.SplitN(tag, "=", 2) + if len(kv) != 2 { + // we may have added bad labels already... + tagErrors.Inc() + return name, fmt.Errorf("error parsing tags on %s", name) + } + + k := kv[0] + v := kv[1] + labels[k] = v + } + + return parsedName, nil + } + + return name, nil +} + +// ProcessLine takes a graphite metric line as a string, processes it into a GraphiteSample, and sends it to the sample channel +func ProcessLine(line string, metricmapper metricmapper.MetricMapper, sampleCh chan *graphitesample.GraphiteSample, strictMatch bool, tagErrors prometheus.Counter, lastProcessed prometheus.Gauge, invalidMetrics prometheus.Counter, logger log.Logger) { + line = strings.TrimSpace(line) + level.Debug(logger).Log("msg", "Incoming line", "line", line) + parts := strings.Split(line, " ") + + if len(parts) != 3 { + level.Info(logger).Log("msg", "Invalid part count", "parts", len(parts), "line", line) + return + } + + originalName := parts[0] + + var name string + var err error + + mapping, labels, mappingPresent := metricmapper.GetMapping(originalName, mapper.MetricTypeGauge) + + if (mappingPresent && mapping.Action == mapper.ActionTypeDrop) || (!mappingPresent && strictMatch) { + return + } + + if mappingPresent { + parsedLabels := make(prometheus.Labels) + + if _, err = parseMetricNameAndTags(originalName, parsedLabels, tagErrors); err != nil { + level.Info(logger).Log("msg", "Invalid tags", "line", line) + return + } + + name = invalidMetricChars.ReplaceAllString(mapping.Name, "_") + // check to ensure the same tags are present + if validKeys := metricNameKeysIndex.checkNameAndKeys(name, parsedLabels); !validKeys { + level.Info(logger).Log("msg", "Dropped because metric keys do not match previously used keys", "line", line) + invalidMetrics.Inc() + + return + } + } else { + labels = make(prometheus.Labels) + name, err = parseMetricNameAndTags(originalName, labels, tagErrors) + if err != nil { + level.Info(logger).Log("msg", "Invalid tags", "line", line) + return + } + name = invalidMetricChars.ReplaceAllString(name, "_") + // check to ensure the same tags are present + if validKeys := metricNameKeysIndex.checkNameAndKeys(name, labels); !validKeys { + level.Info(logger).Log("msg", "Dropped because metric keys do not match previously used keys", "line", line) + invalidMetrics.Inc() + return + } + } + + value, err := strconv.ParseFloat(parts[1], 64) + if err != nil { + level.Info(logger).Log("msg", "Invalid value", "line", line) + return + } + + timestamp, err := strconv.ParseFloat(parts[2], 64) + if err != nil { + level.Info(logger).Log("msg", "Invalid timestamp", "line", line) + return + } + + sample := graphitesample.GraphiteSample{ + OriginalName: originalName, + Name: name, + Value: value, + Labels: labels, + Type: prometheus.GaugeValue, + Help: fmt.Sprintf("Graphite metric %s", name), + Timestamp: time.Unix(int64(timestamp), int64(math.Mod(timestamp, 1.0)*1e9)), + } + level.Debug(logger).Log("msg", "Processing sample", "sample", sample) + lastProcessed.Set(float64(time.Now().UnixNano()) / 1e9) + sampleCh <- &sample +} diff --git a/pkg/line/line_test.go b/pkg/line/line_test.go new file mode 100644 index 0000000..6299b22 --- /dev/null +++ b/pkg/line/line_test.go @@ -0,0 +1,66 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package line + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" +) + +func TestParseNameAndTags(t *testing.T) { + type testCase struct { + line string + parsedName string + labels prometheus.Labels + willFail bool + } + + testCases := []testCase{ + { + line: "my_simple_metric_with_tags;tag1=value1;tag2=value2", + parsedName: "my_simple_metric_with_tags", + labels: prometheus.Labels{ + "tag1": "value1", + "tag2": "value2", + }, + }, + { + line: "my_simple_metric_with_bad_tags;tag1=value1;tag2", + parsedName: "my_simple_metric_with_bad_tags;tag1=value1;tag2", + labels: prometheus.Labels{}, + willFail: true, + }, + } + + tagErrorsTest := prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "graphite_tag_parse_errors_test", + Help: "Total count of samples with invalid tags", + }, + ) + + for _, testCase := range testCases { + labels := prometheus.Labels{} + n, err := parseMetricNameAndTags(testCase.line, labels, tagErrorsTest) + + if !testCase.willFail { + assert.NoError(t, err, "Got unexpected error parsing %s", testCase.line) + } + + assert.Equal(t, testCase.parsedName, n) + assert.Equal(t, testCase.labels, labels) + } +} diff --git a/pkg/metricmapper/mapper.go b/pkg/metricmapper/mapper.go new file mode 100644 index 0000000..d5ff268 --- /dev/null +++ b/pkg/metricmapper/mapper.go @@ -0,0 +1,26 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metricmapper + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/statsd_exporter/pkg/mapper" +) + +// MetricMapper is an interface for mapper methods +type MetricMapper interface { + GetMapping(string, mapper.MetricType) (*mapper.MetricMapping, prometheus.Labels, bool) + InitFromFile(string, int, ...mapper.CacheOption) error + InitCache(int, ...mapper.CacheOption) +} From f6262ec7913c561093bb09baebc6d93aca493790 Mon Sep 17 00:00:00 2001 From: glightfoot Date: Fri, 19 Jun 2020 15:55:36 -0400 Subject: [PATCH 3/4] move label parsing to before getting the mapping, mapping labels override parsed labels Signed-off-by: glightfoot --- main_test.go | 3 ++- pkg/line/line.go | 45 ++++++++++++++++++--------------------------- 2 files changed, 20 insertions(+), 28 deletions(-) diff --git a/main_test.go b/main_test.go index 28fa2fd..fa171e5 100644 --- a/main_test.go +++ b/main_test.go @@ -89,7 +89,7 @@ func TestProcessLine(t *testing.T) { { line: "my.nomap.metric.novalue 9001 ", name: "my_nomap_metric_novalue", - mappingLabels: nil, + mappingLabels: prometheus.Labels{}, value: float64(9001), willFail: true, }, @@ -104,6 +104,7 @@ func TestProcessLine(t *testing.T) { line: "my.mapped.strict.metric 55 1534620625", name: "my_mapped_strict_metric", value: float64(55), + mappingLabels: prometheus.Labels{}, mappingPresent: true, willFail: false, strict: true, diff --git a/pkg/line/line.go b/pkg/line/line.go index 99749d0..122fefa 100644 --- a/pkg/line/line.go +++ b/pkg/line/line.go @@ -122,45 +122,36 @@ func ProcessLine(line string, metricmapper metricmapper.MetricMapper, sampleCh c originalName := parts[0] - var name string - var err error + labels := make(prometheus.Labels) + name, err := parseMetricNameAndTags(originalName, labels, tagErrors) + if err != nil { + level.Info(logger).Log("msg", "Invalid tags", "line", line) + return + } - mapping, labels, mappingPresent := metricmapper.GetMapping(originalName, mapper.MetricTypeGauge) + // check to ensure the same tags are present + if validKeys := metricNameKeysIndex.checkNameAndKeys(name, labels); !validKeys { + level.Info(logger).Log("msg", "Dropped because metric keys do not match previously used keys", "line", line) + invalidMetrics.Inc() - if (mappingPresent && mapping.Action == mapper.ActionTypeDrop) || (!mappingPresent && strictMatch) { return } - if mappingPresent { - parsedLabels := make(prometheus.Labels) + mapping, mlabels, mappingPresent := metricmapper.GetMapping(name, mapper.MetricTypeGauge) - if _, err = parseMetricNameAndTags(originalName, parsedLabels, tagErrors); err != nil { - level.Info(logger).Log("msg", "Invalid tags", "line", line) - return - } + if (mappingPresent && mapping.Action == mapper.ActionTypeDrop) || (!mappingPresent && strictMatch) { + return + } + if mappingPresent { name = invalidMetricChars.ReplaceAllString(mapping.Name, "_") - // check to ensure the same tags are present - if validKeys := metricNameKeysIndex.checkNameAndKeys(name, parsedLabels); !validKeys { - level.Info(logger).Log("msg", "Dropped because metric keys do not match previously used keys", "line", line) - invalidMetrics.Inc() - return + // append labels from the mapping to those parsed, with mapping labels overriding + for k, v := range mlabels { + labels[k] = v } } else { - labels = make(prometheus.Labels) - name, err = parseMetricNameAndTags(originalName, labels, tagErrors) - if err != nil { - level.Info(logger).Log("msg", "Invalid tags", "line", line) - return - } name = invalidMetricChars.ReplaceAllString(name, "_") - // check to ensure the same tags are present - if validKeys := metricNameKeysIndex.checkNameAndKeys(name, labels); !validKeys { - level.Info(logger).Log("msg", "Dropped because metric keys do not match previously used keys", "line", line) - invalidMetrics.Inc() - return - } } value, err := strconv.ParseFloat(parts[1], 64) From dfccf08d698ae0967a2c3ee52766ec14b22dd443 Mon Sep 17 00:00:00 2001 From: glightfoot Date: Mon, 22 Jun 2020 14:01:46 -0400 Subject: [PATCH 4/4] add sleeps before getting metrics for e2e tests, move collector to package Signed-off-by: glightfoot --- e2e/e2e_test.go | 4 +- e2e/issue90_test.go | 2 + main.go | 123 +++----------------------------- main_test.go | 17 +++-- pkg/collector/collector.go | 142 +++++++++++++++++++++++++++++++++++++ pkg/line/line.go | 2 +- 6 files changed, 166 insertions(+), 124 deletions(-) create mode 100644 pkg/collector/collector.go diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index 25e6156..b1c93ed 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -46,7 +46,7 @@ func TestIssue61(t *testing.T) { } defer exporter.Process.Kill() - for i := 0; i < 10; i++ { + for i := 0; i < 20; i++ { if i > 0 { time.Sleep(1 * time.Second) } @@ -93,6 +93,8 @@ rspamd.spam_count 3 NOW` t.Fatalf("write error: %v", err) } + time.Sleep(5 * time.Second) + resp, err := http.Get("http://" + path.Join(webAddr, "metrics")) if err != nil { t.Fatalf("get error: %v", err) diff --git a/e2e/issue90_test.go b/e2e/issue90_test.go index 5aa16dd..5b6882d 100644 --- a/e2e/issue90_test.go +++ b/e2e/issue90_test.go @@ -83,6 +83,8 @@ func TestIssue90(t *testing.T) { conn.Close() } + time.Sleep(5 * time.Second) + resp, err := http.Get("http://" + path.Join(webAddr, "metrics")) if err != nil { t.Fatalf("get error: %v", err) diff --git a/main.go b/main.go index c198e77..e4e85b0 100644 --- a/main.go +++ b/main.go @@ -17,13 +17,10 @@ import ( "bufio" "bytes" "fmt" - "io" "net" "net/http" _ "net/http/pprof" "os" - "sync" - "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -35,9 +32,7 @@ import ( "github.com/prometheus/statsd_exporter/pkg/mapper" "gopkg.in/alecthomas/kingpin.v2" - "github.com/prometheus/graphite_exporter/pkg/graphitesample" - "github.com/prometheus/graphite_exporter/pkg/line" - "github.com/prometheus/graphite_exporter/pkg/metricmapper" + "github.com/prometheus/graphite_exporter/pkg/collector" ) var ( @@ -77,108 +72,6 @@ var ( ) ) -type graphiteCollector struct { - samples map[string]*graphitesample.GraphiteSample - mu *sync.Mutex - mapper metricmapper.MetricMapper - sampleCh chan *graphitesample.GraphiteSample - lineCh chan string - strictMatch bool - logger log.Logger -} - -func newGraphiteCollector(logger log.Logger) *graphiteCollector { - c := &graphiteCollector{ - sampleCh: make(chan *graphitesample.GraphiteSample), - lineCh: make(chan string), - mu: &sync.Mutex{}, - samples: map[string]*graphitesample.GraphiteSample{}, - strictMatch: *strictMatch, - logger: logger, - } - - go c.processSamples() - go c.processLines() - - return c -} - -func (c *graphiteCollector) processReader(reader io.Reader) { - lineScanner := bufio.NewScanner(reader) - - for { - if ok := lineScanner.Scan(); !ok { - break - } - c.lineCh <- lineScanner.Text() - } -} - -func (c *graphiteCollector) processLines() { - for l := range c.lineCh { - line.ProcessLine(l, c.mapper, c.sampleCh, c.strictMatch, tagErrors, lastProcessed, invalidMetrics, c.logger) - } -} - -func (c *graphiteCollector) processSamples() { - ticker := time.NewTicker(time.Minute).C - - for { - select { - case sample, ok := <-c.sampleCh: - if sample == nil || !ok { - return - } - - c.mu.Lock() - c.samples[sample.OriginalName] = sample - c.mu.Unlock() - case <-ticker: - // Garbage collect expired samples. - ageLimit := time.Now().Add(-*sampleExpiry) - - c.mu.Lock() - for k, sample := range c.samples { - if ageLimit.After(sample.Timestamp) { - delete(c.samples, k) - } - } - c.mu.Unlock() - } - } -} - -// Collect implements prometheus.Collector. -func (c graphiteCollector) Collect(ch chan<- prometheus.Metric) { - ch <- lastProcessed - - c.mu.Lock() - samples := make([]*graphitesample.GraphiteSample, 0, len(c.samples)) - - for _, sample := range c.samples { - samples = append(samples, sample) - } - c.mu.Unlock() - - ageLimit := time.Now().Add(-*sampleExpiry) - - for _, sample := range samples { - if ageLimit.After(sample.Timestamp) { - continue - } - ch <- prometheus.MustNewConstMetric( - prometheus.NewDesc(sample.Name, sample.Help, []string{}, sample.Labels), - sample.Type, - sample.Value, - ) - } -} - -// Describe implements prometheus.Collector. -func (c graphiteCollector) Describe(ch chan<- *prometheus.Desc) { - ch <- lastProcessed.Desc() -} - func init() { prometheus.MustRegister(version.NewCollector("graphite_exporter")) } @@ -223,24 +116,24 @@ func main() { http.Handle(*metricsPath, promhttp.Handler()) - c := newGraphiteCollector(logger) + c := collector.NewGraphiteCollector(logger, *strictMatch, *sampleExpiry, tagErrors, lastProcessed, sampleExpiryMetric, invalidMetrics) prometheus.MustRegister(c) - c.mapper = &mapper.MetricMapper{} + c.Mapper = &mapper.MetricMapper{} cacheOption := mapper.WithCacheType(*cacheType) if *mappingConfig != "" { - err := c.mapper.InitFromFile(*mappingConfig, *cacheSize, cacheOption) + err := c.Mapper.InitFromFile(*mappingConfig, *cacheSize, cacheOption) if err != nil { level.Error(logger).Log("msg", "Error loading metric mapping config", "err", err) os.Exit(1) } } else { - c.mapper.InitCache(*cacheSize, cacheOption) + c.Mapper.InitCache(*cacheSize, cacheOption) } if *dumpFSMPath != "" { - err := dumpFSM(c.mapper.(*mapper.MetricMapper), *dumpFSMPath, logger) + err := dumpFSM(c.Mapper.(*mapper.MetricMapper), *dumpFSMPath, logger) if err != nil { level.Error(logger).Log("msg", "Error dumping FSM", "err", err) os.Exit(1) @@ -263,7 +156,7 @@ func main() { go func() { defer conn.Close() - c.processReader(conn) + c.ProcessReader(conn) }() } }() @@ -292,7 +185,7 @@ func main() { continue } - go c.processReader(bytes.NewReader(buf[0:chars])) + go c.ProcessReader(bytes.NewReader(buf[0:chars])) } }() diff --git a/main_test.go b/main_test.go index fa171e5..673df83 100644 --- a/main_test.go +++ b/main_test.go @@ -16,12 +16,14 @@ package main import ( "strings" "testing" + "time" "github.com/go-kit/kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/statsd_exporter/pkg/mapper" "github.com/stretchr/testify/assert" + "github.com/prometheus/graphite_exporter/pkg/collector" "github.com/prometheus/graphite_exporter/pkg/line" ) @@ -145,31 +147,32 @@ func TestProcessLine(t *testing.T) { }, } - c := newGraphiteCollector(log.NewNopLogger()) + // func NewGraphiteCollector(logger log.Logger, strictMatch bool, sampleExpiry time.Duration, tagErrors prometheus.Counter, lastProcessed prometheus.Gauge, sampleExpiryMetric prometheus.Gauge, invalidMetrics prometheus.Counter) *graphiteCollector { + c := collector.NewGraphiteCollector(log.NewNopLogger(), false, 5*time.Minute, tagErrors, lastProcessed, sampleExpiryMetric, invalidMetrics) for _, testCase := range testCases { if testCase.mappingPresent { - c.mapper = &mockMapper{ + c.Mapper = &mockMapper{ name: testCase.name, labels: testCase.mappingLabels, action: testCase.action, present: testCase.mappingPresent, } } else { - c.mapper = &mockMapper{ + c.Mapper = &mockMapper{ present: testCase.mappingPresent, } } - c.strictMatch = testCase.strict - line.ProcessLine(testCase.line, c.mapper, c.sampleCh, c.strictMatch, tagErrors, lastProcessed, invalidMetrics, c.logger) + c.StrictMatch = testCase.strict + line.ProcessLine(testCase.line, c.Mapper, c.SampleCh, c.StrictMatch, tagErrors, lastProcessed, invalidMetrics, c.Logger) } - c.sampleCh <- nil + c.SampleCh <- nil for _, k := range testCases { originalName := strings.Split(k.line, " ")[0] - sample := c.samples[originalName] + sample := c.Samples[originalName] if k.willFail { assert.Nil(t, sample, "Found %s", k.name) diff --git a/pkg/collector/collector.go b/pkg/collector/collector.go new file mode 100644 index 0000000..1ee6725 --- /dev/null +++ b/pkg/collector/collector.go @@ -0,0 +1,142 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "bufio" + "io" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus/graphite_exporter/pkg/graphitesample" + "github.com/prometheus/graphite_exporter/pkg/line" + "github.com/prometheus/graphite_exporter/pkg/metricmapper" +) + +type graphiteCollector struct { + Mapper metricmapper.MetricMapper + Samples map[string]*graphitesample.GraphiteSample + mu *sync.Mutex + SampleCh chan *graphitesample.GraphiteSample + lineCh chan string + StrictMatch bool + sampleExpiry time.Duration + Logger log.Logger + tagErrors prometheus.Counter + lastProcessed prometheus.Gauge + sampleExpiryMetric prometheus.Gauge + invalidMetrics prometheus.Counter +} + +func NewGraphiteCollector(logger log.Logger, strictMatch bool, sampleExpiry time.Duration, tagErrors prometheus.Counter, lastProcessed prometheus.Gauge, sampleExpiryMetric prometheus.Gauge, invalidMetrics prometheus.Counter) *graphiteCollector { + c := &graphiteCollector{ + SampleCh: make(chan *graphitesample.GraphiteSample), + lineCh: make(chan string), + mu: &sync.Mutex{}, + Samples: map[string]*graphitesample.GraphiteSample{}, + StrictMatch: strictMatch, + sampleExpiry: sampleExpiry, + Logger: logger, + tagErrors: tagErrors, + lastProcessed: lastProcessed, + sampleExpiryMetric: sampleExpiryMetric, + invalidMetrics: invalidMetrics, + } + + go c.processSamples() + go c.processLines() + + return c +} + +func (c *graphiteCollector) ProcessReader(reader io.Reader) { + lineScanner := bufio.NewScanner(reader) + + for { + if ok := lineScanner.Scan(); !ok { + break + } + c.lineCh <- lineScanner.Text() + } +} + +func (c *graphiteCollector) processLines() { + for l := range c.lineCh { + line.ProcessLine(l, c.Mapper, c.SampleCh, c.StrictMatch, c.tagErrors, c.lastProcessed, c.invalidMetrics, c.Logger) + } +} + +func (c *graphiteCollector) processSamples() { + ticker := time.NewTicker(time.Minute).C + + for { + select { + case sample, ok := <-c.SampleCh: + if sample == nil || !ok { + return + } + + c.mu.Lock() + c.Samples[sample.OriginalName] = sample + c.mu.Unlock() + case <-ticker: + // Garbage collect expired Samples. + ageLimit := time.Now().Add(-c.sampleExpiry) + + c.mu.Lock() + for k, sample := range c.Samples { + if ageLimit.After(sample.Timestamp) { + delete(c.Samples, k) + } + } + c.mu.Unlock() + } + } +} + +// Collect implements prometheus.Collector. +func (c graphiteCollector) Collect(ch chan<- prometheus.Metric) { + ch <- c.lastProcessed + + c.mu.Lock() + level.Debug(c.Logger).Log("msg", "Samples length", "len", len(c.Samples)) + samples := make([]*graphitesample.GraphiteSample, 0, len(c.Samples)) + + for _, sample := range c.Samples { + samples = append(samples, sample) + } + c.mu.Unlock() + + ageLimit := time.Now().Add(-c.sampleExpiry) + + for _, sample := range samples { + if ageLimit.After(sample.Timestamp) { + continue + } + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc(sample.Name, sample.Help, []string{}, sample.Labels), + sample.Type, + sample.Value, + ) + } +} + +// Describe implements prometheus.Collector. +func (c graphiteCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.lastProcessed.Desc() +} diff --git a/pkg/line/line.go b/pkg/line/line.go index 122fefa..93c46c2 100644 --- a/pkg/line/line.go +++ b/pkg/line/line.go @@ -110,7 +110,7 @@ func parseMetricNameAndTags(name string, labels prometheus.Labels, tagErrors pro } // ProcessLine takes a graphite metric line as a string, processes it into a GraphiteSample, and sends it to the sample channel -func ProcessLine(line string, metricmapper metricmapper.MetricMapper, sampleCh chan *graphitesample.GraphiteSample, strictMatch bool, tagErrors prometheus.Counter, lastProcessed prometheus.Gauge, invalidMetrics prometheus.Counter, logger log.Logger) { +func ProcessLine(line string, metricmapper metricmapper.MetricMapper, sampleCh chan<- *graphitesample.GraphiteSample, strictMatch bool, tagErrors prometheus.Counter, lastProcessed prometheus.Gauge, invalidMetrics prometheus.Counter, logger log.Logger) { line = strings.TrimSpace(line) level.Debug(logger).Log("msg", "Incoming line", "line", line) parts := strings.Split(line, " ")