|
29 | 29 | from bigframes import exceptions as bfe |
30 | 30 | import bigframes.constants |
31 | 31 | import bigframes.core |
32 | | -from bigframes.core import bq_data, compile, rewrite |
| 32 | +from bigframes.core import bq_data, compile, local_data, rewrite |
33 | 33 | from bigframes.core.compile.sqlglot import sql as sg_sql |
34 | 34 | from bigframes.core.compile.sqlglot import sqlglot_ir |
35 | 35 | import bigframes.core.events |
@@ -525,19 +525,21 @@ def _substitute_large_local_sources(self, original_root: nodes.BigFrameNode): |
525 | 525 | needs_upload.append(leaf.local_data_source) |
526 | 526 |
|
527 | 527 | futures = [] |
| 528 | + for local_source in needs_upload: |
| 529 | + future = self.loader.read_data_async( |
| 530 | + local_source, bigframes.core.guid.generate_guid() |
| 531 | + ) |
| 532 | + |
| 533 | + def cache_result( |
| 534 | + future: concurrent.futures.Future, |
| 535 | + local: local_data.ManagedArrowTable = local_source, |
| 536 | + ): |
| 537 | + self.cache.cache_remote_replacement(local, future.result()) |
| 538 | + |
| 539 | + future.add_done_callback(cache_result) |
| 540 | + futures.append(future) |
528 | 541 | try: |
529 | | - for local_source in needs_upload: |
530 | | - future = self.loader.read_data_async( |
531 | | - local_source, bigframes.core.guid.generate_guid() |
532 | | - ) |
533 | | - future.add_done_callback( |
534 | | - lambda f: self.cache.cache_remote_replacement( |
535 | | - local_source, f.result() |
536 | | - ) |
537 | | - ) |
538 | | - futures.append(future) |
539 | | - concurrent.futures.wait(futures) |
540 | | - for future in futures: |
| 542 | + for future in concurrent.futures.as_completed(futures): |
541 | 543 | future.result() |
542 | 544 | except Exception as e: |
543 | 545 | # cancel all futures |
|
0 commit comments