Skip to content

Commit

Permalink
Merge pull request #344 from KantaTamura/zip-profile
Browse files Browse the repository at this point in the history
Support PPE profiling for `Zip`
  • Loading branch information
k5342 committed Aug 28, 2024
2 parents 43ea5de + bb76d3c commit 14f0232
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 46 deletions.
138 changes: 92 additions & 46 deletions pfio/v2/zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,36 @@
from datetime import datetime
from typing import Optional, Set

from ._profiler import record, record_iterable
from .fs import FS, FileStat, format_repr

logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())


class ZipProfileIOWrapper:
def __init__(self, fp):
self.fp = fp

def __enter__(self):
self.fp.__enter__()
return self

def __exit__(self, exc_type, exc_value, traceback):
with record("pfio.v2.Zip:exit-context", trace=True):
self.fp.__exit__(exc_type, exc_value, traceback)

def __getattr__(self, name):
attr = getattr(self.fp, name)
if callable(attr):
def wrapper(*args, **kwargs):
with record(f"pfio.v2.Zip:{attr.__name__}", trace=True):
return attr(*args, **kwargs)
return wrapper
else:
return attr


class ZipFileStat(FileStat):
"""Detailed information of a file in a Zip
Expand Down Expand Up @@ -51,13 +75,15 @@ def __init__(self, zip_info):
class Zip(FS):
_readonly = True

def __init__(self, backend, file_path, mode='r', create=False,
local_cache=False, local_cachedir=None, **kwargs):
def __init__(self, backend, file_path, mode='r',
create=False, local_cache=False, local_cachedir=None,
trace=False, **kwargs):
super().__init__()
self.backend = backend
self.file_path = file_path
self.mode = mode
self.kwargs = kwargs
self.trace = trace

if create:
raise ValueError("create option is not supported")
Expand All @@ -74,13 +100,15 @@ def __init__(self, backend, file_path, mode='r', create=False,
self._reset()

def _reset(self):
obj = self.backend.open(self.file_path,
self.mode + 'b',
**self.kwargs)
self.fileobj = obj
with record("pfio.v2.Zip:create-zipfile-obj", trace=self.trace):
obj = self.backend.open(self.file_path,
self.mode + 'b',
**self.kwargs)
self.fileobj = obj

assert self.fileobj is not None
self.zipobj = zipfile.ZipFile(self.fileobj, self.mode)

assert self.fileobj is not None
self.zipobj = zipfile.ZipFile(self.fileobj, self.mode)
self.name_cache: Optional[Set[str]] = None
if self._readonly:
self.name_cache = self._names()
Expand Down Expand Up @@ -108,43 +136,59 @@ def __repr__(self):
def open(self, file_path, mode='r',
buffering=-1, encoding=None, errors=None,
newline=None, closefd=True, opener=None):
self._checkfork()
with record("pfio.v2.Zip:open", trace=self.trace):
self._checkfork()

file_path = os.path.join(self.cwd, os.path.normpath(file_path))
fp = self.zipobj.open(file_path, mode.replace('b', ''))
file_path = os.path.join(self.cwd, os.path.normpath(file_path))
fp = self.zipobj.open(file_path, mode.replace('b', ''))

if 'b' not in mode:
fp = io.TextIOWrapper(fp, encoding, errors, newline)
if 'b' not in mode:
fp = io.TextIOWrapper(fp, encoding, errors, newline)

return fp
if self.trace:
return ZipProfileIOWrapper(fp)
else:
return fp

def subfs(self, path):
# TODO
raise NotImplementedError()

def close(self):
self._checkfork()
self.zipobj.close()
self.fileobj.close()
with record("pfio.v2.Zip:close", trace=self.trace):
self._checkfork()
self.zipobj.close()
self.fileobj.close()

def stat(self, path):
self._checkfork()
names = self._names()
path = os.path.join(self.cwd, os.path.normpath(path))
if path in names:
actual_path = path
elif not path.endswith('/') and path + '/' in names:
# handles cases when path is a directory but without trailing slash
# see issue $67
actual_path = path + '/'
else:
raise FileNotFoundError(
"{} is not found".format(path))
with record("pfio.v2.Zip:stat", trace=self.trace):
self._checkfork()
names = self._names()
path = os.path.join(self.cwd, os.path.normpath(path))
if path in names:
actual_path = path
elif not path.endswith('/') and path + '/' in names:
# handles cases when path is a directory
# but without trailing slash
# see issue $67
actual_path = path + '/'
else:
raise FileNotFoundError(
"{} is not found".format(path))

return ZipFileStat(self.zipobj.getinfo(actual_path))
return ZipFileStat(self.zipobj.getinfo(actual_path))

def list(self, path_or_prefix: Optional[str] = "", recursive=False,
detail=False):
for e in record_iterable("pfio.v2.Zip:list",
self._list(path_or_prefix,
recursive,
detail),
trace=self.trace):
yield e

def _list(self, path_or_prefix: Optional[str] = "", recursive=False,
detail=False):
self._checkfork()

if path_or_prefix:
Expand Down Expand Up @@ -207,18 +251,19 @@ def list(self, path_or_prefix: Optional[str] = "", recursive=False,
yield return_file_name

def isdir(self, file_path: str):
self._checkfork()
file_path = os.path.join(self.cwd, file_path)
if self.exists(file_path):
return self.stat(file_path).isdir()
else:
file_path = os.path.normpath(file_path)
# check if directories are NOT included in the zip
if any(name.startswith(file_path + "/")
for name in self._names()):
return True
with record("pfio.v2.Zip:isdir", trace=self.trace):
self._checkfork()
file_path = os.path.join(self.cwd, file_path)
if self.exists(file_path):
return self.stat(file_path).isdir()
else:
file_path = os.path.normpath(file_path)
# check if directories are NOT included in the zip
if any(name.startswith(file_path + "/")
for name in self._names()):
return True

return False
return False

def mkdir(self, file_path: str, mode=0o777, *args, dir_fd=None):
raise io.UnsupportedOperation("zip does not support mkdir")
Expand All @@ -227,11 +272,12 @@ def makedirs(self, file_path: str, mode=0o777, exist_ok=False):
raise io.UnsupportedOperation("zip does not support makedirs")

def exists(self, file_path: str):
self._checkfork()
file_path = os.path.join(self.cwd, os.path.normpath(file_path))
namelist = self.zipobj.namelist()
return (file_path in namelist
or file_path + "/" in namelist)
with record("pfio.v2.Zip:exists", trace=self.trace):
self._checkfork()
file_path = os.path.join(self.cwd, os.path.normpath(file_path))
namelist = self.zipobj.namelist()
return (file_path in namelist
or file_path + "/" in namelist)

def rename(self, *args):
raise io.UnsupportedOperation
Expand Down
37 changes: 37 additions & 0 deletions tests/v2_tests/test_s3_zip.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import io
import json
import multiprocessing
import os
import shutil
Expand Down Expand Up @@ -186,3 +187,39 @@ def test_force_type2():
k = "dir/f"
with s3z.open(k, 'wb') as fp:
fp.write(b"bar")


@mock_aws
def test_s3_zip_profiling():
ppe = pytest.importorskip("pytorch_pfn_extras")

with tempfile.TemporaryDirectory() as tmpdir:
zipfilename = os.path.join(tmpdir, "test.zip")
zft = ZipForTest(zipfilename)
bucket = "test-dummy-bucket"

with from_url('s3://{}/'.format(bucket),
create_bucket=True) as s3:
with open(zipfilename, 'rb') as src, \
s3.open('test.zip', 'wb') as dst:
shutil.copyfileobj(src, dst)

ppe.profiler.clear_tracer()
with from_url('s3://{}/test.zip'.format(bucket),
trace=True) as fs:
with fs.open('file', 'rb') as fp:
assert zft.content('file') == fp.read()

state = ppe.profiler.get_tracer().state_dict()
keys = [event["name"] for event in json.loads(state['_event_list'])]

assert "pfio.v2.Zip:create-zipfile-obj" in keys
assert "pfio.v2.Zip:open" in keys
assert "pfio.v2.Zip:read" in keys
assert "pfio.v2.Zip:close" in keys

assert "pfio.v2.S3:open" in keys
assert "pfio.v2.S3:read" in keys
assert "pfio.v2.S3:close" in keys

assert "pfio.boto3:get_object" in keys
29 changes: 29 additions & 0 deletions tests/v2_tests/test_zip.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import io
import json
import multiprocessing
import os
import pickle
Expand Down Expand Up @@ -890,3 +891,31 @@ def test_is_zipfile():
with local.open_zip(zipfilename) as zfs:
for o in zfs.list(recursive=True, detail=True):
assert isinstance(o, ZipFileStat)


def test_zip_profiling():
ppe = pytest.importorskip("pytorch_pfn_extras")

with tempfile.TemporaryDirectory() as tmpdir:
zipfilename = os.path.join(tmpdir, 'test.zip')
_ = ZipForTest(zipfilename)

ppe.profiler.clear_tracer()
with from_url(zipfilename, trace=True) as fs:
for name in fs.list(recursive=True):
with fs.open(name, mode='r') as fp:
_ = fp.read()

state = ppe.profiler.get_tracer().state_dict()
keys = [event["name"] for event in json.loads(state['_event_list'])]

assert "pfio.v2.Zip:create-zipfile-obj" in keys
assert "pfio.v2.Zip:list-0" in keys
assert "pfio.v2.Zip:list-1" in keys
assert "pfio.v2.Zip:open" in keys
assert "pfio.v2.Zip:read" in keys
assert "pfio.v2.Zip:close" in keys

assert "pfio.v2.Local:open" in keys
assert "pfio.v2.Local:read" in keys
assert "pfio.v2.Local:close" in keys

0 comments on commit 14f0232

Please sign in to comment.