-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.go
155 lines (132 loc) · 3.86 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package main
import (
"flag"
"fmt"
"net/http"
"sync"
"time"
"github.com/mercury200Hg/metrics-server-prometheus-exporter/exporter"
"github.com/mercury200Hg/metrics-server-prometheus-exporter/utils"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// Job definition
type Job struct {
Type string
Sleep time.Duration
}
var (
types = []string{"pods", "nodes"}
workers = 0
iterations = 0
inflightCounterVec = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "worker",
Subsystem: "jobs",
Name: "metrics_server_exporter_inflight_jobs",
Help: "Number of jobs in flight for metrics-server-exporter go routine",
},
[]string{"type"},
)
)
func initWorkers() {
flag.IntVar(&workers, "workers", 2, "Number of workers to use")
}
func rootHandler(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, "<h1>Metrics-Server-Exporter</h1><br><div>Please visit <a href='/metrics'>/metrics</a> to see metrics </div>")
}
func logRequestHandler(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.Printf("%s %s %s", r.RemoteAddr, r.Method, r.URL)
handler.ServeHTTP(w, r)
})
}
func main() {
zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
zerolog.ParseLevel("info")
initWorkers()
flag.Parse()
log.Info().Msg("Checking kube-api. Searching for config file/service-accounts...")
status := utils.CheckKubeAPI()
if status == false {
log.Error().Msg("Unable to verify kube config")
} else {
log.Error().Msg("Kube config verified successfully.")
}
prometheus.MustRegister(
exporter.PodMetricCPU,
exporter.PodMetricMemory,
exporter.NodeMetricCPU,
exporter.NodeMetricMemory,
)
// create a channel with a 100 Job buffer
jobsChannel := make(chan *Job, 100)
go startJobProcessor(jobsChannel)
go createJobs(jobsChannel)
log.Info().Msgf("Starting application on port: 9100")
handler := http.NewServeMux()
handler.HandleFunc("/", rootHandler)
handler.Handle("/metrics", logRequestHandler(promhttp.Handler()))
log.Fatal().Err(http.ListenAndServe(fmt.Sprintf(":9100"), handler))
}
// makeJob creates a new job in channel at rate of given sleep time
func makeJob(jobType string) *Job {
duration, _ := time.ParseDuration("30s")
return &Job{
Type: jobType,
Sleep: duration,
}
}
func startJobProcessor(jobs <-chan *Job) {
log.Info().Msgf("Starting %d workers", workers)
wait := sync.WaitGroup{}
wait.Add(workers)
// start given workers
for i := 0; i < workers; i++ {
go func(workerID int) {
// start the worker
startWorker(workerID, jobs)
wait.Done()
}(i)
}
wait.Wait()
}
func createJobs(jobs chan<- *Job) {
for {
// create jobs
for i := 0; i < len(types); i++ {
job := makeJob(types[i])
if i%2 == 0 {
inflightCounterVec.WithLabelValues(job.Type).Inc()
}
jobs <- job
}
// don't file up queue too quickly
generationTime, _ := time.ParseDuration("30s")
time.Sleep(generationTime)
}
}
// creates a worker that pulls job from job channel
func startWorker(workerID int, jobs <-chan *Job) {
for {
select {
// read from the job channel
case job := <-jobs:
startTime := time.Now()
if job.Type == "nodes" {
exporter.RecordNodeMetrics()
log.Info().Msgf("Scrape count:[%d], Worker:[%d]. Processed job for [%s] in %0.3f seconds", iterations, workerID, job.Type, time.Now().Sub(startTime).Seconds())
} else if job.Type == "pods" {
exporter.RecordPodMetrics()
log.Info().Msgf("Scrape count:[%d], Worker:[%d]. Processed job for [%s] in %0.3f seconds", iterations, workerID, job.Type, time.Now().Sub(startTime).Seconds())
// Increase the iteration count
iterations++
}
// Sleep to prevent excess load
log.Info().Msgf("Sleeping workers for %s seconds", job.Sleep.String())
time.Sleep(job.Sleep)
}
}
}