Skip to content

Commit f10b5d7

Browse files
committed
FEATURE: health revamp
1. Added a 'JsonPollingSink'. This sink is an in-memory aggregator. It exposes these aggregations via HTTP. 2. Added healthd. This is a daemon that polls the JsonPollingSink's and aggregates the results. 3. Added healthtop. This is a CLI utility that queries a healthd instance and displays top jobs and hosts.
1 parent 964ef5a commit f10b5d7

29 files changed

+2692
-181
lines changed

README.txt

Lines changed: 0 additions & 75 deletions
This file was deleted.

aggregator.go

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
package health
2+
3+
import (
4+
"time"
5+
)
6+
7+
type aggregator struct {
8+
// How long is each aggregation interval. Eg, 1 minute
9+
intervalDuration time.Duration
10+
11+
// Retain controls how many metrics interval we keep. Eg, 5 minutes
12+
retain time.Duration
13+
14+
// maxIntervals is the maximum length of intervals.
15+
// It is retain / interval.
16+
maxIntervals int
17+
18+
// intervals is a slice of the retained intervals
19+
intervalAggregations []*IntervalAggregation
20+
}
21+
22+
func startAggregator(intervalDuration time.Duration, retain time.Duration, sink *JsonPollingSink) {
23+
cmdChan := sink.cmdChan
24+
doneChan := sink.doneChan
25+
intervalsChanChan := sink.intervalsChanChan
26+
ticker := time.Tick(1 * time.Second)
27+
28+
agg := newAggregator(intervalDuration, retain)
29+
30+
for {
31+
select {
32+
case <-doneChan:
33+
break
34+
case cmd := <-cmdChan:
35+
if cmd.Kind == cmdKindEvent {
36+
agg.EmitEvent(cmd.Job, cmd.Event)
37+
} else if cmd.Kind == cmdKindEventErr {
38+
agg.EmitEventErr(cmd.Job, cmd.Event, cmd.Err)
39+
} else if cmd.Kind == cmdKindTiming {
40+
agg.EmitTiming(cmd.Job, cmd.Event, cmd.Nanos)
41+
} else if cmd.Kind == cmdKindComplete {
42+
agg.EmitComplete(cmd.Job, cmd.Status, cmd.Nanos)
43+
}
44+
case <-ticker:
45+
agg.getIntervalAggregation() // this has the side effect of sliding the interval window if necessary.
46+
case intervalsChan := <-intervalsChanChan:
47+
intervalsChan <- agg.memorySafeIntervals()
48+
}
49+
}
50+
}
51+
52+
func newAggregator(intervalDuration time.Duration, retain time.Duration) *aggregator {
53+
maxIntervals := int(retain / intervalDuration)
54+
return &aggregator{
55+
intervalDuration: intervalDuration,
56+
retain: retain,
57+
maxIntervals: maxIntervals,
58+
intervalAggregations: make([]*IntervalAggregation, 0, maxIntervals),
59+
}
60+
}
61+
62+
func (a *aggregator) memorySafeIntervals() []*IntervalAggregation {
63+
ret := make([]*IntervalAggregation, 0, len(a.intervalAggregations))
64+
curAgg := a.getIntervalAggregation()
65+
66+
for _, intAgg := range a.intervalAggregations {
67+
if intAgg == curAgg {
68+
ret = append(ret, intAgg.Clone())
69+
} else {
70+
ret = append(ret, intAgg)
71+
}
72+
}
73+
74+
return ret
75+
}
76+
77+
func (a *aggregator) EmitEvent(job string, event string) {
78+
intAgg := a.getIntervalAggregation()
79+
intAgg.Events[event] = intAgg.Events[event] + 1
80+
jobAgg := intAgg.getJobAggregation(job)
81+
jobAgg.Events[event] = jobAgg.Events[event] + 1
82+
intAgg.SerialNumber++
83+
}
84+
85+
func (a *aggregator) EmitEventErr(job string, event string, inputErr error) {
86+
intAgg := a.getIntervalAggregation()
87+
errc := intAgg.getCounterErrs(event)
88+
errc.incrementAndAddError(inputErr)
89+
jobAgg := intAgg.getJobAggregation(job)
90+
jerrc := jobAgg.getCounterErrs(event)
91+
jerrc.incrementAndAddError(inputErr)
92+
intAgg.SerialNumber++
93+
}
94+
95+
func (a *aggregator) EmitTiming(job string, event string, nanos int64) {
96+
intAgg := a.getIntervalAggregation()
97+
t := intAgg.getTimers(event)
98+
t.ingest(nanos)
99+
jobAgg := intAgg.getJobAggregation(job)
100+
jt := jobAgg.getTimers(event)
101+
jt.ingest(nanos)
102+
intAgg.SerialNumber++
103+
}
104+
105+
func (a *aggregator) EmitComplete(job string, status CompletionStatus, nanos int64) {
106+
intAgg := a.getIntervalAggregation()
107+
jobAgg := intAgg.getJobAggregation(job)
108+
jobAgg.ingest(status, nanos)
109+
intAgg.SerialNumber++
110+
}
111+
112+
func (a *aggregator) getIntervalAggregation() *IntervalAggregation {
113+
intervalStart := now().Truncate(a.intervalDuration)
114+
115+
n := len(a.intervalAggregations)
116+
if n > 0 && a.intervalAggregations[n-1].IntervalStart == intervalStart {
117+
return a.intervalAggregations[n-1]
118+
}
119+
120+
return a.createIntervalAggregation(intervalStart)
121+
}
122+
123+
func (a *aggregator) createIntervalAggregation(interval time.Time) *IntervalAggregation {
124+
// Make new interval:
125+
current := NewIntervalAggregation(interval)
126+
127+
// If we've reached our max intervals, and we're going to shift everything down, then set the last one
128+
n := len(a.intervalAggregations)
129+
if n == a.maxIntervals {
130+
for i := 1; i < n; i++ {
131+
a.intervalAggregations[i-1] = a.intervalAggregations[i]
132+
}
133+
a.intervalAggregations[n-1] = current
134+
} else {
135+
a.intervalAggregations = append(a.intervalAggregations, current)
136+
}
137+
138+
return current
139+
}
140+
141+
var nowMock time.Time
142+
143+
func now() time.Time {
144+
if nowMock.IsZero() {
145+
return time.Now()
146+
}
147+
return nowMock
148+
}
149+
150+
func setNowMock(t string) {
151+
var err error
152+
nowMock, err = time.Parse(time.RFC3339, t)
153+
if err != nil {
154+
panic(err)
155+
}
156+
}
157+
158+
func resetNowMock() {
159+
nowMock = time.Time{}
160+
}

0 commit comments

Comments
 (0)