diff --git a/vsb/workloads/dataset.py b/vsb/workloads/dataset.py index de67c2f..65a1b8b 100644 --- a/vsb/workloads/dataset.py +++ b/vsb/workloads/dataset.py @@ -6,7 +6,6 @@ import json import pandas import pathlib -from pinecone.grpc import PineconeGRPC import pyarrow.dataset as ds from pyarrow.parquet import ParquetDataset, ParquetFile @@ -16,6 +15,9 @@ from vsb import logger from vsb.logging import ProgressIOWrapper +# remove +import gevent.monkey + class Dataset: """ @@ -168,6 +170,20 @@ def setup_queries(self, query_limit=0): ) def _download_dataset_files(self): + # Unpatch all gevent monkeypatched modules; we use google cloud + # python libraries which will try to call stuff like socket and + # wait, and if they're monkeypatched, they'll fail with a LoopExit + # because the OS thread it runs in has no hub. + + # https://github.com/gevent/gevent/issues/1350#issuecomment-478630812 + + # Note that this does mean that this function will block in a non- + # gevent-friendly way. Ensure that it's called in a threadpool, or + # you may get heartbeat failures in distributed mode. + import threading + from importlib import reload + + reload(threading) with FileLock(self.cache / ".lock"): self.cache.mkdir(parents=True, exist_ok=True) logger.debug( @@ -175,6 +191,9 @@ def _download_dataset_files(self): ) client = Client.create_anonymous_client() bucket: Bucket = client.bucket(Dataset.gcs_bucket) + logger.debug( + f"_download_dataset_files(): threading={gevent.monkey.is_module_patched('threading')}" + ) blobs = [b for b in bucket.list_blobs(prefix=self.name + "/")] # Ignore directories (blobs ending in '/') as we don't explicilty need them # (non-empty directories will have their files downloaded @@ -230,6 +249,8 @@ def should_download(blob): # Clear the progress bar now we're done. vsb.progress.stop() vsb.progress = None + # Re-apply gevent monkeypatching. + gevent.monkey.patch_all() def _load_parquet_dataset(self, kind, limit=0): parquet_files = [f for f in (self.cache / self.name).glob(kind + "/*.parquet")]