From 30b7c0007d61eefa062deda23b9bafc29fb263a3 Mon Sep 17 00:00:00 2001 From: Chuck Daniels Date: Thu, 23 Jan 2025 17:00:10 -0500 Subject: [PATCH] Add `group` option to `dataset_to_icechunk` (#383) * Bump ruff pre-commit hook and fix formatting * Bump ruff target version to match min Python version * Add `group` param to `dataset_to_icechunk` function * Add unit tests for non-root groups Fixes #341 --- .pre-commit-config.yaml | 2 +- docs/releases.rst | 7 + pyproject.toml | 7 +- virtualizarr/tests/test_readers/conftest.py | 2 +- .../tests/test_writers/test_icechunk.py | 154 +++++++++--------- virtualizarr/writers/icechunk.py | 78 +++++---- 6 files changed, 144 insertions(+), 106 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 51953239..a0892673 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -11,7 +11,7 @@ repos: - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. - rev: "v0.8.6" + rev: v0.9.2 hooks: # Run the linter. - id: ruff diff --git a/docs/releases.rst b/docs/releases.rst index 4a42bea4..c887134d 100644 --- a/docs/releases.rst +++ b/docs/releases.rst @@ -15,6 +15,9 @@ New Features for the `to_icechunk` method to add timestamps as checksums when writing virtual references to an icechunk store. This is useful for ensuring that virtual references are not stale when reading from an icechunk store, which can happen if the underlying data has changed since the virtual references were written. +- Add ``group=None`` keyword-only parameter to ``dataset_to_icechunk`` function to + allow writing to a nested group at the specified path (root group, if not specified). + (:issue:`341`) By `Chuck Daniels `_. Breaking changes ~~~~~~~~~~~~~~~~ @@ -26,6 +29,10 @@ Breaking changes Also a warning is no longer thrown when ``indexes=None`` is passed to ``open_virtual_dataset``, and the recommendations in the docs updated to match. This also means that ``xarray.combine_by_coords`` will now work when the necessary dimension coordinates are specified in ``loadable_variables``. (:issue:`18`, :pull:`357`, :pull:`358`) By `Tom Nicholas `_. +- For function ``dataset_to_icechunk``, parameters ``append_dim`` and ``last_updated_at`` + are now keyword-only parameters, rather than positional or keyword. This change is + breaking _only_ where arguments for these parameters are currently given positionally. + (:issue:`341`) By `Chuck Daniels `_. Deprecations ~~~~~~~~~~~~ diff --git a/pyproject.toml b/pyproject.toml index e45ca563..912f58b6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -111,14 +111,15 @@ ignore_missing_imports = true # Same as Black. line-length = 88 indent-width = 4 -target-version = "py39" +target-version = "py310" exclude = [ "docs", - ".eggs"] + ".eggs" +] [tool.ruff.lint] -# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default. +# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default. # Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or # McCabe complexity (`C901`) by default. select = ["E4", "E7", "E9", "F", "I"] diff --git a/virtualizarr/tests/test_readers/conftest.py b/virtualizarr/tests/test_readers/conftest.py index f96447db..98663223 100644 --- a/virtualizarr/tests/test_readers/conftest.py +++ b/virtualizarr/tests/test_readers/conftest.py @@ -306,7 +306,7 @@ def filter_and_cf_roundtrip_hdf5_file(tmpdir, request): from random import randint - filepath = f"{tmpdir}/{request.param}_{randint(0,100)}_cf_roundtrip.nc" + filepath = f"{tmpdir}/{request.param}_{randint(0, 100)}_cf_roundtrip.nc" ds.to_netcdf(filepath, engine="h5netcdf", encoding=encoding) return filepath diff --git a/virtualizarr/tests/test_writers/test_icechunk.py b/virtualizarr/tests/test_writers/test_icechunk.py index 95515096..3f697a90 100644 --- a/virtualizarr/tests/test_writers/test_icechunk.py +++ b/virtualizarr/tests/test_writers/test_icechunk.py @@ -10,10 +10,9 @@ import numpy as np import numpy.testing as npt -from xarray import Coordinates, Dataset, concat, open_dataset, open_zarr -from xarray.core.variable import Variable -from zarr import Array, Group, group # type: ignore -from zarr.core.metadata import ArrayV3Metadata # type: ignore +import xarray as xr +import zarr +from zarr.core.metadata import ArrayV3Metadata from virtualizarr.manifests import ChunkManifest, ManifestArray from virtualizarr.readers.common import separate_coords @@ -25,10 +24,10 @@ @pytest.fixture(scope="function") -def icechunk_storage(tmpdir) -> "Storage": +def icechunk_storage(tmp_path: Path) -> "Storage": from icechunk import Storage - return Storage.new_local_filesystem(str(tmpdir)) + return Storage.new_local_filesystem(str(tmp_path)) @pytest.fixture(scope="function") @@ -40,28 +39,30 @@ def icechunk_filestore(icechunk_storage: "Storage") -> "IcechunkStore": return session.store +@pytest.mark.parametrize("group_path", [None, "", "/a", "a", "/a/b", "a/b", "a/b/"]) def test_write_new_virtual_variable( - icechunk_filestore: "IcechunkStore", vds_with_manifest_arrays: Dataset + icechunk_filestore: "IcechunkStore", + vds_with_manifest_arrays: xr.Dataset, + group_path: Optional[str], ): vds = vds_with_manifest_arrays - dataset_to_icechunk(vds, icechunk_filestore) + dataset_to_icechunk(vds, icechunk_filestore, group=group_path) # check attrs - root_group = group(store=icechunk_filestore) - assert isinstance(root_group, Group) - assert root_group.attrs == {"something": 0} + group = zarr.group(store=icechunk_filestore, path=group_path) + assert isinstance(group, zarr.Group) + assert group.attrs.asdict() == {"something": 0} # TODO check against vds, then perhaps parametrize? # check array exists - assert "a" in root_group - arr = root_group["a"] - assert isinstance(arr, Array) + assert "a" in group + arr = group["a"] + assert isinstance(arr, zarr.Array) # check array metadata - # TODO why doesn't a .zarr_format or .version attribute exist on zarr.Array? - # assert arr.zarr_format == 3 + assert arr.metadata.zarr_format == 3 assert arr.shape == (2, 3) assert arr.chunks == (2, 3) assert arr.dtype == np.dtype(" Variable: +) -> xr.Variable: manifest = generate_chunk_manifest( file_uri, shape=shape, @@ -458,7 +459,7 @@ def gen_virtual_variable( zarr_format=zarr_format, ) ma = ManifestArray(chunkmanifest=manifest, zarray=zarray) - return Variable( + return xr.Variable( data=ma, dims=dims, encoding=encoding, @@ -480,9 +481,9 @@ def gen_virtual_dataset( length: int = 48, dims: Optional[list[str]] = None, zarr_format: Literal[2, 3] = 2, - coords: Optional[Coordinates] = None, -) -> Dataset: - ds = open_dataset(file_uri) + coords: Optional[xr.Coordinates] = None, +) -> xr.Dataset: + ds = xr.open_dataset(file_uri) ds_dims: list[str] = cast(list[str], list(ds.dims)) dims = dims or ds_dims var = gen_virtual_variable( @@ -500,7 +501,7 @@ def gen_virtual_dataset( zarr_format=zarr_format, attrs=ds[variable_name].attrs, ) - return Dataset( + return xr.Dataset( {variable_name: var}, coords=coords, attrs=ds.attrs, @@ -538,12 +539,12 @@ def test_append_virtual_ref_without_encoding( icechunk_filestore_append = repo.writable_session("main") dataset_to_icechunk(vds, icechunk_filestore_append.store, append_dim="x") icechunk_filestore_append.commit("appended data again") - array = open_zarr( + array = xr.open_zarr( icechunk_filestore_append.store, consolidated=False, zarr_format=3 ) - expected_ds = open_dataset(simple_netcdf4) - expected_array = concat([expected_ds, expected_ds, expected_ds], dim="x") + expected_ds = xr.open_dataset(simple_netcdf4) + expected_array = xr.concat([expected_ds, expected_ds, expected_ds], dim="x") xrt.assert_identical(array, expected_array) def test_append_virtual_ref_with_encoding( @@ -593,12 +594,15 @@ def test_append_virtual_ref_with_encoding( icechunk_filestore_append = icechunk_repo.writable_session("main") dataset_to_icechunk(vds2, icechunk_filestore_append.store, append_dim="time") icechunk_filestore_append.commit("appended data") - new_ds = open_zarr( + new_ds = xr.open_zarr( icechunk_filestore_append.store, consolidated=False, zarr_format=3 ) - expected_ds1, expected_ds2 = open_dataset(filepath1), open_dataset(filepath2) - expected_ds = concat([expected_ds1, expected_ds2], dim="time").drop_vars( + expected_ds1, expected_ds2 = ( + xr.open_dataset(filepath1), + xr.open_dataset(filepath2), + ) + expected_ds = xr.concat([expected_ds1, expected_ds2], dim="time").drop_vars( ["time", "lat", "lon"], errors="ignore" ) # Because we encode attributes, attributes may differ, for example @@ -696,7 +700,9 @@ async def test_append_with_multiple_root_arrays( icechunk_filestore.commit( "test commit" ) # need to commit it in order to append to it in the next lines - new_ds = open_zarr(icechunk_filestore.store, consolidated=False, zarr_format=3) + new_ds = xr.open_zarr( + icechunk_filestore.store, consolidated=False, zarr_format=3 + ) first_time_chunk_before_append = await icechunk_filestore.store.get( "time/c/0", prototype=default_buffer_prototype() ) @@ -710,12 +716,15 @@ async def test_append_with_multiple_root_arrays( "time/c/0", prototype=default_buffer_prototype() ) ) == first_time_chunk_before_append - new_ds = open_zarr( + new_ds = xr.open_zarr( icechunk_filestore_append.store, consolidated=False, zarr_format=3 ) - expected_ds1, expected_ds2 = open_dataset(filepath1), open_dataset(filepath2) - expected_ds = concat([expected_ds1, expected_ds2], dim="time") + expected_ds1, expected_ds2 = ( + xr.open_dataset(filepath1), + xr.open_dataset(filepath2), + ) + expected_ds = xr.concat([expected_ds1, expected_ds2], dim="time") xrt.assert_equal(new_ds, expected_ds) # When appending to a virtual ref with compression, it succeeds @@ -776,12 +785,12 @@ def test_append_with_compression_succeeds( icechunk_filestore_append = icechunk_repo.writable_session("main") dataset_to_icechunk(vds2, icechunk_filestore_append.store, append_dim="time") icechunk_filestore_append.commit("appended data") - updated_ds = open_zarr( + updated_ds = xr.open_zarr( store=icechunk_filestore_append.store, consolidated=False, zarr_format=3 ) - expected_ds1, expected_ds2 = open_dataset(file1), open_dataset(file2) - expected_ds = concat([expected_ds1, expected_ds2], dim="time") + expected_ds1, expected_ds2 = xr.open_dataset(file1), xr.open_dataset(file2) + expected_ds = xr.concat([expected_ds1, expected_ds2], dim="time") expected_ds = expected_ds.drop_vars(["lon", "lat", "time"], errors="ignore") xrt.assert_equal(updated_ds, expected_ds) @@ -887,13 +896,12 @@ def test_append_dim_not_in_dims_raises_error( # Attempt to append using a non-existent append_dim "z" icechunk_filestore_append = icechunk_repo.writable_session("main") + with pytest.raises( ValueError, - match="append_dim z does not match any existing dataset dimensions", + match="append_dim 'z' does not match any existing dataset dimensions", ): dataset_to_icechunk(vds, icechunk_filestore_append.store, append_dim="z") -# TODO test writing to a group that isn't the root group - # TODO test with S3 / minio diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index e6869c6f..5ecf5afa 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -26,6 +26,8 @@ def dataset_to_icechunk( ds: Dataset, store: "IcechunkStore", + *, + group: Optional[str] = None, append_dim: Optional[str] = None, last_updated_at: Optional[datetime] = None, ) -> None: @@ -38,52 +40,67 @@ def dataset_to_icechunk( ---------- ds: xr.Dataset store: IcechunkStore + group: Optional[str] + Path to the group in which to store the dataset, defaulting to the root group. append_dim: Optional[str] - Name of the dimension along which to append data. If provided, the dataset must have a dimension with this name. + Name of the dimension along which to append data. If provided, the dataset must + have a dimension with this name. last_updated_at: Optional[datetime] - The time at which the virtual dataset was last updated. When specified, if any of the virtual chunks written in this - session are modified in storage after this time, icechunk will raise an error at runtime when trying to read the - virtual chunk. When not specified, icechunk will not check for modifications to the virtual chunks at runtime. + The time at which the virtual dataset was last updated. When specified, if any + of the virtual chunks written in this session are modified in storage after this + time, icechunk will raise an error at runtime when trying to read the virtual + chunk. When not specified, icechunk will not check for modifications to the + virtual chunks at runtime. """ try: from icechunk import IcechunkStore # type: ignore[import-not-found] from zarr import Group # type: ignore[import-untyped] + from zarr.storage import StorePath except ImportError: raise ImportError( "The 'icechunk' and 'zarr' version 3 libraries are required to use this function" - ) + ) from None if not isinstance(store, IcechunkStore): - raise TypeError(f"expected type IcechunkStore, but got type {type(store)}") - elif not isinstance(last_updated_at, (type(None), datetime)): raise TypeError( - f"expected type Optional[datetime], but got type {type(last_updated_at)}" + f"store: expected type IcechunkStore, but got type {type(store)}" + ) + + if not isinstance(group, (type(None), str)): + raise TypeError( + f"group: expected type Optional[str], but got type {type(group)}" + ) + + if not isinstance(last_updated_at, (type(None), datetime)): + raise TypeError( + "last_updated_at: expected type Optional[datetime]," + f" but got type {type(last_updated_at)}" ) if store.read_only: raise ValueError("supplied store is read-only") - # TODO only supports writing to the root group currently - # TODO pass zarr_format kwarg? - if append_dim is not None: - if append_dim not in ds.dims: - raise ValueError( - f"append_dim {append_dim} does not match any existing dataset dimensions" - ) - root_group = Group.open(store=store, zarr_format=3) + if append_dim and append_dim not in ds.dims: + raise ValueError( + f"append_dim {append_dim!r} does not match any existing dataset dimensions" + ) + + store_path = StorePath(store, path=group or "") + + if append_dim: + group_object = Group.open(store=store_path, zarr_format=3) else: - root_group = Group.from_store(store=store) + group_object = Group.from_store(store=store_path, zarr_format=3) - # TODO this is Frozen, the API for setting attributes must be something else - # root_group.attrs = ds.attrs - # for k, v in ds.attrs.items(): - # root_group.attrs[k] = encode_zarr_attr_value(v) + group_object.update_attributes( + {k: encode_zarr_attr_value(v) for k, v in ds.attrs.items()} + ) return write_variables_to_icechunk_group( ds.variables, ds.attrs, store=store, - group=root_group, + group=group_object, append_dim=append_dim, last_updated_at=last_updated_at, ) @@ -93,7 +110,7 @@ def write_variables_to_icechunk_group( variables, attrs, store, - group, + group: "Group", append_dim: Optional[str] = None, last_updated_at: Optional[datetime] = None, ): @@ -113,7 +130,12 @@ def write_variables_to_icechunk_group( # of xarrays zarr integration to ignore having to format the attributes ourselves. ds = Dataset(loadable_variables, attrs=attrs) ds.to_zarr( - store, zarr_format=3, consolidated=False, mode="a", append_dim=append_dim + store, + group=group.name, + zarr_format=3, + consolidated=False, + mode="a", + append_dim=append_dim, ) # Then finish by writing the virtual variables to the same group @@ -219,9 +241,9 @@ def write_virtual_variable_to_icechunk( fill_value=zarray.fill_value, ) - # TODO it would be nice if we could assign directly to the .attrs property - for k, v in var.attrs.items(): - arr.attrs[k] = encode_zarr_attr_value(v) + arr.update_attributes( + {k: encode_zarr_attr_value(v) for k, v in var.attrs.items()} + ) _encoding_keys = {"_FillValue", "missing_value", "scale_factor", "add_offset"} for k, v in var.encoding.items(): @@ -267,7 +289,7 @@ def write_manifest_virtual_refs( ) -> None: """Write all the virtual references for one array manifest at once.""" - key_prefix = f"{group.name}{arr_name}" + key_prefix = f"{group.name}/{arr_name}" # loop over every reference in the ChunkManifest for that array # TODO inefficient: this should be replaced with something that sets all (new) references for the array at once