Skip to content

Commit

Permalink
Unpatch threading for dataset downloading
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathanzxu committed Aug 14, 2024
1 parent 96c1fe1 commit e3b50c0
Showing 1 changed file with 22 additions and 1 deletion.
23 changes: 22 additions & 1 deletion vsb/workloads/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import json
import pandas
import pathlib
from pinecone.grpc import PineconeGRPC
import pyarrow.dataset as ds
from pyarrow.parquet import ParquetDataset, ParquetFile

Expand All @@ -16,6 +15,9 @@
from vsb import logger
from vsb.logging import ProgressIOWrapper

# remove
import gevent.monkey


class Dataset:
"""
Expand Down Expand Up @@ -168,13 +170,30 @@ def setup_queries(self, query_limit=0):
)

def _download_dataset_files(self):
# Unpatch all gevent monkeypatched modules; we use google cloud
# python libraries which will try to call stuff like socket and
# wait, and if they're monkeypatched, they'll fail with a LoopExit
# because the OS thread it runs in has no hub.

# https://github.com/gevent/gevent/issues/1350#issuecomment-478630812

# Note that this does mean that this function will block in a non-
# gevent-friendly way. Ensure that it's called in a threadpool, or
# you may get heartbeat failures in distributed mode.
import threading
from importlib import reload

reload(threading)
with FileLock(self.cache / ".lock"):
self.cache.mkdir(parents=True, exist_ok=True)
logger.debug(
f"Checking for existence of dataset '{self.name}' in dataset cache '{self.cache}'"
)
client = Client.create_anonymous_client()
bucket: Bucket = client.bucket(Dataset.gcs_bucket)
logger.debug(
f"_download_dataset_files(): threading={gevent.monkey.is_module_patched('threading')}"
)
blobs = [b for b in bucket.list_blobs(prefix=self.name + "/")]
# Ignore directories (blobs ending in '/') as we don't explicilty need them
# (non-empty directories will have their files downloaded
Expand Down Expand Up @@ -230,6 +249,8 @@ def should_download(blob):
# Clear the progress bar now we're done.
vsb.progress.stop()
vsb.progress = None
# Re-apply gevent monkeypatching.
gevent.monkey.patch_all()

def _load_parquet_dataset(self, kind, limit=0):
parquet_files = [f for f in (self.cache / self.name).glob(kind + "/*.parquet")]
Expand Down

0 comments on commit e3b50c0

Please sign in to comment.