Skip to content

Support async FSMap objects in zarr.open #2774

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 50 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
656a565
WIP: Support fsspec mutable mapping objects in zarr.open
maxrjones Jan 28, 2025
877eb80
Simplify library availability checking
maxrjones Feb 6, 2025
90cc08d
Merge branch 'main' into support-FSMap
maxrjones Feb 6, 2025
f04145c
Improve test coverage
maxrjones Feb 13, 2025
825cd6e
Merge branch 'main' into support-FSMap
maxrjones Feb 13, 2025
c4bfb06
Improve error messages
maxrjones Feb 13, 2025
06f35f2
Consolidate code
maxrjones Feb 13, 2025
e792e01
Make test more readable
maxrjones Feb 13, 2025
ed11018
Make async instances from sync fsmap objects
maxrjones Feb 13, 2025
e586001
Move test to fsspec store
maxrjones Feb 13, 2025
3f9a34c
Re-add type ignore
maxrjones Feb 13, 2025
4d1bd26
"Update docstring"
maxrjones Feb 13, 2025
4b7a5eb
Merge branch 'main' into support-FSMap
maxrjones Feb 13, 2025
abc5fdf
Add another test
maxrjones Feb 13, 2025
5d8e8ca
Merge branch 'main' into support-FSMap
d-v-b Feb 14, 2025
cb2db7d
Require auto_mkdir for LocalFileSystem
maxrjones Feb 14, 2025
ed9639f
Merge branch 'main' into support-FSMap
maxrjones Feb 14, 2025
46e8bff
Update test location
maxrjones Feb 14, 2025
a126d4b
Merge branch 'main' into support-FSMap
dcherian Feb 14, 2025
50c18f0
Merge branch 'main' into support-FSMap
maxrjones Feb 27, 2025
7517f72
Convert older filesystems to async
maxrjones Feb 27, 2025
3ae719b
Use if on fsspec versions rather than try; else
maxrjones Feb 27, 2025
d4d2256
Always use asynchronous=True in _make_async
maxrjones Feb 27, 2025
b4a2bd1
Merge branch 'main' into support-FSMap
maxrjones May 30, 2025
28f8420
Improve tests
maxrjones May 30, 2025
b782704
Apply suggestions from code review
maxrjones May 30, 2025
466bdd2
Apply more code suggestions
maxrjones May 30, 2025
b89a1a5
Fix typing error
maxrjones May 30, 2025
95ddf7c
Merge branch 'main' into support-FSMap
maxrjones May 30, 2025
9dcb558
Test remote stores in min_deps env
maxrjones May 30, 2025
f2b076a
Remove redundant import
maxrjones May 30, 2025
7032ca1
Merge branch 'main' into support-FSMap
maxrjones Jun 2, 2025
ac4c64c
Test warning
maxrjones Jun 2, 2025
27441e9
Lint
maxrjones Jun 2, 2025
696761e
Add pytest pin
dstansby Jun 3, 2025
9326025
Merge branch 'pin-pytest' into support-FSMap
maxrjones Jun 3, 2025
7441486
Add release note
maxrjones Jun 3, 2025
18ee24d
Generate coverage on min_deps and upstream jobs
maxrjones Jun 3, 2025
c1acdcf
Update src/zarr/storage/_fsspec.py
maxrjones Jun 3, 2025
7f48751
More useful error messages
maxrjones Jun 3, 2025
a02b259
Add TypeAlias
maxrjones Jun 3, 2025
7e0f2d6
Fix typing for no fsspec installation
maxrjones Jun 3, 2025
c05bd1f
Merge branch 'main' into support-FSMap
maxrjones Jun 5, 2025
ec38155
Merge branch 'main' into support-FSMap
d-v-b Jun 6, 2025
9ca4781
Merge branch 'main' into support-FSMap
d-v-b Jun 7, 2025
21e6493
Merge branch 'main' into support-FSMap
d-v-b Jun 7, 2025
d0a753d
Merge branch 'main' into support-FSMap
maxrjones Jun 9, 2025
3b06c53
Move imports
maxrjones Jun 9, 2025
98f00dd
Don't mutate FSMap object
maxrjones Jun 9, 2025
9039c65
Merge branch 'main' into support-FSMap
maxrjones Jun 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ jobs:
hatch env run -e ${{ matrix.dependency-set }} list-env
- name: Run Tests
run: |
hatch env run --env ${{ matrix.dependency-set }} run
hatch env run --env ${{ matrix.dependency-set }} run-coverage
- name: Upload coverage
uses: codecov/codecov-action@v5
with:
Expand Down
1 change: 1 addition & 0 deletions changes/2774.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `zarr.storage.FsspecStore.from_mapper()` so that `zarr.open()` supports stores of type `fsspec.mapping.FSMap`.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ dependencies = [
'obstore==0.5.*',
# test deps
'zarr[test]',
'zarr[remote_tests]',
]

[tool.hatch.envs.min_deps.scripts]
Expand Down
24 changes: 20 additions & 4 deletions src/zarr/storage/_common.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from __future__ import annotations

import importlib.util
import json
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, Self
from typing import TYPE_CHECKING, Any, Literal, Self, TypeAlias

from zarr.abc.store import ByteRequest, Store
from zarr.core.buffer import Buffer, default_buffer_prototype
Expand All @@ -12,6 +13,12 @@
from zarr.storage._memory import MemoryStore
from zarr.storage._utils import normalize_path

_has_fsspec = importlib.util.find_spec("fsspec")
if _has_fsspec:
from fsspec.mapping import FSMap
else:
FSMap = None

Check warning on line 20 in src/zarr/storage/_common.py

View check run for this annotation

Codecov / codecov/patch

src/zarr/storage/_common.py#L20

Added line #L20 was not covered by tests

if TYPE_CHECKING:
from zarr.core.buffer import BufferPrototype

Expand Down Expand Up @@ -227,7 +234,7 @@
return False


StoreLike = Store | StorePath | Path | str | dict[str, Buffer]
StoreLike: TypeAlias = Store | StorePath | FSMap | Path | str | dict[str, Buffer]


async def make_store_path(
Expand Down Expand Up @@ -314,9 +321,18 @@
# We deliberate only consider dict[str, Buffer] here, and not arbitrary mutable mappings.
# By only allowing dictionaries, which are in-memory, we know that MemoryStore appropriate.
store = await MemoryStore.open(store_dict=store_like, read_only=_read_only)
elif _has_fsspec and isinstance(store_like, FSMap):
if path:
raise ValueError(
"'path' was provided but is not used for FSMap store_like objects. Specify the path when creating the FSMap instance instead."
)
if storage_options:
raise ValueError(
"'storage_options was provided but is not used for FSMap store_like objects. Specify the storage options when creating the FSMap instance instead."
)
store = FsspecStore.from_mapper(store_like, read_only=_read_only)
else:
msg = f"Unsupported type for store_like: '{type(store_like).__name__}'" # type: ignore[unreachable]
raise TypeError(msg)
raise TypeError(f"Unsupported type for store_like: '{type(store_like).__name__}'")

result = await StorePath.open(store, path=path_normalized, mode=mode)

Expand Down
84 changes: 74 additions & 10 deletions src/zarr/storage/_fsspec.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from __future__ import annotations

import json
import warnings
from contextlib import suppress
from typing import TYPE_CHECKING, Any

from packaging.version import parse as parse_version

from zarr.abc.store import (
ByteRequest,
OffsetByteRequest,
Expand All @@ -17,7 +20,9 @@
if TYPE_CHECKING:
from collections.abc import AsyncIterator, Iterable

from fsspec import AbstractFileSystem
from fsspec.asyn import AsyncFileSystem
from fsspec.mapping import FSMap

from zarr.core.buffer import BufferPrototype
from zarr.core.common import BytesLike
Expand All @@ -30,6 +35,42 @@
)


def _make_async(fs: AbstractFileSystem) -> AsyncFileSystem:
"""Convert a sync FSSpec filesystem to an async FFSpec filesystem

If the filesystem class supports async operations, a new async instance is created
from the existing instance.

If the filesystem class does not support async operations, the existing instance
is wrapped with AsyncFileSystemWrapper.
"""
import fsspec

fsspec_version = parse_version(fsspec.__version__)
if fs.async_impl and fs.asynchronous:
# Already an async instance of an async filesystem, nothing to do
return fs
if fs.async_impl:
# Convert sync instance of an async fs to an async instance
fs_dict = json.loads(fs.to_json())
fs_dict["asynchronous"] = True
return fsspec.AbstractFileSystem.from_json(json.dumps(fs_dict))

# Wrap sync filesystems with the async wrapper
if type(fs) is fsspec.implementations.local.LocalFileSystem and not fs.auto_mkdir:
raise ValueError(
f"LocalFilesystem {fs} was created with auto_mkdir=False but Zarr requires the filesystem to automatically create directories"
)
if fsspec_version < parse_version("2024.12.0"):
raise ImportError(
f"The filesystem '{fs}' is synchronous, and the required "
"AsyncFileSystemWrapper is not available. Upgrade fsspec to version "
"2024.12.0 or later to enable this functionality."
)

return fsspec.implementations.asyn_wrapper.AsyncFileSystemWrapper(fs, asynchronous=True)


class FsspecStore(Store):
"""
Store for remote data based on FSSpec.
Expand Down Expand Up @@ -137,6 +178,38 @@ def from_upath(
allowed_exceptions=allowed_exceptions,
)

@classmethod
def from_mapper(
cls,
fs_map: FSMap,
read_only: bool = False,
allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS,
) -> FsspecStore:
"""
Create a FsspecStore from a FSMap object.

Parameters
----------
fs_map : FSMap
Fsspec mutable mapping object.
read_only : bool
Whether the store is read-only, defaults to False.
allowed_exceptions : tuple, optional
The exceptions that are allowed to be raised when accessing the
store. Defaults to ALLOWED_EXCEPTIONS.

Returns
-------
FsspecStore
"""
fs = _make_async(fs_map.fs)
return cls(
fs=fs,
path=fs_map.root,
read_only=read_only,
allowed_exceptions=allowed_exceptions,
)

@classmethod
def from_url(
cls,
Expand Down Expand Up @@ -175,16 +248,7 @@ def from_url(

fs, path = url_to_fs(url, **opts)
if not fs.async_impl:
try:
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper

fs = AsyncFileSystemWrapper(fs, asynchronous=True)
except ImportError as e:
raise ImportError(
f"The filesystem for URL '{url}' is synchronous, and the required "
"AsyncFileSystemWrapper is not available. Upgrade fsspec to version "
"2024.12.0 or later to enable this functionality."
) from e
fs = _make_async(fs)

# fsspec is not consistent about removing the scheme from the path, so check and strip it here
# https://github.com/fsspec/filesystem_spec/issues/1722
Expand Down
127 changes: 119 additions & 8 deletions tests/test_store/test_fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@
import re
from typing import TYPE_CHECKING, Any

import numpy as np
import pytest
from packaging.version import parse as parse_version

import zarr.api.asynchronous
from zarr import Array
from zarr.abc.store import OffsetByteRequest
from zarr.core.buffer import Buffer, cpu, default_buffer_prototype
from zarr.core.sync import _collect_aiterator, sync
from zarr.storage import FsspecStore
from zarr.storage._fsspec import _make_async
from zarr.testing.store import StoreTests

if TYPE_CHECKING:
import pathlib
from collections.abc import Generator
from pathlib import Path

Expand Down Expand Up @@ -191,7 +195,11 @@ async def test_fsspec_store_from_uri(self, store: FsspecStore) -> None:
)
assert dict(group.attrs) == {"key": "value"}

meta["attributes"]["key"] = "value-2" # type: ignore[index]
meta = {
"attributes": {"key": "value-2"},
"zarr_format": 3,
"node_type": "group",
}
await store.set(
"directory-2/zarr.json",
self.buffer_cls.from_bytes(json.dumps(meta).encode()),
Expand All @@ -201,7 +209,11 @@ async def test_fsspec_store_from_uri(self, store: FsspecStore) -> None:
)
assert dict(group.attrs) == {"key": "value-2"}

meta["attributes"]["key"] = "value-3" # type: ignore[index]
meta = {
"attributes": {"key": "value-3"},
"zarr_format": 3,
"node_type": "group",
}
await store.set(
"directory-3/zarr.json",
self.buffer_cls.from_bytes(json.dumps(meta).encode()),
Expand Down Expand Up @@ -264,32 +276,131 @@ async def test_delete_dir_unsupported_deletes(self, store: FsspecStore) -> None:
await store.delete_dir("test_prefix")


def array_roundtrip(store: FsspecStore) -> None:
"""
Round trip an array using a Zarr store

Args:
store: FsspecStore
"""
data = np.ones((3, 3))
arr = zarr.create_array(store=store, overwrite=True, data=data)
assert isinstance(arr, Array)
# Read set values
arr2 = zarr.open_array(store=store)
assert isinstance(arr2, Array)
np.testing.assert_array_equal(arr[:], data)


@pytest.mark.skipif(
parse_version(fsspec.__version__) < parse_version("2024.12.0"),
reason="No AsyncFileSystemWrapper",
)
def test_wrap_sync_filesystem() -> None:
def test_wrap_sync_filesystem(tmp_path: pathlib.Path) -> None:
"""The local fs is not async so we should expect it to be wrapped automatically"""
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper

store = FsspecStore.from_url("local://test/path")

store = FsspecStore.from_url(f"file://{tmp_path}", storage_options={"auto_mkdir": True})
assert isinstance(store.fs, AsyncFileSystemWrapper)
assert store.fs.async_impl
array_roundtrip(store)


@pytest.mark.skipif(
parse_version(fsspec.__version__) >= parse_version("2024.12.0"),
reason="No AsyncFileSystemWrapper",
)
def test_wrap_sync_filesystem_raises(tmp_path: pathlib.Path) -> None:
"""The local fs is not async so we should expect it to be wrapped automatically"""
with pytest.raises(ImportError, match="The filesystem .*"):
FsspecStore.from_url(f"file://{tmp_path}", storage_options={"auto_mkdir": True})


@pytest.mark.skipif(
parse_version(fsspec.__version__) < parse_version("2024.12.0"),
reason="No AsyncFileSystemWrapper",
)
def test_no_wrap_async_filesystem() -> None:
"""An async fs should not be wrapped automatically; fsspec's https filesystem is such an fs"""
"""An async fs should not be wrapped automatically; fsspec's s3 filesystem is such an fs"""
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper

store = FsspecStore.from_url("https://test/path")

store = FsspecStore.from_url(
f"s3://{test_bucket_name}/foo/spam/",
storage_options={"endpoint_url": endpoint_url, "anon": False, "asynchronous": True},
read_only=False,
)
assert not isinstance(store.fs, AsyncFileSystemWrapper)
assert store.fs.async_impl
array_roundtrip(store)


@pytest.mark.skipif(
parse_version(fsspec.__version__) < parse_version("2024.12.0"),
reason="No AsyncFileSystemWrapper",
)
def test_open_fsmap_file(tmp_path: pathlib.Path) -> None:
min_fsspec_with_async_wrapper = parse_version("2024.12.0")
current_version = parse_version(fsspec.__version__)

fs = fsspec.filesystem("file", auto_mkdir=True)
mapper = fs.get_mapper(tmp_path)

if current_version < min_fsspec_with_async_wrapper:
# Expect ImportError for older versions
with pytest.raises(
ImportError,
match=r"The filesystem .* is synchronous, and the required AsyncFileSystemWrapper is not available.*",
):
array_roundtrip(mapper)
else:
# Newer versions should work
array_roundtrip(mapper)


@pytest.mark.skipif(
parse_version(fsspec.__version__) < parse_version("2024.12.0"),
reason="No AsyncFileSystemWrapper",
)
def test_open_fsmap_file_raises(tmp_path: pathlib.Path) -> None:
fsspec = pytest.importorskip("fsspec.implementations.local")
fs = fsspec.LocalFileSystem(auto_mkdir=False)
mapper = fs.get_mapper(tmp_path)
with pytest.raises(ValueError, match="LocalFilesystem .*"):
array_roundtrip(mapper)


@pytest.mark.parametrize("asynchronous", [True, False])
def test_open_fsmap_s3(asynchronous: bool) -> None:
s3_filesystem = s3fs.S3FileSystem(
asynchronous=asynchronous, endpoint_url=endpoint_url, anon=False
)
mapper = s3_filesystem.get_mapper(f"s3://{test_bucket_name}/map/foo/")
array_roundtrip(mapper)


def test_open_s3map_raises() -> None:
with pytest.raises(TypeError, match="Unsupported type for store_like:.*"):
zarr.open(store=0, mode="w", shape=(3, 3))
s3_filesystem = s3fs.S3FileSystem(asynchronous=True, endpoint_url=endpoint_url, anon=False)
mapper = s3_filesystem.get_mapper(f"s3://{test_bucket_name}/map/foo/")
with pytest.raises(
ValueError, match="'path' was provided but is not used for FSMap store_like objects"
):
zarr.open(store=mapper, path="bar", mode="w", shape=(3, 3))
with pytest.raises(
ValueError,
match="'storage_options was provided but is not used for FSMap store_like objects",
):
zarr.open(store=mapper, storage_options={"anon": True}, mode="w", shape=(3, 3))


@pytest.mark.parametrize("asynchronous", [True, False])
def test_make_async(asynchronous: bool) -> None:
s3_filesystem = s3fs.S3FileSystem(
asynchronous=asynchronous, endpoint_url=endpoint_url, anon=False
)
fs = _make_async(s3_filesystem)
assert fs.asynchronous


@pytest.mark.skipif(
Expand Down