From 2a0f5153acbb55ba92a30e91563acd734e80d7f9 Mon Sep 17 00:00:00 2001 From: Jonathan Xu Date: Wed, 14 Aug 2024 16:07:13 -0400 Subject: [PATCH] Unpatch threading for dataset downloading --- vsb/workloads/dataset.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/vsb/workloads/dataset.py b/vsb/workloads/dataset.py index de67c2f..9132293 100644 --- a/vsb/workloads/dataset.py +++ b/vsb/workloads/dataset.py @@ -6,7 +6,6 @@ import json import pandas import pathlib -from pinecone.grpc import PineconeGRPC import pyarrow.dataset as ds from pyarrow.parquet import ParquetDataset, ParquetFile @@ -16,6 +15,8 @@ from vsb import logger from vsb.logging import ProgressIOWrapper +import gevent.monkey + class Dataset: """ @@ -168,6 +169,20 @@ def setup_queries(self, query_limit=0): ) def _download_dataset_files(self): + # Unpatch all gevent monkeypatched modules; we use google cloud + # python libraries which will try to call stuff like socket and + # wait, and if they're monkeypatched, they'll fail with a LoopExit + # because the OS thread it runs in has no hub. + + # https://github.com/gevent/gevent/issues/1350#issuecomment-478630812 + + # Note that this does mean that this function will block in a non- + # gevent-friendly way. Ensure that it's called in a threadpool, or + # you may get heartbeat failures in distributed mode. + import threading + from importlib import reload + + reload(threading) with FileLock(self.cache / ".lock"): self.cache.mkdir(parents=True, exist_ok=True) logger.debug( @@ -175,6 +190,9 @@ def _download_dataset_files(self): ) client = Client.create_anonymous_client() bucket: Bucket = client.bucket(Dataset.gcs_bucket) + logger.debug( + f"_download_dataset_files(): threading={gevent.monkey.is_module_patched('threading')}" + ) blobs = [b for b in bucket.list_blobs(prefix=self.name + "/")] # Ignore directories (blobs ending in '/') as we don't explicilty need them # (non-empty directories will have their files downloaded @@ -230,6 +248,8 @@ def should_download(blob): # Clear the progress bar now we're done. vsb.progress.stop() vsb.progress = None + # Re-apply gevent monkeypatching. + gevent.monkey.patch_all() def _load_parquet_dataset(self, kind, limit=0): parquet_files = [f for f in (self.cache / self.name).glob(kind + "/*.parquet")]