Skip to content

Commit

Permalink
Support PPE profiling for Local (#340)
Browse files Browse the repository at this point in the history
* Profiling with PPE

* Profiling with PPE

* Make mypy happy

* Make flake8 happy

* Make isort and mypy even happier

* Support all methods

* Align tag naming rule with PPE (module.Klass:method)

* Support PPE tracing

* Local: Support io class method

* Add trace switching

Can be specified in the `trace` argument when creating a Local() instance.

* Split PPE methods

* mypy: ignore ppe

* add trace test

* setup: add trace context

* doc: add output tracer results tips

---------

Co-authored-by: UENISHI Kota <[email protected]>
  • Loading branch information
KantaTamura and kuenishi committed Aug 26, 2024
1 parent b874d49 commit bdc81e5
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 26 deletions.
32 changes: 32 additions & 0 deletions docs/source/tips.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,35 @@ A. Default buffer size of file objects from ``pfio.v2.S3`` is 16MB,
with open_url('s3://bucket/very-large-file', 'wb') as dst:
with open('very-large-local-file', 'rb') as src:
shutil.copyfileobj(src, dst, length=16*1024*1024)
Q. How to output tracer results (readable by Chrome Tracing).
=============================================================

A. As shown in the sample program below,
by executing `initialize_writer()` and `flush()` after tracing,
JSON that can be read by Chrome tracing can be output.

.. code-block:: python
import json
import pytorch_pfn_extras as ppe
from pfio.v2 import Local, Path, from_url
tracer = ppe.profiler.get_tracer()
with Local(trace=True) as fs:
for f in fs.list():
if fs.isdir(f.strip()):
dir += 1
continue
fil += 1
len(fs.open(f).read())
w = ppe.writing.SimpleWriter(out_dir="")
# output '['
tracer.initialize_writer("trace.json", w)
# output json dump
tracer.flush("trace.json", w)
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ ignore_missing_imports = True

[mypy-pyarrow.*]
ignore_missing_imports = True

[mypy-pytorch_pfn_extras.*]
ignore_missing_imports = True
21 changes: 21 additions & 0 deletions pfio/v2/_profiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
try:
from pytorch_pfn_extras.profiler import record, record_iterable

except ImportError:

class _DummyRecord:
def __init__(self):
pass

def __enter__(self):
pass

def __exit__(self, exc_type, exc_value, traceback):
pass

# IF PPE is not available, wrap with noop
def record(tag, trace, *args): # type: ignore # NOQA
return _DummyRecord()

def record_iterable(tag, iter, trace, *args): # type: ignore # NOQA
yield from iter
97 changes: 72 additions & 25 deletions pfio/v2/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,32 @@
import shutil
from typing import Optional

from ._profiler import record, record_iterable
from .fs import FS, FileStat, format_repr


class LocalProfileWrapper:
def __init__(self, fp):
self.fp = fp

def __enter__(self):
self.fp.__enter__()
return self

def __exit__(self, exc_type, exc_value, traceback):
self.fp.__exit__(exc_type, exc_value, traceback)

def __getattr__(self, name):
attr = getattr(self.fp, name)
if callable(attr):
def wrapper(*args, **kwargs):
with record(f"pfio.v2.Local:{attr.__name__}", trace=True):
return attr(*args, **kwargs)
return wrapper
else:
return attr


class LocalFileStat(FileStat):
"""Detailed information of a POSIX file
Expand Down Expand Up @@ -45,9 +68,11 @@ def __init__(self, _stat, filename):


class Local(FS):
def __init__(self, cwd=None, create=False, **_):
def __init__(self, cwd=None, trace=False, create=False, **_):
super().__init__()

self.trace = trace

if cwd is None:
self._cwd = ''
else:
Expand Down Expand Up @@ -87,13 +112,28 @@ def open(self, file_path, mode='r',
buffering=-1, encoding=None, errors=None,
newline=None, closefd=True, opener=None):

path = os.path.join(self.cwd, file_path)
return io.open(path, mode,
buffering, encoding, errors,
newline, closefd, opener)
with record("pfio.v2.Local:open", trace=self.trace):
path = os.path.join(self.cwd, file_path)

fp = io.open(path, mode,
buffering, encoding, errors,
newline, closefd, opener)

# Add ppe recorder to io class methods (e.g. read, write)
if self.trace:
return LocalProfileWrapper(fp)
else:
return fp

def list(self, path: Optional[str] = '', recursive=False,
detail=False):
for e in record_iterable("pfio.v2.Local:list",
self._list(path, recursive, detail),
trace=self.trace):
yield e

def _list(self, path: Optional[str] = '', recursive=False,
detail=False):
path_or_prefix = os.path.join(self.cwd,
"" if path is None else path)

Expand Down Expand Up @@ -129,43 +169,50 @@ def _recursive_list(self, prefix_end_index: int, path: str,
e.path, detail)

def stat(self, path):
path = os.path.join(self.cwd, path)
return LocalFileStat(os.stat(path), path)
with record("pfio.v2.Local:stat", trace=self.trace):
path = os.path.join(self.cwd, path)
return LocalFileStat(os.stat(path), path)

def isdir(self, path: str):
path = os.path.join(self.cwd, path)
return os.path.isdir(path)

def mkdir(self, file_path: str, mode=0o777, *args, dir_fd=None):
path = os.path.join(self.cwd, file_path)
return os.mkdir(path, mode, *args, dir_fd=None)
with record("pfio.v2.Local:mkdir", trace=self.trace):
path = os.path.join(self.cwd, file_path)
return os.mkdir(path, mode, *args, dir_fd=None)

def makedirs(self, file_path: str, mode=0o777, exist_ok=False):
path = os.path.join(self.cwd, file_path)
return os.makedirs(path, mode, exist_ok)
with record("pfio.v2.Local:makedirs", trace=self.trace):
path = os.path.join(self.cwd, file_path)
return os.makedirs(path, mode, exist_ok)

def exists(self, file_path: str):
path = os.path.join(self.cwd, file_path)
return os.path.exists(path)
with record("pfio.v2.Local:exists", trace=self.trace):
path = os.path.join(self.cwd, file_path)
return os.path.exists(path)

def rename(self, src, dst):
s = os.path.join(self.cwd, src)
d = os.path.join(self.cwd, dst)
return os.rename(s, d)
with record("pfio.v2.Local:rename", trace=self.trace):
s = os.path.join(self.cwd, src)
d = os.path.join(self.cwd, dst)
return os.rename(s, d)

def remove(self, file_path: str, recursive=False):
file_path = os.path.join(self.cwd, file_path)
if recursive:
return shutil.rmtree(file_path)
if os.path.isdir(file_path):
return os.rmdir(file_path)
with record("pfio.v2.Local:remove", trace=self.trace):
file_path = os.path.join(self.cwd, file_path)
if recursive:
return shutil.rmtree(file_path)
if os.path.isdir(file_path):
return os.rmdir(file_path)

return os.remove(file_path)
return os.remove(file_path)

def glob(self, pattern: str):
return [
str(item.relative_to(self.cwd))
for item in pathlib.Path(self.cwd).glob(pattern)]
with record("pfio.v2.Local:glob", trace=self.trace):
return [
str(item.relative_to(self.cwd))
for item in pathlib.Path(self.cwd).glob(pattern)]

def _canonical_name(self, file_path: str) -> str:
return "file:/" + os.path.normpath(os.path.join(self.cwd, file_path))
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
'doc': ['sphinx', 'sphinx_rtd_theme'],
'bench': ['numpy>=1.19.5', 'torch>=1.9.0', 'Pillow<=8.2.0'],
'hdfs': ['pyarrow>=6.0.0'],
'trace': ['pytorch-pfn-extras'],
},
# When updating install requires, docs/requirements.txt should be updated too
install_requires=['boto3', 'deprecation', 'urllib3'],
Expand Down
44 changes: 44 additions & 0 deletions tests/v2_tests/test_local.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import io
import json
import os
import pickle
import tempfile
Expand Down Expand Up @@ -292,5 +293,48 @@ def test_from_url_create_option(self):
assert os.path.exists(path) and os.path.isdir(path)


def test_local_rw_profiling():
ppe = pytest.importorskip("pytorch_pfn_extras")

with Local(trace=True) as fs:
ppe.profiler.clear_tracer()

with fs.open('foo.txt', 'w') as fp:
fp.write('bar')

dict = ppe.profiler.get_tracer().state_dict()
keys = [event["name"] for event in json.loads(dict['_event_list'])]

assert "pfio.v2.Local:open" in keys
assert "pfio.v2.Local:write" in keys

with Local(trace=True) as fs:
ppe.profiler.clear_tracer()

with fs.open('foo.txt', 'r') as fp:
tmp = fp.read()
assert tmp == 'bar'

dict = ppe.profiler.get_tracer().state_dict()
keys = [event["name"] for event in json.loads(dict['_event_list'])]

assert "pfio.v2.Local:open" in keys
assert "pfio.v2.Local:read" in keys

# no profile case
with Local(trace=False) as fs:
ppe.profiler.clear_tracer()

with fs.open('foo.txt', 'r') as fp:
tmp = fp.read()
assert tmp == 'bar'

dict = ppe.profiler.get_tracer().state_dict()
keys = [event["name"] for event in json.loads(dict['_event_list'])]

assert "pfio.v2.Local:open" not in keys
assert "pfio.v2.Local:read" not in keys


if __name__ == '__main__':
unittest.main()
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
envlist = py38,py39,py310,py311,py312,doc

[testenv]
deps = .[test]
deps = .[test, trace]
skipsdist = True
commands =
flake8 pfio tests
Expand Down

0 comments on commit bdc81e5

Please sign in to comment.