Skip to content

Commit 1e0e834

Browse files
committed
FEATURE: gauges
Gauges let you measure the current value of a number that can go up or down over time. They typically have last-win semantics in terms of multiple values being submitted in periods. One common use-case for gauges is periodically sampling the current amount of memory used or the number of active goroutines.
1 parent 1c60aae commit 1e0e834

22 files changed

+274
-19
lines changed

README.md

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,9 @@ There are five types of completion statuses:
9797
* **ValidationError** - Your code was fine, but the user passed in bad inputs, and so the job wasn't completed successfully.
9898
* **Junk** - The job wasn't completed successfully, but not really because of an Error or ValidationError. For instance, maybe there's just a 404 (not found) or 401 (unauthorized) request to your app. This status code might not apply to all apps.
9999

100-
### Events, Timings, and Errors
100+
### Events, Timings, Gauges, and Errors
101101

102-
Within jobs, you can emit events, timings, and errors. The first argument of each of these methods is supposed to be a *key*. Camel case with dots is good because it works with other metrics stores like StatsD. Each method has a basic version as well as a version that accepts keys/values.
102+
Within jobs, you can emit events, timings, gauges, and errors. The first argument of each of these methods is supposed to be a *key*. Camel case with dots is good because it works with other metrics stores like StatsD. Each method has a basic version as well as a version that accepts keys/values.
103103

104104
#### Events
105105

@@ -140,7 +140,25 @@ job.TimingKv("fetch_user", time.Since(startTime).Nanoseconds(),
140140
```
141141

142142
* For the StatsD sink, we'll send it to StatsD as a timing.
143-
* The JSON polling will compute a summary of your timings: min, max, avg, stddev, count, sum.
143+
* The JSON polling sink will compute a summary of your timings: min, max, avg, stddev, count, sum.
144+
145+
#### Gauges
146+
147+
```go
148+
// Gauges:
149+
job.Gauge("num_goroutines", numRunningGoroutines())
150+
151+
// Timings also support keys/values:
152+
job.GaugeKv("num_goroutines", numRunningGoroutines(),
153+
health.Kvs{"dispatcher": dispatcherStatus()})
154+
```
155+
156+
* For the WriterSink, a timing is just like logging to a file:
157+
```
158+
[2014-12-17T20:36:24.136663759Z]: job:/api/v2/user_stories event:num_goroutines gauge:17 kvs:[request-id:F8a8bQOWmRpO6ky dispatcher:running]
159+
```
160+
161+
* For the StatsD sink, we'll send it to StatsD as a gauge.
144162

145163
#### Errors
146164

@@ -228,6 +246,7 @@ type Sink interface {
228246
EmitEvent(job string, event string, kvs map[string]string)
229247
EmitEventErr(job string, event string, err error, kvs map[string]string)
230248
EmitTiming(job string, event string, nanoseconds int64, kvs map[string]string)
249+
EmitGauge(job string, event string, value float64, kvs map[string]string)
231250
EmitComplete(job string, status CompletionStatus, nanoseconds int64, kvs map[string]string)
232251
}
233252
```
@@ -361,7 +380,6 @@ Request for contributions:
361380

362381
health core:
363382

364-
* A gauge metric type
365383
* A way to do fine-grained histograms with variable binning.
366384

367385
healthd & healthtop

aggregator.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ AGGREGATE_LOOP:
4040
agg.EmitEventErr(cmd.Job, cmd.Event, cmd.Err)
4141
} else if cmd.Kind == cmdKindTiming {
4242
agg.EmitTiming(cmd.Job, cmd.Event, cmd.Nanos)
43+
} else if cmd.Kind == cmdKindGauge {
44+
agg.EmitGauge(cmd.Job, cmd.Event, cmd.Value)
4345
} else if cmd.Kind == cmdKindComplete {
4446
agg.EmitComplete(cmd.Job, cmd.Status, cmd.Nanos)
4547
}
@@ -104,6 +106,14 @@ func (a *aggregator) EmitTiming(job string, event string, nanos int64) {
104106
intAgg.SerialNumber++
105107
}
106108

109+
func (a *aggregator) EmitGauge(job string, event string, value float64) {
110+
intAgg := a.getIntervalAggregation()
111+
intAgg.Gauges[event] = value
112+
jobAgg := intAgg.getJobAggregation(job)
113+
jobAgg.Gauges[event] = value
114+
intAgg.SerialNumber++
115+
}
116+
107117
func (a *aggregator) EmitComplete(job string, status CompletionStatus, nanos int64) {
108118
intAgg := a.getIntervalAggregation()
109119
jobAgg := intAgg.getJobAggregation(job)

aggregator_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,43 @@ func TestEmitTiming(t *testing.T) {
164164
assert.EqualValues(t, 9, tAgg.NanosMax)
165165
}
166166

167+
func TestEmitGauge(t *testing.T) {
168+
setNowMock("2011-09-09T23:36:13Z")
169+
defer resetNowMock()
170+
a := newAggregator(time.Minute, time.Minute*5)
171+
a.EmitGauge("foo", "bar", 100)
172+
173+
assert.Equal(t, 1, len(a.intervalAggregations))
174+
175+
intAgg := a.intervalAggregations[0]
176+
assert.NotNil(t, intAgg.Gauges)
177+
assert.EqualValues(t, 1, intAgg.SerialNumber)
178+
v, ok := intAgg.Gauges["bar"]
179+
assert.True(t, ok)
180+
assert.Equal(t, 100.0, v)
181+
182+
assert.NotNil(t, intAgg.Jobs)
183+
jobAgg := intAgg.Jobs["foo"]
184+
assert.NotNil(t, jobAgg)
185+
assert.NotNil(t, jobAgg.Gauges)
186+
v, ok = intAgg.Gauges["bar"]
187+
assert.True(t, ok)
188+
assert.Equal(t, 100.0, v)
189+
190+
// Another gauge:
191+
a.EmitGauge("baz", "bar", 3.14) // note: diff job
192+
193+
intAgg = a.intervalAggregations[0]
194+
v, ok = intAgg.Gauges["bar"]
195+
assert.True(t, ok)
196+
assert.Equal(t, 3.14, v)
197+
198+
jobAgg = intAgg.Jobs["baz"]
199+
v, ok = intAgg.Gauges["bar"]
200+
assert.True(t, ok)
201+
assert.Equal(t, 3.14, v)
202+
}
203+
167204
func TestEmitComplete(t *testing.T) {
168205
setNowMock("2011-09-09T23:36:13Z")
169206
defer resetNowMock()

error_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ func (s *testSink) EmitEventErr(job string, event string, inputErr error, kvs ma
3232
last.WasRaw = true
3333
}
3434
}
35-
func (s *testSink) EmitTiming(job string, event string, nanos int64, kvs map[string]string) {}
35+
func (s *testSink) EmitTiming(job string, event string, nanos int64, kvs map[string]string) {}
36+
func (s *testSink) EmitGauge(job string, event string, value float64, kvs map[string]string) {}
3637
func (s *testSink) EmitComplete(job string, status CompletionStatus, nanos int64, kvs map[string]string) {
3738
}
3839

health.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ type EventReceiver interface {
1515
EventErrKv(eventName string, err error, kvs map[string]string) error
1616
Timing(eventName string, nanoseconds int64)
1717
TimingKv(eventName string, nanoseconds int64, kvs map[string]string)
18+
Gauge(eventName, value float64)
19+
GaugeKv(eventName, value float64, kvs map[string]string)
1820
}
1921

2022
type Stream struct {
@@ -57,6 +59,7 @@ type Sink interface {
5759
EmitEventErr(job string, event string, err error, kvs map[string]string)
5860
EmitTiming(job string, event string, nanoseconds int64, kvs map[string]string)
5961
EmitComplete(job string, status CompletionStatus, nanoseconds int64, kvs map[string]string)
62+
EmitGauge(job string, event string, value float64, kvs map[string]string)
6063
}
6164

6265
func NewStream() *Stream {
@@ -146,6 +149,20 @@ func (j *Job) TimingKv(eventName string, nanoseconds int64, kvs map[string]strin
146149
}
147150
}
148151

152+
func (j *Job) Gauge(eventName string, value float64) {
153+
allKvs := j.mergedKeyValues(nil)
154+
for _, sink := range j.Stream.Sinks {
155+
sink.EmitGauge(j.JobName, eventName, value, allKvs)
156+
}
157+
}
158+
159+
func (j *Job) GaugeKv(eventName string, value float64, kvs map[string]string) {
160+
allKvs := j.mergedKeyValues(kvs)
161+
for _, sink := range j.Stream.Sinks {
162+
sink.EmitGauge(j.JobName, eventName, value, allKvs)
163+
}
164+
}
165+
149166
func (j *Job) Complete(status CompletionStatus) {
150167
allKvs := j.mergedKeyValues(nil)
151168
for _, sink := range j.Stream.Sinks {

interval_aggregation.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type IntervalAggregation struct {
2424

2525
type aggregationMaps struct {
2626
Timers map[string]*TimerAggregation `json:"timers"`
27+
Gauges map[string]float64 `json:"gauges"`
2728
Events map[string]int64 `json:"events"`
2829
EventErrs map[string]*ErrorCounter `json:"event_errs"`
2930
}
@@ -67,6 +68,7 @@ func NewIntervalAggregation(intervalStart time.Time) *IntervalAggregation {
6768

6869
func (am *aggregationMaps) initAggregationMaps() {
6970
am.Timers = make(map[string]*TimerAggregation)
71+
am.Gauges = make(map[string]float64)
7072
am.Events = make(map[string]int64)
7173
am.EventErrs = make(map[string]*ErrorCounter)
7274
}

interval_aggregation_clone.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ func (am *aggregationMaps) Clone() *aggregationMaps {
2424
dup.Events[k] = v
2525
}
2626

27+
for k, v := range am.Gauges {
28+
dup.Gauges[k] = v
29+
}
30+
2731
for k, v := range am.Timers {
2832
dup.Timers[k] = v.Clone()
2933
}

interval_aggregation_clone_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ func TestClone(t *testing.T) {
2727
a.EmitEvent("foo", "bar")
2828
a.EmitTiming("foo", "bar", 100)
2929
a.EmitEventErr("foo", "bar", fmt.Errorf("hi"))
30+
a.EmitGauge("foo", "bar", 3.14)
3031
a.EmitComplete("foo", Error, 99)
3132

3233
assert.Equal(t, 301, len(intAgg.Jobs))
@@ -51,11 +52,15 @@ func assertAggregationData(t *testing.T, intAgg *IntervalAggregation) {
5152
assert.Equal(t, 300, len(intAgg.Jobs))
5253
assert.Equal(t, 1200, len(intAgg.Events))
5354
assert.Equal(t, 1200, len(intAgg.Timers))
55+
assert.Equal(t, 1200, len(intAgg.Gauges))
5456
assert.Equal(t, 1200, len(intAgg.EventErrs))
5557

5658
// Spot-check events:
5759
assert.EqualValues(t, 1, intAgg.Events["event0"])
5860

61+
// Spot check gauges:
62+
assert.EqualValues(t, 3.14, intAgg.Gauges["gauge0"])
63+
5964
// Spot-check timings:
6065
assert.EqualValues(t, 1, intAgg.Timers["timing0"].Count)
6166
assert.EqualValues(t, 12, intAgg.Timers["timing0"].NanosSum)
@@ -70,6 +75,8 @@ func assertAggregationData(t *testing.T, intAgg *IntervalAggregation) {
7075
assert.EqualValues(t, 0, job.CountError)
7176
assert.EqualValues(t, 1, job.Events["event0"])
7277
assert.EqualValues(t, 0, job.Events["event4"])
78+
assert.EqualValues(t, 3.14, job.Gauges["gauge0"])
79+
assert.EqualValues(t, 0.0, job.Gauges["gauge4"])
7380
assert.EqualValues(t, 1, job.Timers["timing0"].Count)
7481
assert.EqualValues(t, 12, job.Timers["timing0"].NanosSum)
7582
assert.EqualValues(t, 1, job.EventErrs["err0"].Count)
@@ -90,6 +97,7 @@ func aggregatorWithData() *aggregator {
9097
// We want 300 jobs
9198
// Each job will have 5 events, but we want 1200 events total
9299
// Each job will have 5 timers, but we want 1200 timers total
100+
// Each job will have 5 gauges, but we want 1200 gauges total
93101
// Each job will have 5 errs, but we want 1200 errs total
94102
// Given this 300/1200 dichotomy,
95103
// - the first job will have 4 events, the next job 4 events, etc.
@@ -109,6 +117,11 @@ func aggregatorWithData() *aggregator {
109117
timings = append(timings, fmt.Sprintf("timing%d", i))
110118
}
111119

120+
gauges := []string{}
121+
for i := 0; i < 1200; i++ {
122+
gauges = append(gauges, fmt.Sprintf("gauge%d", i))
123+
}
124+
112125
eventErrs := []eventErr{}
113126
for i := 0; i < 1200; i++ {
114127
eventErrs = append(eventErrs, eventErr{
@@ -141,6 +154,14 @@ func aggregatorWithData() *aggregator {
141154
}
142155
}
143156

157+
cur = 0
158+
for _, j := range jobs {
159+
for i := 0; i < 4; i++ {
160+
a.EmitGauge(j, gauges[cur], 3.14)
161+
cur++
162+
}
163+
}
164+
144165
for _, j := range jobs {
145166
a.EmitComplete(j, Success, 12)
146167
}

interval_aggregation_merge.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ func (intoAm *aggregationMaps) merge(fromAm *aggregationMaps) {
4343
intoAm.Events[k] += v
4444
}
4545

46+
for k, v := range fromAm.Gauges {
47+
intoAm.Gauges[k] = v
48+
}
49+
4650
for k, v := range fromAm.Timers {
4751
if existingTimer, ok := intoAm.Timers[k]; ok {
4852
existingTimer.merge(v)

interval_aggregation_merge_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,25 @@ func TestMerge(t *testing.T) {
3030
a2 := aggregatorWithData()
3131
intAgg2 := a2.intervalAggregations[0]
3232

33+
// Modify a gauge:
34+
a2.EmitGauge("job0", "gauge1", 5.5)
35+
3336
intAgg.Merge(intAgg2)
3437

3538
// same number of events:
3639
assert.Equal(t, 300, len(intAgg.Jobs))
3740
assert.Equal(t, 1200, len(intAgg.Events))
3841
assert.Equal(t, 1200, len(intAgg.Timers))
42+
assert.Equal(t, 1200, len(intAgg.Gauges))
3943
assert.Equal(t, 1200, len(intAgg.EventErrs))
4044

4145
// Spot-check events:
4246
assert.EqualValues(t, 2, intAgg.Events["event0"])
4347

48+
// Spot-check gauges:
49+
assert.EqualValues(t, 3.14, intAgg.Gauges["gauge0"])
50+
assert.EqualValues(t, 5.5, intAgg.Gauges["gauge1"]) // 5.5 takes precedence over 3.14 (argument to merge takes precedence.)
51+
4452
// Spot-check timings:
4553
assert.EqualValues(t, 2, intAgg.Timers["timing0"].Count)
4654
assert.EqualValues(t, 24, intAgg.Timers["timing0"].NanosSum)
@@ -55,6 +63,7 @@ func TestMerge(t *testing.T) {
5563
assert.EqualValues(t, 0, job.CountError)
5664
assert.EqualValues(t, 2, job.Events["event0"])
5765
assert.EqualValues(t, 0, job.Events["event4"])
66+
assert.EqualValues(t, 3.14, job.Gauges["gauge0"])
5867
assert.EqualValues(t, 2, job.Timers["timing0"].Count)
5968
assert.EqualValues(t, 24, job.Timers["timing0"].NanosSum)
6069
assert.EqualValues(t, 2, job.EventErrs["err0"].Count)

0 commit comments

Comments
 (0)