diff --git a/go.mod b/go.mod index 139157cd3..c867df5f5 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( golang.org/x/net v0.0.0-20190620200207-3b0461eec859 golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd // indirect golang.org/x/text v0.3.2 // indirect - google.golang.org/appengine v1.4.0 // indirect google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb // indirect google.golang.org/grpc v1.20.1 ) diff --git a/go.sum b/go.sum index ed2a1d844..01c02972c 100644 --- a/go.sum +++ b/go.sum @@ -67,6 +67,7 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1 h1:Hz2g2wirWK7H0qIIhGIqRGTuMwTE8HEKFnDZZ7lm9NU= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/plugin/runmetrics/producer.go b/plugin/runmetrics/producer.go index eb307fea9..7991b1167 100644 --- a/plugin/runmetrics/producer.go +++ b/plugin/runmetrics/producer.go @@ -2,11 +2,12 @@ package runmetrics import ( "errors" + "runtime" + "sync" + "go.opencensus.io/metric" "go.opencensus.io/metric/metricdata" "go.opencensus.io/metric/metricproducer" - "runtime" - "sync" ) type ( diff --git a/stats/view/aggregation_data.go b/stats/view/aggregation_data.go index d500e67f7..9cecc9e56 100644 --- a/stats/view/aggregation_data.go +++ b/stats/view/aggregation_data.go @@ -117,10 +117,10 @@ func (a *SumData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Po // N+1 buckets. type DistributionData struct { Count int64 // number of data points aggregated - Min float64 // minimum value in the distribution - Max float64 // max value in the distribution + Min float64 // minimum value in the distribution, value is non-monotonic + Max float64 // max value in the distribution, value is non-monotonic Mean float64 // mean of the distribution - SumOfSquaredDev float64 // sum of the squared deviation from the mean + SumOfSquaredDev float64 // sum of the squared deviation from the mean, non-monotnic exporters shouldn't use this CountPerBucket []int64 // number of occurrences per bucket // ExemplarsPerBucket is slice the same length as CountPerBucket containing // an exemplar for the associated bucket, or nil. diff --git a/stats/view/collector.go b/stats/view/collector.go index 8a6a2c0fd..99df7668c 100644 --- a/stats/view/collector.go +++ b/stats/view/collector.go @@ -16,6 +16,7 @@ package view import ( + "math" "sort" "time" @@ -56,6 +57,16 @@ func (c *collector) clearRows() { c.signatures = make(map[string]AggregationData) } +func (c *collector) resetValues() { + for _, ad := range c.signatures { + switch ad := ad.(type) { + case *DistributionData: + ad.Min = math.MaxFloat64 + ad.Max = math.SmallestNonzeroFloat64 + } + } +} + // encodeWithKeys encodes the map by using values // only associated with the keys provided. func encodeWithKeys(m *tag.Map, keys []tag.Key) []byte { diff --git a/stats/view/view.go b/stats/view/view.go index 293b54ecb..1531cbe1b 100644 --- a/stats/view/view.go +++ b/stats/view/view.go @@ -149,6 +149,12 @@ func (v *viewInternal) clearRows() { v.collector.clearRows() } +// resetValues resets values that need to be +// completely isolated for each bucket +func (v *viewInternal) resetValues() { + v.collector.resetValues() +} + func (v *viewInternal) collectedRows() []*Row { return v.collector.collectedRows(v.view.TagKeys) } diff --git a/stats/view/worker.go b/stats/view/worker.go index 2f3c018af..0a8ad03af 100644 --- a/stats/view/worker.go +++ b/stats/view/worker.go @@ -233,6 +233,7 @@ func (w *worker) reportView(v *viewInternal, now time.Time) { e.ExportView(viewData) } exportersMu.Unlock() + v.resetValues() } func (w *worker) reportUsage(now time.Time) { @@ -275,6 +276,7 @@ func (w *worker) Read() []*metricdata.Metric { metric := w.toMetric(v, now) if metric != nil { metrics = append(metrics, metric) + v.resetValues() } } return metrics diff --git a/stats/view/worker_test.go b/stats/view/worker_test.go index 6c15d37ef..3e2b36c2e 100644 --- a/stats/view/worker_test.go +++ b/stats/view/worker_test.go @@ -18,6 +18,8 @@ package view import ( "context" "errors" + "math" + "runtime/debug" "sync" "testing" "time" @@ -261,36 +263,107 @@ func TestReportUsage(t *testing.T) { } for _, tt := range tests { - restart() - SetReportingPeriod(25 * time.Millisecond) + t.Run(tt.name, func(t *testing.T) { + restart() + SetReportingPeriod(25 * time.Millisecond) - if err := Register(tt.view); err != nil { - t.Fatalf("%v: cannot register: %v", tt.name, err) - } + if err := Register(tt.view); err != nil { + t.Fatalf("%v: cannot register: %v", tt.name, err) + } - e := &countExporter{} - RegisterExporter(e) + e := &countExporter{} + RegisterExporter(e) + defer UnregisterExporter(e) - stats.Record(ctx, m.M(1)) - stats.Record(ctx, m.M(1)) - stats.Record(ctx, m.M(1)) - stats.Record(ctx, m.M(1)) + stats.Record(ctx, m.M(1)) + stats.Record(ctx, m.M(1)) + stats.Record(ctx, m.M(1)) + stats.Record(ctx, m.M(1)) - time.Sleep(50 * time.Millisecond) + time.Sleep(50 * time.Millisecond) - stats.Record(ctx, m.M(1)) - stats.Record(ctx, m.M(1)) - stats.Record(ctx, m.M(1)) - stats.Record(ctx, m.M(1)) + stats.Record(ctx, m.M(1)) + stats.Record(ctx, m.M(1)) + stats.Record(ctx, m.M(1)) + stats.Record(ctx, m.M(1)) - time.Sleep(50 * time.Millisecond) + time.Sleep(50 * time.Millisecond) - e.Lock() - count := e.count - e.Unlock() - if got, want := count, tt.wantMaxCount; got > want { - t.Errorf("%v: got count data = %v; want at most %v", tt.name, got, want) - } + e.Lock() + count := e.count + e.Unlock() + if got, want := count, tt.wantMaxCount; got > want { + t.Errorf("%v: got count data = %v; want at most %v", tt.name, got, want) + } + }) + } +} + +func TestReportUsageMinMax(t *testing.T) { + ctx := context.Background() + + m := stats.Float64("measure", "desc", "unit") + + tests := []struct { + name string + view *View + data [][]float64 + wantMin float64 + wantMax float64 + }{ + { + name: "reset_data", + view: &View{Name: "const", Measure: m, Aggregation: Distribution(1, 4, 10, 12)}, + data: [][]float64{{2, 2, 2, 2}, {4, 4, 4, 1}}, + wantMin: 1, + wantMax: 4, + }, + { + name: "no_data", + view: &View{Name: "const", Measure: m, Aggregation: Distribution(1, 4, 10, 12)}, + wantMin: 0, + wantMax: 0, + }, + { + name: "constant_data", + view: &View{Name: "const", Measure: m, Aggregation: Distribution(1, 4, 10, 12)}, + data: [][]float64{{1, 1, 1, 1}, {1, 1, 1, 1}}, + wantMin: 1, + wantMax: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + restart() + SetReportingPeriod(25 * time.Millisecond) + + if err := Register(tt.view); err != nil { + t.Fatalf("%v: cannot register: %v", tt.name, err) + } + + e := &distributionExporter{} + RegisterExporter(e) + defer UnregisterExporter(e) + + for _, batch := range tt.data { + for _, val := range batch { + stats.Record(ctx, m.M(val)) + } + time.Sleep(50 * time.Millisecond) + } + + e.Lock() + min := e.min + max := e.max + e.Unlock() + if got, want := min, tt.wantMin; got != want { + t.Errorf("%v: got min = %v; want %v", tt.name, got, want) + } + if got, want := max, tt.wantMax; got != want { + t.Errorf("%v: got max = %v; want %v", tt.name, got, want) + } + }) } } @@ -494,7 +567,11 @@ func (e *countExporter) ExportView(vd *Data) { if len(vd.Rows) == 0 { return } - d := vd.Rows[0].Data.(*CountData) + d, ok := vd.Rows[0].Data.(*CountData) + if !ok { + debug.PrintStack() + panic("BYE") + } e.Lock() defer e.Unlock() @@ -514,6 +591,28 @@ func (e *vdExporter) ExportView(vd *Data) { e.vds = append(e.vds, vd) } +type distributionExporter struct { + sync.Mutex + min float64 + max float64 +} + +func (e *distributionExporter) ExportView(vd *Data) { + if len(vd.Rows) == 0 { + return + } + d := vd.Rows[0].Data.(*DistributionData) + + e.Lock() + defer e.Unlock() + if d.Min != math.MaxFloat64 { + e.min = d.Min + } + if d.Max != math.SmallestNonzeroFloat64 { + e.max = d.Max + } +} + // restart stops the current processors and creates a new one. func restart() { defaultWorker.stop()