Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

stats/view: implement Flush #922

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions stats/view/view_measure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
)

func TestMeasureFloat64AndInt64(t *testing.T) {
restart()

// Recording through both a Float64Measure and Int64Measure with the
// same name should work.

Expand Down
27 changes: 25 additions & 2 deletions stats/view/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package view

import (
"fmt"
"sync"
"time"

"go.opencensus.io/stats"
Expand All @@ -43,6 +44,8 @@ type worker struct {
timer *time.Ticker
c chan command
quit, done chan bool
flushCh chan bool
quitOnce sync.Once
}

var defaultWorker *worker
Expand Down Expand Up @@ -142,6 +145,21 @@ func newWorker() *worker {
c: make(chan command, 1024),
quit: make(chan bool),
done: make(chan bool),
flushCh: make(chan bool),
}
}

// Flush reports all collected points regardless
// of the time reporting period or buffering.
func Flush() {
select {
case <-defaultWorker.quit:
// If this channel is closed i.e. we quit, do nothing.
return
default: // Otherwise we can proceed with flushing.
req := &flushReq{c: make(chan bool)}
defaultWorker.c <- req
<-req.c // don't return until the flush is complete.
}
}

Expand All @@ -162,8 +180,13 @@ func (w *worker) start() {
}

func (w *worker) stop() {
w.quit <- true
<-w.done
w.quitOnce.Do(func() {
// Close w.quit so that any operations that need
// to check if we've stopped/quit will immediately
// select on w.quit.
close(w.quit)
<-w.done
})
}

func (w *worker) getMeasureRef(name string) *measureRef {
Expand Down
11 changes: 11 additions & 0 deletions stats/view/worker_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,17 @@ func (cmd *registerViewReq) handleCommand(w *worker) {
}
}

// flushReq is the command to flush all recorded
// data regardless of time period and buffering.
type flushReq struct {
c chan bool
}

func (fr *flushReq) handleCommand(w *worker) {
w.reportUsage(time.Now())
fr.c <- true
}

// unregisterFromViewReq is the command to unregister to a view. Has no
// impact on the data collection for client that are pulling data from the
// library.
Expand Down
62 changes: 62 additions & 0 deletions stats/view/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,68 @@ func TestUnregisterReportsUsage(t *testing.T) {
}
}

func TestFlush(t *testing.T) {
restart()
ctx := context.Background()

SetReportingPeriod(time.Hour)

m1 := stats.Int64("measure", "desc", "unit")
view1 := &View{Name: "count", Measure: m1, Aggregation: Count()}
m2 := stats.Int64("measure2", "desc", "unit")
view2 := &View{Name: "count2", Measure: m2, Aggregation: Count()}

if err := Register(view1, view2); err != nil {
t.Fatalf("cannot register: %v", err)
}

e := &countExporter{}
RegisterExporter(e)

// Irrespective of the reporting period, with Flush
// all the recorded points should be reported. Hence we'll
// set an arbitrarily large period of 1 hr.
SetReportingPeriod(time.Hour)

stats.Record(ctx, m1.M(1))
stats.Record(ctx, m2.M(3))
stats.Record(ctx, m2.M(1))

<-time.After(40 * time.Millisecond)
Flush()
<-time.After(40 * time.Millisecond)

e.Lock()
got := e.totalCount
e.Unlock()
want := int64(3) // Number of wanted data points
if got != want {
t.Errorf("Count data\nGot: %d\nWant: %v", got, want)
}
}

func TestFlush_afterStopDoesnotBlock(t *testing.T) {
restart()

doneCh := make(chan bool)
go func() {
defer close(doneCh)

for i := 0; i < 10; i++ {
Flush()
defaultWorker.stop()
Flush()
}
}()

select {
case <-time.After(300 * time.Microsecond): // Arbitrary duration that's considered "long"
t.Fatal("Flush + stop goroutine did not return on time")
case <-doneCh:
// returned ASAP so okay
}
}

type countExporter struct {
sync.Mutex
count int64
Expand Down