From faf3ef9586496532e50b2d6498bd928ceb5bbbff Mon Sep 17 00:00:00 2001 From: Yaowen Mei Date: Sat, 22 Jun 2024 16:29:23 +0000 Subject: [PATCH] Pipe peak num of bq uploader to reproxy.INFO This CL pass the peak num of bigquery uploaders to reproxy.INFO log. This metric will help diagnose bottlenecks in the BigQuery upload process. In general, for every build, even on small 8 core machine, there are about 180 seconds at the end of the build that the peak num of actions is constantly almost zero. In the future, we can dynamically adjust the number of BigQuery uploaders based on the in-flight actions to better utilize this available machine capacity. Test: run a local chrome build, and check the records are saved in bq. Bug: b/345015849 Change-Id: If2a0e4ec5368dea3db91a81b122db2af2ae98639 GitOrigin-RevId: d43da69ec0f75694868f0bdaf7213456cbf3e7a9 --- internal/pkg/bigquery/bigquery.go | 42 +++++++++++++++++++++++++------ internal/pkg/logger/logger.go | 10 +++++--- 2 files changed, 41 insertions(+), 11 deletions(-) diff --git a/internal/pkg/bigquery/bigquery.go b/internal/pkg/bigquery/bigquery.go index 7546f47b..300f8031 100644 --- a/internal/pkg/bigquery/bigquery.go +++ b/internal/pkg/bigquery/bigquery.go @@ -79,14 +79,31 @@ func NewInserter(ctx context.Context, resourceSpec, defaultProject string, ts *o // BQSpec defines which bigquery table the LogRecords will be saved. type BQSpec struct { - Err atomic.Pointer[error] - ProjectID string - TableSpec string - BatchSizeMB int - Client *bigquery.Inserter - CleanUp func() error - Ctx context.Context - Cancel context.CancelFunc + Err atomic.Pointer[error] + ProjectID string + TableSpec string + BatchSizeMB int + Client *bigquery.Inserter + CleanUp func() error + Ctx context.Context + Cancel context.CancelFunc + runningWorkers int32 + peakWorkers int32 +} + +func (b *BQSpec) increaseWorkerNum() { + atomic.AddInt32(&b.runningWorkers, 1) + atomic.StoreInt32(&b.peakWorkers, max(b.peakWorkers, b.runningWorkers)) +} + +func (b *BQSpec) decreaseWorkerNum() { + atomic.AddInt32(&b.runningWorkers, -1) +} + +func (b *BQSpec) GetPeakWorkers() int32 { + res := atomic.LoadInt32(&b.peakWorkers) + atomic.StoreInt32(&b.peakWorkers, 0) + return res } // batch fetches LogRecords from a channel, groups and saves them in batches. @@ -139,8 +156,10 @@ func upload(bqSpec *BQSpec, batches <-chan []*bigquerytranslator.Item, successfu return } else { wg.Add(1) + bqSpec.increaseWorkerNum() go func(bqSpec *BQSpec, items []*bigquerytranslator.Item, successful *int32, failed *int32) { uploadWithRetry(bqSpec, items, successful, failed) + bqSpec.decreaseWorkerNum() wg.Done() }(bqSpec, items, successful, failed) } @@ -217,3 +236,10 @@ func LogRecordsToBigQuery(bqSpec *BQSpec, items <-chan *bigquerytranslator.Item, bqSpec.CleanUp() bqSpec.Cancel() } + +func max(a, b int32) int32 { + if a > b { + return a + } + return b +} diff --git a/internal/pkg/logger/logger.go b/internal/pkg/logger/logger.go index 8e402427..2b98149c 100644 --- a/internal/pkg/logger/logger.go +++ b/internal/pkg/logger/logger.go @@ -68,9 +68,10 @@ const ( ) const ( - textDelimiter string = "\n\n\n" - peakNumActions string = "PEAK_NUM_ACTIONS" - unixTime string = "UNIX_TIME" + textDelimiter string = "\n\n\n" + peakNumActions string = "PEAK_NUM_ACTIONS" + unixTime string = "UNIX_TIME" + peakNumBQWorker string = "PEAK_NUM_BQ_WORKER" ) // Logger logs Records asynchronously into a file. @@ -466,6 +467,9 @@ func (l *Logger) collectResourceUsageSamples(samples map[string]int64) { } samples[unixTime] = time.Now().Unix() samples[peakNumActions] = int64(atomic.SwapInt32(&l.peakRunningActions, 0)) + if l.bqSpec != nil { + samples[peakNumBQWorker] = int64(l.bqSpec.GetPeakWorkers()) + } // These log messages in reproxy.INFO are used for plotting the time series // of resource usage by a plotter. log.Infof("Resource Usage: %v", samples)