Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
xando committed Jun 12, 2024
1 parent 7daa72a commit cba68ae
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 15 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies = [
"google-cloud-bigquery>=3,<5",
"google-cloud-bigquery-storage>=2,<3",
"pyarrow>=16,<17",
"tenacity"
]

[project.license]
Expand Down
34 changes: 19 additions & 15 deletions src/pyarrow/bigquery/write/upload.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
import tenacity

from google.cloud.bigquery_storage_v1 import types
from google.api_core.exceptions import Unknown
from google.api_core import retry

from . import pa_to_pb


def upload_data(stream, pa_table, protobuf_definition, offset):
local_offset = 0
for serialized_rows in pa_to_pb.serialize(pa_table, protobuf_definition):
proto_rows = types.ProtoRows()
proto_rows.serialized_rows.extend(serialized_rows)
@tenacity.retry(
stop=tenacity.stop_after_attempt(5),
retry=tenacity.retry_if_exception_type(Unknown))
def _send(stream, serialized_rows, offset):
proto_rows = types.ProtoRows()
proto_rows.serialized_rows.extend(serialized_rows)

proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows

request = types.AppendRowsRequest()
request.offset = offset + local_offset
request.proto_rows = proto_data
request = types.AppendRowsRequest()
request.offset = offset
request.proto_rows = proto_data

job = stream.append_rows_stream.send(request)
job.result(
retry=retry.Retry(predicate=retry.if_exception_type(Unknown))
)
stream.append_rows_stream.send(request).result()


def upload_data(stream, pa_table, protobuf_definition, offset):
local_offset = 0
for serialized_rows in pa_to_pb.serialize(pa_table, protobuf_definition):
_send(stream, serialized_rows, offset + local_offset)
local_offset += len(serialized_rows)

0 comments on commit cba68ae

Please sign in to comment.