Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] unpatch threading for dataset downloading #223

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion vsb/workloads/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import json
import pandas
import pathlib
from pinecone.grpc import PineconeGRPC
import pyarrow.dataset as ds
from pyarrow.parquet import ParquetDataset, ParquetFile

Expand All @@ -16,6 +15,8 @@
from vsb import logger
from vsb.logging import ProgressIOWrapper

import gevent.monkey


class Dataset:
"""
Expand Down Expand Up @@ -168,13 +169,30 @@ def setup_queries(self, query_limit=0):
)

def _download_dataset_files(self):
# Unpatch all gevent monkeypatched modules; we use google cloud
# python libraries which will try to call stuff like socket and
# wait, and if they're monkeypatched, they'll fail with a LoopExit
# because the OS thread it runs in has no hub.

# https://github.com/gevent/gevent/issues/1350#issuecomment-478630812

# Note that this does mean that this function will block in a non-
# gevent-friendly way. Ensure that it's called in a threadpool, or
# you may get heartbeat failures in distributed mode.
import threading
from importlib import reload

reload(threading)
with FileLock(self.cache / ".lock"):
self.cache.mkdir(parents=True, exist_ok=True)
logger.debug(
f"Checking for existence of dataset '{self.name}' in dataset cache '{self.cache}'"
)
client = Client.create_anonymous_client()
bucket: Bucket = client.bucket(Dataset.gcs_bucket)
logger.debug(
f"_download_dataset_files(): threading={gevent.monkey.is_module_patched('threading')}"
)
blobs = [b for b in bucket.list_blobs(prefix=self.name + "/")]
# Ignore directories (blobs ending in '/') as we don't explicilty need them
# (non-empty directories will have their files downloaded
Expand Down Expand Up @@ -230,6 +248,8 @@ def should_download(blob):
# Clear the progress bar now we're done.
vsb.progress.stop()
vsb.progress = None
# Re-apply gevent monkeypatching.
gevent.monkey.patch_all()

def _load_parquet_dataset(self, kind, limit=0):
parquet_files = [f for f in (self.cache / self.name).glob(kind + "/*.parquet")]
Expand Down
Loading