From d0af7bd3b2a7a5f85564b70b0d4402bc731ceff0 Mon Sep 17 00:00:00 2001 From: glightfoot Date: Mon, 22 Jun 2020 14:01:46 -0400 Subject: [PATCH] add sleeps before getting metrics for e2e tests Signed-off-by: glightfoot --- e2e/e2e_test.go | 4 +- e2e/issue90_test.go | 2 + main.go | 123 +++----------------------------------------- main_test.go | 17 +++--- pkg/line/line.go | 2 +- 5 files changed, 24 insertions(+), 124 deletions(-) diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index 25e6156..b1c93ed 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -46,7 +46,7 @@ func TestIssue61(t *testing.T) { } defer exporter.Process.Kill() - for i := 0; i < 10; i++ { + for i := 0; i < 20; i++ { if i > 0 { time.Sleep(1 * time.Second) } @@ -93,6 +93,8 @@ rspamd.spam_count 3 NOW` t.Fatalf("write error: %v", err) } + time.Sleep(5 * time.Second) + resp, err := http.Get("http://" + path.Join(webAddr, "metrics")) if err != nil { t.Fatalf("get error: %v", err) diff --git a/e2e/issue90_test.go b/e2e/issue90_test.go index 5aa16dd..5b6882d 100644 --- a/e2e/issue90_test.go +++ b/e2e/issue90_test.go @@ -83,6 +83,8 @@ func TestIssue90(t *testing.T) { conn.Close() } + time.Sleep(5 * time.Second) + resp, err := http.Get("http://" + path.Join(webAddr, "metrics")) if err != nil { t.Fatalf("get error: %v", err) diff --git a/main.go b/main.go index c198e77..e4e85b0 100644 --- a/main.go +++ b/main.go @@ -17,13 +17,10 @@ import ( "bufio" "bytes" "fmt" - "io" "net" "net/http" _ "net/http/pprof" "os" - "sync" - "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -35,9 +32,7 @@ import ( "github.com/prometheus/statsd_exporter/pkg/mapper" "gopkg.in/alecthomas/kingpin.v2" - "github.com/prometheus/graphite_exporter/pkg/graphitesample" - "github.com/prometheus/graphite_exporter/pkg/line" - "github.com/prometheus/graphite_exporter/pkg/metricmapper" + "github.com/prometheus/graphite_exporter/pkg/collector" ) var ( @@ -77,108 +72,6 @@ var ( ) ) -type graphiteCollector struct { - samples map[string]*graphitesample.GraphiteSample - mu *sync.Mutex - mapper metricmapper.MetricMapper - sampleCh chan *graphitesample.GraphiteSample - lineCh chan string - strictMatch bool - logger log.Logger -} - -func newGraphiteCollector(logger log.Logger) *graphiteCollector { - c := &graphiteCollector{ - sampleCh: make(chan *graphitesample.GraphiteSample), - lineCh: make(chan string), - mu: &sync.Mutex{}, - samples: map[string]*graphitesample.GraphiteSample{}, - strictMatch: *strictMatch, - logger: logger, - } - - go c.processSamples() - go c.processLines() - - return c -} - -func (c *graphiteCollector) processReader(reader io.Reader) { - lineScanner := bufio.NewScanner(reader) - - for { - if ok := lineScanner.Scan(); !ok { - break - } - c.lineCh <- lineScanner.Text() - } -} - -func (c *graphiteCollector) processLines() { - for l := range c.lineCh { - line.ProcessLine(l, c.mapper, c.sampleCh, c.strictMatch, tagErrors, lastProcessed, invalidMetrics, c.logger) - } -} - -func (c *graphiteCollector) processSamples() { - ticker := time.NewTicker(time.Minute).C - - for { - select { - case sample, ok := <-c.sampleCh: - if sample == nil || !ok { - return - } - - c.mu.Lock() - c.samples[sample.OriginalName] = sample - c.mu.Unlock() - case <-ticker: - // Garbage collect expired samples. - ageLimit := time.Now().Add(-*sampleExpiry) - - c.mu.Lock() - for k, sample := range c.samples { - if ageLimit.After(sample.Timestamp) { - delete(c.samples, k) - } - } - c.mu.Unlock() - } - } -} - -// Collect implements prometheus.Collector. -func (c graphiteCollector) Collect(ch chan<- prometheus.Metric) { - ch <- lastProcessed - - c.mu.Lock() - samples := make([]*graphitesample.GraphiteSample, 0, len(c.samples)) - - for _, sample := range c.samples { - samples = append(samples, sample) - } - c.mu.Unlock() - - ageLimit := time.Now().Add(-*sampleExpiry) - - for _, sample := range samples { - if ageLimit.After(sample.Timestamp) { - continue - } - ch <- prometheus.MustNewConstMetric( - prometheus.NewDesc(sample.Name, sample.Help, []string{}, sample.Labels), - sample.Type, - sample.Value, - ) - } -} - -// Describe implements prometheus.Collector. -func (c graphiteCollector) Describe(ch chan<- *prometheus.Desc) { - ch <- lastProcessed.Desc() -} - func init() { prometheus.MustRegister(version.NewCollector("graphite_exporter")) } @@ -223,24 +116,24 @@ func main() { http.Handle(*metricsPath, promhttp.Handler()) - c := newGraphiteCollector(logger) + c := collector.NewGraphiteCollector(logger, *strictMatch, *sampleExpiry, tagErrors, lastProcessed, sampleExpiryMetric, invalidMetrics) prometheus.MustRegister(c) - c.mapper = &mapper.MetricMapper{} + c.Mapper = &mapper.MetricMapper{} cacheOption := mapper.WithCacheType(*cacheType) if *mappingConfig != "" { - err := c.mapper.InitFromFile(*mappingConfig, *cacheSize, cacheOption) + err := c.Mapper.InitFromFile(*mappingConfig, *cacheSize, cacheOption) if err != nil { level.Error(logger).Log("msg", "Error loading metric mapping config", "err", err) os.Exit(1) } } else { - c.mapper.InitCache(*cacheSize, cacheOption) + c.Mapper.InitCache(*cacheSize, cacheOption) } if *dumpFSMPath != "" { - err := dumpFSM(c.mapper.(*mapper.MetricMapper), *dumpFSMPath, logger) + err := dumpFSM(c.Mapper.(*mapper.MetricMapper), *dumpFSMPath, logger) if err != nil { level.Error(logger).Log("msg", "Error dumping FSM", "err", err) os.Exit(1) @@ -263,7 +156,7 @@ func main() { go func() { defer conn.Close() - c.processReader(conn) + c.ProcessReader(conn) }() } }() @@ -292,7 +185,7 @@ func main() { continue } - go c.processReader(bytes.NewReader(buf[0:chars])) + go c.ProcessReader(bytes.NewReader(buf[0:chars])) } }() diff --git a/main_test.go b/main_test.go index fa171e5..673df83 100644 --- a/main_test.go +++ b/main_test.go @@ -16,12 +16,14 @@ package main import ( "strings" "testing" + "time" "github.com/go-kit/kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/statsd_exporter/pkg/mapper" "github.com/stretchr/testify/assert" + "github.com/prometheus/graphite_exporter/pkg/collector" "github.com/prometheus/graphite_exporter/pkg/line" ) @@ -145,31 +147,32 @@ func TestProcessLine(t *testing.T) { }, } - c := newGraphiteCollector(log.NewNopLogger()) + // func NewGraphiteCollector(logger log.Logger, strictMatch bool, sampleExpiry time.Duration, tagErrors prometheus.Counter, lastProcessed prometheus.Gauge, sampleExpiryMetric prometheus.Gauge, invalidMetrics prometheus.Counter) *graphiteCollector { + c := collector.NewGraphiteCollector(log.NewNopLogger(), false, 5*time.Minute, tagErrors, lastProcessed, sampleExpiryMetric, invalidMetrics) for _, testCase := range testCases { if testCase.mappingPresent { - c.mapper = &mockMapper{ + c.Mapper = &mockMapper{ name: testCase.name, labels: testCase.mappingLabels, action: testCase.action, present: testCase.mappingPresent, } } else { - c.mapper = &mockMapper{ + c.Mapper = &mockMapper{ present: testCase.mappingPresent, } } - c.strictMatch = testCase.strict - line.ProcessLine(testCase.line, c.mapper, c.sampleCh, c.strictMatch, tagErrors, lastProcessed, invalidMetrics, c.logger) + c.StrictMatch = testCase.strict + line.ProcessLine(testCase.line, c.Mapper, c.SampleCh, c.StrictMatch, tagErrors, lastProcessed, invalidMetrics, c.Logger) } - c.sampleCh <- nil + c.SampleCh <- nil for _, k := range testCases { originalName := strings.Split(k.line, " ")[0] - sample := c.samples[originalName] + sample := c.Samples[originalName] if k.willFail { assert.Nil(t, sample, "Found %s", k.name) diff --git a/pkg/line/line.go b/pkg/line/line.go index 122fefa..93c46c2 100644 --- a/pkg/line/line.go +++ b/pkg/line/line.go @@ -110,7 +110,7 @@ func parseMetricNameAndTags(name string, labels prometheus.Labels, tagErrors pro } // ProcessLine takes a graphite metric line as a string, processes it into a GraphiteSample, and sends it to the sample channel -func ProcessLine(line string, metricmapper metricmapper.MetricMapper, sampleCh chan *graphitesample.GraphiteSample, strictMatch bool, tagErrors prometheus.Counter, lastProcessed prometheus.Gauge, invalidMetrics prometheus.Counter, logger log.Logger) { +func ProcessLine(line string, metricmapper metricmapper.MetricMapper, sampleCh chan<- *graphitesample.GraphiteSample, strictMatch bool, tagErrors prometheus.Counter, lastProcessed prometheus.Gauge, invalidMetrics prometheus.Counter, logger log.Logger) { line = strings.TrimSpace(line) level.Debug(logger).Log("msg", "Incoming line", "line", line) parts := strings.Split(line, " ")