Skip to content

Commit

Permalink
use prometheus /metrics endpoint instead of assuming that prometheus …
Browse files Browse the repository at this point in the history
…self-scrapes
  • Loading branch information
matt-deboer committed Apr 29, 2017
1 parent f5a507e commit c86c283
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 49 deletions.
90 changes: 77 additions & 13 deletions pkg/locator/locator.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package locator

import (
"bufio"
"context"
"fmt"
"io/ioutil"
"math/big"
"net/http"
"regexp"
"strings"
"time"

log "github.com/Sirupsen/logrus"

"github.com/prometheus/client_golang/api/prometheus"
"github.com/prometheus/common/model"
)

var (
Expand Down Expand Up @@ -64,31 +66,93 @@ func ToPrometheusClients(endpointURLs []string) ([]*PrometheusEndpoint, error) {
for _, endpoint := range endpointURLs {
addr := strings.Trim(endpoint, " ")
if len(addr) > 0 {
var uptime time.Duration
var queryAPI prometheus.QueryAPI
client, err := prometheus.New(prometheus.Config{
Address: addr,
})
if err == nil {
queryAPI := prometheus.NewQueryAPI(client)
result, err := queryAPI.Query(context.TODO(), "(time() - max(process_start_time_seconds{job=\"prometheus\"}))", time.Now())
// Scape the /metrics endpoint of the individual prometheus instance, since
// self-scaping of prometheus' own metrics might not be configured
if log.GetLevel() >= log.DebugLevel {
log.Debugf("Endpoint %v returned uptime result: %v", addr, result)
log.Debugf("Testing %s/metrics", addr)
}
if err == nil {
if vector, ok := result.(model.Vector); ok && len(vector) > 0 {
uptime := time.Duration(float64(result.(model.Vector)[0].Value)) * time.Second
endpoints = append(endpoints, &PrometheusEndpoint{QueryAPI: prometheus.NewQueryAPI(client), Address: addr, Uptime: uptime})
continue
} else {
log.Errorf("Endpoint %v returned unexpected uptime result: %v", addr, result)
err = fmt.Errorf("Unexpected uptime result: '%v'", result)
scraped, err := ScrapeMetric(addr, "process_start_time_seconds")
if err == nil && scraped != nil {
processStartTimeSeconds := scraped.Value
uptime = time.Duration(time.Now().UTC().Unix()-int64(processStartTimeSeconds)) * time.Second
if log.GetLevel() >= log.DebugLevel {
log.Debugf("Parsed current uptime for %s: %s", addr, uptime)
}
queryAPI = prometheus.NewQueryAPI(client)
_, err = queryAPI.Query(context.TODO(), "up", time.Now())
if err != nil && log.GetLevel() >= log.DebugLevel {
log.Debugf("Query 'up' returned error: %v", err)
}
}
}
endpoints = append(endpoints, &PrometheusEndpoint{Address: addr, Uptime: time.Duration(0), Error: err})

if err == nil {
endpoints = append(endpoints, &PrometheusEndpoint{QueryAPI: queryAPI, Address: addr, Uptime: uptime})
} else {
log.Errorf("Failed to resolve build_info and uptime for %v: %v", addr, err)
endpoints = append(endpoints, &PrometheusEndpoint{Address: addr, Uptime: time.Duration(0), Error: err})
}
}
}
if len(endpoints) == 0 {
return nil, fmt.Errorf("Unable to locate any potential endpoints")
}
return endpoints, nil
}

// LabeledValue represents a persed metric instance
type LabeledValue struct {
Name string
Labels string
Value float64
}

func (lv *LabeledValue) String() string {
return fmt.Sprintf("%s%s %f", lv.Name, lv.Labels, lv.Value)
}

// ScrapeMetric parses metrics in a simple fashion, returning
// the first instance of each metric for a given name; results may be unexpected
// for metrics with multiple instances
func ScrapeMetric(addr string, name string) (*LabeledValue, error) {

resp, err := http.Get(fmt.Sprintf("%s/metrics", addr))
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("%s/metrics returned %d", addr, resp.StatusCode)
}

defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, "#") {
parts := strings.Split(line, " ")
nameParts := strings.Split(parts[0], "{")
if nameParts[0] == name {
f := new(big.Float)
_, err := fmt.Sscan(parts[1], f)
if err == nil {
v := &LabeledValue{Name: nameParts[0]}
v.Value, _ = f.Float64()
if len(nameParts) > 1 {
v.Labels = "{" + nameParts[1]
}
return v, nil
}
return nil, fmt.Errorf("Failed to parse value for metric %s", line)
}
}
}
return nil, nil
}
3 changes: 2 additions & 1 deletion pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func NewRouter(interval time.Duration, affinityOptions []AffinityOption,
interval: interval,
rewriter: noOpRewriter,
metrics: newMetrics(version.Name),
selection: &selector.Result{},
theConch: make(chan struct{}, 1),
}

Expand Down Expand Up @@ -104,7 +105,7 @@ func (r *Router) doSelection() {

result, err := r.selector.Select()

if result.Selection == nil || len(result.Selection) == 0 {
if len(result.Selection) == 0 {
if err != nil {
log.Errorf("Selector returned no valid selection, and error: %v", err)
if r.selection == nil {
Expand Down
30 changes: 15 additions & 15 deletions pkg/selector/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,38 +43,38 @@ func NewSelector(locators []locator.Locator, strategyArgs ...string) (*Selector,
// Select performs selection of a/all viable prometheus endpoint target(s)
func (s *Selector) Select() (result *Result, err error) {

endpoints := make([]*locator.PrometheusEndpoint, 0, 3)
result = &Result{
Candidates: make([]*locator.PrometheusEndpoint, 0, 3),
}

for _, loc := range s.locators {
clients, err := loc.Endpoints()
endpoints, err := loc.Endpoints()
if err != nil {
if clients != nil && len(clients) > 0 {
endpoints = append(endpoints, clients...)
log.Warnf("Locator %v resolved the following endpoints: %v, with errors: %v", loc, clients, err)
if endpoints != nil && len(endpoints) > 0 {
result.Candidates = append(result.Candidates, endpoints...)
log.Warnf("Locator %v resolved the following endpoints: %v, with errors: %v", loc, endpoints, err)
} else {
log.Errorf("Locator %v failed to resolve endpoints: %v", loc, err)
}
} else {
endpoints = append(endpoints, clients...)
result.Candidates = append(result.Candidates, endpoints...)
if log.GetLevel() >= log.DebugLevel {
log.Debugf("Locator %v resolved the following endpoints: %v", loc, clients)
log.Debugf("Locator %v resolved the following endpoints: %v", loc, endpoints)
}
}
}
if len(endpoints) == 0 {
return nil, fmt.Errorf("No endpoints returned by any locators")
}

result = &Result{
Candidates: endpoints,
if len(result.Candidates) == 0 {
return result, fmt.Errorf("No endpoints returned by any locators")
}

err = s.Strategy.Select(endpoints)
err = s.Strategy.Select(result.Candidates)
if err != nil {
return result, err
}

result.Selection = make([]*url.URL, 0, len(endpoints))
for _, endpoint := range endpoints {
result.Selection = make([]*url.URL, 0, len(result.Candidates))
for _, endpoint := range result.Candidates {
if endpoint.Selected {
target, err := url.ParseRequestURI(endpoint.Address)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/selector/strategy/minimumhistory/minimumhistory.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s *Selector) Description() string {

// ComparisonMetricName gets the name of the comparison metric/calculation used to make a selection
func (s *Selector) ComparisonMetricName() string {
return "prometheus_build_info"
return "up"
}

// NextIndex returns the index of the target that should be used to field the next request
Expand All @@ -65,7 +65,7 @@ func (s *Selector) Select(endpoints []*locator.PrometheusEndpoint) (err error) {
for _, endpoint := range endpoints {
endpoint.Selected = false
if endpoint.QueryAPI != nil {
value, err := endpoint.QueryAPI.Query(context.TODO(), "prometheus_build_info", time.Now().Add(-1*s.minimumHistory))
value, err := endpoint.QueryAPI.Query(context.TODO(), "max(up)", time.Now().Add(-1*s.minimumHistory))
if err != nil {
log.Errorf("Endpoint %v returned error: %v", endpoint, err)
} else {
Expand Down
22 changes: 7 additions & 15 deletions pkg/selector/strategy/singlemostdata/singlemostdata.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package singlemostdata

import (
"context"
"fmt"
"net/url"
"time"

log "github.com/Sirupsen/logrus"
"github.com/matt-deboer/mpp/pkg/locator"
"github.com/matt-deboer/mpp/pkg/selector"
"github.com/prometheus/common/model"
)

const (
Expand Down Expand Up @@ -60,24 +57,19 @@ func (s *Selector) Select(endpoints []*locator.PrometheusEndpoint) (err error) {
for i, endpoint := range endpoints {
endpoint.Selected = false
if endpoint.QueryAPI != nil {
value, err := endpoint.QueryAPI.Query(context.TODO(), comparisonMetricName, time.Now())
scraped, err := locator.ScrapeMetric(endpoint.Address, "prometheus_local_storage_ingested_samples_total")
if err != nil {
log.Errorf("Endpoint %v returned error: %v", endpoint, err)
endpoint.Error = err
} else {
if log.GetLevel() >= log.DebugLevel {
log.Debugf("Endpoint %v returned value: %v", endpoint, value)
log.Debugf("Endpoint %v returned value: %v", endpoint, scraped)
}
if value.Type() == model.ValVector {
sampleValue := int64(value.(model.Vector)[0].Value)
endpoint.ComparisonMetricValue = sampleValue
if sampleValue > mostData {
mostData = sampleValue
mostDataIndex = i
}
} else {
endpoint.Error = fmt.Errorf("Endpoint %v returned unexpected type: %v", endpoint, value.Type())
log.Error(endpoint.Error)
sampleValue := int64(scraped.Value)
endpoint.ComparisonMetricValue = sampleValue
if sampleValue > mostData {
mostData = sampleValue
mostDataIndex = i
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ func parseLocators(c *cli.Context) []locator.Locator {
locators = append(locators, locator.NewEndpointsFileLocator(endpointsFile))
}

if len(kubeNamespace) > 0 {
if len(kubeServiceName) == 0 && len(kubePodLabelSelector) == 0 {
argError(c, "Kubernetes locator requires one of either 'kube-service-name' or 'kube-pod-label-selector'")
if len(kubeServiceName) > 0 || len(kubePodLabelSelector) > 0 {
if len(kubeNamespace) == 0 {
argError(c, `--kube-namespace is required when using the kubernetes locator`)
}
kubePort := c.String("kube-port")
locator, err := kuberneteslocator.NewKubernetesLocator(kubeconfig,
Expand Down

0 comments on commit c86c283

Please sign in to comment.