Skip to content

Commit 6e4a95d

Browse files
committed
monitor: add ps-style support to current tasks
Change-Id: I26a0e811081ddfaf8451f11429c138a7008e9dd6
1 parent d1b9556 commit 6e4a95d

File tree

8 files changed

+107
-14
lines changed

8 files changed

+107
-14
lines changed

group_disabled.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222

2323
func (g *MonitorGroup) Stats(cb func(name string, val float64)) {}
2424

25+
func (g *MonitorGroup) Running(cb func(name string, current []*TaskCtx)) {}
26+
2527
func (g *MonitorGroup) Datapoints(reset bool, cb func(name string,
2628
data [][]float64, total uint64, clipped bool, fraction float64)) {
2729
}

group_enabled.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ import (
2020
"fmt"
2121
"strings"
2222

23-
"golang.org/x/net/context"
2423
"github.com/spacemonkeygo/errors"
2524
"github.com/spacemonkeygo/monitor/trace"
25+
"golang.org/x/net/context"
2626
)
2727

2828
// Stats conforms to the Monitor interface. Stats aggregates all statistics
@@ -41,6 +41,22 @@ func (g *MonitorGroup) Stats(cb func(name string, val float64)) {
4141
}
4242
}
4343

44+
// Running collects lists of all running tasks by name
45+
func (g *MonitorGroup) Running(cb func(name string, current []*TaskCtx)) {
46+
snapshot := g.monitors.Snapshot()
47+
for _, name := range sortedStringKeys(snapshot) {
48+
cache_val := snapshot[name]
49+
mon, ok := cache_val.(*TaskMonitor)
50+
if !ok {
51+
continue
52+
}
53+
current := mon.Running()
54+
if len(current) > 0 {
55+
cb(fmt.Sprintf("%s.%s", g.group_name, name), current)
56+
}
57+
}
58+
}
59+
4460
// Datapoints conforms to the DataCollection interface. Datapoints aggregates
4561
// all datasets attached to this group.
4662
func (g *MonitorGroup) Datapoints(reset bool, cb func(name string,
@@ -276,3 +292,5 @@ func (self *MonitorGroup) TracedTask(ctx *context.Context) func(*error) {
276292
}
277293
}
278294
}
295+
296+
var _ RunningTasksCollector = (*MonitorGroup)(nil)

http.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,37 @@ package monitor
1717
import (
1818
"fmt"
1919
"net/http"
20+
"sort"
21+
"strings"
22+
"time"
2023
)
2124

25+
type durationSort []time.Duration
26+
27+
func (d durationSort) Len() int { return len(d) }
28+
func (d durationSort) Less(i, j int) bool { return d[i] < d[j] }
29+
func (d durationSort) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
30+
2231
// ServeHTTP dumps all of the MonitorStore's keys and values to the requester.
2332
// This method allows a MonitorStore to be registered as an HTTP handler.
2433
func (s *MonitorStore) ServeHTTP(w http.ResponseWriter, req *http.Request) {
2534
w.Header().Set("Content-Type", "text/plain")
35+
36+
if strings.HasSuffix(req.URL.Path, "running") {
37+
s.Running(func(name string, current []*TaskCtx) {
38+
fmt.Fprintf(w, "%s - %d tasks\n", name, len(current))
39+
durs := make([]time.Duration, 0, len(current))
40+
for _, task := range current {
41+
durs = append(durs, task.ElapsedTime())
42+
}
43+
sort.Sort(sort.Reverse(durationSort(durs)))
44+
for _, dur := range durs {
45+
fmt.Fprintf(w, "\t%s\n", dur)
46+
}
47+
})
48+
return
49+
}
50+
2651
s.Stats(func(name string, val float64) {
2752
fmt.Fprintf(w, "%s\t%f\n", name, val)
2853
})

monitor.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ type Monitor interface {
4141
Stats(cb func(name string, val float64))
4242
}
4343

44+
// RunningTasksCollector keeps track of tasks that are currently in process.
45+
type RunningTasksCollector interface {
46+
Running(cb func(name string, current []*TaskCtx))
47+
}
48+
4449
// DataCollection is the basic key/vector interface. Anything that implements
4550
// the DataCollection interface can be connected to the monitor system for
4651
// later processing.
@@ -75,6 +80,11 @@ func sortedStringKeys(snapshot map[interface{}]interface{}) []string {
7580
// Stats calls cb with all the statistics registered on the default store.
7681
func Stats(cb func(name string, val float64)) { DefaultStore.Stats(cb) }
7782

83+
// Running calls cb with lists of currently running tasks by name.
84+
func Running(cb func(name string, current []*TaskCtx)) {
85+
DefaultStore.Running(cb)
86+
}
87+
7888
// Datapoints calls cb with all the datasets registered on the default store.
7989
func Datapoints(reset bool, cb func(name string, data [][]float64, total uint64,
8090
clipped bool, fraction float64)) {

store.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,17 @@ func (s *MonitorStore) Stats(cb func(name string, val float64)) {
4343
}
4444
}
4545

46+
// Running collects lists of all running tasks by name
47+
func (s *MonitorStore) Running(cb func(name string, current []*TaskCtx)) {
48+
snapshot := s.groups.Snapshot()
49+
for _, name := range sortedStringKeys(snapshot) {
50+
cache_val := snapshot[name]
51+
if mon, ok := cache_val.(RunningTasksCollector); ok {
52+
mon.Running(cb)
53+
}
54+
}
55+
}
56+
4657
// Datapoints conforms to the DataCollection interface
4758
func (s *MonitorStore) Datapoints(reset bool, cb func(name string,
4859
data [][]float64, total uint64, clipped bool, fraction float64)) {
@@ -83,3 +94,5 @@ func (s *MonitorStore) GetMonitorsNamed(group_name string) *MonitorGroup {
8394
func (s *MonitorStore) GetMonitors() *MonitorGroup {
8495
return s.GetMonitorsNamed(PackageName())
8596
}
97+
98+
var _ RunningTasksCollector = (*MonitorStore)(nil)

tasks.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ package monitor
1616

1717
import (
1818
"sync"
19+
"time"
20+
21+
"github.com/spacemonkeygo/monotime"
1922
)
2023

2124
// TaskMonitor is a type for keeping track of tasks. A TaskMonitor will keep
@@ -39,14 +42,26 @@ type TaskMonitor struct {
3942
total_timing *IntValueMonitor
4043
errors map[string]uint64
4144
panics uint64
45+
running map[*TaskCtx]bool
4246
}
4347

4448
// NewTaskMonitor returns a new TaskMonitor. You probably want to create
4549
// a TaskMonitor using MonitorGroup.Task instead.
4650
func NewTaskMonitor() *TaskMonitor {
4751
return &TaskMonitor{
48-
errors: make(map[string]uint64),
4952
success_timing: NewIntValueMonitor(),
5053
error_timing: NewIntValueMonitor(),
51-
total_timing: NewIntValueMonitor()}
54+
total_timing: NewIntValueMonitor(),
55+
errors: make(map[string]uint64),
56+
running: make(map[*TaskCtx]bool)}
57+
}
58+
59+
// TaskCtx keeps track of a task as it is running.
60+
type TaskCtx struct {
61+
start time.Duration
62+
monitor *TaskMonitor
63+
}
64+
65+
func (t TaskCtx) ElapsedTime() time.Duration {
66+
return monotime.Monotonic() - t.start
5267
}

tasks_disabled.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,6 @@ func (t *TaskMonitor) Stats(cb func(name string, val float64)) {}
2121
func (t *TaskMonitor) Start() func(*error) { return func(*error) {} }
2222
func (t *TaskMonitor) NewContext() *TaskCtx { return &TaskCtx{} }
2323

24-
type TaskCtx struct{}
25-
2624
func (c *TaskCtx) Finish(err_ref *error, rec interface{}) {}
25+
26+
func (t *TaskMonitor) Running() (rv []*TaskCtx) { return nil }

tasks_enabled.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package monitor
1919
import (
2020
"fmt"
2121
"sort"
22-
"time"
2322

2423
"github.com/spacemonkeygo/errors"
2524
"github.com/spacemonkeygo/monotime"
@@ -30,12 +29,6 @@ const (
3029
microsecondInNanoseconds = 1000
3130
)
3231

33-
// TaskCtx keeps track of a task as it is running.
34-
type TaskCtx struct {
35-
start time.Duration
36-
monitor *TaskMonitor
37-
}
38-
3932
// Start is a helper method for watching a task in a less error-prone way.
4033
// Managing a task context yourself is tricky to get right - recover only works
4134
// in deferred methods. Call out of a method that was deferred and it no longer
@@ -48,14 +41,16 @@ func (t *TaskMonitor) Start() func(*error) {
4841
// NewContext creates a new context that is watching a live task. See Start
4942
// or MonitorGroup.Task
5043
func (t *TaskMonitor) NewContext() *TaskCtx {
44+
c := &TaskCtx{start: monotime.Monotonic(), monitor: t}
5145
t.mtx.Lock()
5246
t.current += 1
5347
t.total_started += 1
5448
if t.current > t.highwater {
5549
t.highwater = t.current
5650
}
51+
t.running[c] = true
5752
t.mtx.Unlock()
58-
return &TaskCtx{start: monotime.Monotonic(), monitor: t}
53+
return c
5954
}
6055

6156
// Stats conforms to the Monitor interface
@@ -121,7 +116,7 @@ func (t *TaskMonitor) Stats(cb func(name string, val float64)) {
121116
// Finish will re-panic any recovered panics (provided it wasn't a nil panic)
122117
// after bookkeeping.
123118
func (c *TaskCtx) Finish(err_ref *error, rec interface{}) {
124-
duration_nanoseconds := int64(monotime.Monotonic() - c.start)
119+
duration_nanoseconds := int64(c.ElapsedTime())
125120
var error_name string
126121
var err error
127122
if err_ref != nil {
@@ -151,6 +146,7 @@ func (c *TaskCtx) Finish(err_ref *error, rec interface{}) {
151146
c.monitor.mtx.Lock()
152147
c.monitor.current -= 1
153148
c.monitor.total_completed += 1
149+
delete(c.monitor.running, c)
154150
if err != nil {
155151
c.monitor.errors[error_name] += 1
156152
if rec != nil {
@@ -170,3 +166,17 @@ func (c *TaskCtx) Finish(err_ref *error, rec interface{}) {
170166
panic(rec)
171167
}
172168
}
169+
170+
// Running returns a list of tasks that are currently running. Each TaskCtx
171+
// can tell how long it's been since the task was started, though keep in mind
172+
// that the task might finish between calling (*TaskMonitor).Running() and
173+
// (*TaskCtx).ElapsedTime()
174+
func (t *TaskMonitor) Running() (rv []*TaskCtx) {
175+
t.mtx.Lock()
176+
rv = make([]*TaskCtx, 0, len(t.running))
177+
for task_ctx := range t.running {
178+
rv = append(rv, task_ctx)
179+
}
180+
t.mtx.Unlock()
181+
return rv
182+
}

0 commit comments

Comments
 (0)