diff --git a/runtime/drivers/bigquery/sql_store.go b/runtime/drivers/bigquery/sql_store.go index 3dc9d192361..b2bd7ba3bfe 100644 --- a/runtime/drivers/bigquery/sql_store.go +++ b/runtime/drivers/bigquery/sql_store.go @@ -10,6 +10,7 @@ import ( "os" "regexp" "strings" + "sync/atomic" "time" "cloud.google.com/go/bigquery" @@ -246,21 +247,40 @@ func (f *fileIterator) Next() ([]string, error) { } defer writer.Close() - ticker := time.NewTicker(time.Second * 5) - defer ticker.Stop() + overLimit := atomic.Bool{} + limitCtx, cancel := context.WithCancel(f.ctx) + defer cancel() + + go func() { + ticker := time.NewTicker(time.Second * 5) + defer ticker.Stop() + for { + select { + case <-limitCtx.Done(): + return + case <-ticker.C: + fileInfo, err := os.Stat(fw.Name()) + if err == nil { // ignore error + if fileInfo.Size() > f.limitInBytes { + overLimit.Store(true) + cancel() + } + } + } + } + }() + + rows := int64(0) // write arrow records to parquet file for rdr.Next() { select { case <-f.ctx.Done(): return nil, f.ctx.Err() - case <-ticker.C: - fileInfo, err := os.Stat(fw.Name()) - if err == nil { // ignore error - if fileInfo.Size() > f.limitInBytes { - return nil, drivers.ErrStorageLimitExceeded - } + case <-limitCtx.Done(): + if overLimit.Load() { + return nil, drivers.ErrStorageLimitExceeded } - + return nil, limitCtx.Err() default: rec := rdr.Record() f.progress.Observe(rec.NumRows(), drivers.ProgressUnitRecord) @@ -270,6 +290,7 @@ func (f *fileIterator) Next() ([]string, error) { if err := writer.WriteBuffered(rec); err != nil { return nil, err } + rows += rec.NumRows() } } if rdr.Err() != nil { @@ -278,11 +299,15 @@ func (f *fileIterator) Next() ([]string, error) { writer.Close() fw.Close() + if uint64(rows) < f.bqIter.TotalRows { + f.logger.Error("not all rows written to parquet file", zap.Int64("rows_written", rows), zap.Uint64("total_rows", f.bqIter.TotalRows), observability.ZapCtx(f.ctx)) + } + fileInfo, err := os.Stat(fw.Name()) if err != nil { return nil, err } - f.logger.Debug("size of file", zap.String("size", datasize.ByteSize(fileInfo.Size()).HumanReadable()), observability.ZapCtx(f.ctx)) + f.logger.Info("parquet file written", zap.String("size", datasize.ByteSize(fileInfo.Size()).HumanReadable()), zap.Int64("rows", rows), observability.ZapCtx(f.ctx)) return []string{fw.Name()}, nil } @@ -317,7 +342,7 @@ func (f *fileIterator) downloadAsJSONFile() (string, error) { f.downloaded = true init := false - rows := 0 + rows := int64(0) enc := json.NewEncoder(fw) enc.SetEscapeHTML(false) bigNumericFields := make([]string, 0) @@ -331,6 +356,16 @@ func (f *fileIterator) downloadAsJSONFile() (string, error) { if !init { return "", drivers.ErrNoRows } + fileInfo, err := os.Stat(fw.Name()) + if err != nil { + return "", fmt.Errorf("bigquery: failed to poll json file size: %w", err) + } + + if uint64(rows) < f.bqIter.TotalRows { + f.logger.Error("not all rows written to json file", zap.Int64("rows_written", rows), zap.Uint64("total_rows", f.bqIter.TotalRows), observability.ZapCtx(f.ctx)) + } + + f.logger.Info("json file written", zap.String("size", datasize.ByteSize(fileInfo.Size()).HumanReadable()), zap.Int64("rows", rows), observability.ZapCtx(f.ctx)) // all rows written successfully return fw.Name(), nil }