Skip to content

Commit 8fb9793

Browse files
committed
review round 2 fix
1 parent 3f25a33 commit 8fb9793

File tree

4 files changed

+124
-142
lines changed

4 files changed

+124
-142
lines changed

monitoring/exporter/stackdriver/mock_check_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func mockAddToBundler(bndler *bundler.Bundler, item interface{}, _ int) error {
127127

128128
// newTestExp creates an exporter which saves error to errStorage. Caller should not set
129129
// opts.OnError.
130-
func newTestExp(t *testing.T, opts *Options) *Exporter {
130+
func newTestExp(t *testing.T, opts Options) *Exporter {
131131
opts.OnError = testOnError
132132
exp, err := NewExporter(ctx, opts)
133133
if err != nil {
@@ -140,7 +140,7 @@ func newTestExp(t *testing.T, opts *Options) *Exporter {
140140

141141
// newTestProjData creates a projectData object to test behavior of projectData.uploadRowData. Other
142142
// uses are not recommended. As newTestExp, all errors are saved to errStorage.
143-
func newTestProjData(t *testing.T, opts *Options) *projectData {
143+
func newTestProjData(t *testing.T, opts Options) *projectData {
144144
return newTestExp(t, opts).newProjectData(project1)
145145
}
146146

monitoring/exporter/stackdriver/project_data.go

Lines changed: 34 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@ package stackdriver
1717
import (
1818
"fmt"
1919

20-
"go.opencensus.io/tag"
2120
"google.golang.org/api/support/bundler"
22-
metricpb "google.golang.org/genproto/googleapis/api/metric"
2321
mpb "google.golang.org/genproto/googleapis/monitoring/v3"
2422
)
2523

26-
// MaxTimeSeriePerUpload is the maximum number of time series that stackdriver accepts. Only test
27-
// may change this value.
28-
var MaxTimeSeriesPerUpload = 200
24+
// MaxTimeSeriePerUpload is the maximum number of time series that's uploaded to the stackdriver
25+
// at once. Consumer may change this value, but note that stackdriver may reject upload request if
26+
// the number of time series is too large.
27+
var MaxTimeSeriesPerUpload = 100
2928

3029
// projectData contain per-project data in exporter. It should be created by newProjectData()
3130
type projectData struct {
@@ -36,121 +35,50 @@ type projectData struct {
3635
bndler *bundler.Bundler
3736
}
3837

39-
func (e *Exporter) newProjectData(projectID string) *projectData {
40-
pd := &projectData{
41-
parent: e,
42-
projectID: projectID,
43-
}
44-
45-
pd.bndler = newBundler((*RowData)(nil), pd.uploadRowData)
46-
// Set options for bundler if they are provided by users.
47-
if 0 < e.opts.BundleDelayThreshold {
48-
pd.bndler.DelayThreshold = e.opts.BundleDelayThreshold
49-
}
50-
if 0 < e.opts.BundleCountThreshold {
51-
pd.bndler.BundleCountThreshold = e.opts.BundleCountThreshold
52-
}
53-
return pd
54-
}
55-
5638
// uploadRowData is called by bundler to upload row data, and report any error happened meanwhile.
5739
func (pd *projectData) uploadRowData(bundle interface{}) {
5840
exp := pd.parent
5941
rds := bundle.([]*RowData)
6042

61-
// reqRds contains RowData objects those are uploaded to stackdriver at given iteration.
62-
// It's main usage is for error reporting. For actual uploading operation, we use req.
63-
// remainingRds are RowData that has not been processed at all.
64-
var reqRds, remainingRds []*RowData
65-
for ; len(rds) != 0; rds = remainingRds {
66-
var req *mpb.CreateTimeSeriesRequest
67-
req, reqRds, remainingRds = pd.makeReq(rds)
68-
if req == nil {
69-
// No need to perform RPC call for empty set of requests.
70-
continue
71-
}
72-
if err := createTimeSeries(exp.client, exp.ctx, req); err != nil {
73-
newErr := fmt.Errorf("RPC call to create time series failed for project %s: %v", pd.projectID, err)
74-
// We pass all row data not successfully uploaded.
75-
exp.onError(newErr, reqRds...)
76-
}
77-
}
78-
}
43+
// uploadTs contains TimeSeries objects that needs to be uploaded.
44+
var uploadTs []*mpb.TimeSeries = nil
45+
// uploadRds contains RowData objects corresponds to uploadTs. It's used for error reporting
46+
// when upload operation fails.
47+
var uploadRds []*RowData = nil
7948

80-
// makeReq creates a request that's suitable to be passed to create time series RPC call.
81-
//
82-
// reqRds contains rows those are contained in req. Main use of reqRds is to be returned to users if
83-
// creating time series failed. (We don't want users to investigate structure of timeseries.)
84-
// remainingRds contains rows those are not used at all in makeReq because of the length limitation
85-
// or request. Another call of makeReq() with remainigRds will handle (some) rows in them. When req
86-
// is nil, then there's nothing to request and reqRds will also contain nothing.
87-
//
88-
// Some rows in rds may fail while converting them to time series, and in that case makeReq() calls
89-
// exporter's onError() directly, not propagating errors to the caller.
90-
func (pd *projectData) makeReq(rds []*RowData) (req *mpb.CreateTimeSeriesRequest, reqRds, remainingRds []*RowData) {
91-
exp := pd.parent
92-
timeSeries := []*mpb.TimeSeries{}
93-
94-
var i int
95-
var rd *RowData
96-
for i, rd = range rds {
97-
pt := newPoint(rd.View, rd.Row, rd.Start, rd.End)
98-
if pt.Value == nil {
99-
err := fmt.Errorf("inconsistent data found in view %s", rd.View.Name)
100-
pd.parent.onError(err, rd)
101-
continue
102-
}
103-
resource, err := exp.makeResource(rd)
49+
for _, rd := range rds {
50+
ts, err := exp.makeTS(rd)
10451
if err != nil {
105-
newErr := fmt.Errorf("failed to construct resource of view %s: %v", rd.View.Name, err)
106-
pd.parent.onError(newErr, rd)
52+
exp.opts.OnError(err, rd)
10753
continue
10854
}
109-
110-
ts := &mpb.TimeSeries{
111-
Metric: &metricpb.Metric{
112-
Type: rd.View.Name,
113-
Labels: exp.makeLabels(rd.Row.Tags),
114-
},
115-
Resource: resource,
116-
Points: []*mpb.Point{pt},
117-
}
118-
// Growing timeseries and reqRds are done at same time.
119-
timeSeries = append(timeSeries, ts)
120-
reqRds = append(reqRds, rd)
121-
// Don't grow timeseries over the limit.
122-
if len(timeSeries) == MaxTimeSeriesPerUpload {
123-
break
55+
// Time series created. We update both uploadTs and uploadRds.
56+
uploadTs = append(uploadTs, ts)
57+
uploadRds = append(uploadRds, rd)
58+
if len(uploadTs) == MaxTimeSeriesPerUpload {
59+
pd.uploadTimeSeries(uploadTs, uploadRds)
60+
uploadTs = nil
61+
uploadRds = nil
12462
}
12563
}
126-
127-
// Since i is the last index processed, remainingRds should start from i+1.
128-
remainingRds = rds[i+1:]
129-
if len(timeSeries) == 0 {
130-
req = nil
131-
} else {
132-
req = &mpb.CreateTimeSeriesRequest{
133-
Name: fmt.Sprintf("projects/%s", pd.projectID),
134-
TimeSeries: timeSeries,
135-
}
64+
// Upload any remaining time series.
65+
if len(uploadTs) != 0 {
66+
pd.uploadTimeSeries(uploadTs, uploadRds)
13667
}
137-
return req, reqRds, remainingRds
13868
}
13969

140-
// makeLables constructs label that's ready for being uploaded to stackdriver.
141-
func (e *Exporter) makeLabels(tags []tag.Tag) map[string]string {
142-
opts := e.opts
143-
labels := make(map[string]string, len(opts.DefaultLabels)+len(tags))
144-
for key, val := range opts.DefaultLabels {
145-
labels[key] = val
146-
}
147-
// If there's overlap When combining exporter's default label and tags, values in tags win.
148-
for _, tag := range tags {
149-
labels[tag.Key.Name()] = tag.Value
70+
// uploadTimeSeries uploads timeSeries. ts and rds must contain matching data, and ts must not be
71+
// empty. When uploading fails, this function calls exporter's OnError() directly, not propagating
72+
// errors to the caller.
73+
func (pd *projectData) uploadTimeSeries(ts []*mpb.TimeSeries, rds []*RowData) {
74+
exp := pd.parent
75+
req := &mpb.CreateTimeSeriesRequest{
76+
Name: fmt.Sprintf("projects/%s", pd.projectID),
77+
TimeSeries: ts,
15078
}
151-
// Some labels are not for exporting.
152-
for _, key := range opts.UnexportedLabels {
153-
delete(labels, key)
79+
if err := createTimeSeries(exp.client, exp.ctx, req); err != nil {
80+
newErr := fmt.Errorf("RPC call to create time series failed for project %s: %v", pd.projectID, err)
81+
// We pass all row data not successfully uploaded.
82+
exp.opts.OnError(newErr, rds...)
15483
}
155-
return labels
15684
}

monitoring/exporter/stackdriver/stackdriver.go

Lines changed: 80 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,12 @@ import (
3636

3737
monitoring "cloud.google.com/go/monitoring/apiv3"
3838
"go.opencensus.io/stats/view"
39+
"go.opencensus.io/tag"
3940
"google.golang.org/api/option"
4041
"google.golang.org/api/support/bundler"
42+
metricpb "google.golang.org/genproto/googleapis/api/metric"
4143
mrpb "google.golang.org/genproto/googleapis/api/monitoredres"
44+
mpb "google.golang.org/genproto/googleapis/monitoring/v3"
4245
)
4346

4447
// Exporter is the exporter that can be registered to opencensus. An Exporter object must be
@@ -47,12 +50,7 @@ type Exporter struct {
4750
// TODO(lawrencechung): If possible, find a way to not storing ctx in the struct.
4851
ctx context.Context
4952
client *monitoring.MetricClient
50-
opts *Options
51-
52-
// copy of some option values which may be modified by exporter.
53-
getProjectID func(*RowData) (string, error)
54-
onError func(error, ...*RowData)
55-
makeResource func(*RowData) (*mrpb.MonitoredResource, error)
53+
opts Options
5654

5755
// mu protects access to projDataMap
5856
mu sync.Mutex
@@ -67,9 +65,14 @@ type Options struct {
6765
// RPC calls.
6866
ClientOptions []option.ClientOption
6967

70-
// options for bundles amortizing export requests. Note that a bundle is created for each
68+
// Options for bundles amortizing export requests. Note that a bundle is created for each
7169
// project. When not provided, default values in bundle package are used.
70+
71+
// BundleDelayThreshold determines the max amount of time the exporter can wait before
72+
// uploading data to the stackdriver.
7273
BundleDelayThreshold time.Duration
74+
// BundleCountThreshold determines how many RowData objects can be buffered before batch
75+
// uploading them to the backend.
7376
BundleCountThreshold int
7477

7578
// Callback functions provided by user.
@@ -85,8 +88,8 @@ type Options struct {
8588
GetProjectID func(*RowData) (projectID string, err error)
8689
// OnError is used to report any error happened while exporting view data fails. Whenever
8790
// this function is called, it's guaranteed that at least one row data is also passed to
88-
// OnError. Row data passed to OnError must not be modified. When OnError is not set, all
89-
// errors happened on exporting are ignored.
91+
// OnError. Row data passed to OnError must not be modified and OnError must be
92+
// non-blocking. When OnError is not set, all errors happened on exporting are ignored.
9093
OnError func(error, ...*RowData)
9194
// MakeResource creates monitored resource from RowData. It is guaranteed that only RowData
9295
// that passes GetProjectID will be given to this function. Though not recommended, error
@@ -129,8 +132,8 @@ func defaultMakeResource(rd *RowData) (*mrpb.MonitoredResource, error) {
129132
return &mrpb.MonitoredResource{Type: "global"}, nil
130133
}
131134

132-
// Following functions are wrapper of functions that may show non-deterministic behavior. Only tests
133-
// can modify these functions.
135+
// Following functions are wrapper of functions those will be mocked by tests. Only tests can modify
136+
// these functions.
134137
var (
135138
newMetricClient = monitoring.NewMetricClient
136139
createTimeSeries = (*monitoring.MetricClient).CreateTimeSeries
@@ -141,7 +144,7 @@ var (
141144
// NewExporter creates an Exporter object. Once a call to NewExporter is made, any fields in opts
142145
// must not be modified at all. ctx will also be used throughout entire exporter operation when
143146
// making RPC call.
144-
func NewExporter(ctx context.Context, opts *Options) (*Exporter, error) {
147+
func NewExporter(ctx context.Context, opts Options) (*Exporter, error) {
145148
client, err := newMetricClient(ctx, opts.ClientOptions...)
146149
if err != nil {
147150
return nil, fmt.Errorf("failed to create a metric client: %v", err)
@@ -154,19 +157,14 @@ func NewExporter(ctx context.Context, opts *Options) (*Exporter, error) {
154157
projDataMap: make(map[string]*projectData),
155158
}
156159

157-
// We don't want to modify user-supplied options, so save default options directly in
158-
// exporter.
159-
e.getProjectID = defaultGetProjectID
160-
if opts.GetProjectID != nil {
161-
e.getProjectID = opts.GetProjectID
160+
if e.opts.GetProjectID == nil {
161+
e.opts.GetProjectID = defaultGetProjectID
162162
}
163-
e.onError = defaultOnError
164-
if opts.OnError != nil {
165-
e.onError = opts.OnError
163+
if e.opts.OnError == nil {
164+
e.opts.OnError = defaultOnError
166165
}
167-
e.makeResource = defaultMakeResource
168-
if opts.MakeResource != nil {
169-
e.makeResource = opts.MakeResource
166+
if e.opts.MakeResource == nil {
167+
e.opts.MakeResource = defaultMakeResource
170168
}
171169

172170
return e, nil
@@ -201,12 +199,12 @@ var RowDataNotApplicableError = errors.New("row data is not applicable to the ex
201199

202200
// exportRowData exports a single row data.
203201
func (e *Exporter) exportRowData(rd *RowData) {
204-
projID, err := e.getProjectID(rd)
202+
projID, err := e.opts.GetProjectID(rd)
205203
if err != nil {
206204
// We ignore non-applicable RowData.
207205
if err != RowDataNotApplicableError {
208206
newErr := fmt.Errorf("failed to get project ID on row data with view %s: %v", rd.View.Name, err)
209-
e.onError(newErr, rd)
207+
e.opts.OnError(newErr, rd)
210208
}
211209
return
212210
}
@@ -217,7 +215,7 @@ func (e *Exporter) exportRowData(rd *RowData) {
217215
go pd.uploadRowData(rd)
218216
default:
219217
newErr := fmt.Errorf("failed to add row data with view %s to bundle for project %s: %v", rd.View.Name, projID, err)
220-
e.onError(newErr, rd)
218+
e.opts.OnError(newErr, rd)
221219
}
222220
}
223221

@@ -233,6 +231,23 @@ func (e *Exporter) getProjectData(projectID string) *projectData {
233231
return pd
234232
}
235233

234+
func (e *Exporter) newProjectData(projectID string) *projectData {
235+
pd := &projectData{
236+
parent: e,
237+
projectID: projectID,
238+
}
239+
240+
pd.bndler = newBundler((*RowData)(nil), pd.uploadRowData)
241+
// Set options for bundler if they are provided by users.
242+
if 0 < e.opts.BundleDelayThreshold {
243+
pd.bndler.DelayThreshold = e.opts.BundleDelayThreshold
244+
}
245+
if 0 < e.opts.BundleCountThreshold {
246+
pd.bndler.BundleCountThreshold = e.opts.BundleCountThreshold
247+
}
248+
return pd
249+
}
250+
236251
// Close flushes and closes the exporter. Close must be called after the exporter is unregistered
237252
// and no further calls to ExportView() are made. Once Close() is returned no further access to the
238253
// exporter is allowed in any way.
@@ -248,3 +263,42 @@ func (e *Exporter) Close() error {
248263
}
249264
return nil
250265
}
266+
267+
// makeTS constructs a time series from a row data.
268+
func (e *Exporter) makeTS(rd *RowData) (*mpb.TimeSeries, error) {
269+
pt := newPoint(rd.View, rd.Row, rd.Start, rd.End)
270+
if pt.Value == nil {
271+
return nil, fmt.Errorf("inconsistent data found in view %s", rd.View.Name)
272+
}
273+
resource, err := e.opts.MakeResource(rd)
274+
if err != nil {
275+
return nil, fmt.Errorf("failed to construct resource of view %s: %v", rd.View.Name, err)
276+
}
277+
ts := &mpb.TimeSeries{
278+
Metric: &metricpb.Metric{
279+
Type: rd.View.Name,
280+
Labels: e.makeLabels(rd.Row.Tags),
281+
},
282+
Resource: resource,
283+
Points: []*mpb.Point{pt},
284+
}
285+
return ts, nil
286+
}
287+
288+
// makeLables constructs label that's ready for being uploaded to stackdriver.
289+
func (e *Exporter) makeLabels(tags []tag.Tag) map[string]string {
290+
opts := e.opts
291+
labels := make(map[string]string, len(opts.DefaultLabels)+len(tags))
292+
for key, val := range opts.DefaultLabels {
293+
labels[key] = val
294+
}
295+
// If there's overlap When combining exporter's default label and tags, values in tags win.
296+
for _, tag := range tags {
297+
labels[tag.Key.Name()] = tag.Value
298+
}
299+
// Some labels are not for exporting.
300+
for _, key := range opts.UnexportedLabels {
301+
delete(labels, key)
302+
}
303+
return labels
304+
}

0 commit comments

Comments
 (0)