Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Lazily initialize the view package's default worker #1287

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions stats/view/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ type Exporter interface {
//
// Binaries can register exporters, libraries shouldn't register exporters.
func RegisterExporter(e Exporter) {
maybeInitDefaultWorker()
defaultWorker.RegisterExporter(e)
}

// UnregisterExporter unregisters an exporter.
func UnregisterExporter(e Exporter) {
maybeInitDefaultWorker()
defaultWorker.UnregisterExporter(e)
}
24 changes: 24 additions & 0 deletions stats/view/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package view

import "testing"

// Exporting functionality is tested in `worker_test.go`. These are a trivial
// set of tests to make sure that these package-level exports will correctly
// initialize defaultWorker if necessary.
func TestRegisterExporter(t *testing.T) {
stopAndClearDefaultWorker()

e := &countExporter{}
RegisterExporter(e)

if _, ok := defaultWorker.exporters[e]; !ok {
t.Errorf("exporter doesn't appear to be registered with the default worker")
}
}

func TestUnregisterExporter(t *testing.T) {
stopAndClearDefaultWorker()

e := &countExporter{}
UnregisterExporter(e)
}
29 changes: 21 additions & 8 deletions stats/view/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,6 @@ import (
"go.opencensus.io/tag"
)

func init() {
defaultWorker = NewMeter().(*worker)
go defaultWorker.start()
internal.DefaultRecorder = record
internal.MeasurementRecorder = recordMeasurement
}

type measureRef struct {
measure string
views map[*viewInternal]struct{}
Expand Down Expand Up @@ -113,13 +106,28 @@ type Meter interface {

var _ Meter = (*worker)(nil)

var defaultWorker *worker
var (
defaultWorker *worker
defaultWorkerInit sync.Once
)

// Lazily initializes and starts the package's default worker. Should be invoked
// before any exported functions in this package that use defaultWorker.
func maybeInitDefaultWorker() {
defaultWorkerInit.Do(func() {
defaultWorker = NewMeter().(*worker)
go defaultWorker.start()
internal.DefaultRecorder = record
internal.MeasurementRecorder = recordMeasurement
})
}

var defaultReportingDuration = 10 * time.Second

// Find returns a registered view associated with this name.
// If no registered view is found, nil is returned.
func Find(name string) (v *View) {
maybeInitDefaultWorker()
return defaultWorker.Find(name)
}

Expand All @@ -138,6 +146,7 @@ func (w *worker) Find(name string) (v *View) {
// Register begins collecting data for the given views.
// Once a view is registered, it reports data to the registered exporters.
func Register(views ...*View) error {
maybeInitDefaultWorker()
return defaultWorker.Register(views...)
}

Expand All @@ -157,6 +166,7 @@ func (w *worker) Register(views ...*View) error {
// It is not necessary to unregister from views you expect to collect for the
// duration of your program execution.
func Unregister(views ...*View) {
maybeInitDefaultWorker()
defaultWorker.Unregister(views...)
}

Expand All @@ -180,6 +190,7 @@ func (w *worker) Unregister(views ...*View) {
// RetrieveData gets a snapshot of the data collected for the the view registered
// with the given name. It is intended for testing only.
func RetrieveData(viewName string) ([]*Row, error) {
maybeInitDefaultWorker()
return defaultWorker.RetrieveData(viewName)
}

Expand Down Expand Up @@ -229,11 +240,13 @@ func (w *worker) recordMeasurement(tags *tag.Map, ms []stats.Measurement, attach
// duration is. For example, the Stackdriver exporter recommends a value no
// lower than 1 minute. Consult each exporter per your needs.
func SetReportingPeriod(d time.Duration) {
maybeInitDefaultWorker()
defaultWorker.SetReportingPeriod(d)
}

// Stop stops the default worker.
func Stop() {
maybeInitDefaultWorker()
defaultWorker.Stop()
}

Expand Down
26 changes: 14 additions & 12 deletions stats/view/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func Test_Worker_ViewRegistration(t *testing.T) {

for _, tc := range tcs {
t.Run(tc.label, func(t *testing.T) {
restart()
stopAndClearDefaultWorker()

views := map[string]*View{
"v1ID": {
Expand Down Expand Up @@ -123,7 +123,7 @@ func Test_Worker_ViewRegistration(t *testing.T) {
}

func Test_Worker_MultiExport(t *testing.T) {
restart()
stopAndClearDefaultWorker()

// This test reports the same data for the default worker and a secondary
// worker, and ensures that the stats are kept independently.
Expand Down Expand Up @@ -239,7 +239,7 @@ func Test_Worker_MultiExport(t *testing.T) {
}

func Test_Worker_RecordFloat64(t *testing.T) {
restart()
stopAndClearDefaultWorker()

someError := errors.New("some error")
m := stats.Float64("Test_Worker_RecordFloat64/MF1", "desc MF1", "unit")
Expand Down Expand Up @@ -383,7 +383,7 @@ func TestReportUsage(t *testing.T) {
}

for _, tt := range tests {
restart()
stopAndClearDefaultWorker()
SetReportingPeriod(25 * time.Millisecond)

if err := Register(tt.view); err != nil {
Expand Down Expand Up @@ -436,7 +436,7 @@ func Test_SetReportingPeriodReqNeverBlocks(t *testing.T) {
}

func TestWorkerStarttime(t *testing.T) {
restart()
stopAndClearDefaultWorker()

ctx := context.Background()
m := stats.Int64("measure/TestWorkerStarttime", "desc", "unit")
Expand Down Expand Up @@ -487,7 +487,7 @@ func TestWorkerStarttime(t *testing.T) {
}

func TestUnregisterReportsUsage(t *testing.T) {
restart()
stopAndClearDefaultWorker()
ctx := context.Background()

m1 := stats.Int64("measure", "desc", "unit")
Expand Down Expand Up @@ -522,7 +522,7 @@ func TestUnregisterReportsUsage(t *testing.T) {
}

func TestWorkerRace(t *testing.T) {
restart()
stopAndClearDefaultWorker()
ctx := context.Background()

m1 := stats.Int64("measure", "desc", "unit")
Expand Down Expand Up @@ -638,11 +638,13 @@ func (e *vdExporter) ExportView(vd *Data) {
e.vds = append(e.vds, vd)
}

// restart stops the current processors and creates a new one.
func restart() {
defaultWorker.Stop()
defaultWorker = NewMeter().(*worker)
go defaultWorker.start()
// stopAndClearDefaultWorker stops defaultWorker's processors, clears it, and
// resets its sync.Once so that it can be lazily initialized again.
func stopAndClearDefaultWorker() {
if defaultWorker != nil {
defaultWorker.Stop()
defaultWorkerInit = sync.Once{}
}
}

// byTag implements sort.Interface for *metricdata.TimeSeries by Labels.
Expand Down