Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.

Commit 0845048

Browse files
committed
Merge branch 'main' into shuowei-job-history
2 parents d2dc5f6 + 96597f0 commit 0845048

File tree

2 files changed

+30
-14
lines changed

2 files changed

+30
-14
lines changed

bigframes/session/bq_caching_executor.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from __future__ import annotations
1616

17+
import concurrent.futures
1718
import math
1819
import threading
1920
from typing import Literal, Mapping, Optional, Sequence, Tuple
@@ -514,13 +515,29 @@ def _substitute_large_local_sources(self, original_root: nodes.BigFrameNode):
514515
Replace large local sources with the uploaded version of those datasources.
515516
"""
516517
# Step 1: Upload all previously un-uploaded data
518+
needs_upload = []
517519
for leaf in original_root.unique_nodes():
518520
if isinstance(leaf, nodes.ReadLocalNode):
519521
if (
520522
leaf.local_data_source.metadata.total_bytes
521523
> bigframes.constants.MAX_INLINE_BYTES
522524
):
523-
self._upload_local_data(leaf.local_data_source)
525+
needs_upload.append(leaf.local_data_source)
526+
527+
futures: dict[concurrent.futures.Future, local_data.ManagedArrowTable] = dict()
528+
for local_source in needs_upload:
529+
future = self.loader.read_data_async(
530+
local_source, bigframes.core.guid.generate_guid()
531+
)
532+
futures[future] = local_source
533+
try:
534+
for future in concurrent.futures.as_completed(futures.keys()):
535+
self.cache.cache_remote_replacement(futures[future], future.result())
536+
except Exception as e:
537+
# cancel all futures
538+
for future in futures:
539+
future.cancel()
540+
raise e
524541

525542
# Step 2: Replace local scans with remote scans
526543
def map_local_scans(node: nodes.BigFrameNode):
@@ -550,18 +567,6 @@ def map_local_scans(node: nodes.BigFrameNode):
550567

551568
return original_root.bottom_up(map_local_scans)
552569

553-
def _upload_local_data(self, local_table: local_data.ManagedArrowTable):
554-
if self.cache.get_uploaded_local_data(local_table) is not None:
555-
return
556-
# Lock prevents concurrent repeated work, but slows things down.
557-
# Might be better as a queue and a worker thread
558-
with self._upload_lock:
559-
if self.cache.get_uploaded_local_data(local_table) is None:
560-
uploaded = self.loader.load_data_or_write_data(
561-
local_table, bigframes.core.guid.generate_guid()
562-
)
563-
self.cache.cache_remote_replacement(local_table, uploaded)
564-
565570
def _execute_plan_gbq(
566571
self,
567572
plan: nodes.BigFrameNode,

bigframes/session/loader.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,17 @@ def __init__(
302302
self._session = session
303303
self._clock = session_time.BigQuerySyncedClock(bqclient)
304304
self._clock.sync()
305+
self._threadpool = concurrent.futures.ThreadPoolExecutor(
306+
max_workers=1, thread_name_prefix="bigframes-loader"
307+
)
308+
309+
def read_data_async(
310+
self, local_data: local_data.ManagedArrowTable, offsets_col: str
311+
) -> concurrent.futures.Future[bq_data.BigqueryDataSource]:
312+
future = self._threadpool.submit(
313+
self._load_data_or_write_data, local_data, offsets_col
314+
)
315+
return future
305316

306317
def read_pandas(
307318
self,
@@ -352,7 +363,7 @@ def read_managed_data(
352363
session=self._session,
353364
)
354365

355-
def load_data_or_write_data(
366+
def _load_data_or_write_data(
356367
self,
357368
data: local_data.ManagedArrowTable,
358369
offsets_col: str,

0 commit comments

Comments
 (0)