Skip to content

Commit

Permalink
Pipe peak num of bq uploader to reproxy.INFO
Browse files Browse the repository at this point in the history
This CL pass the peak num of bigquery uploaders to reproxy.INFO log.

This metric will help diagnose bottlenecks in the BigQuery upload process.

In general, for every build, even on small 8 core machine, there are about 180 seconds at the end of the build that the peak num of actions is constantly almost zero. In the future, we can dynamically adjust the number of BigQuery uploaders based on the in-flight actions to better utilize this available machine capacity.

Test: run a local chrome build, and check the records are saved in bq.
Bug: b/345015849
Change-Id: If2a0e4ec5368dea3db91a81b122db2af2ae98639
GitOrigin-RevId: d43da69ec0f75694868f0bdaf7213456cbf3e7a9
  • Loading branch information
ywmei-brt1 authored and copybara-github committed Jul 2, 2024
1 parent e52bc6f commit faf3ef9
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 11 deletions.
42 changes: 34 additions & 8 deletions internal/pkg/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,31 @@ func NewInserter(ctx context.Context, resourceSpec, defaultProject string, ts *o

// BQSpec defines which bigquery table the LogRecords will be saved.
type BQSpec struct {
Err atomic.Pointer[error]
ProjectID string
TableSpec string
BatchSizeMB int
Client *bigquery.Inserter
CleanUp func() error
Ctx context.Context
Cancel context.CancelFunc
Err atomic.Pointer[error]
ProjectID string
TableSpec string
BatchSizeMB int
Client *bigquery.Inserter
CleanUp func() error
Ctx context.Context
Cancel context.CancelFunc
runningWorkers int32
peakWorkers int32
}

func (b *BQSpec) increaseWorkerNum() {
atomic.AddInt32(&b.runningWorkers, 1)
atomic.StoreInt32(&b.peakWorkers, max(b.peakWorkers, b.runningWorkers))
}

func (b *BQSpec) decreaseWorkerNum() {
atomic.AddInt32(&b.runningWorkers, -1)
}

func (b *BQSpec) GetPeakWorkers() int32 {
res := atomic.LoadInt32(&b.peakWorkers)
atomic.StoreInt32(&b.peakWorkers, 0)
return res
}

// batch fetches LogRecords from a channel, groups and saves them in batches.
Expand Down Expand Up @@ -139,8 +156,10 @@ func upload(bqSpec *BQSpec, batches <-chan []*bigquerytranslator.Item, successfu
return
} else {
wg.Add(1)
bqSpec.increaseWorkerNum()
go func(bqSpec *BQSpec, items []*bigquerytranslator.Item, successful *int32, failed *int32) {
uploadWithRetry(bqSpec, items, successful, failed)
bqSpec.decreaseWorkerNum()
wg.Done()
}(bqSpec, items, successful, failed)
}
Expand Down Expand Up @@ -217,3 +236,10 @@ func LogRecordsToBigQuery(bqSpec *BQSpec, items <-chan *bigquerytranslator.Item,
bqSpec.CleanUp()
bqSpec.Cancel()
}

func max(a, b int32) int32 {
if a > b {
return a
}
return b
}
10 changes: 7 additions & 3 deletions internal/pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ const (
)

const (
textDelimiter string = "\n\n\n"
peakNumActions string = "PEAK_NUM_ACTIONS"
unixTime string = "UNIX_TIME"
textDelimiter string = "\n\n\n"
peakNumActions string = "PEAK_NUM_ACTIONS"
unixTime string = "UNIX_TIME"
peakNumBQWorker string = "PEAK_NUM_BQ_WORKER"
)

// Logger logs Records asynchronously into a file.
Expand Down Expand Up @@ -466,6 +467,9 @@ func (l *Logger) collectResourceUsageSamples(samples map[string]int64) {
}
samples[unixTime] = time.Now().Unix()
samples[peakNumActions] = int64(atomic.SwapInt32(&l.peakRunningActions, 0))
if l.bqSpec != nil {
samples[peakNumBQWorker] = int64(l.bqSpec.GetPeakWorkers())
}
// These log messages in reproxy.INFO are used for plotting the time series
// of resource usage by a plotter.
log.Infof("Resource Usage: %v", samples)
Expand Down

0 comments on commit faf3ef9

Please sign in to comment.