Skip to content

Commit

Permalink
deps: update dvc-objects to >=3
Browse files Browse the repository at this point in the history
  • Loading branch information
efiop committed Dec 15, 2023
1 parent 5777edd commit 3143e7e
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 52 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ dependencies = [
"dictdiffer>=0.8.1",
"pygtrie>=2.3.2",
"shortuuid>=0.5.0",
"dvc-objects>=2,<3",
"dvc-objects>=3,<4",
"diskcache>=5.2.1",
"attrs>=21.3.0",
"sqltrie>=0.9.0,<1",
Expand Down
57 changes: 44 additions & 13 deletions src/dvc_data/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,17 @@
import errno
import logging
import os
import posixpath
import typing
from collections import deque
from typing import Any, BinaryIO, NamedTuple, Optional, Tuple

from dvc_objects.fs.callbacks import DEFAULT_CALLBACK
from fsspec import AbstractFileSystem

from .utils import cached_property

if typing.TYPE_CHECKING:
from dvc_objects.fs.base import AnyFSPath, FileSystem
from dvc_objects.fs.callbacks import Callback
from dvc_objects.fs.path import Path

from dvc_data.hashfile.db import HashFileDB

Expand All @@ -40,21 +38,54 @@ def __init__(self, index: "DataIndex", **kwargs: Any):
super().__init__(**kwargs)
self.index = index

@cached_property
def path(self) -> "Path":
from dvc_objects.fs.path import Path
@classmethod
def join(cls, *parts: str) -> str:
return posixpath.join(*parts)

@classmethod
def parts(cls, path: str) -> Tuple[str, ...]:
ret = []
while True:
path, part = posixpath.split(path)

if part:
ret.append(part)
continue

if path:
ret.append(path)

break

ret.reverse()

return tuple(ret)

def getcwd(self) -> str:
return self.root_marker

def normpath(self, path: str) -> str:
return path

def abspath(self, path: str) -> str:
if not posixpath.isabs(path):
path = self.join(self.getcwd(), path)
return self.normpath(path)

def _getcwd() -> str:
return self.root_marker
def relpath(self, path: str, start: Optional[str] = None) -> str:
if start is None:
start = "."
return posixpath.relpath(self.abspath(path), start=self.abspath(start))

return Path(self.sep, getcwd=_getcwd)
def relparts(self, path: str, start: Optional[str] = None) -> Tuple[str, ...]:
return self.parts(self.relpath(path, start=start))

def _get_key(self, path: str) -> Tuple[str, ...]:
path = self.path.abspath(path)
path = self.abspath(path)
if path == self.root_marker:
return ()

key = self.path.relparts(path, self.root_marker)
key = self.relparts(path, self.root_marker)
if key in ((".",), ("",)):
key = ()

Expand Down Expand Up @@ -125,13 +156,13 @@ def ls(self, path: "AnyFSPath", detail: bool = True, **kwargs: Any):

if not detail:
return [
self.path.join(path, key[-1])
self.join(path, key[-1])
for key in self.index.ls(root_key, detail=False)
]

entries = []
for key, info in self.index.ls(root_key, detail=True):
info["name"] = self.path.join(path, key[-1])
info["name"] = self.join(path, key[-1])
entries.append(info)
return entries
except KeyError as exc:
Expand Down
9 changes: 4 additions & 5 deletions src/dvc_data/hashfile/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,12 @@ def _upload_file(

from .hash import HashStreamFile

path = upload_odb.fs.path
tmp_info = path.join(upload_odb.path, tmp_fname())
tmp_info = upload_odb.fs.join(upload_odb.path, tmp_fname())
with fs.open(from_path, mode="rb") as stream:
hashed_stream = HashStreamFile(stream)
size = fs.size(from_path)
cb = callback or TqdmCallback(
desc=path.name(from_path),
desc=upload_odb.fs.name(from_path),
bytes=True,
size=size,
)
Expand Down Expand Up @@ -125,7 +124,7 @@ def _build_tree(
for root, _, fnames in walk_iter:
if DefaultIgnoreFile in fnames:
raise IgnoreInCollectedDirError(
DefaultIgnoreFile, fs.path.join(root, DefaultIgnoreFile)
DefaultIgnoreFile, fs.join(root, DefaultIgnoreFile)
)

# NOTE: we know for sure that root starts with path, so we can use
Expand Down Expand Up @@ -174,7 +173,7 @@ def _make_staging_url(fs: "FileSystem", odb: "HashFileDB", path: Optional[str]):
if path not in _url_cache:
_url_cache[path] = hashlib.sha256(path.encode("utf-8")).hexdigest()

url = fs.path.join(url, _url_cache[path])
url = fs.join(url, _url_cache[path])

return url

Expand Down
10 changes: 3 additions & 7 deletions src/dvc_data/hashfile/checkout.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def __call__(self, cache, from_path, to_fs, to_path):
if to_fs.exists(to_path):
to_fs.remove(to_path) # broken symlink

parent = to_fs.path.parent(to_path)
parent = to_fs.parent(to_path)
to_fs.makedirs(parent)
try:
transfer(
Expand Down Expand Up @@ -199,16 +199,12 @@ def _checkout(
progress_callback.set_size(sum(diff.stats.values()))
link = Link(links, callback=progress_callback)
for change in diff.deleted:
entry_path = (
fs.path.join(path, *change.old.key) if change.old.key != ROOT else path
)
entry_path = fs.join(path, *change.old.key) if change.old.key != ROOT else path
_remove(entry_path, fs, change.old.in_cache, force=force, prompt=prompt)

failed = []
for change in chain(diff.added, diff.modified):
entry_path = (
fs.path.join(path, *change.new.key) if change.new.key != ROOT else path
)
entry_path = fs.join(path, *change.new.key) if change.new.key != ROOT else path
if change.new.oid.isdir:
fs.makedirs(entry_path)
continue
Expand Down
6 changes: 3 additions & 3 deletions src/dvc_data/hashfile/db/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def oids_exist(self, oids, jobs=None, progress=noop):
def _list_paths(self, prefix=None):
assert self.path is not None
if prefix:
path = self.fs.path.join(self.path, prefix[:2])
path = self.fs.join(self.path, prefix[:2])
if not self.fs.exists(path):
return
else:
Expand All @@ -74,9 +74,9 @@ def _list_paths(self, prefix=None):

def _remove_unpacked_dir(self, hash_):
hash_path = self.oid_to_path(hash_)
path = self.fs.path.with_name(
path = self.fs.with_name(
hash_path,
self.fs.path.name(hash_path) + self.UNPACKED_DIR_SUFFIX,
self.fs.name(hash_path) + self.UNPACKED_DIR_SUFFIX,
)
self.fs.remove(path)

Expand Down
4 changes: 2 additions & 2 deletions src/dvc_data/index/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def build_entries(
if root == path:
root_key: Tuple[str, ...] = ()
else:
root_key = fs.path.relparts(root, path)
root_key = fs.relparts(root, path)

entries: Iterable[Tuple[str, Optional[Dict]]]
if detail:
Expand All @@ -69,7 +69,7 @@ def build_entries(
for name, info in entries:
try:
entry = build_entry(
fs.path.join(root, name),
fs.join(root, name),
fs,
compute_hash=compute_hash,
state=state,
Expand Down
20 changes: 10 additions & 10 deletions src/dvc_data/index/checkout.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def _delete_files(
if not entries:
return

fs.remove([fs.path.join(path, *(entry.key or ())) for entry in entries])
fs.remove([fs.join(path, *(entry.key or ())) for entry in entries])


def _create_files( # noqa: C901, PLR0912, PLR0913
Expand All @@ -90,7 +90,7 @@ def _create_files( # noqa: C901, PLR0912, PLR0913
list
)
for entry in entries:
dest_path = fs.path.join(path, *entry.key)
dest_path = fs.join(path, *entry.key)
storage_info = index.storage_map[entry.key]
storage_obj = getattr(storage_info, storage)

Expand Down Expand Up @@ -158,30 +158,30 @@ def _create_files( # noqa: C901, PLR0912, PLR0913
FileStorage(
key,
fs,
fs.path.join(path, *key),
fs.join(path, *key),
)
)


def _delete_dirs(entries, path, fs):
for entry in entries:
try:
fs.rmdir(fs.path.join(path, *entry.key))
fs.rmdir(fs.join(path, *entry.key))
except OSError:
pass


def _create_dirs(entries, path, fs):
for entry in entries:
fs.makedirs(fs.path.join(path, *entry.key), exist_ok=True)
fs.makedirs(fs.join(path, *entry.key), exist_ok=True)


def _chmod_files(entries, path, fs):
if not isinstance(fs, LocalFileSystem):
return

for entry in entries:
entry_path = fs.path.join(path, *entry.key)
entry_path = fs.join(path, *entry.key)
mode = os.stat(entry_path).st_mode | stat.S_IEXEC
try:
os.chmod(entry_path, mode)
Expand Down Expand Up @@ -366,7 +366,7 @@ def apply( # noqa: PLR0913
onerror = _onerror_noop

for entry in diff.dirs_failed:
onerror(None, fs.path.join(path, *entry.key), None)
onerror(None, fs.join(path, *entry.key), None)

_delete_files(
diff.files_delete,
Expand Down Expand Up @@ -409,9 +409,9 @@ def _prune_existing_versions(
if entry.meta.version_id is None:
yield entry
else:
entry_path = fs.path.join(path, *(entry.key or ()))
assert hasattr(fs.path, "version_path")
versioned_path = fs.path.version_path(entry_path, entry.meta.version_id)
entry_path = fs.join(path, *(entry.key or ()))
assert hasattr(fs, "version_path")
versioned_path = fs.version_path(entry_path, entry.meta.version_id)
query_vers[versioned_path] = entry
for path, exists in batch_exists(
fs, query_vers.keys(), batch_size=jobs, callback=callback
Expand Down
2 changes: 1 addition & 1 deletion src/dvc_data/index/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def fetch(
else:
old = build(cache.path, cache.fs)
diff = compare(old, fs_index)
cache.fs.makedirs(cache.fs.path.parent(cache.path), exist_ok=True)
cache.fs.makedirs(cache.fs.parent(cache.path), exist_ok=True)
apply(
diff,
cache.path,
Expand Down
4 changes: 2 additions & 2 deletions src/dvc_data/index/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,9 @@ def get_key(self, entry: "DataIndexEntry") -> "DataIndexKey":
def get(self, entry: "DataIndexEntry") -> Tuple["FileSystem", str]:
assert entry.key is not None
assert entry.key[: len(self.prefix)] == self.prefix
path = self.fs.path.join(self.path, *entry.key[len(self.prefix) :])
path = self.fs.join(self.path, *entry.key[len(self.prefix) :])
if self.fs.version_aware and entry.meta and entry.meta.version_id:
path = self.fs.path.version_path(path, entry.meta.version_id)
path = self.fs.version_path(path, entry.meta.version_id)
return self.fs, path

def exists(self, entry: "DataIndexEntry", refresh: bool = False) -> bool:
Expand Down
4 changes: 2 additions & 2 deletions src/dvc_data/index/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def _meta_checksum(fs: "FileSystem", meta: "Meta") -> Any:

def _onerror(cache, data, failed_keys, src_path, dest_path, exc):
if not isinstance(exc, FileNotFoundError) or cache.fs.exists(src_path):
failed_keys.add(data.fs.path.relparts(dest_path, data.path))
failed_keys.add(data.fs.relparts(dest_path, data.path))

logger.debug(
"failed to create '%s' from '%s'",
Expand Down Expand Up @@ -89,7 +89,7 @@ def push(
meta_only=True,
meta_cmp_key=partial(_meta_checksum, data.fs),
)
data.fs.makedirs(data.fs.path.parent(data.path), exist_ok=True)
data.fs.makedirs(data.fs.parent(data.path), exist_ok=True)

failed_keys: Set["DataIndexKey"] = set()

Expand Down
12 changes: 6 additions & 6 deletions src/dvc_data/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ class NotARepoError(Exception):
class Repo:
def __init__(self, root: str = "", fs: Optional[FileSystem] = None) -> None:
fs = fs or localfs
root = root or fs.path.getcwd()
control_dir: str = os.getenv("DVC_DIR") or fs.path.join(root, ".dvc")
root = root or fs.getcwd()
control_dir: str = os.getenv("DVC_DIR") or fs.join(root, ".dvc")

if not fs.isdir(control_dir):
raise NotARepoError(f"{root} is not a data repo.")

self.fs = fs or localfs
self.root = root
self._control_dir = control_dir
self._tmp_dir: str = fs.path.join(self._control_dir, "tmp")
self._object_dir: str = fs.path.join(self._control_dir, "cache")
self._tmp_dir: str = fs.join(self._control_dir, "tmp")
self._object_dir: str = fs.join(self._control_dir, "cache")

self.index = DataIndex()

Expand All @@ -36,12 +36,12 @@ def discover(
) -> "Repo":
remaining = start
fs = fs or localfs
path = start = fs.path.abspath(start)
path = start = fs.abspath(start)
while remaining:
try:
return cls(path, fs)
except NotARepoError:
path, remaining = fs.path.split(path)
path, remaining = fs.split(path)
raise NotARepoError(f"No data repository was found at {start}")

@property
Expand Down

0 comments on commit 3143e7e

Please sign in to comment.