Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add FS.scheme property #338

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions pfio/v2/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,16 @@ class FS(abc.ABC):
'''

_cwd = ''
_scheme = ''

def __init__(self):
def __init__(self, scheme=None):
self.pid = os.getpid()
if scheme:
self._scheme = str(scheme)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like you to add a few sentences of document here to clarify this variable stores the name of custom scheme or actual implementation scheme (e.g. s3, data, or hdfs). Reading the code told me either can be here. The former if custom scheme defined, and the latter if it's not defined.

@property
def scheme(self):
return self._scheme

@property
def cwd(self):
Expand Down Expand Up @@ -426,32 +433,33 @@ def _zip_check_create_not_supported():

def _from_scheme(scheme, dirname, kwargs, bucket=None):
known_scheme = ['file', 'hdfs', 's3']
actual_scheme = scheme

# Custom scheme; using configparser for older Python. Will
# update to toml in Python 3.11 once 3.10 is in the end.
if scheme not in known_scheme:
if actual_scheme not in known_scheme:
config_dict = config.get_custom_scheme(scheme)
if config_dict is not None:
scheme = config_dict.pop('scheme') # Get the real scheme
actual_scheme = config_dict.pop('scheme') # Get the real scheme
# Custom scheme expected here
if scheme not in known_scheme:
raise ValueError("Scheme {} is not supported", scheme)
if actual_scheme not in known_scheme:
raise ValueError("Scheme {} is not supported", actual_scheme)
for k in config_dict:
if k not in kwargs:
# Don't overwrite with configuration value
kwargs[k] = config_dict[k]

if scheme == 'file':
if actual_scheme == 'file':
from .local import Local
fs = Local(dirname, **kwargs)
elif scheme == 'hdfs':
fs = Local(dirname, scheme=scheme, **kwargs)
elif actual_scheme == 'hdfs':
from .hdfs import Hdfs
fs = Hdfs(dirname, **kwargs)
elif scheme == 's3':
fs = Hdfs(dirname, scheme=scheme, **kwargs)
elif actual_scheme == 's3':
from .s3 import S3
fs = S3(bucket=bucket, prefix=dirname, **kwargs)
fs = S3(bucket=bucket, prefix=dirname, scheme=scheme, **kwargs)
else:
raise RuntimeError("scheme '{}' is not defined".format(scheme))
raise RuntimeError("scheme '{}' is not defined".format(actual_scheme))

return fs

Expand Down
4 changes: 2 additions & 2 deletions pfio/v2/hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ class Hdfs(FS):

'''

def __init__(self, cwd=None, create=False, **_):
super().__init__()
def __init__(self, cwd=None, create=False, scheme=None, **_):
super().__init__(scheme=scheme)
self._nameservice, self._fs = _create_fs()
assert self._fs is not None
self.username = self._get_principal_name()
Expand Down
2 changes: 1 addition & 1 deletion pfio/v2/http_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self,
max_cache_size: int = 1024 * 1024 * 1024,
bearer_token_path: Optional[str] = None):
assert not isinstance(fs, HTTPCachedFS)
super().__init__()
super().__init__(scheme=fs.scheme)

self.fs = fs
self.max_cache_size = max_cache_size
Expand Down
4 changes: 2 additions & 2 deletions pfio/v2/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ def __init__(self, _stat, filename):


class Local(FS):
def __init__(self, cwd=None, create=False, **_):
super().__init__()
def __init__(self, cwd=None, create=False, scheme=None, **_):
super().__init__(scheme=scheme)

if cwd is None:
self._cwd = ''
Expand Down
27 changes: 4 additions & 23 deletions pfio/v2/pathlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,48 +100,31 @@ class PurePath(PathLike):
Args:
args: construct paths.
fs: target file system.
scheme: specify URL scheme. (for `as_uri` method)

Note:
It conforms to `pathlib.PurePosixPath` of Python 3.12 specification.

this class not inherits any `pathlib` classes because
pfio filesystems is not suitable for pathlib abstact
and helper classes.

TODO:
`scheme` should moves to `FS`.
"""

def __init__(
self,
*args: Union[str, PathLike],
fs: FS,
scheme: Optional[str] = None,
) -> None:
if isinstance(fs, Local):
scheme = scheme or "file"
elif isinstance(fs, S3):
scheme = scheme or "s3"
elif isinstance(fs, Hdfs):
scheme = scheme or "hdfs"
elif isinstance(fs, Zip):
scheme = scheme or ""
else:
raise ValueError(f"unsupported FS: {fs}")

self._fs: FS = fs
self._scheme = scheme
self._pure = PurePosixPath(*args)
self._hash = hash(self._pure) + hash(self._fs) + hash(self._scheme)
self._hash = hash(self._pure) + hash(self._fs) + hash(self.scheme)

@property
def sep(self) -> str:
return "/"

@property
def scheme(self) -> str:
return self._scheme
return self._fs.scheme

def __hash__(self) -> int:
return self._hash
Expand Down Expand Up @@ -339,7 +322,7 @@ def with_segments(
self: SelfPurePathType,
*args: Union[str, PathLike],
) -> SelfPurePathType:
return type(self)(*args, fs=self._fs, scheme=self.scheme)
return type(self)(*args, fs=self._fs)


class Path(PurePath):
Expand All @@ -349,7 +332,6 @@ class Path(PurePath):
Args:
args: construct paths.
fs: target file system.
scheme: specify URL scheme. (for `as_uri` method)

Note:
many methods raise `NotImplementedError`
Expand All @@ -364,9 +346,8 @@ def __init__(
self,
*args: str,
fs: FS,
scheme: Optional[str] = None,
) -> None:
super().__init__(*args, fs=fs, scheme=scheme)
super().__init__(*args, fs=fs)

def _as_relative_to_fs(self) -> str:
return _removeprefix(self.as_posix(), self.anchor)
Expand Down
3 changes: 2 additions & 1 deletion pfio/v2/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,10 @@ def __init__(self, bucket, prefix=None,
mpu_chunksize=32*1024*1024,
buffering=-1,
create=False,
scheme=None,
_skip_connect=None, # For test purpose
**_):
super().__init__()
super().__init__(scheme=scheme)
self.bucket = bucket
self.create_bucket = create_bucket
if prefix is not None:
Expand Down
2 changes: 1 addition & 1 deletion pfio/v2/zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class Zip(FS):

def __init__(self, backend, file_path, mode='r', create=False,
local_cache=False, local_cachedir=None, **kwargs):
super().__init__()
super().__init__(scheme=backend.scheme)
self.backend = backend
self.file_path = file_path
self.mode = mode
Expand Down
4 changes: 4 additions & 0 deletions tests/v2_tests/test_custom_scheme.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ def test_ini():

with pfio.v2.from_url('foobar://pfio/') as fs:
assert isinstance(fs, pfio.v2.Local)
assert fs.scheme == "foobar"

with pfio.v2.from_url('baz://pfio/', _skip_connect=True) as s3:
assert isinstance(s3, pfio.v2.S3)
assert s3.scheme == "baz"

assert 'https://s3.example.com' == s3.kwargs['endpoint_url']
assert 'hoge' == s3.kwargs['aws_access_key_id']
Expand Down Expand Up @@ -56,9 +58,11 @@ def test_add_custom_scheme():

with pfio.v2.from_url('foobar2://pfio/') as fs:
assert isinstance(fs, pfio.v2.Local)
assert fs.scheme == "foobar2"

with pfio.v2.from_url('baz2://pfio/', _skip_connect=True) as s3:
assert isinstance(s3, pfio.v2.S3)
assert s3.scheme == "baz2"

assert 'https://s3.example.com' == s3.kwargs['endpoint_url']
assert 'hoge' == s3.kwargs['aws_access_key_id']
Expand Down
4 changes: 4 additions & 0 deletions tests/v2_tests/test_hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ def test_repr_str(self):
repr(fs)
str(fs)

def test_scheme(self):
with Hdfs(self.dirname, scheme="hdfs") as fs:
assert fs.scheme == "hdfs"

def test_read_non_exist(self):
non_exist_file = "non_exist_file.txt"

Expand Down
12 changes: 12 additions & 0 deletions tests/v2_tests/test_http_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def test_httpcache_simple(target):

with gen_fs(target) as underlay:
fs = HTTPCachedFS(http_cache, underlay)

assert fs.scheme == underlay.scheme
with fs.open(filename, mode="wb") as fp:
fp.write(content)
with fs.open(filename, mode="rb") as fp:
Expand All @@ -94,6 +96,8 @@ def test_httpcache_too_large():

with gen_fs("local") as underlay:
fs = HTTPCachedFS(http_cache, underlay)
assert fs.scheme == underlay.scheme

with fs.open(filename, mode="wb") as fp:
for _ in range(1024 + 1): # 1 MB exceeds
fp.write(one_mb_array)
Expand Down Expand Up @@ -135,6 +139,8 @@ def test_httpcache_zipfile_flat(target):
with gen_fs(target) as underlay:
with underlay.open_zip(zipfilename, mode="w") as zipfs:
fs = HTTPCachedFS(http_cache, zipfs)
assert fs.scheme == underlay.scheme

with fs.open(filename1, mode="wb") as fp:
fp.write(filecontent1)
with fs.open(filename2, mode="wb") as fp:
Expand All @@ -144,6 +150,8 @@ def test_httpcache_zipfile_flat(target):

with underlay.open_zip(zipfilename, mode="r") as zipfs:
fs = HTTPCachedFS(http_cache, zipfs)
assert fs.scheme == underlay.scheme

with fs.open(filename1, mode="rb") as fp:
assert fp.read(-1) == filecontent1
with fs.open(filename2, mode="rb") as fp:
Expand Down Expand Up @@ -178,6 +186,8 @@ def test_httpcache_zipfile_archived(target):
cached_fs = HTTPCachedFS(http_cache, underlay)

with cached_fs.open_zip(zipfilename, mode="w") as fs:
assert cached_fs.scheme == underlay.scheme

with fs.open(filename1, mode="wb") as fp:
fp.write(filecontent1)
with fs.open(filename2, mode="wb") as fp:
Expand All @@ -186,6 +196,8 @@ def test_httpcache_zipfile_archived(target):
assert len(cache_content) == 0

with cached_fs.open_zip(zipfilename, mode="r") as fs:
assert cached_fs.scheme == underlay.scheme

with fs.open(filename1, mode="rb") as fp:
assert fp.read(-1) == filecontent1
with fs.open(filename2, mode="rb") as fp:
Expand Down
7 changes: 7 additions & 0 deletions tests/v2_tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ def test_repr_str(self):
str(fs)
repr(fs)

def test_scheme(self):
with Local(self.testdir.name) as fs:
assert fs.scheme == ""

with Local(self.testdir.name, scheme="file") as fs:
assert fs.scheme == "file"

def test_read_string(self):

with Local() as fs:
Expand Down
11 changes: 11 additions & 0 deletions tests/v2_tests/test_pathlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,17 @@ def test_path_iterdir(storage: str, path: str) -> None:
assert sorted(actual_entries) == sorted(filtered)


def test_path_scheme_property(storage: str) -> None:
scheme = urlparse(storage).scheme or "file"

with from_url(url=storage) as fs:
p = PurePath(fs=fs)
assert p.scheme == scheme

q = Path(fs=fs)
assert q.scheme == scheme


def test_unlink(storage: str) -> None:
with from_url(url=storage) as fs:
target = Path("my", fs=fs)
Expand Down
5 changes: 5 additions & 0 deletions tests/v2_tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ def test_s3_repr_str(s3_fixture):
str(s3)


def test_scheme(s3_fixture):
with from_url('s3://test-bucket/base', **s3_fixture.aws_kwargs) as s3:
assert s3.scheme == "s3"


def test_s3_files(s3_fixture):
with from_url('s3://test-bucket/base',
**s3_fixture.aws_kwargs) as s3:
Expand Down
2 changes: 2 additions & 0 deletions tests/v2_tests/test_s3_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def test_s3_zip():
with from_url('s3://{}/test.zip'.format(bucket)) as z:
assert isinstance(z, Zip)
assert isinstance(z.fileobj, io.BufferedReader)
assert z.scheme == "s3"

assert zipfile.is_zipfile(z.fileobj)
with z.open('file', 'rb') as fp:
Expand All @@ -42,6 +43,7 @@ def test_s3_zip():
with from_url('s3://{}/test.zip'.format(bucket),
buffering=0) as z:
assert isinstance(z, Zip)
assert z.scheme == "s3"
assert 'buffering' in z.kwargs
assert isinstance(z.fileobj, pfio.v2.s3._ObjectReader)

Expand Down
4 changes: 4 additions & 0 deletions tests/v2_tests/test_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ def test_repr_str(self):
repr(z)
str(z)

def test_scheme(self):
with local.open_zip(self.zip_file_path) as z:
assert z.scheme == local.scheme

def test_read_bytes(self):
with local.open_zip(os.path.abspath(self.zip_file_path)) as z:
with z.open(self.zipped_file_path, "rb") as zipped_file:
Expand Down