diff --git a/requirements.txt b/requirements.txt index 28ead88..566e879 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ xarray zarr bgen_reader>=4.0.5 git+https://github.com/pystatgen/sgkit +git+https://github.com/eric-czech/rechunker.git diff --git a/setup.cfg b/setup.cfg index 74678b4..5472909 100644 --- a/setup.cfg +++ b/setup.cfg @@ -54,7 +54,7 @@ ignore = [isort] default_section = THIRDPARTY known_first_party = sgkit -known_third_party = bgen_reader,dask,numpy,pytest,setuptools,xarray,zarr +known_third_party = bgen_reader,dask,numpy,pytest,rechunker,setuptools,xarray,zarr multi_line_output = 3 include_trailing_comma = True force_grid_wrap = 0 @@ -67,6 +67,8 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-setuptools.*] ignore_missing_imports = True +[mypy-rechunker.*] +ignore_missing_imports = True [mypy-bgen_reader.*] ignore_missing_imports = True [mypy-sgkit.*] @@ -75,3 +77,5 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-sgkit_bgen.tests.*] disallow_untyped_defs = False +[mypy-sgkit_bgen.*] +allow_redefinition = True diff --git a/sgkit_bgen/__init__.py b/sgkit_bgen/__init__.py index ec2b66e..45f9f44 100644 --- a/sgkit_bgen/__init__.py +++ b/sgkit_bgen/__init__.py @@ -1,3 +1,3 @@ -from .bgen_reader import read_bgen, rechunk_from_zarr, rechunk_to_zarr # noqa: F401 +from .bgen_reader import bgen_to_zarr, read_bgen, rechunk_bgen # noqa: F401 -__all__ = ["read_bgen", "rechunk_from_zarr", "rechunk_to_zarr"] +__all__ = ["read_bgen", "rechunk_bgen", "bgen_to_zarr"] diff --git a/sgkit_bgen/bgen_reader.py b/sgkit_bgen/bgen_reader.py index dec235c..6927ae6 100644 --- a/sgkit_bgen/bgen_reader.py +++ b/sgkit_bgen/bgen_reader.py @@ -1,6 +1,7 @@ """BGEN reader implementation (using bgen_reader)""" +import tempfile from pathlib import Path -from typing import Any, Dict, Hashable, MutableMapping, Optional, Tuple, Union +from typing import Any, Dict, Hashable, Mapping, MutableMapping, Optional, Tuple, Union import dask.array as da import dask.dataframe as dd @@ -12,11 +13,11 @@ from bgen_reader._metafile import create_metafile from bgen_reader._reader import infer_metafile_filepath from bgen_reader._samples import generate_samples, read_samples_file +from rechunker import api as rechunker_api from xarray import Dataset -from xarray.backends.zarr import ZarrStore from sgkit import create_genotype_dosage_dataset -from sgkit.typing import ArrayLike +from sgkit.typing import ArrayLike, DType from sgkit.utils import encode_array PathType = Union[str, Path] @@ -241,6 +242,8 @@ def read_bgen( def encode_variables( ds: Dataset, + chunk_length: int, + chunk_width: int, compressor: Optional[Any] = zarr.Blosc(cname="zstd", clevel=7, shuffle=2), probability_dtype: Optional[Any] = "uint8", ) -> Dict[Hashable, Dict[str, Any]]: @@ -249,6 +252,8 @@ def encode_variables( e = {} if compressor is not None: e.update({"compressor": compressor}) + if v in GT_DATA_VARS: + e.update({"chunks": (chunk_length, chunk_width) + ds[v].shape[2:]}) if probability_dtype is not None and v == "call_genotype_probability": dtype = np.dtype(probability_dtype) # Xarray will decode into float32 so any int greater than @@ -287,16 +292,16 @@ def pack_variables(ds: Dataset) -> Dataset: return ds -def unpack_variables(ds: Dataset, dtype: Any = "float32") -> Dataset: +def unpack_variables(ds: Dataset, dtype: DType = "float32") -> Dataset: # Restore homozygous reference GP - gp = ds["call_genotype_probability"].astype(dtype) + gp = ds["call_genotype_probability"].astype(dtype) # type: ignore[no-untyped-call] if gp.sizes["genotypes"] != 2: raise ValueError( "Expecting variable 'call_genotype_probability' to have genotypes " f"dimension of size 2 (received sizes = {dict(gp.sizes)})" ) ds = ds.drop_vars("call_genotype_probability") - ds["call_genotype_probability"] = xr.concat( # type: ignore[no-untyped-call] + ds["call_genotype_probability"] = xr.concat( [1 - gp.sum(dim="genotypes", skipna=False), gp], dim="genotypes" ) @@ -309,44 +314,78 @@ def unpack_variables(ds: Dataset, dtype: Any = "float32") -> Dataset: return ds -def rechunk_to_zarr( +def rechunk_bgen( ds: Dataset, - store: Union[PathType, MutableMapping[str, bytes]], + output: Union[PathType, MutableMapping[str, bytes]], *, - mode: str = "w", chunk_length: int = 10_000, - chunk_width: int = 10_000, + chunk_width: int = 1_000, compressor: Optional[Any] = zarr.Blosc(cname="zstd", clevel=7, shuffle=2), - probability_dtype: Optional[Any] = "uint8", + probability_dtype: Optional[DType] = "uint8", + max_mem: str = "4GB", pack: bool = True, - compute: bool = True, -) -> ZarrStore: + tempdir: Optional[PathType] = None, +) -> Dataset: + if isinstance(output, Path): + output = str(output) + + chunk_length = min(chunk_length, ds.dims["variants"]) + chunk_width = min(chunk_width, ds.dims["samples"]) + if pack: ds = pack_variables(ds) - for v in set(GT_DATA_VARS) & set(ds): - chunk_size = da.asarray(ds[v]).chunksize[0] - if chunk_length % chunk_size != 0: - raise ValueError( - f"Chunk size in variant dimension for variable '{v}' ({chunk_size}) " - f"must evenly divide target chunk size {chunk_length}" - ) - ds[v] = ds[v].chunk(chunks=dict(samples=chunk_width)) # type: ignore[dict-item] + encoding = encode_variables( - ds, compressor=compressor, probability_dtype=probability_dtype + ds, + chunk_length=chunk_length, + chunk_width=chunk_width, + compressor=compressor, + probability_dtype=probability_dtype, ) - return ds.to_zarr(store, mode=mode, encoding=encoding or None, compute=compute) # type: ignore[arg-type] + with tempfile.TemporaryDirectory( + prefix="bgen_to_zarr_", suffix=".zarr", dir=tempdir + ) as tmpdir: + rechunked = rechunker_api.rechunk_dataset( + ds, + encoding=encoding, + max_mem=max_mem, + target_store=output, + temp_store=tmpdir, + executor="dask", + ) + rechunked.execute() + + ds: Dataset = xr.open_zarr(output, concat_characters=False) # type: ignore[no-untyped-call] + if pack: + ds = unpack_variables(ds) + + return ds -def rechunk_from_zarr( - store: Union[PathType, MutableMapping[str, bytes]], +def bgen_to_zarr( + input: PathType, + output: Union[PathType, MutableMapping[str, bytes]], + region: Optional[Mapping[Hashable, Any]] = None, chunk_length: int = 10_000, - chunk_width: int = 10_000, - mask_and_scale: bool = True, + chunk_width: int = 1_000, + temp_chunk_length: int = 100, + compressor: Optional[Any] = zarr.Blosc(cname="zstd", clevel=7, shuffle=2), + probability_dtype: Optional[DType] = "uint8", + max_mem: str = "4GB", + pack: bool = True, + tempdir: Optional[PathType] = None, ) -> Dataset: - # Always use concat_characters=False to avoid https://github.com/pydata/xarray/issues/4405 - ds = xr.open_zarr(store, mask_and_scale=mask_and_scale, concat_characters=False) # type: ignore[no-untyped-call] - for v in set(GT_DATA_VARS) & set(ds): - ds[v] = ds[v].chunk(chunks=dict(variants=chunk_length, samples=chunk_width)) - # Workaround for https://github.com/pydata/xarray/issues/4380 - del ds[v].encoding["chunks"] - return ds # type: ignore[no-any-return] + ds = read_bgen(input, chunks=(temp_chunk_length, -1, -1)) + if region is not None: + ds = ds.isel(indexers=region) + return rechunk_bgen( + ds, + output, + chunk_length=chunk_length, + chunk_width=chunk_width, + compressor=compressor, + probability_dtype=probability_dtype, + max_mem=max_mem, + pack=pack, + tempdir=tempdir, + ) diff --git a/sgkit_bgen/tests/test_bgen_reader.py b/sgkit_bgen/tests/test_bgen_reader.py index b9200d1..bf1406d 100644 --- a/sgkit_bgen/tests/test_bgen_reader.py +++ b/sgkit_bgen/tests/test_bgen_reader.py @@ -9,8 +9,8 @@ from sgkit_bgen.bgen_reader import ( GT_DATA_VARS, BgenReader, - rechunk_from_zarr, - rechunk_to_zarr, + bgen_to_zarr, + rechunk_bgen, unpack_variables, ) @@ -44,6 +44,12 @@ [np.nan, 1.018, 0.010, 0.160, 0.991] # Generated using bgen-reader directly ) +EXPECTED_DIMS = dict(variants=199, samples=500, genotypes=3, alleles=2) + + +def _shape(*dims: str) -> Tuple[int, ...]: + return tuple(EXPECTED_DIMS[d] for d in dims) + @pytest.mark.parametrize("chunks", CHUNKS) def test_read_bgen(shared_datadir, chunks): @@ -51,12 +57,14 @@ def test_read_bgen(shared_datadir, chunks): ds = read_bgen(path, chunks=chunks) # check some of the data (in different chunks) - assert ds["call_dosage"].shape == (199, 500) + assert ds["call_dosage"].shape == _shape("variants", "samples") npt.assert_almost_equal(ds["call_dosage"].values[1][0], 1.987, decimal=3) npt.assert_almost_equal(ds["call_dosage"].values[100][0], 0.160, decimal=3) npt.assert_array_equal(ds["call_dosage_mask"].values[0, 0], [True]) npt.assert_array_equal(ds["call_dosage_mask"].values[0, 1], [False]) - assert ds["call_genotype_probability"].shape == (199, 500, 3) + assert ds["call_genotype_probability"].shape == _shape( + "variants", "samples", "genotypes" + ) npt.assert_almost_equal( ds["call_genotype_probability"].values[1][0], [0.005, 0.002, 0.992], decimal=3 ) @@ -137,14 +145,14 @@ def test_read_bgen__raise_on_invalid_indexers(shared_datadir): reader[([0], [0], [0])] -def _rechunk_to_zarr( +def _rechunk_bgen( shared_datadir: Path, tmp_path: Path, **kwargs: Any -) -> Tuple[xr.Dataset, str]: +) -> Tuple[xr.Dataset, xr.Dataset, str]: path = shared_datadir / "example.bgen" ds = read_bgen(path, chunks=(10, -1, -1)) store = tmp_path / "example.zarr" - rechunk_to_zarr(ds, store, **kwargs) - return ds, str(store) + dsr = rechunk_bgen(ds, store, **kwargs) + return ds, dsr, str(store) def _open_zarr(store: str, **kwargs: Any) -> xr.Dataset: @@ -152,24 +160,30 @@ def _open_zarr(store: str, **kwargs: Any) -> xr.Dataset: return xr.open_zarr(store, concat_characters=False, **kwargs) # type: ignore[no-any-return,no-untyped-call] -@pytest.mark.parametrize("chunk_width", [10, 50, 500]) -def test_rechunk_to_zarr__chunk_size(shared_datadir, tmp_path, chunk_width): - _, store = _rechunk_to_zarr( - shared_datadir, tmp_path, chunk_width=chunk_width, pack=False +@pytest.mark.parametrize("target_chunks", [(10, 10), (50, 50), (100, 50), (50, 100)]) +def test_rechunk_bgen__target_chunks(shared_datadir, tmp_path, target_chunks): + _, dsr, store = _rechunk_bgen( + shared_datadir, + tmp_path, + chunk_length=target_chunks[0], + chunk_width=target_chunks[1], + pack=False, ) - dsr = _open_zarr(store) for v in GT_DATA_VARS: - # Chunks shape should equal ( - # length of chunks on read, - # width of chunks on rechunk - # ) - assert dsr[v].data.chunksize[0] == 10 - assert dsr[v].data.chunksize[1] == chunk_width + assert dsr[v].data.chunksize[:2] == target_chunks + + +def test_rechunk_from_zarr__self_consistent(shared_datadir, tmp_path): + # With no probability dtype or packing, rechunk_{to,from}_zarr is a noop + ds, dsr, store = _rechunk_bgen( + shared_datadir, tmp_path, probability_dtype=None, pack=False + ) + xr.testing.assert_allclose(ds.compute(), dsr.compute()) # type: ignore[no-untyped-call] @pytest.mark.parametrize("dtype", ["uint8", "uint16"]) -def test_rechunk_to_zarr__probability_encoding(shared_datadir, tmp_path, dtype): - ds, store = _rechunk_to_zarr( +def test_rechunk_bgen__probability_encoding(shared_datadir, tmp_path, dtype): + ds, _, store = _rechunk_bgen( shared_datadir, tmp_path, probability_dtype=dtype, pack=False ) dsr = _open_zarr(store, mask_and_scale=False) @@ -184,50 +198,25 @@ def test_rechunk_to_zarr__probability_encoding(shared_datadir, tmp_path, dtype): np.testing.assert_allclose(ds[v], dsr[v], atol=tolerance) -def test_rechunk_to_zarr__variable_packing(shared_datadir, tmp_path): - ds, store = _rechunk_to_zarr( +def test_rechunk_bgen__variable_packing(shared_datadir, tmp_path): + ds, dsr, store = _rechunk_bgen( shared_datadir, tmp_path, probability_dtype=None, pack=True ) - dsr = _open_zarr(store, mask_and_scale=True) - dsr = unpack_variables(dsr) # A minor tolerance is necessary here when packing is enabled # because one of the genotype probabilities is constructed from the others xr.testing.assert_allclose(ds.compute(), dsr.compute(), atol=1e-6) # type: ignore[no-untyped-call] -def test_rechunk_to_zarr__raise_on_invalid_chunk_length(shared_datadir, tmp_path): - with pytest.raises( - ValueError, - match="Chunk size in variant dimension for variable .* must evenly divide target chunk size", - ): - _rechunk_to_zarr(shared_datadir, tmp_path, chunk_length=11) - - -@pytest.mark.parametrize("chunks", [(10, 10), (50, 50), (100, 50), (50, 100)]) -def test_rechunk_from_zarr__target_chunks(shared_datadir, tmp_path, chunks): - ds, store = _rechunk_to_zarr( - shared_datadir, - tmp_path, - chunk_length=chunks[0], - chunk_width=chunks[1], - pack=False, - ) - ds = rechunk_from_zarr(store, chunk_length=chunks[0], chunk_width=chunks[1]) - for v in GT_DATA_VARS: - assert ds[v].data.chunksize[:2] == chunks - - @pytest.mark.parametrize("dtype", ["uint32", "int8", "float32"]) -def test_rechunk_from_zarr__invalid_probability_type(shared_datadir, tmp_path, dtype): +def test_rechunk_bgen__invalid_probability_type(shared_datadir, tmp_path, dtype): with pytest.raises(ValueError, match="Probability integer dtype invalid"): - _rechunk_to_zarr(shared_datadir, tmp_path, probability_dtype=dtype) + _rechunk_bgen(shared_datadir, tmp_path, probability_dtype=dtype) def test_unpack_variables__invalid_gp_dims(shared_datadir, tmp_path): # Validate that an error is thrown when variables are # unpacked without being packed in the first place - _, store = _rechunk_to_zarr(shared_datadir, tmp_path, pack=False) - dsr = _open_zarr(store, mask_and_scale=True) + _, dsr, store = _rechunk_bgen(shared_datadir, tmp_path, pack=False) with pytest.raises( ValueError, match="Expecting variable 'call_genotype_probability' to have genotypes dimension of size 2", @@ -235,10 +224,18 @@ def test_unpack_variables__invalid_gp_dims(shared_datadir, tmp_path): unpack_variables(dsr) -def test_rechunk_from_zarr__self_consistent(shared_datadir, tmp_path): - # With no probability dtype or packing, rechunk_{to,from}_zarr is a noop - ds, store = _rechunk_to_zarr( - shared_datadir, tmp_path, probability_dtype=None, pack=False - ) - dsr = rechunk_from_zarr(store) - xr.testing.assert_allclose(ds.compute(), dsr.compute()) # type: ignore[no-untyped-call] +@pytest.mark.parametrize( + "region", [None, dict(variants=slice(0, 100)), dict(samples=slice(0, 50))] +) +def test_bgen_to_zarr(shared_datadir, tmp_path, region): + input = shared_datadir / "example.bgen" + output = tmp_path / "example.zarr" + ds = bgen_to_zarr(input, output, region=region) + expected_dims = { + k: EXPECTED_DIMS[k] + if region is None or k not in region + else region[k].stop - region[k].start + for k in EXPECTED_DIMS + } + actual_dims = {k: v for k, v in ds.dims.items() if k in expected_dims} + assert actual_dims == expected_dims