Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 46 additions & 5 deletions src/dvc_objects/fs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ def info(
path: AnyFSPath,
callback: fsspec.Callback = ...,
batch_size: Optional[int] = ...,
return_exceptions: Literal[False] = ...,
**kwargs,
) -> "Entry": ...

Expand All @@ -582,12 +583,44 @@ def info(
path: list[AnyFSPath],
callback: fsspec.Callback = ...,
batch_size: Optional[int] = ...,
return_exceptions: Literal[False] = ...,
) -> list["Entry"]: ...

def info(self, path, callback=DEFAULT_CALLBACK, batch_size=None, **kwargs):
@overload
def info(
self,
path: AnyFSPath,
callback: fsspec.Callback = ...,
batch_size: Optional[int] = ...,
return_exceptions: Literal[True] = ...,
**kwargs,
) -> Union["Entry", Exception]: ...

@overload
def info(
self,
path: list[AnyFSPath],
callback: fsspec.Callback = ...,
batch_size: Optional[int] = ...,
return_exceptions: Literal[True] = ...,
) -> list[Union["Entry", Exception]]: ...

def info(
self,
path,
callback: fsspec.Callback = DEFAULT_CALLBACK,
batch_size=None,
return_exceptions=False,
**kwargs,
):
if isinstance(path, str):
return self.fs.info(path, **kwargs)
callback.set_size(len(path))
try:
return self.fs.info(path, **kwargs)
except Exception as e:
if return_exceptions:
return e
raise

jobs = batch_size or self.jobs
if self.fs.async_impl:
loop = get_loop()
Expand All @@ -596,14 +629,22 @@ def info(self, path, callback=DEFAULT_CALLBACK, batch_size=None, **kwargs):
[self.fs._info(p, **kwargs) for p in path],
batch_size=jobs,
callback=callback,
return_exceptions=return_exceptions,
),
loop,
)
return fut.result()

func = partial(self.fs.info, **kwargs)
def info_func(p):
try:
return self.fs.info(p, **kwargs)
except Exception as e:
if return_exceptions:
return e
raise

with ThreadPoolExecutor(max_workers=jobs, cancel_on_error=True) as executor:
it = executor.map(func, path)
it = executor.map(info_func, path)
return list(callback.wrap(it))

def mkdir(
Expand Down
27 changes: 27 additions & 0 deletions tests/fs/test_localfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,30 @@ def test_normpath_with_newlines():
fs = LocalFileSystem()
newline_path = os.path.join("one", "two\nthree")
assert fs.normpath(newline_path) == newline_path


def test_info_return_exceptions(tmp_path):
fs = LocalFileSystem()
path = fspath(tmp_path / "non-existent")

with pytest.raises(FileNotFoundError):
fs.info(path)

result = fs.info(path, return_exceptions=True)
assert isinstance(result, FileNotFoundError)


def test_info_return_exceptions_batch(tmp_path):
fs = LocalFileSystem()
exist_path = fspath(tmp_path / "existent")
non_exist_path = fspath(tmp_path / "non-existent")
(tmp_path / "existent").write_text("foo")

# default behavior: raises on first error
with pytest.raises(FileNotFoundError):
fs.info([exist_path, non_exist_path])

results = fs.info([exist_path, non_exist_path], return_exceptions=True)
assert isinstance(results[0], dict)
assert fs.normpath(results[0]["name"]) == exist_path
assert isinstance(results[1], FileNotFoundError)