Skip to content

Commit

Permalink
break out into packages
Browse files Browse the repository at this point in the history
Signed-off-by: glightfoot <[email protected]>
  • Loading branch information
glightfoot committed Jun 19, 2020
1 parent 9328f9a commit d17a563
Show file tree
Hide file tree
Showing 7 changed files with 366 additions and 223 deletions.
5 changes: 4 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
run:
modules-download-mode: vendor

skip-dirs:
- vendor
- e2e
# Run only staticcheck for now. Additional linters will be enabled one-by-one.
linters:
enable:
- staticcheck
- goimports
disable-all: true
219 changes: 39 additions & 180 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,10 @@ import (
"bytes"
"fmt"
"io"
"math"
"net"
"net/http"
_ "net/http/pprof"
"os"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -39,6 +34,10 @@ import (
"github.com/prometheus/common/version"
"github.com/prometheus/statsd_exporter/pkg/mapper"
"gopkg.in/alecthomas/kingpin.v2"

"github.com/prometheus/graphite_exporter/pkg/graphitesample"
"github.com/prometheus/graphite_exporter/pkg/line"
"github.com/prometheus/graphite_exporter/pkg/metricmapper"
)

var (
Expand All @@ -64,102 +63,49 @@ var (
Help: "How long in seconds a metric sample is valid for.",
},
)
tagParseFailures = prometheus.NewCounter(
tagErrors = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "graphite_tag_parse_failures",
Name: "graphite_tag_parse_errors",
Help: "Total count of samples with invalid tags",
})
},
)
invalidMetrics = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "graphite_invalid_metrics",
Help: "Total count of metrics dropped due to mismatched label keys",
})
invalidMetricChars = regexp.MustCompile("[^a-zA-Z0-9_:]")

metricNameKeysIndex = newMetricNameAndKeys()
},
)
)

// metricNameAndKeys is a cache of metric names and the label keys previously used
type metricNameAndKeys struct {
mtx sync.Mutex
cache map[string]string
}

func newMetricNameAndKeys() *metricNameAndKeys {
x := metricNameAndKeys{
cache: make(map[string]string),
}
return &x
}

func keysFromLabels(labels prometheus.Labels) string {
labelKeys := make([]string, len(labels))
for k, _ := range labels {
labelKeys = append(labelKeys, k)
}
sort.Strings(labelKeys)
return strings.Join(labelKeys, ",")
}

// checkNameAndKeys returns true if metric has the same label keys or is new, false if not
func (c *metricNameAndKeys) checkNameAndKeys(name string, labels prometheus.Labels) bool {
c.mtx.Lock()
defer c.mtx.Unlock()
providedKeys := keysFromLabels(labels)
if keys, found := c.cache[name]; found {
return keys == providedKeys
}

c.cache[name] = providedKeys
return true
}

type graphiteSample struct {
OriginalName string
Name string
Labels prometheus.Labels
Help string
Value float64
Type prometheus.ValueType
Timestamp time.Time
}

func (s graphiteSample) String() string {
return fmt.Sprintf("%#v", s)
}

type metricMapper interface {
GetMapping(string, mapper.MetricType) (*mapper.MetricMapping, prometheus.Labels, bool)
InitFromFile(string, int, ...mapper.CacheOption) error
InitCache(int, ...mapper.CacheOption)
}

type graphiteCollector struct {
samples map[string]*graphiteSample
samples map[string]*graphitesample.GraphiteSample
mu *sync.Mutex
mapper metricMapper
sampleCh chan *graphiteSample
mapper metricmapper.MetricMapper
sampleCh chan *graphitesample.GraphiteSample
lineCh chan string
strictMatch bool
logger log.Logger
}

func newGraphiteCollector(logger log.Logger) *graphiteCollector {
c := &graphiteCollector{
sampleCh: make(chan *graphiteSample),
sampleCh: make(chan *graphitesample.GraphiteSample),
lineCh: make(chan string),
mu: &sync.Mutex{},
samples: map[string]*graphiteSample{},
samples: map[string]*graphitesample.GraphiteSample{},
strictMatch: *strictMatch,
logger: logger,
}

go c.processSamples()
go c.processLines()

return c
}

func (c *graphiteCollector) processReader(reader io.Reader) {
lineScanner := bufio.NewScanner(reader)

for {
if ok := lineScanner.Scan(); !ok {
break
Expand All @@ -169,113 +115,9 @@ func (c *graphiteCollector) processReader(reader io.Reader) {
}

func (c *graphiteCollector) processLines() {
for line := range c.lineCh {
c.processLine(line)
}
}

func parseMetricNameAndTags(name string, labels prometheus.Labels) (string, error) {
if strings.ContainsRune(name, ';') {
// name contains tags - parse tags and add to labels
if strings.Count(name, ";") != strings.Count(name, "=") {
tagParseFailures.Inc()
return name, fmt.Errorf("error parsing tags on %s", name)
}

parts := strings.Split(name, ";")
parsedName := parts[0]
tags := parts[1:]

for _, tag := range tags {
kv := strings.SplitN(tag, "=", 2)
if len(kv) != 2 {
// we may have added bad labels already...
tagParseFailures.Inc()
return name, fmt.Errorf("error parsing tags on %s", name)
}

k := kv[0]
v := kv[1]
labels[k] = v
}

return parsedName, nil
for l := range c.lineCh {
line.ProcessLine(l, c.mapper, c.sampleCh, c.strictMatch, tagErrors, lastProcessed, invalidMetrics, c.logger)
}

return name, nil
}

func (c *graphiteCollector) processLine(line string) {
line = strings.TrimSpace(line)
level.Debug(c.logger).Log("msg", "Incoming line", "line", line)
parts := strings.Split(line, " ")
if len(parts) != 3 {
level.Info(c.logger).Log("msg", "Invalid part count", "parts", len(parts), "line", line)
return
}
originalName := parts[0]
var name string
var err error
mapping, labels, mappingPresent := c.mapper.GetMapping(originalName, mapper.MetricTypeGauge)

if (mappingPresent && mapping.Action == mapper.ActionTypeDrop) || (!mappingPresent && c.strictMatch) {
return
}

if mappingPresent {
parsedLabels := make(prometheus.Labels)
_, err = parseMetricNameAndTags(originalName, parsedLabels)
if err != nil {
level.Info(c.logger).Log("msg", "Invalid tags", "line", line)
return
}

name = invalidMetricChars.ReplaceAllString(mapping.Name, "_")
// check to ensure the same tags are present
if validKeys := metricNameKeysIndex.checkNameAndKeys(name, parsedLabels); !validKeys {
level.Info(c.logger).Log("msg", "Dropped because metric keys do not match previously used keys", "line", line)
invalidMetrics.Inc()
return
}
} else {
labels = make(prometheus.Labels)
name, err = parseMetricNameAndTags(originalName, labels)
if err != nil {
level.Info(c.logger).Log("msg", "Invalid tags", "line", line)
return
}
name = invalidMetricChars.ReplaceAllString(name, "_")
// check to ensure the same tags are present
if validKeys := metricNameKeysIndex.checkNameAndKeys(name, labels); !validKeys {
level.Info(c.logger).Log("msg", "Dropped because metric keys do not match previously used keys", "line", line)
invalidMetrics.Inc()
return
}
}

value, err := strconv.ParseFloat(parts[1], 64)
if err != nil {
level.Info(c.logger).Log("msg", "Invalid value", "line", line)
return
}
timestamp, err := strconv.ParseFloat(parts[2], 64)
if err != nil {
level.Info(c.logger).Log("msg", "Invalid timestamp", "line", line)
return
}

sample := graphiteSample{
OriginalName: originalName,
Name: name,
Value: value,
Labels: labels,
Type: prometheus.GaugeValue,
Help: fmt.Sprintf("Graphite metric %s", name),
Timestamp: time.Unix(int64(timestamp), int64(math.Mod(timestamp, 1.0)*1e9)),
}
level.Debug(c.logger).Log("msg", "Processing sample", "sample", sample)
lastProcessed.Set(float64(time.Now().UnixNano()) / 1e9)
c.sampleCh <- &sample
}

func (c *graphiteCollector) processSamples() {
Expand All @@ -287,12 +129,14 @@ func (c *graphiteCollector) processSamples() {
if sample == nil || !ok {
return
}

c.mu.Lock()
c.samples[sample.OriginalName] = sample
c.mu.Unlock()
case <-ticker:
// Garbage collect expired samples.
ageLimit := time.Now().Add(-*sampleExpiry)

c.mu.Lock()
for k, sample := range c.samples {
if ageLimit.After(sample.Timestamp) {
Expand All @@ -309,13 +153,15 @@ func (c graphiteCollector) Collect(ch chan<- prometheus.Metric) {
ch <- lastProcessed

c.mu.Lock()
samples := make([]*graphiteSample, 0, len(c.samples))
samples := make([]*graphitesample.GraphiteSample, 0, len(c.samples))

for _, sample := range c.samples {
samples = append(samples, sample)
}
c.mu.Unlock()

ageLimit := time.Now().Add(-*sampleExpiry)

for _, sample := range samples {
if ageLimit.After(sample.Timestamp) {
continue
Expand All @@ -341,16 +187,20 @@ func dumpFSM(mapper *mapper.MetricMapper, dumpFilename string, logger log.Logger
if mapper.FSM == nil {
return fmt.Errorf("no FSM available to be dumped, possibly because the mapping contains regex patterns")
}

f, err := os.Create(dumpFilename)
if err != nil {
return err
}

level.Info(logger).Log("msg", "Start dumping FSM", "to", dumpFilename)

w := bufio.NewWriter(f)
mapper.FSM.DumpFSM(w)
w.Flush()
f.Close()
level.Info(logger).Log("msg", "Finish dumping FSM")

return nil
}

Expand All @@ -360,17 +210,19 @@ func main() {
kingpin.Version(version.Print("graphite_exporter"))
kingpin.HelpFlag.Short('h')
kingpin.Parse()

logger := promlog.New(promlogConfig)

prometheus.MustRegister(sampleExpiryMetric)
prometheus.MustRegister(tagParseFailures)
prometheus.MustRegister(tagErrors)
prometheus.MustRegister(invalidMetrics)
sampleExpiryMetric.Set(sampleExpiry.Seconds())

level.Info(logger).Log("msg", "Starting graphite_exporter", "version_info", version.Info())
level.Info(logger).Log("build_context", version.BuildContext())

http.Handle(*metricsPath, promhttp.Handler())

c := newGraphiteCollector(logger)
prometheus.MustRegister(c)

Expand Down Expand Up @@ -400,13 +252,15 @@ func main() {
level.Error(logger).Log("msg", "Error binding to TCP socket", "err", err)
os.Exit(1)
}

go func() {
for {
conn, err := tcpSock.Accept()
if err != nil {
level.Error(logger).Log("msg", "Error accepting TCP connection", "err", err)
continue
}

go func() {
defer conn.Close()
c.processReader(conn)
Expand All @@ -419,20 +273,25 @@ func main() {
level.Error(logger).Log("msg", "Error resolving UDP address", "err", err)
os.Exit(1)
}

udpSock, err := net.ListenUDP("udp", udpAddress)
if err != nil {
level.Error(logger).Log("msg", "Error listening to UDP address", "err", err)
os.Exit(1)
}

go func() {
defer udpSock.Close()

for {
buf := make([]byte, 65536)

chars, srcAddress, err := udpSock.ReadFromUDP(buf)
if err != nil {
level.Error(logger).Log("msg", "Error reading UDP packet", "from", srcAddress, "err", err)
continue
}

go c.processReader(bytes.NewReader(buf[0:chars]))
}
}()
Expand Down
Loading

0 comments on commit d17a563

Please sign in to comment.