Skip to content

Commit

Permalink
Add Flush to force aggregate in-memory buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
rakyll committed Sep 21, 2018
1 parent 572ae0b commit 728691b
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 9 deletions.
36 changes: 27 additions & 9 deletions stats/view/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ type worker struct {
views map[string]*viewInternal
startTimes map[*viewInternal]time.Time

timer *time.Ticker
c chan command
quit, done chan bool
timer *time.Ticker
c chan command
quitCh, doneCh, flushCh chan struct{}
}

var defaultWorker *worker
Expand Down Expand Up @@ -107,6 +107,15 @@ func RetrieveData(viewName string) ([]*Row, error) {
return resp.rows, resp.err
}

// Flush force aggregates all buffered in-memory data.
// Otherwise, aggregation is going to happen at each period
// set by SetReportingPeriod.
//
// Flush is useful before program termination to avoid data loss.
func Flush() {
defaultWorker.flushAll()
}

func record(tags *tag.Map, ms interface{}) {
req := &recordReq{
tm: tags,
Expand Down Expand Up @@ -140,8 +149,9 @@ func newWorker() *worker {
startTimes: make(map[*viewInternal]time.Time),
timer: time.NewTicker(defaultReportingDuration),
c: make(chan command, 1024),
quit: make(chan bool),
done: make(chan bool),
quitCh: make(chan struct{}),
doneCh: make(chan struct{}),
flushCh: make(chan struct{}),
}
}

Expand All @@ -152,18 +162,26 @@ func (w *worker) start() {
cmd.handleCommand(w)
case <-w.timer.C:
w.reportUsage(time.Now())
case <-w.quit:
case <-w.flushCh:
w.reportUsage(time.Now())
w.doneCh <- struct{}{}
case <-w.quitCh:
w.timer.Stop()
close(w.c)
w.done <- true
w.doneCh <- struct{}{}
return
}
}
}

func (w *worker) flushAll() {
w.flushCh <- struct{}{}
<-w.doneCh
}

func (w *worker) stop() {
w.quit <- true
<-w.done
w.quitCh <- struct{}{}
<-w.doneCh
}

func (w *worker) getMeasureRef(name string) *measureRef {
Expand Down
30 changes: 30 additions & 0 deletions stats/view/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,36 @@ func TestUnregisterReportsUsage(t *testing.T) {
}
}

func TestFlush(t *testing.T) {
restart()

ctx := context.Background()

m := stats.Int64("measure", "desc", "unit")
SetReportingPeriod(time.Hour)

e := &vdExporter{}
RegisterExporter(e)

if err := Register(&View{Name: "count", Measure: m, Aggregation: Count()}); err != nil {
t.Fatal(err)
}

stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))

Flush()

e.Lock()
got := len(e.vds)
e.Unlock()

if got == 0 {
t.Errorf("got %v aggregations; want at least one", got)
}
}

type countExporter struct {
sync.Mutex
count int64
Expand Down

0 comments on commit 728691b

Please sign in to comment.