Skip to content

Commit 96597f0

Browse files
perf: Make executor data uploads async internally (#2529)
1 parent 915cce5 commit 96597f0

File tree

2 files changed

+30
-14
lines changed

2 files changed

+30
-14
lines changed

bigframes/session/bq_caching_executor.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from __future__ import annotations
1616

17+
import concurrent.futures
1718
import math
1819
import threading
1920
from typing import Literal, Mapping, Optional, Sequence, Tuple
@@ -514,13 +515,29 @@ def _substitute_large_local_sources(self, original_root: nodes.BigFrameNode):
514515
Replace large local sources with the uploaded version of those datasources.
515516
"""
516517
# Step 1: Upload all previously un-uploaded data
518+
needs_upload = []
517519
for leaf in original_root.unique_nodes():
518520
if isinstance(leaf, nodes.ReadLocalNode):
519521
if (
520522
leaf.local_data_source.metadata.total_bytes
521523
> bigframes.constants.MAX_INLINE_BYTES
522524
):
523-
self._upload_local_data(leaf.local_data_source)
525+
needs_upload.append(leaf.local_data_source)
526+
527+
futures: dict[concurrent.futures.Future, local_data.ManagedArrowTable] = dict()
528+
for local_source in needs_upload:
529+
future = self.loader.read_data_async(
530+
local_source, bigframes.core.guid.generate_guid()
531+
)
532+
futures[future] = local_source
533+
try:
534+
for future in concurrent.futures.as_completed(futures.keys()):
535+
self.cache.cache_remote_replacement(futures[future], future.result())
536+
except Exception as e:
537+
# cancel all futures
538+
for future in futures:
539+
future.cancel()
540+
raise e
524541

525542
# Step 2: Replace local scans with remote scans
526543
def map_local_scans(node: nodes.BigFrameNode):
@@ -550,18 +567,6 @@ def map_local_scans(node: nodes.BigFrameNode):
550567

551568
return original_root.bottom_up(map_local_scans)
552569

553-
def _upload_local_data(self, local_table: local_data.ManagedArrowTable):
554-
if self.cache.get_uploaded_local_data(local_table) is not None:
555-
return
556-
# Lock prevents concurrent repeated work, but slows things down.
557-
# Might be better as a queue and a worker thread
558-
with self._upload_lock:
559-
if self.cache.get_uploaded_local_data(local_table) is None:
560-
uploaded = self.loader.load_data_or_write_data(
561-
local_table, bigframes.core.guid.generate_guid()
562-
)
563-
self.cache.cache_remote_replacement(local_table, uploaded)
564-
565570
def _execute_plan_gbq(
566571
self,
567572
plan: nodes.BigFrameNode,

bigframes/session/loader.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,17 @@ def __init__(
300300
self._session = session
301301
self._clock = session_time.BigQuerySyncedClock(bqclient)
302302
self._clock.sync()
303+
self._threadpool = concurrent.futures.ThreadPoolExecutor(
304+
max_workers=1, thread_name_prefix="bigframes-loader"
305+
)
306+
307+
def read_data_async(
308+
self, local_data: local_data.ManagedArrowTable, offsets_col: str
309+
) -> concurrent.futures.Future[bq_data.BigqueryDataSource]:
310+
future = self._threadpool.submit(
311+
self._load_data_or_write_data, local_data, offsets_col
312+
)
313+
return future
303314

304315
def read_pandas(
305316
self,
@@ -350,7 +361,7 @@ def read_managed_data(
350361
session=self._session,
351362
)
352363

353-
def load_data_or_write_data(
364+
def _load_data_or_write_data(
354365
self,
355366
data: local_data.ManagedArrowTable,
356367
offsets_col: str,

0 commit comments

Comments
 (0)