-
Notifications
You must be signed in to change notification settings - Fork 46
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #3 from weeco/master
Use prometheus collector interface (bigger refactoring)
- Loading branch information
Showing
17 changed files
with
885 additions
and
704 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,15 +1,15 @@ | ||
# build image | ||
FROM golang:1.11-alpine as builder | ||
FROM golang:1.12-alpine as builder | ||
RUN apk update && apk add git ca-certificates | ||
|
||
WORKDIR /app | ||
COPY . . | ||
|
||
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -installsuffix cgo -o /go/bin/kube-eagle | ||
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -installsuffix cgo -o /go/bin/kube-eagle | ||
|
||
# executable image | ||
FROM scratch | ||
COPY --from=builder /go/bin/kube-eagle /go/bin/kube-eagle | ||
|
||
ENV VERSION 1.0.3 | ||
ENV VERSION 1.1.0 | ||
ENTRYPOINT ["/go/bin/kube-eagle"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
package collector | ||
|
||
import ( | ||
"fmt" | ||
"github.com/google-cloud-tools/kube-eagle/kubernetes" | ||
"github.com/google-cloud-tools/kube-eagle/options" | ||
"github.com/prometheus/client_golang/prometheus" | ||
log "github.com/sirupsen/logrus" | ||
"sync" | ||
"time" | ||
) | ||
|
||
type collectorFactoryFunc = func(opts *options.Options) (Collector, error) | ||
|
||
var ( | ||
kubernetesClient *kubernetes.Client | ||
scrapeDurationDesc *prometheus.Desc | ||
scrapeSuccessDesc *prometheus.Desc | ||
factoriesByCollectorName = make(map[string]collectorFactoryFunc) | ||
) | ||
|
||
// registerCollector adds a collector to the registry so that it's Update() method will be called every time | ||
// the metrics endpoint is triggered | ||
func registerCollector(collectorName string, collectorFactory collectorFactoryFunc) { | ||
log.Debugf("Registering collector '%s'", collectorName) | ||
factoriesByCollectorName[collectorName] = collectorFactory | ||
} | ||
|
||
// KubeEagleCollector implements the prometheus collector interface | ||
type KubeEagleCollector struct { | ||
CollectorByName map[string]Collector | ||
} | ||
|
||
// NewKubeEagleCollector creates a new KubeEagle collector which can be considered as manager of multiple collectors | ||
func NewKubeEagleCollector(opts *options.Options) (*KubeEagleCollector, error) { | ||
// Create registered collectors by executing it's collector factory function | ||
collectorByName := make(map[string]Collector) | ||
for collectorName, factory := range factoriesByCollectorName { | ||
log.Debugf("Creating collector '%s'", collectorName) | ||
collector, err := factory(opts) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to create collector '%s': '%s'", collectorName, err) | ||
} | ||
collectorByName[collectorName] = collector | ||
} | ||
|
||
var err error | ||
kubernetesClient, err = kubernetes.NewClient(opts) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to initialize kubernetes client: '%v'", err) | ||
} | ||
|
||
scrapeDurationDesc = prometheus.NewDesc( | ||
prometheus.BuildFQName(opts.Namespace, "scrape", "collector_duration_seconds"), | ||
"Kube Eagle: Duration of a collector scrape.", | ||
[]string{"collector"}, | ||
nil, | ||
) | ||
scrapeSuccessDesc = prometheus.NewDesc( | ||
prometheus.BuildFQName(opts.Namespace, "scrape", "collector_success"), | ||
"Kube Eagle: Whether a collector succeeded.", | ||
[]string{"collector"}, | ||
nil, | ||
) | ||
|
||
return &KubeEagleCollector{CollectorByName: collectorByName}, nil | ||
} | ||
|
||
// Describe implements the prometheus.Collector interface | ||
func (k KubeEagleCollector) Describe(ch chan<- *prometheus.Desc) { | ||
ch <- scrapeDurationDesc | ||
ch <- scrapeSuccessDesc | ||
} | ||
|
||
// Collect implements the prometheus.Collector interface | ||
func (k KubeEagleCollector) Collect(ch chan<- prometheus.Metric) { | ||
wg := sync.WaitGroup{} | ||
|
||
// Run all collectors concurrently and add meta information about that (such as request duration and error/success count) | ||
for name, collector := range k.CollectorByName { | ||
wg.Add(1) | ||
go func(wg *sync.WaitGroup, collectorName string, c Collector) { | ||
defer wg.Done() | ||
begin := time.Now() | ||
err := c.updateMetrics(ch) | ||
duration := time.Since(begin) | ||
|
||
var isSuccess float64 | ||
if err != nil { | ||
log.Errorf("Collector '%s' failed after %fs: %s", collectorName, duration.Seconds(), err) | ||
isSuccess = 0 | ||
} else { | ||
log.Debugf("Collector '%s' succeeded after %fs.", collectorName, duration.Seconds()) | ||
isSuccess = 1 | ||
} | ||
ch <- prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, duration.Seconds(), collectorName) | ||
ch <- prometheus.MustNewConstMetric(scrapeSuccessDesc, prometheus.GaugeValue, isSuccess, collectorName) | ||
}(&wg, name, collector) | ||
} | ||
wg.Wait() | ||
} | ||
|
||
// IsHealthy returns a bool which indicates whether the collector is working properly or not | ||
func (k KubeEagleCollector) IsHealthy() bool { | ||
return kubernetesClient.IsHealthy() | ||
} | ||
|
||
// Collector is an interface which has to be implemented for each collector which wants to expose metrics | ||
type Collector interface { | ||
updateMetrics(ch chan<- prometheus.Metric) error | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,197 @@ | ||
package collector | ||
|
||
import ( | ||
"github.com/google-cloud-tools/kube-eagle/options" | ||
"github.com/prometheus/client_golang/prometheus" | ||
log "github.com/sirupsen/logrus" | ||
corev1 "k8s.io/api/core/v1" | ||
v1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" | ||
"sync" | ||
) | ||
|
||
type containerResourcesCollector struct { | ||
// Resource limits | ||
limitCPUCoresDesc *prometheus.Desc | ||
limitMemoryBytesDesc *prometheus.Desc | ||
|
||
// Resource requests | ||
requestCPUCoresDesc *prometheus.Desc | ||
requestMemoryBytesDesc *prometheus.Desc | ||
|
||
// Resource usage | ||
usageCPUCoresDesc *prometheus.Desc | ||
usageMemoryBytesDesc *prometheus.Desc | ||
} | ||
|
||
func init() { | ||
registerCollector("container_resources", newContainerResourcesCollector) | ||
} | ||
|
||
func newContainerResourcesCollector(opts *options.Options) (Collector, error) { | ||
subsystem := "pod_container_resource" | ||
labels := []string{"pod", "container", "qos", "phase", "namespace"} | ||
|
||
return &containerResourcesCollector{ | ||
// Prometheus metrics | ||
// Resource limits | ||
limitCPUCoresDesc: prometheus.NewDesc( | ||
prometheus.BuildFQName(opts.Namespace, subsystem, "limits_cpu_cores"), | ||
"The container's CPU limit in Kubernetes", | ||
labels, | ||
prometheus.Labels{}, | ||
), | ||
limitMemoryBytesDesc: prometheus.NewDesc( | ||
prometheus.BuildFQName(opts.Namespace, subsystem, "limits_memory_bytes"), | ||
"The container's RAM limit in Kubernetes", | ||
labels, | ||
prometheus.Labels{}, | ||
), | ||
// Resource requests | ||
requestCPUCoresDesc: prometheus.NewDesc( | ||
prometheus.BuildFQName(opts.Namespace, subsystem, "requests_cpu_cores"), | ||
"The container's requested CPU resources in Kubernetes", | ||
labels, | ||
prometheus.Labels{}, | ||
), | ||
requestMemoryBytesDesc: prometheus.NewDesc( | ||
prometheus.BuildFQName(opts.Namespace, subsystem, "requests_memory_bytes"), | ||
"The container's requested RAM resources in Kubernetes", | ||
labels, | ||
prometheus.Labels{}, | ||
), | ||
// Resource usage | ||
usageCPUCoresDesc: prometheus.NewDesc( | ||
prometheus.BuildFQName(opts.Namespace, subsystem, "usage_cpu_cores"), | ||
"CPU usage in number of cores", | ||
labels, | ||
prometheus.Labels{}, | ||
), | ||
usageMemoryBytesDesc: prometheus.NewDesc( | ||
prometheus.BuildFQName(opts.Namespace, subsystem, "usage_memory_bytes"), | ||
"RAM usage in bytes", | ||
labels, | ||
prometheus.Labels{}, | ||
), | ||
}, nil | ||
} | ||
|
||
func (c *containerResourcesCollector) updateMetrics(ch chan<- prometheus.Metric) error { | ||
log.Debug("Collecting container metrics") | ||
|
||
var wg sync.WaitGroup | ||
var podList *corev1.PodList | ||
var podListError error | ||
var podMetricses *v1beta1.PodMetricsList | ||
var podMetricsesError error | ||
|
||
// Get pod list | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
podList, podListError = kubernetesClient.PodList() | ||
}() | ||
|
||
// Get node resource usage metrics | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
podMetricses, podMetricsesError = kubernetesClient.PodMetricses() | ||
}() | ||
|
||
wg.Wait() | ||
if podListError != nil { | ||
log.Warn("Failed to get podList from Kubernetes", podListError) | ||
return podListError | ||
} | ||
if podMetricsesError != nil { | ||
log.Warn("Failed to get podMetricses from Kubernetes", podMetricsesError) | ||
return podMetricsesError | ||
} | ||
|
||
containerMetricses := buildEnrichedContainerMetricses(podList, podMetricses) | ||
|
||
for _, containerMetrics := range containerMetricses { | ||
cm := *containerMetrics | ||
log.Debugf("Test") | ||
labelValues := []string{cm.Pod, cm.Container, cm.Qos, cm.Phase, cm.Namespace} | ||
ch <- prometheus.MustNewConstMetric(c.requestCPUCoresDesc, prometheus.GaugeValue, cm.RequestCPUCores, labelValues...) | ||
ch <- prometheus.MustNewConstMetric(c.requestMemoryBytesDesc, prometheus.GaugeValue, cm.RequestMemoryBytes, labelValues...) | ||
ch <- prometheus.MustNewConstMetric(c.limitCPUCoresDesc, prometheus.GaugeValue, cm.LimitCPUCores, labelValues...) | ||
ch <- prometheus.MustNewConstMetric(c.limitMemoryBytesDesc, prometheus.GaugeValue, cm.LimitMemoryBytes, labelValues...) | ||
ch <- prometheus.MustNewConstMetric(c.usageCPUCoresDesc, prometheus.GaugeValue, cm.UsageCPUCores, labelValues...) | ||
ch <- prometheus.MustNewConstMetric(c.usageMemoryBytesDesc, prometheus.GaugeValue, cm.UsageMemoryBytes, labelValues...) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
type enrichedContainerMetricses struct { | ||
Node string | ||
Pod string | ||
Container string | ||
Qos string | ||
Phase string | ||
Namespace string | ||
RequestCPUCores float64 | ||
RequestMemoryBytes float64 | ||
LimitCPUCores float64 | ||
LimitMemoryBytes float64 | ||
UsageCPUCores float64 | ||
UsageMemoryBytes float64 | ||
} | ||
|
||
// buildEnrichedContainerMetricses merges the container metrics from two requests (podList request and podMetrics request) into | ||
// one, so that we can expose valuable metadata (such as a nodename) as prometheus labels which is just present | ||
// in one of the both responses. | ||
func buildEnrichedContainerMetricses(podList *corev1.PodList, podMetricses *v1beta1.PodMetricsList) []*enrichedContainerMetricses { | ||
// Group container metricses by pod name | ||
containerMetricsesByPod := make(map[string]map[string]v1beta1.ContainerMetrics) | ||
for _, pm := range podMetricses.Items { | ||
containerMetricses := make(map[string]v1beta1.ContainerMetrics) | ||
for _, c := range pm.Containers { | ||
containerMetricses[c.Name] = c | ||
} | ||
containerMetricsesByPod[pm.Name] = containerMetricses | ||
} | ||
|
||
var containerMetricses []*enrichedContainerMetricses | ||
for _, podInfo := range podList.Items { | ||
containers := append(podInfo.Spec.Containers, podInfo.Spec.InitContainers...) | ||
|
||
for _, containerInfo := range containers { | ||
qos := string(podInfo.Status.QOSClass) | ||
|
||
// Resources requested | ||
requestCPUCores := float64(containerInfo.Resources.Requests.Cpu().MilliValue()) / 1000 | ||
requestMemoryBytes := float64(containerInfo.Resources.Requests.Memory().MilliValue()) / 1000 | ||
|
||
// Resources limit | ||
limitCPUCores := float64(containerInfo.Resources.Limits.Cpu().MilliValue()) / 1000 | ||
limitMemoryBytes := float64(containerInfo.Resources.Limits.Memory().MilliValue()) / 1000 | ||
|
||
// Resources usage | ||
containerUsageMetrics := containerMetricsesByPod[podInfo.Name][containerInfo.Name] | ||
usageCPUCores := float64(containerUsageMetrics.Usage.Cpu().MilliValue()) / 1000 | ||
usageMemoryBytes := float64(containerUsageMetrics.Usage.Memory().MilliValue()) / 1000 | ||
|
||
nodeName := podInfo.Spec.NodeName | ||
metric := &enrichedContainerMetricses{ | ||
Node: nodeName, | ||
Container: containerInfo.Name, | ||
Pod: podInfo.Name, | ||
Qos: qos, | ||
Phase: string(podInfo.Status.Phase), | ||
Namespace: podInfo.Namespace, | ||
RequestCPUCores: requestCPUCores, | ||
RequestMemoryBytes: requestMemoryBytes, | ||
LimitCPUCores: limitCPUCores, | ||
LimitMemoryBytes: limitMemoryBytes, | ||
UsageCPUCores: usageCPUCores, | ||
UsageMemoryBytes: usageMemoryBytes, | ||
} | ||
containerMetricses = append(containerMetricses, metric) | ||
} | ||
} | ||
|
||
return containerMetricses | ||
} |
Oops, something went wrong.