Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[36] initial support for tagged graphite metrics #128

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
run:
modules-download-mode: vendor

skip-dirs:
- vendor
- e2e
# Run only staticcheck for now. Additional linters will be enabled one-by-one.
linters:
enable:
- staticcheck
- goimports
disable-all: true
4 changes: 3 additions & 1 deletion e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestIssue61(t *testing.T) {
}
defer exporter.Process.Kill()

for i := 0; i < 10; i++ {
for i := 0; i < 20; i++ {
if i > 0 {
time.Sleep(1 * time.Second)
}
Expand Down Expand Up @@ -93,6 +93,8 @@ rspamd.spam_count 3 NOW`
t.Fatalf("write error: %v", err)
}

time.Sleep(5 * time.Second)

resp, err := http.Get("http://" + path.Join(webAddr, "metrics"))
if err != nil {
t.Fatalf("get error: %v", err)
Expand Down
2 changes: 2 additions & 0 deletions e2e/issue90_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ func TestIssue90(t *testing.T) {
conn.Close()
}

time.Sleep(5 * time.Second)

resp, err := http.Get("http://" + path.Join(webAddr, "metrics"))
if err != nil {
t.Fatalf("get error: %v", err)
Expand Down
212 changes: 36 additions & 176 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,10 @@ import (
"bufio"
"bytes"
"fmt"
"io"
"math"
"net"
"net/http"
_ "net/http/pprof"
"os"
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand All @@ -38,6 +31,8 @@ import (
"github.com/prometheus/common/version"
"github.com/prometheus/statsd_exporter/pkg/mapper"
"gopkg.in/alecthomas/kingpin.v2"

"github.com/prometheus/graphite_exporter/pkg/collector"
)

var (
Expand All @@ -63,170 +58,20 @@ var (
Help: "How long in seconds a metric sample is valid for.",
},
)
invalidMetricChars = regexp.MustCompile("[^a-zA-Z0-9_:]")
tagErrors = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "graphite_tag_parse_errors",
Help: "Total count of samples with invalid tags",
},
)
invalidMetrics = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "graphite_invalid_metrics",
Help: "Total count of metrics dropped due to mismatched label keys",
},
)
)

type graphiteSample struct {
OriginalName string
Name string
Labels map[string]string
Help string
Value float64
Type prometheus.ValueType
Timestamp time.Time
}

func (s graphiteSample) String() string {
return fmt.Sprintf("%#v", s)
}

type metricMapper interface {
GetMapping(string, mapper.MetricType) (*mapper.MetricMapping, prometheus.Labels, bool)
InitFromFile(string, int, ...mapper.CacheOption) error
InitCache(int, ...mapper.CacheOption)
}

type graphiteCollector struct {
samples map[string]*graphiteSample
mu *sync.Mutex
mapper metricMapper
sampleCh chan *graphiteSample
lineCh chan string
strictMatch bool
logger log.Logger
}

func newGraphiteCollector(logger log.Logger) *graphiteCollector {
c := &graphiteCollector{
sampleCh: make(chan *graphiteSample),
lineCh: make(chan string),
mu: &sync.Mutex{},
samples: map[string]*graphiteSample{},
strictMatch: *strictMatch,
logger: logger,
}
go c.processSamples()
go c.processLines()
return c
}

func (c *graphiteCollector) processReader(reader io.Reader) {
lineScanner := bufio.NewScanner(reader)
for {
if ok := lineScanner.Scan(); !ok {
break
}
c.lineCh <- lineScanner.Text()
}
}

func (c *graphiteCollector) processLines() {
for line := range c.lineCh {
c.processLine(line)
}
}

func (c *graphiteCollector) processLine(line string) {
line = strings.TrimSpace(line)
level.Debug(c.logger).Log("msg", "Incoming line", "line", line)
parts := strings.Split(line, " ")
if len(parts) != 3 {
level.Info(c.logger).Log("msg", "Invalid part count", "parts", len(parts), "line", line)
return
}
originalName := parts[0]
var name string
mapping, labels, present := c.mapper.GetMapping(originalName, mapper.MetricTypeGauge)

if (present && mapping.Action == mapper.ActionTypeDrop) || (!present && c.strictMatch) {
return
}

if present {
name = invalidMetricChars.ReplaceAllString(mapping.Name, "_")
} else {
name = invalidMetricChars.ReplaceAllString(originalName, "_")
}

value, err := strconv.ParseFloat(parts[1], 64)
if err != nil {
level.Info(c.logger).Log("msg", "Invalid value", "line", line)
return
}
timestamp, err := strconv.ParseFloat(parts[2], 64)
if err != nil {
level.Info(c.logger).Log("msg", "Invalid timestamp", "line", line)
return
}
sample := graphiteSample{
OriginalName: originalName,
Name: name,
Value: value,
Labels: labels,
Type: prometheus.GaugeValue,
Help: fmt.Sprintf("Graphite metric %s", name),
Timestamp: time.Unix(int64(timestamp), int64(math.Mod(timestamp, 1.0)*1e9)),
}
level.Debug(c.logger).Log("msg", "Processing sample", "sample", sample)
lastProcessed.Set(float64(time.Now().UnixNano()) / 1e9)
c.sampleCh <- &sample
}

func (c *graphiteCollector) processSamples() {
ticker := time.NewTicker(time.Minute).C

for {
select {
case sample, ok := <-c.sampleCh:
if sample == nil || !ok {
return
}
c.mu.Lock()
c.samples[sample.OriginalName] = sample
c.mu.Unlock()
case <-ticker:
// Garbage collect expired samples.
ageLimit := time.Now().Add(-*sampleExpiry)
c.mu.Lock()
for k, sample := range c.samples {
if ageLimit.After(sample.Timestamp) {
delete(c.samples, k)
}
}
c.mu.Unlock()
}
}
}

// Collect implements prometheus.Collector.
func (c graphiteCollector) Collect(ch chan<- prometheus.Metric) {
ch <- lastProcessed

c.mu.Lock()
samples := make([]*graphiteSample, 0, len(c.samples))
for _, sample := range c.samples {
samples = append(samples, sample)
}
c.mu.Unlock()

ageLimit := time.Now().Add(-*sampleExpiry)
for _, sample := range samples {
if ageLimit.After(sample.Timestamp) {
continue
}
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(sample.Name, sample.Help, []string{}, sample.Labels),
sample.Type,
sample.Value,
)
}
}

// Describe implements prometheus.Collector.
func (c graphiteCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- lastProcessed.Desc()
}

func init() {
prometheus.MustRegister(version.NewCollector("graphite_exporter"))
}
Expand All @@ -235,16 +80,20 @@ func dumpFSM(mapper *mapper.MetricMapper, dumpFilename string, logger log.Logger
if mapper.FSM == nil {
return fmt.Errorf("no FSM available to be dumped, possibly because the mapping contains regex patterns")
}

f, err := os.Create(dumpFilename)
if err != nil {
return err
}

level.Info(logger).Log("msg", "Start dumping FSM", "to", dumpFilename)

w := bufio.NewWriter(f)
mapper.FSM.DumpFSM(w)
w.Flush()
f.Close()
level.Info(logger).Log("msg", "Finish dumping FSM")

return nil
}

Expand All @@ -254,33 +103,37 @@ func main() {
kingpin.Version(version.Print("graphite_exporter"))
kingpin.HelpFlag.Short('h')
kingpin.Parse()

logger := promlog.New(promlogConfig)

prometheus.MustRegister(sampleExpiryMetric)
prometheus.MustRegister(tagErrors)
prometheus.MustRegister(invalidMetrics)
sampleExpiryMetric.Set(sampleExpiry.Seconds())

level.Info(logger).Log("msg", "Starting graphite_exporter", "version_info", version.Info())
level.Info(logger).Log("build_context", version.BuildContext())

http.Handle(*metricsPath, promhttp.Handler())
c := newGraphiteCollector(logger)

c := collector.NewGraphiteCollector(logger, *strictMatch, *sampleExpiry, tagErrors, lastProcessed, sampleExpiryMetric, invalidMetrics)
prometheus.MustRegister(c)

c.mapper = &mapper.MetricMapper{}
c.Mapper = &mapper.MetricMapper{}
cacheOption := mapper.WithCacheType(*cacheType)

if *mappingConfig != "" {
err := c.mapper.InitFromFile(*mappingConfig, *cacheSize, cacheOption)
err := c.Mapper.InitFromFile(*mappingConfig, *cacheSize, cacheOption)
if err != nil {
level.Error(logger).Log("msg", "Error loading metric mapping config", "err", err)
os.Exit(1)
}
} else {
c.mapper.InitCache(*cacheSize, cacheOption)
c.Mapper.InitCache(*cacheSize, cacheOption)
}

if *dumpFSMPath != "" {
err := dumpFSM(c.mapper.(*mapper.MetricMapper), *dumpFSMPath, logger)
err := dumpFSM(c.Mapper.(*mapper.MetricMapper), *dumpFSMPath, logger)
if err != nil {
level.Error(logger).Log("msg", "Error dumping FSM", "err", err)
os.Exit(1)
Expand All @@ -292,16 +145,18 @@ func main() {
level.Error(logger).Log("msg", "Error binding to TCP socket", "err", err)
os.Exit(1)
}

go func() {
for {
conn, err := tcpSock.Accept()
if err != nil {
level.Error(logger).Log("msg", "Error accepting TCP connection", "err", err)
continue
}

go func() {
defer conn.Close()
c.processReader(conn)
c.ProcessReader(conn)
}()
}
}()
Expand All @@ -311,21 +166,26 @@ func main() {
level.Error(logger).Log("msg", "Error resolving UDP address", "err", err)
os.Exit(1)
}

udpSock, err := net.ListenUDP("udp", udpAddress)
if err != nil {
level.Error(logger).Log("msg", "Error listening to UDP address", "err", err)
os.Exit(1)
}

go func() {
defer udpSock.Close()

for {
buf := make([]byte, 65536)

chars, srcAddress, err := udpSock.ReadFromUDP(buf)
if err != nil {
level.Error(logger).Log("msg", "Error reading UDP packet", "from", srcAddress, "err", err)
continue
}
go c.processReader(bytes.NewReader(buf[0:chars]))

go c.ProcessReader(bytes.NewReader(buf[0:chars]))
}
}()

Expand Down
Loading