diff --git a/stats/view/worker.go b/stats/view/worker.go index ce2f86ab6..fef7bf513 100644 --- a/stats/view/worker.go +++ b/stats/view/worker.go @@ -194,26 +194,30 @@ func (w *worker) tryRegisterView(v *View) (*viewInternal, error) { return vi, nil } +func (w *worker) reportView(v *viewInternal, now time.Time) { + if !v.isSubscribed() { + return + } + rows := v.collectedRows() + _, ok := w.startTimes[v] + if !ok { + w.startTimes[v] = now + } + viewData := &Data{ + View: v.view, + Start: w.startTimes[v], + End: time.Now(), + Rows: rows, + } + exportersMu.Lock() + for e := range exporters { + e.ExportView(viewData) + } + exportersMu.Unlock() +} + func (w *worker) reportUsage(now time.Time) { for _, v := range w.views { - if !v.isSubscribed() { - continue - } - rows := v.collectedRows() - _, ok := w.startTimes[v] - if !ok { - w.startTimes[v] = now - } - viewData := &Data{ - View: v.view, - Start: w.startTimes[v], - End: time.Now(), - Rows: rows, - } - exportersMu.Lock() - for e := range exporters { - e.ExportView(viewData) - } - exportersMu.Unlock() + w.reportView(v, now) } } diff --git a/stats/view/worker_commands.go b/stats/view/worker_commands.go index d0dd00ce7..06c3c5464 100644 --- a/stats/view/worker_commands.go +++ b/stats/view/worker_commands.go @@ -88,6 +88,9 @@ func (cmd *unregisterFromViewReq) handleCommand(w *worker) { continue } + // Report pending data for this view before removing it. + w.reportView(vi, time.Now()) + vi.unsubscribe() if !vi.isSubscribed() { // this was the last subscription and view is not collecting anymore. diff --git a/stats/view/worker_test.go b/stats/view/worker_test.go index de9e6180b..d43014648 100644 --- a/stats/view/worker_test.go +++ b/stats/view/worker_test.go @@ -362,9 +362,45 @@ func TestWorkerStarttime(t *testing.T) { e.Unlock() } +func TestUnregisterReportsUsage(t *testing.T) { + restart() + ctx := context.Background() + + m1 := stats.Int64("measure", "desc", "unit") + view1 := &View{Name: "count", Measure: m1, Aggregation: Count()} + m2 := stats.Int64("measure2", "desc", "unit") + view2 := &View{Name: "count2", Measure: m2, Aggregation: Count()} + + SetReportingPeriod(time.Hour) + + if err := Register(view1, view2); err != nil { + t.Fatalf("cannot register: %v", err) + } + + e := &countExporter{} + RegisterExporter(e) + + stats.Record(ctx, m1.M(1)) + stats.Record(ctx, m2.M(1)) + stats.Record(ctx, m2.M(1)) + + Unregister(view2) + + // Unregister should only flush view2, so expect the count of 2. + want := int64(2) + + e.Lock() + got := e.totalCount + e.Unlock() + if got != want { + t.Errorf("got count data = %v; want %v", got, want) + } +} + type countExporter struct { sync.Mutex - count int64 + count int64 + totalCount int64 } func (e *countExporter) ExportView(vd *Data) { @@ -376,6 +412,7 @@ func (e *countExporter) ExportView(vd *Data) { e.Lock() defer e.Unlock() e.count = d.Value + e.totalCount += d.Value } type vdExporter struct {