Skip to content

Commit

Permalink
fix bq iterator skipping records (#5403)
Browse files Browse the repository at this point in the history
* fix bq iterator skipping records

* revert check to 5 seconds

* fix channel

* check limit ctx cancel reason

* info log for written file
  • Loading branch information
pjain1 authored and nishantmonu51 committed Aug 6, 2024
1 parent 3d77176 commit 93bff70
Showing 1 changed file with 46 additions and 11 deletions.
57 changes: 46 additions & 11 deletions runtime/drivers/bigquery/sql_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"regexp"
"strings"
"sync/atomic"
"time"

"cloud.google.com/go/bigquery"
Expand Down Expand Up @@ -246,21 +247,40 @@ func (f *fileIterator) Next() ([]string, error) {
}
defer writer.Close()

ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()
overLimit := atomic.Bool{}
limitCtx, cancel := context.WithCancel(f.ctx)
defer cancel()

go func() {
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()
for {
select {
case <-limitCtx.Done():
return
case <-ticker.C:
fileInfo, err := os.Stat(fw.Name())
if err == nil { // ignore error
if fileInfo.Size() > f.limitInBytes {
overLimit.Store(true)
cancel()
}
}
}
}
}()

rows := int64(0)
// write arrow records to parquet file
for rdr.Next() {
select {
case <-f.ctx.Done():
return nil, f.ctx.Err()
case <-ticker.C:
fileInfo, err := os.Stat(fw.Name())
if err == nil { // ignore error
if fileInfo.Size() > f.limitInBytes {
return nil, drivers.ErrStorageLimitExceeded
}
case <-limitCtx.Done():
if overLimit.Load() {
return nil, drivers.ErrStorageLimitExceeded
}

return nil, limitCtx.Err()
default:
rec := rdr.Record()
f.progress.Observe(rec.NumRows(), drivers.ProgressUnitRecord)
Expand All @@ -270,6 +290,7 @@ func (f *fileIterator) Next() ([]string, error) {
if err := writer.WriteBuffered(rec); err != nil {
return nil, err
}
rows += rec.NumRows()
}
}
if rdr.Err() != nil {
Expand All @@ -278,11 +299,15 @@ func (f *fileIterator) Next() ([]string, error) {
writer.Close()
fw.Close()

if uint64(rows) < f.bqIter.TotalRows {
f.logger.Error("not all rows written to parquet file", zap.Int64("rows_written", rows), zap.Uint64("total_rows", f.bqIter.TotalRows), observability.ZapCtx(f.ctx))
}

fileInfo, err := os.Stat(fw.Name())
if err != nil {
return nil, err
}
f.logger.Debug("size of file", zap.String("size", datasize.ByteSize(fileInfo.Size()).HumanReadable()), observability.ZapCtx(f.ctx))
f.logger.Info("parquet file written", zap.String("size", datasize.ByteSize(fileInfo.Size()).HumanReadable()), zap.Int64("rows", rows), observability.ZapCtx(f.ctx))
return []string{fw.Name()}, nil
}

Expand Down Expand Up @@ -317,7 +342,7 @@ func (f *fileIterator) downloadAsJSONFile() (string, error) {
f.downloaded = true

init := false
rows := 0
rows := int64(0)
enc := json.NewEncoder(fw)
enc.SetEscapeHTML(false)
bigNumericFields := make([]string, 0)
Expand All @@ -331,6 +356,16 @@ func (f *fileIterator) downloadAsJSONFile() (string, error) {
if !init {
return "", drivers.ErrNoRows
}
fileInfo, err := os.Stat(fw.Name())
if err != nil {
return "", fmt.Errorf("bigquery: failed to poll json file size: %w", err)
}

if uint64(rows) < f.bqIter.TotalRows {
f.logger.Error("not all rows written to json file", zap.Int64("rows_written", rows), zap.Uint64("total_rows", f.bqIter.TotalRows), observability.ZapCtx(f.ctx))
}

f.logger.Info("json file written", zap.String("size", datasize.ByteSize(fileInfo.Size()).HumanReadable()), zap.Int64("rows", rows), observability.ZapCtx(f.ctx))
// all rows written successfully
return fw.Name(), nil
}
Expand Down

1 comment on commit 93bff70

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉 Published on https://ui.rilldata.com as production
🚀 Deployed on https://66b25feb4c76d62956b4c285--rill-ui.netlify.app

Please sign in to comment.