From 1380fae97d1e9d7e7d96593154699df45bfb1b7d Mon Sep 17 00:00:00 2001 From: Marwan Sulaiman Date: Wed, 30 Oct 2019 02:21:31 -0400 Subject: [PATCH 1/7] clearRows on every ExportView --- go.mod | 1 - go.sum | 1 + stats/view/worker.go | 1 + 3 files changed, 2 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 139157cd3..c867df5f5 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( golang.org/x/net v0.0.0-20190620200207-3b0461eec859 golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd // indirect golang.org/x/text v0.3.2 // indirect - google.golang.org/appengine v1.4.0 // indirect google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb // indirect google.golang.org/grpc v1.20.1 ) diff --git a/go.sum b/go.sum index ed2a1d844..01c02972c 100644 --- a/go.sum +++ b/go.sum @@ -67,6 +67,7 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1 h1:Hz2g2wirWK7H0qIIhGIqRGTuMwTE8HEKFnDZZ7lm9NU= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/stats/view/worker.go b/stats/view/worker.go index 2f3c018af..9af33e15a 100644 --- a/stats/view/worker.go +++ b/stats/view/worker.go @@ -233,6 +233,7 @@ func (w *worker) reportView(v *viewInternal, now time.Time) { e.ExportView(viewData) } exportersMu.Unlock() + v.clearRows() } func (w *worker) reportUsage(now time.Time) { From 15a8df57314d03556ddc7c9098ace816a5595e68 Mon Sep 17 00:00:00 2001 From: Marwan Sulaiman Date: Thu, 14 Nov 2019 18:40:37 -0500 Subject: [PATCH 2/7] do not reset everything --- stats/view/collector.go | 13 +++++++++++++ stats/view/view.go | 6 ++++++ stats/view/worker.go | 2 +- 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/stats/view/collector.go b/stats/view/collector.go index 8a6a2c0fd..0b158a2d7 100644 --- a/stats/view/collector.go +++ b/stats/view/collector.go @@ -16,6 +16,7 @@ package view import ( + "math" "sort" "time" @@ -56,6 +57,18 @@ func (c *collector) clearRows() { c.signatures = make(map[string]AggregationData) } +func (c *collector) resetValues() { + for _, ad := range c.signatures { + switch ad := ad.(type) { + case *DistributionData: + ad.Min = math.MaxFloat64 + ad.Max = math.SmallestNonzeroFloat64 + ad.Mean = 0 + ad.SumOfSquaredDev = 0 + } + } +} + // encodeWithKeys encodes the map by using values // only associated with the keys provided. func encodeWithKeys(m *tag.Map, keys []tag.Key) []byte { diff --git a/stats/view/view.go b/stats/view/view.go index 293b54ecb..1531cbe1b 100644 --- a/stats/view/view.go +++ b/stats/view/view.go @@ -149,6 +149,12 @@ func (v *viewInternal) clearRows() { v.collector.clearRows() } +// resetValues resets values that need to be +// completely isolated for each bucket +func (v *viewInternal) resetValues() { + v.collector.resetValues() +} + func (v *viewInternal) collectedRows() []*Row { return v.collector.collectedRows(v.view.TagKeys) } diff --git a/stats/view/worker.go b/stats/view/worker.go index 9af33e15a..52daaba1f 100644 --- a/stats/view/worker.go +++ b/stats/view/worker.go @@ -233,7 +233,7 @@ func (w *worker) reportView(v *viewInternal, now time.Time) { e.ExportView(viewData) } exportersMu.Unlock() - v.clearRows() + v.resetValues() } func (w *worker) reportUsage(now time.Time) { From 2be56c019057bacaa3456bb0d05ae10bb181eb0b Mon Sep 17 00:00:00 2001 From: Marwan Sulaiman Date: Thu, 14 Nov 2019 20:04:57 -0500 Subject: [PATCH 3/7] reset on config --- stats/view/collector.go | 13 ------------- stats/view/view.go | 6 ------ stats/view/worker.go | 16 +++++++++++++++- stats/view/worker_commands.go | 10 ++++++++++ 4 files changed, 25 insertions(+), 20 deletions(-) diff --git a/stats/view/collector.go b/stats/view/collector.go index 0b158a2d7..8a6a2c0fd 100644 --- a/stats/view/collector.go +++ b/stats/view/collector.go @@ -16,7 +16,6 @@ package view import ( - "math" "sort" "time" @@ -57,18 +56,6 @@ func (c *collector) clearRows() { c.signatures = make(map[string]AggregationData) } -func (c *collector) resetValues() { - for _, ad := range c.signatures { - switch ad := ad.(type) { - case *DistributionData: - ad.Min = math.MaxFloat64 - ad.Max = math.SmallestNonzeroFloat64 - ad.Mean = 0 - ad.SumOfSquaredDev = 0 - } - } -} - // encodeWithKeys encodes the map by using values // only associated with the keys provided. func encodeWithKeys(m *tag.Map, keys []tag.Key) []byte { diff --git a/stats/view/view.go b/stats/view/view.go index 1531cbe1b..293b54ecb 100644 --- a/stats/view/view.go +++ b/stats/view/view.go @@ -149,12 +149,6 @@ func (v *viewInternal) clearRows() { v.collector.clearRows() } -// resetValues resets values that need to be -// completely isolated for each bucket -func (v *viewInternal) resetValues() { - v.collector.resetValues() -} - func (v *viewInternal) collectedRows() []*Row { return v.collector.collectedRows(v.view.TagKeys) } diff --git a/stats/view/worker.go b/stats/view/worker.go index 52daaba1f..1fb706698 100644 --- a/stats/view/worker.go +++ b/stats/view/worker.go @@ -47,6 +47,7 @@ type worker struct { c chan command quit, done chan bool mu sync.RWMutex + reset bool } var defaultWorker *worker @@ -134,6 +135,17 @@ func SetReportingPeriod(d time.Duration) { <-req.c // don't return until the timer is set to the new duration. } +// DisableMonotonicity will reset the data for all cumulative +// metrics right after every reporting period. +func DisableMonotonicity(b bool) { + req := &disableMonotonicityReq{ + b: b, + c: make(chan bool), + } + defaultWorker.c <- req + <-req.c +} + func newWorker() *worker { return &worker{ measures: make(map[string]*measureRef), @@ -233,7 +245,9 @@ func (w *worker) reportView(v *viewInternal, now time.Time) { e.ExportView(viewData) } exportersMu.Unlock() - v.resetValues() + if w.reset { + v.clearRows() + } } func (w *worker) reportUsage(now time.Time) { diff --git a/stats/view/worker_commands.go b/stats/view/worker_commands.go index 0267e179a..e94db1f88 100644 --- a/stats/view/worker_commands.go +++ b/stats/view/worker_commands.go @@ -184,3 +184,13 @@ func (cmd *setReportingPeriodReq) handleCommand(w *worker) { } cmd.c <- true } + +type disableMonotonicityReq struct { + b bool + c chan bool +} + +func (cmd *disableMonotonicityReq) handleCommand(w *worker) { + w.reset = cmd.b + cmd.c <- true +} From 80c9ce637214d9b2d3fb2e6715892dfeb0e28bdf Mon Sep 17 00:00:00 2001 From: Marwan Sulaiman Date: Fri, 15 Nov 2019 21:11:49 -0500 Subject: [PATCH 4/7] Revert "reset on config" This reverts commit 2be56c019057bacaa3456bb0d05ae10bb181eb0b. --- stats/view/collector.go | 13 +++++++++++++ stats/view/view.go | 6 ++++++ stats/view/worker.go | 16 +--------------- stats/view/worker_commands.go | 10 ---------- 4 files changed, 20 insertions(+), 25 deletions(-) diff --git a/stats/view/collector.go b/stats/view/collector.go index 8a6a2c0fd..0b158a2d7 100644 --- a/stats/view/collector.go +++ b/stats/view/collector.go @@ -16,6 +16,7 @@ package view import ( + "math" "sort" "time" @@ -56,6 +57,18 @@ func (c *collector) clearRows() { c.signatures = make(map[string]AggregationData) } +func (c *collector) resetValues() { + for _, ad := range c.signatures { + switch ad := ad.(type) { + case *DistributionData: + ad.Min = math.MaxFloat64 + ad.Max = math.SmallestNonzeroFloat64 + ad.Mean = 0 + ad.SumOfSquaredDev = 0 + } + } +} + // encodeWithKeys encodes the map by using values // only associated with the keys provided. func encodeWithKeys(m *tag.Map, keys []tag.Key) []byte { diff --git a/stats/view/view.go b/stats/view/view.go index 293b54ecb..1531cbe1b 100644 --- a/stats/view/view.go +++ b/stats/view/view.go @@ -149,6 +149,12 @@ func (v *viewInternal) clearRows() { v.collector.clearRows() } +// resetValues resets values that need to be +// completely isolated for each bucket +func (v *viewInternal) resetValues() { + v.collector.resetValues() +} + func (v *viewInternal) collectedRows() []*Row { return v.collector.collectedRows(v.view.TagKeys) } diff --git a/stats/view/worker.go b/stats/view/worker.go index 1fb706698..52daaba1f 100644 --- a/stats/view/worker.go +++ b/stats/view/worker.go @@ -47,7 +47,6 @@ type worker struct { c chan command quit, done chan bool mu sync.RWMutex - reset bool } var defaultWorker *worker @@ -135,17 +134,6 @@ func SetReportingPeriod(d time.Duration) { <-req.c // don't return until the timer is set to the new duration. } -// DisableMonotonicity will reset the data for all cumulative -// metrics right after every reporting period. -func DisableMonotonicity(b bool) { - req := &disableMonotonicityReq{ - b: b, - c: make(chan bool), - } - defaultWorker.c <- req - <-req.c -} - func newWorker() *worker { return &worker{ measures: make(map[string]*measureRef), @@ -245,9 +233,7 @@ func (w *worker) reportView(v *viewInternal, now time.Time) { e.ExportView(viewData) } exportersMu.Unlock() - if w.reset { - v.clearRows() - } + v.resetValues() } func (w *worker) reportUsage(now time.Time) { diff --git a/stats/view/worker_commands.go b/stats/view/worker_commands.go index e94db1f88..0267e179a 100644 --- a/stats/view/worker_commands.go +++ b/stats/view/worker_commands.go @@ -184,13 +184,3 @@ func (cmd *setReportingPeriodReq) handleCommand(w *worker) { } cmd.c <- true } - -type disableMonotonicityReq struct { - b bool - c chan bool -} - -func (cmd *disableMonotonicityReq) handleCommand(w *worker) { - w.reset = cmd.b - cmd.c <- true -} From 878d62cef9d220702135cae2f92af480bf2c4fbd Mon Sep 17 00:00:00 2001 From: Marwan Sulaiman Date: Fri, 15 Nov 2019 21:14:39 -0500 Subject: [PATCH 5/7] reset only min/max --- stats/view/collector.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/stats/view/collector.go b/stats/view/collector.go index 0b158a2d7..99df7668c 100644 --- a/stats/view/collector.go +++ b/stats/view/collector.go @@ -63,8 +63,6 @@ func (c *collector) resetValues() { case *DistributionData: ad.Min = math.MaxFloat64 ad.Max = math.SmallestNonzeroFloat64 - ad.Mean = 0 - ad.SumOfSquaredDev = 0 } } } From 7fae1399b9a07d8b03225ca48b465df8dfda70b5 Mon Sep 17 00:00:00 2001 From: Marwan Sulaiman Date: Mon, 18 Nov 2019 21:54:28 -0500 Subject: [PATCH 6/7] sort imports --- plugin/runmetrics/producer.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/plugin/runmetrics/producer.go b/plugin/runmetrics/producer.go index eb307fea9..7991b1167 100644 --- a/plugin/runmetrics/producer.go +++ b/plugin/runmetrics/producer.go @@ -2,11 +2,12 @@ package runmetrics import ( "errors" + "runtime" + "sync" + "go.opencensus.io/metric" "go.opencensus.io/metric/metricdata" "go.opencensus.io/metric/metricproducer" - "runtime" - "sync" ) type ( From 92ea5dc9a76f654c15101273083737c767974835 Mon Sep 17 00:00:00 2001 From: Marwan Sulaiman Date: Mon, 18 Nov 2019 23:20:53 -0500 Subject: [PATCH 7/7] pr comments + add tests --- stats/view/aggregation_data.go | 6 +- stats/view/worker.go | 1 + stats/view/worker_test.go | 147 +++++++++++++++++++++++++++------ 3 files changed, 127 insertions(+), 27 deletions(-) diff --git a/stats/view/aggregation_data.go b/stats/view/aggregation_data.go index d500e67f7..9cecc9e56 100644 --- a/stats/view/aggregation_data.go +++ b/stats/view/aggregation_data.go @@ -117,10 +117,10 @@ func (a *SumData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Po // N+1 buckets. type DistributionData struct { Count int64 // number of data points aggregated - Min float64 // minimum value in the distribution - Max float64 // max value in the distribution + Min float64 // minimum value in the distribution, value is non-monotonic + Max float64 // max value in the distribution, value is non-monotonic Mean float64 // mean of the distribution - SumOfSquaredDev float64 // sum of the squared deviation from the mean + SumOfSquaredDev float64 // sum of the squared deviation from the mean, non-monotnic exporters shouldn't use this CountPerBucket []int64 // number of occurrences per bucket // ExemplarsPerBucket is slice the same length as CountPerBucket containing // an exemplar for the associated bucket, or nil. diff --git a/stats/view/worker.go b/stats/view/worker.go index 52daaba1f..0a8ad03af 100644 --- a/stats/view/worker.go +++ b/stats/view/worker.go @@ -276,6 +276,7 @@ func (w *worker) Read() []*metricdata.Metric { metric := w.toMetric(v, now) if metric != nil { metrics = append(metrics, metric) + v.resetValues() } } return metrics diff --git a/stats/view/worker_test.go b/stats/view/worker_test.go index 6c15d37ef..3e2b36c2e 100644 --- a/stats/view/worker_test.go +++ b/stats/view/worker_test.go @@ -18,6 +18,8 @@ package view import ( "context" "errors" + "math" + "runtime/debug" "sync" "testing" "time" @@ -261,36 +263,107 @@ func TestReportUsage(t *testing.T) { } for _, tt := range tests { - restart() - SetReportingPeriod(25 * time.Millisecond) + t.Run(tt.name, func(t *testing.T) { + restart() + SetReportingPeriod(25 * time.Millisecond) - if err := Register(tt.view); err != nil { - t.Fatalf("%v: cannot register: %v", tt.name, err) - } + if err := Register(tt.view); err != nil { + t.Fatalf("%v: cannot register: %v", tt.name, err) + } - e := &countExporter{} - RegisterExporter(e) + e := &countExporter{} + RegisterExporter(e) + defer UnregisterExporter(e) - stats.Record(ctx, m.M(1)) - stats.Record(ctx, m.M(1)) - stats.Record(ctx, m.M(1)) - stats.Record(ctx, m.M(1)) + stats.Record(ctx, m.M(1)) + stats.Record(ctx, m.M(1)) + stats.Record(ctx, m.M(1)) + stats.Record(ctx, m.M(1)) - time.Sleep(50 * time.Millisecond) + time.Sleep(50 * time.Millisecond) - stats.Record(ctx, m.M(1)) - stats.Record(ctx, m.M(1)) - stats.Record(ctx, m.M(1)) - stats.Record(ctx, m.M(1)) + stats.Record(ctx, m.M(1)) + stats.Record(ctx, m.M(1)) + stats.Record(ctx, m.M(1)) + stats.Record(ctx, m.M(1)) - time.Sleep(50 * time.Millisecond) + time.Sleep(50 * time.Millisecond) - e.Lock() - count := e.count - e.Unlock() - if got, want := count, tt.wantMaxCount; got > want { - t.Errorf("%v: got count data = %v; want at most %v", tt.name, got, want) - } + e.Lock() + count := e.count + e.Unlock() + if got, want := count, tt.wantMaxCount; got > want { + t.Errorf("%v: got count data = %v; want at most %v", tt.name, got, want) + } + }) + } +} + +func TestReportUsageMinMax(t *testing.T) { + ctx := context.Background() + + m := stats.Float64("measure", "desc", "unit") + + tests := []struct { + name string + view *View + data [][]float64 + wantMin float64 + wantMax float64 + }{ + { + name: "reset_data", + view: &View{Name: "const", Measure: m, Aggregation: Distribution(1, 4, 10, 12)}, + data: [][]float64{{2, 2, 2, 2}, {4, 4, 4, 1}}, + wantMin: 1, + wantMax: 4, + }, + { + name: "no_data", + view: &View{Name: "const", Measure: m, Aggregation: Distribution(1, 4, 10, 12)}, + wantMin: 0, + wantMax: 0, + }, + { + name: "constant_data", + view: &View{Name: "const", Measure: m, Aggregation: Distribution(1, 4, 10, 12)}, + data: [][]float64{{1, 1, 1, 1}, {1, 1, 1, 1}}, + wantMin: 1, + wantMax: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + restart() + SetReportingPeriod(25 * time.Millisecond) + + if err := Register(tt.view); err != nil { + t.Fatalf("%v: cannot register: %v", tt.name, err) + } + + e := &distributionExporter{} + RegisterExporter(e) + defer UnregisterExporter(e) + + for _, batch := range tt.data { + for _, val := range batch { + stats.Record(ctx, m.M(val)) + } + time.Sleep(50 * time.Millisecond) + } + + e.Lock() + min := e.min + max := e.max + e.Unlock() + if got, want := min, tt.wantMin; got != want { + t.Errorf("%v: got min = %v; want %v", tt.name, got, want) + } + if got, want := max, tt.wantMax; got != want { + t.Errorf("%v: got max = %v; want %v", tt.name, got, want) + } + }) } } @@ -494,7 +567,11 @@ func (e *countExporter) ExportView(vd *Data) { if len(vd.Rows) == 0 { return } - d := vd.Rows[0].Data.(*CountData) + d, ok := vd.Rows[0].Data.(*CountData) + if !ok { + debug.PrintStack() + panic("BYE") + } e.Lock() defer e.Unlock() @@ -514,6 +591,28 @@ func (e *vdExporter) ExportView(vd *Data) { e.vds = append(e.vds, vd) } +type distributionExporter struct { + sync.Mutex + min float64 + max float64 +} + +func (e *distributionExporter) ExportView(vd *Data) { + if len(vd.Rows) == 0 { + return + } + d := vd.Rows[0].Data.(*DistributionData) + + e.Lock() + defer e.Unlock() + if d.Min != math.MaxFloat64 { + e.min = d.Min + } + if d.Max != math.SmallestNonzeroFloat64 { + e.max = d.Max + } +} + // restart stops the current processors and creates a new one. func restart() { defaultWorker.stop()