Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v3] fix: zarr v2 compatibility fixes for Dask #2186

Merged
merged 20 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions src/zarr/api/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@
import numpy as np
import numpy.typing as npt

from zarr.abc.store import Store
from zarr.core.array import Array, AsyncArray, get_array_metadata
from zarr.core.common import JSON, AccessModeLiteral, ChunkCoords, MemoryOrder, ZarrFormat
from zarr.core.config import config
from zarr.core.group import AsyncGroup
from zarr.core.metadata.v2 import ArrayV2Metadata
from zarr.core.metadata.v3 import ArrayV3Metadata
from zarr.store import (
from zarr.storage import (
StoreLike,
StorePath,
make_store_path,
)

Expand Down Expand Up @@ -225,6 +227,7 @@ async def open(
Return type depends on what exists in the given store.
"""
zarr_format = _handle_zarr_version_or_format(zarr_version=zarr_version, zarr_format=zarr_format)

store_path = await make_store_path(store, mode=mode, storage_options=storage_options)

if path is not None:
Expand All @@ -243,9 +246,9 @@ async def open(
return await open_group(store=store_path, zarr_format=zarr_format, mode=mode, **kwargs)

try:
return await open_array(store=store_path, zarr_format=zarr_format, mode=mode, **kwargs)
return await open_array(store=store_path, zarr_format=zarr_format, **kwargs)
except KeyError:
return await open_group(store=store_path, zarr_format=zarr_format, mode=mode, **kwargs)
return await open_group(store=store_path, zarr_format=zarr_format, **kwargs)


async def open_consolidated(*args: Any, **kwargs: Any) -> AsyncGroup:
Expand Down Expand Up @@ -319,7 +322,8 @@ async def save_array(
or _default_zarr_version()
)

store_path = await make_store_path(store, mode="w", storage_options=storage_options)
mode = kwargs.pop("mode", None)
store_path = await make_store_path(store, mode=mode, storage_options=storage_options)
if path is not None:
store_path = store_path / path
new = await AsyncArray.create(
Expand Down Expand Up @@ -496,7 +500,9 @@ async def group(

zarr_format = _handle_zarr_version_or_format(zarr_version=zarr_version, zarr_format=zarr_format)

store_path = await make_store_path(store, storage_options=storage_options)
mode = None if isinstance(store, Store) else cast(AccessModeLiteral, "a")

store_path = await make_store_path(store, mode=mode, storage_options=storage_options)
if path is not None:
store_path = store_path / path

Expand Down Expand Up @@ -769,7 +775,11 @@ async def create(
if meta_array is not None:
warnings.warn("meta_array is not yet implemented", RuntimeWarning, stacklevel=2)

mode = kwargs.pop("mode", cast(AccessModeLiteral, "r" if read_only else "w"))
mode = kwargs.pop("mode", None)
if mode is None:
if not isinstance(store, Store | StorePath):
mode = "a"

store_path = await make_store_path(store, mode=mode, storage_options=storage_options)
if path is not None:
store_path = store_path / path
Expand Down Expand Up @@ -945,7 +955,8 @@ async def open_array(
The opened array.
"""

store_path = await make_store_path(store, storage_options=storage_options)
mode = kwargs.pop("mode", None)
store_path = await make_store_path(store, mode=mode)
if path is not None:
store_path = store_path / path

Expand Down
2 changes: 1 addition & 1 deletion src/zarr/api/synchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
if TYPE_CHECKING:
from zarr.core.buffer import NDArrayLike
from zarr.core.common import JSON, AccessModeLiteral, ChunkCoords, ZarrFormat
from zarr.store import StoreLike
from zarr.storage import StoreLike

__all__ = [
"array",
Expand Down
59 changes: 39 additions & 20 deletions src/zarr/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import json
from asyncio import gather
from dataclasses import dataclass, field, replace
from logging import getLogger
from typing import TYPE_CHECKING, Any, Literal, cast

import numpy as np
import numpy.typing as npt

from zarr._compat import _deprecate_positional_args
from zarr.abc.store import set_or_delete
from zarr.abc.store import Store, set_or_delete
from zarr.codecs import BytesCodec
from zarr.codecs._v2 import V2Compressor, V2Filters
from zarr.core.attributes import Attributes
Expand All @@ -19,7 +20,7 @@
NDBuffer,
default_buffer_prototype,
)
from zarr.core.chunk_grids import RegularChunkGrid, _guess_chunks
from zarr.core.chunk_grids import RegularChunkGrid, normalize_chunks
from zarr.core.chunk_key_encodings import (
ChunkKeyEncoding,
DefaultChunkKeyEncoding,
Expand Down Expand Up @@ -67,10 +68,8 @@
from zarr.core.metadata.v3 import ArrayV3Metadata
from zarr.core.sync import collect_aiterator, sync
from zarr.registry import get_pipeline_class
from zarr.store import StoreLike, StorePath, make_store_path
from zarr.store.common import (
ensure_no_existing_node,
)
from zarr.storage import StoreLike, make_store_path
from zarr.storage.common import StorePath, ensure_no_existing_node

if TYPE_CHECKING:
from collections.abc import Iterable, Iterator, Sequence
Expand All @@ -82,6 +81,8 @@
# Array and AsyncArray are defined in the base ``zarr`` namespace
__all__ = ["create_codec_pipeline", "parse_array_metadata"]

logger = getLogger(__name__)


def parse_array_metadata(data: Any) -> ArrayV2Metadata | ArrayV3Metadata:
if isinstance(data, ArrayV2Metadata | ArrayV3Metadata):
Expand Down Expand Up @@ -222,15 +223,14 @@ async def create(

shape = parse_shapelike(shape)

if chunk_shape is None:
if chunks is None:
chunk_shape = chunks = _guess_chunks(shape=shape, typesize=np.dtype(dtype).itemsize)
else:
chunks = parse_shapelike(chunks)
if chunks is not None and chunk_shape is not None:
raise ValueError("Only one of chunk_shape or chunks can be provided.")
d-v-b marked this conversation as resolved.
Show resolved Hide resolved

chunk_shape = chunks
elif chunks is not None:
raise ValueError("Only one of chunk_shape or chunks must be provided.")
dtype = np.dtype(dtype)
if chunks:
_chunks = normalize_chunks(chunks, shape, dtype.itemsize)
else:
_chunks = normalize_chunks(chunk_shape, shape, dtype.itemsize)

if zarr_format == 3:
if dimension_separator is not None:
Expand All @@ -253,7 +253,7 @@ async def create(
store_path,
shape=shape,
dtype=dtype,
chunk_shape=chunk_shape,
chunk_shape=_chunks,
fill_value=fill_value,
chunk_key_encoding=chunk_key_encoding,
codecs=codecs,
Expand All @@ -276,7 +276,7 @@ async def create(
store_path,
shape=shape,
dtype=dtype,
chunks=chunk_shape,
chunks=_chunks,
dimension_separator=dimension_separator,
fill_value=fill_value,
order=order,
Expand Down Expand Up @@ -404,6 +404,10 @@ async def open(
metadata_dict = await get_array_metadata(store_path, zarr_format=zarr_format)
return cls(store_path=store_path, metadata=metadata_dict)

@property
def store(self) -> Store:
return self.store_path.store

@property
def ndim(self) -> int:
return len(self.metadata.shape)
Expand Down Expand Up @@ -831,6 +835,10 @@ def open(
async_array = sync(AsyncArray.open(store))
return cls(async_array)

@property
def store(self) -> Store:
return self._async_array.store
jhamman marked this conversation as resolved.
Show resolved Hide resolved

@property
def ndim(self) -> int:
return self._async_array.ndim
Expand Down Expand Up @@ -2380,15 +2388,26 @@ def chunks_initialized(array: Array | AsyncArray) -> tuple[str, ...]:
def _build_parents(node: AsyncArray | AsyncGroup) -> list[AsyncGroup]:
from zarr.core.group import AsyncGroup, GroupMetadata

required_parts = node.store_path.path.split("/")[:-1]
parents = []
store = node.store_path.store
path = node.store_path.path
if not path:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know whether this branch is covered by an existing test? If not, it might be good to add one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this path is highly exercised.

return []

required_parts = path.split("/")[:-1]
parents = [
# the root group
AsyncGroup(
metadata=GroupMetadata(zarr_format=node.metadata.zarr_format),
store_path=StorePath(store=store, path=""),
)
]

for i, part in enumerate(required_parts):
path = "/".join(required_parts[:i] + [part])
p = "/".join(required_parts[:i] + [part])
parents.append(
AsyncGroup(
metadata=GroupMetadata(zarr_format=node.metadata.zarr_format),
store_path=StorePath(store=node.store_path.store, path=path),
store_path=StorePath(store=store, path=p),
)
)

Expand Down
46 changes: 45 additions & 1 deletion src/zarr/core/chunk_grids.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

import itertools
import math
import numbers
import operator
from abc import abstractmethod
from dataclasses import dataclass
from functools import reduce
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

import numpy as np

Expand Down Expand Up @@ -97,6 +98,49 @@ def _guess_chunks(
return tuple(int(x) for x in chunks)


def normalize_chunks(chunks: Any, shape: tuple[int, ...], typesize: int) -> tuple[int, ...]:
"""Convenience function to normalize the `chunks` argument for an array
with the given `shape`."""

# N.B., expect shape already normalized

# handle auto-chunking
if chunks is None or chunks is True:
d-v-b marked this conversation as resolved.
Show resolved Hide resolved
return _guess_chunks(shape, typesize)

# handle no chunking
if chunks is False:
return shape

# handle 1D convenience form
if isinstance(chunks, numbers.Integral):
chunks = tuple(int(chunks) for _ in shape)

# handle dask-style chunks (iterable of iterables)
if all(isinstance(c, (tuple | list)) for c in chunks):
# take first chunk size for each dimension
chunks = tuple(
c[0] for c in chunks
) # TODO: check/error/warn for irregular chunks (e.g. if c[0] != c[1:-1])

# handle bad dimensionality
if len(chunks) > len(shape):
raise ValueError("too many dimensions in chunks")

# handle underspecified chunks
if len(chunks) < len(shape):
# assume chunks across remaining dimensions
chunks += shape[len(chunks) :]

# handle None or -1 in chunks
if -1 in chunks or None in chunks:
chunks = tuple(
s if c == -1 or c is None else int(c) for s, c in zip(shape, chunks, strict=False)
)

return tuple(int(c) for c in chunks)


@dataclass(frozen=True)
class ChunkGrid(Metadata):
@classmethod
Expand Down
22 changes: 17 additions & 5 deletions src/zarr/core/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
)
from zarr.core.config import config
from zarr.core.sync import SyncMixin, sync
from zarr.store import StoreLike, StorePath, make_store_path
from zarr.store.common import ensure_no_existing_node
from zarr.storage import StoreLike, make_store_path
from zarr.storage.common import StorePath, ensure_no_existing_node

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Generator, Iterable, Iterator
Expand Down Expand Up @@ -176,7 +176,9 @@ async def open(
# alternatively, we could warn and favor v3
raise ValueError("Both zarr.json and .zgroup objects exist")
if zarr_json_bytes is None and zgroup_bytes is None:
raise FileNotFoundError(store_path)
raise FileNotFoundError(
f"could not find zarr.json or .zgroup objects in {store_path}"
)
# set zarr_format based on which keys were found
if zarr_json_bytes is not None:
zarr_format = 3
Expand Down Expand Up @@ -698,6 +700,10 @@ async def _members(
"Object at %s is not recognized as a component of a Zarr hierarchy.", key
)

async def keys(self) -> AsyncGenerator[str, None]:
async for key, _ in self.members():
yield key

async def contains(self, member: str) -> bool:
# TODO: this can be made more efficient.
try:
Expand Down Expand Up @@ -821,15 +827,18 @@ def __delitem__(self, key: str) -> None:
self._sync(self._async_group.delitem(key))

def __iter__(self) -> Iterator[str]:
raise NotImplementedError
yield from self.keys()

def __len__(self) -> int:
raise NotImplementedError
return self.nmembers()

def __setitem__(self, key: str, value: Any) -> None:
"""__setitem__ is not supported in v3"""
raise NotImplementedError

def __repr__(self) -> str:
return f"<Group {self.store_path}>"

async def update_attributes_async(self, new_attributes: dict[str, Any]) -> Group:
new_metadata = replace(self.metadata, attributes=new_attributes)

Expand Down Expand Up @@ -904,6 +913,9 @@ def members(self, max_depth: int | None = 0) -> tuple[tuple[str, Array | Group],

return tuple((kv[0], _parse_async_node(kv[1])) for kv in _members)

def keys(self) -> Generator[str, None]:
yield from self._sync_iter(self._async_group.keys())

def __contains__(self, member: str) -> bool:
return self._sync(self._async_group.contains(member))

Expand Down
15 changes: 15 additions & 0 deletions src/zarr/storage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from zarr.storage.common import StoreLike, StorePath, make_store_path
from zarr.storage.local import LocalStore
from zarr.storage.memory import MemoryStore
from zarr.storage.remote import RemoteStore
from zarr.storage.zip import ZipStore

__all__ = [
"LocalStore",
"MemoryStore",
"RemoteStore",
"StoreLike",
"StorePath",
"ZipStore",
"make_store_path",
]
File renamed without changes.
Loading