From d13275649fd0a7a938f09745f71787cc7edb2c18 Mon Sep 17 00:00:00 2001 From: eric-czech Date: Mon, 24 Aug 2020 18:03:29 -0400 Subject: [PATCH] bgen_to_zarr implementation #16 --- setup.cfg | 4 +- sgkit_bgen/__init__.py | 4 +- sgkit_bgen/bgen_reader.py | 150 ++++++++++++++++++++++++--- sgkit_bgen/tests/data/.gitignore | 2 + sgkit_bgen/tests/test_bgen_reader.py | 135 ++++++++++++++++++++++-- 5 files changed, 269 insertions(+), 26 deletions(-) create mode 100644 sgkit_bgen/tests/data/.gitignore diff --git a/setup.cfg b/setup.cfg index 35ae76e..74678b4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -54,7 +54,7 @@ ignore = [isort] default_section = THIRDPARTY known_first_party = sgkit -known_third_party = bgen_reader,dask,numpy,pytest,setuptools,xarray +known_third_party = bgen_reader,dask,numpy,pytest,setuptools,xarray,zarr multi_line_output = 3 include_trailing_comma = True force_grid_wrap = 0 @@ -71,5 +71,7 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-sgkit.*] ignore_missing_imports = True +[mypy-zarr.*] +ignore_missing_imports = True [mypy-sgkit_bgen.tests.*] disallow_untyped_defs = False diff --git a/sgkit_bgen/__init__.py b/sgkit_bgen/__init__.py index aa9465b..ec2b66e 100644 --- a/sgkit_bgen/__init__.py +++ b/sgkit_bgen/__init__.py @@ -1,3 +1,3 @@ -from .bgen_reader import read_bgen # noqa: F401 +from .bgen_reader import read_bgen, rechunk_from_zarr, rechunk_to_zarr # noqa: F401 -__all__ = ["read_bgen"] +__all__ = ["read_bgen", "rechunk_from_zarr", "rechunk_to_zarr"] diff --git a/sgkit_bgen/bgen_reader.py b/sgkit_bgen/bgen_reader.py index f90babb..dec235c 100644 --- a/sgkit_bgen/bgen_reader.py +++ b/sgkit_bgen/bgen_reader.py @@ -1,16 +1,19 @@ """BGEN reader implementation (using bgen_reader)""" from pathlib import Path -from typing import Any, Dict, Tuple, Union +from typing import Any, Dict, Hashable, MutableMapping, Optional, Tuple, Union import dask.array as da import dask.dataframe as dd import numpy as np +import xarray as xr +import zarr from bgen_reader._bgen_file import bgen_file from bgen_reader._bgen_metafile import bgen_metafile from bgen_reader._metafile import create_metafile from bgen_reader._reader import infer_metafile_filepath from bgen_reader._samples import generate_samples, read_samples_file from xarray import Dataset +from xarray.backends.zarr import ZarrStore from sgkit import create_genotype_dosage_dataset from sgkit.typing import ArrayLike @@ -38,6 +41,13 @@ def _to_dict(df: dd.DataFrame, dtype: Any = None) -> Dict[str, da.Array]: VARIANT_DF_DTYPE = dict([(f[0], f[1]) for f in VARIANT_FIELDS]) VARIANT_ARRAY_DTYPE = dict([(f[0], f[2]) for f in VARIANT_FIELDS]) +GT_DATA_VARS = [ + "call_genotype_probability", + "call_genotype_probability_mask", + "call_dosage", + "call_dosage_mask", +] + class BgenReader: @@ -79,15 +89,7 @@ def split(allele_row: np.ndarray) -> np.ndarray: return np.apply_along_axis(split, 1, alleles[:, np.newaxis]) - variant_alleles = variant_arrs["allele_ids"].map_blocks(split_alleles) - - def max_str_len(arr: ArrayLike) -> Any: - return arr.map_blocks( - lambda s: np.char.str_len(s.astype(str)), dtype=np.int8 - ).max() - - max_allele_length = max(max_str_len(variant_alleles).compute()) - self.variant_alleles = variant_alleles.astype(f"S{max_allele_length}") + self.variant_alleles = variant_arrs["allele_ids"].map_blocks(split_alleles) with bgen_file(self.path) as bgen: sample_path = self.path.with_suffix(".sample") @@ -172,6 +174,7 @@ def read_bgen( chunks: Union[str, int, Tuple[int, ...]] = "auto", lock: bool = False, persist: bool = True, + dtype: Any = "float32", ) -> Dataset: """Read BGEN dataset. @@ -194,23 +197,23 @@ def read_bgen( memory, by default True. This is an important performance consideration as the metadata file for this data will be read multiple times when False. + dtype : Any + Genotype probability array data type, by default float32. Warnings -------- Only bi-allelic, diploid BGEN files are currently supported. """ - bgen_reader = BgenReader(path, persist) + bgen_reader = BgenReader(path, persist, dtype=dtype) variant_contig, variant_contig_names = encode_array(bgen_reader.contig.compute()) variant_contig_names = list(variant_contig_names) variant_contig = variant_contig.astype("int16") - - variant_position = np.array(bgen_reader.pos, dtype=int) - variant_alleles = np.array(bgen_reader.variant_alleles, dtype="S1") - variant_id = np.array(bgen_reader.variant_id, dtype=str) - - sample_id = np.array(bgen_reader.sample_id, dtype=str) + variant_position = np.asarray(bgen_reader.pos, dtype=int) + variant_alleles = np.asarray(bgen_reader.variant_alleles, dtype="S") + variant_id = np.asarray(bgen_reader.variant_id, dtype=str) + sample_id = np.asarray(bgen_reader.sample_id, dtype=str) call_genotype_probability = da.from_array( bgen_reader, @@ -234,3 +237,116 @@ def read_bgen( ) return ds + + +def encode_variables( + ds: Dataset, + compressor: Optional[Any] = zarr.Blosc(cname="zstd", clevel=7, shuffle=2), + probability_dtype: Optional[Any] = "uint8", +) -> Dict[Hashable, Dict[str, Any]]: + encoding = {} + for v in ds: + e = {} + if compressor is not None: + e.update({"compressor": compressor}) + if probability_dtype is not None and v == "call_genotype_probability": + dtype = np.dtype(probability_dtype) + # Xarray will decode into float32 so any int greater than + # 16 bits will cause overflow/underflow + # See https://en.wikipedia.org/wiki/Floating-point_arithmetic#Internal_representation + # *bits precision column for single precision floats + if dtype not in [np.uint8, np.uint16]: + raise ValueError( + "Probability integer dtype invalid, must " + f"be uint8 or uint16 not {probability_dtype}" + ) + divisor = np.iinfo(dtype).max - 1 + e.update( + { + "dtype": probability_dtype, + "add_offset": -1.0 / divisor, + "scale_factor": 1.0 / divisor, + "_FillValue": 0, + } + ) + if e: + encoding[v] = e + return encoding + + +def pack_variables(ds: Dataset) -> Dataset: + # Remove dosage as it is unnecessary and should be redefined + # based on encoded probabilities later (w/ reduced precision) + ds = ds.drop_vars(["call_dosage", "call_dosage_mask"], errors="ignore") + + # Remove homozygous reference GP and redefine mask + gp = ds["call_genotype_probability"][..., 1:] + gp_mask = ds["call_genotype_probability_mask"].any(dim="genotypes") + ds = ds.drop_vars(["call_genotype_probability", "call_genotype_probability_mask"]) + ds = ds.assign(call_genotype_probability=gp, call_genotype_probability_mask=gp_mask) + return ds + + +def unpack_variables(ds: Dataset, dtype: Any = "float32") -> Dataset: + # Restore homozygous reference GP + gp = ds["call_genotype_probability"].astype(dtype) + if gp.sizes["genotypes"] != 2: + raise ValueError( + "Expecting variable 'call_genotype_probability' to have genotypes " + f"dimension of size 2 (received sizes = {dict(gp.sizes)})" + ) + ds = ds.drop_vars("call_genotype_probability") + ds["call_genotype_probability"] = xr.concat( # type: ignore[no-untyped-call] + [1 - gp.sum(dim="genotypes", skipna=False), gp], dim="genotypes" + ) + + # Restore dosage + ds["call_dosage"] = gp[..., 0] + 2 * gp[..., 1] + ds["call_dosage_mask"] = ds["call_genotype_probability_mask"] + ds["call_genotype_probability_mask"] = ds[ + "call_genotype_probability_mask" + ].broadcast_like(ds["call_genotype_probability"]) + return ds + + +def rechunk_to_zarr( + ds: Dataset, + store: Union[PathType, MutableMapping[str, bytes]], + *, + mode: str = "w", + chunk_length: int = 10_000, + chunk_width: int = 10_000, + compressor: Optional[Any] = zarr.Blosc(cname="zstd", clevel=7, shuffle=2), + probability_dtype: Optional[Any] = "uint8", + pack: bool = True, + compute: bool = True, +) -> ZarrStore: + if pack: + ds = pack_variables(ds) + for v in set(GT_DATA_VARS) & set(ds): + chunk_size = da.asarray(ds[v]).chunksize[0] + if chunk_length % chunk_size != 0: + raise ValueError( + f"Chunk size in variant dimension for variable '{v}' ({chunk_size}) " + f"must evenly divide target chunk size {chunk_length}" + ) + ds[v] = ds[v].chunk(chunks=dict(samples=chunk_width)) # type: ignore[dict-item] + encoding = encode_variables( + ds, compressor=compressor, probability_dtype=probability_dtype + ) + return ds.to_zarr(store, mode=mode, encoding=encoding or None, compute=compute) # type: ignore[arg-type] + + +def rechunk_from_zarr( + store: Union[PathType, MutableMapping[str, bytes]], + chunk_length: int = 10_000, + chunk_width: int = 10_000, + mask_and_scale: bool = True, +) -> Dataset: + # Always use concat_characters=False to avoid https://github.com/pydata/xarray/issues/4405 + ds = xr.open_zarr(store, mask_and_scale=mask_and_scale, concat_characters=False) # type: ignore[no-untyped-call] + for v in set(GT_DATA_VARS) & set(ds): + ds[v] = ds[v].chunk(chunks=dict(variants=chunk_length, samples=chunk_width)) + # Workaround for https://github.com/pydata/xarray/issues/4380 + del ds[v].encoding["chunks"] + return ds # type: ignore[no-any-return] diff --git a/sgkit_bgen/tests/data/.gitignore b/sgkit_bgen/tests/data/.gitignore new file mode 100644 index 0000000..4685de1 --- /dev/null +++ b/sgkit_bgen/tests/data/.gitignore @@ -0,0 +1,2 @@ +*.metadata2.mmm +*.metafile diff --git a/sgkit_bgen/tests/test_bgen_reader.py b/sgkit_bgen/tests/test_bgen_reader.py index 7f91d77..9ccd05c 100644 --- a/sgkit_bgen/tests/test_bgen_reader.py +++ b/sgkit_bgen/tests/test_bgen_reader.py @@ -1,8 +1,18 @@ +from pathlib import Path +from typing import Any, Tuple + import numpy as np import numpy.testing as npt import pytest +import xarray as xr from sgkit_bgen import read_bgen -from sgkit_bgen.bgen_reader import BgenReader +from sgkit_bgen.bgen_reader import ( + GT_DATA_VARS, + BgenReader, + rechunk_from_zarr, + rechunk_to_zarr, + unpack_variables, +) CHUNKS = [ (100, 200, 3), @@ -61,7 +71,7 @@ def test_read_bgen(shared_datadir, chunks): ) -def test_read_bgen_with_sample_file(shared_datadir): +def test_read_bgen__with_sample_file(shared_datadir): # The example file was generated using # qctool -g sgkit_bgen/tests/data/example.bgen -og sgkit_bgen/tests/data/example-separate-samples.bgen -os sgkit_bgen/tests/data/example-separate-samples.sample -incl-samples sgkit_bgen/tests/data/samples # Then editing example-separate-samples.sample to change the sample IDs @@ -71,7 +81,7 @@ def test_read_bgen_with_sample_file(shared_datadir): assert ds["sample_id"].values.tolist() == ["s1", "s2", "s3", "s4", "s5"] -def test_read_bgen_with_no_samples(shared_datadir): +def test_read_bgen__with_no_samples(shared_datadir): # The example file was generated using # qctool -g sgkit_bgen/tests/data/example.bgen -og sgkit_bgen/tests/data/example-no-samples.bgen -os sgkit_bgen/tests/data/example-no-samples.sample -bgen-omit-sample-identifier-block -incl-samples sgkit_bgen/tests/data/samples # Then deleting example-no-samples.sample @@ -88,7 +98,7 @@ def test_read_bgen_with_no_samples(shared_datadir): @pytest.mark.parametrize("chunks", CHUNKS) -def test_read_bgen_fancy_index(shared_datadir, chunks): +def test_read_bgen__fancy_index(shared_datadir, chunks): path = shared_datadir / "example.bgen" ds = read_bgen(path, chunks=chunks) npt.assert_almost_equal( @@ -98,7 +108,7 @@ def test_read_bgen_fancy_index(shared_datadir, chunks): @pytest.mark.parametrize("chunks", CHUNKS) -def test_read_bgen_scalar_index(shared_datadir, chunks): +def test_read_bgen__scalar_index(shared_datadir, chunks): path = shared_datadir / "example.bgen" ds = read_bgen(path, chunks=chunks) for i, ix in enumerate(INDEXES): @@ -116,7 +126,13 @@ def test_read_bgen_scalar_index(shared_datadir, chunks): ) -def test_read_bgen_raise_on_invalid_indexers(shared_datadir): +def test_read_bgen__fixlen_strings(shared_datadir): + path = shared_datadir / "example.bgen" + ds = read_bgen(path, fixed_length_strings=True) + assert ds["variant_allele"].dtype == "S1" + + +def test_read_bgen__raise_on_invalid_indexers(shared_datadir): path = shared_datadir / "example.bgen" reader = BgenReader(path) with pytest.raises(IndexError, match="Indexer must be tuple"): @@ -125,3 +141,110 @@ def test_read_bgen_raise_on_invalid_indexers(shared_datadir): reader[(slice(None),)] with pytest.raises(IndexError, match="Indexer must contain only slices or ints"): reader[([0], [0], [0])] + + +def _rechunk_to_zarr( + shared_datadir: Path, tmp_path: Path, **kwargs: Any +) -> Tuple[xr.Dataset, str]: + path = shared_datadir / "example.bgen" + ds = read_bgen(path, chunks=(10, -1, -1)) + store = tmp_path / "example.zarr" + rechunk_to_zarr(ds, store, **kwargs) + return ds, str(store) + + +def _open_zarr(store: str, **kwargs: Any) -> xr.Dataset: + # Force concat_characters False to avoid to avoid https://github.com/pydata/xarray/issues/4405 + return xr.open_zarr(store, concat_characters=False, **kwargs) # type: ignore[no-any-return,no-untyped-call] + + +@pytest.mark.parametrize("chunk_width", [10, 50, 500]) +def test_rechunk_to_zarr__chunk_size(shared_datadir, tmp_path, chunk_width): + _, store = _rechunk_to_zarr( + shared_datadir, tmp_path, chunk_width=chunk_width, pack=False + ) + dsr = _open_zarr(store) + for v in GT_DATA_VARS: + # Chunks shape should equal ( + # length of chunks on read, + # width of chunks on rechunk + # ) + assert dsr[v].data.chunksize[0] == 10 + assert dsr[v].data.chunksize[1] == chunk_width + + +@pytest.mark.parametrize("dtype", ["uint8", "uint16"]) +def test_rechunk_to_zarr__probability_encoding(shared_datadir, tmp_path, dtype): + ds, store = _rechunk_to_zarr( + shared_datadir, tmp_path, probability_dtype=dtype, pack=False + ) + dsr = _open_zarr(store, mask_and_scale=False) + v = "call_genotype_probability" + assert dsr[v].shape == ds[v].shape + assert dsr[v].dtype == dtype + dsr = _open_zarr(store, mask_and_scale=True) + # There are two missing calls which equates to + # 6 total nan values across 3 possible genotypes + assert np.isnan(dsr[v].values).sum() == 6 + tolerance = 1.0 / (np.iinfo(dtype).max - 1) + np.testing.assert_allclose(ds[v], dsr[v], atol=tolerance) + + +def test_rechunk_to_zarr__variable_packing(shared_datadir, tmp_path): + ds, store = _rechunk_to_zarr( + shared_datadir, tmp_path, probability_dtype=None, pack=True + ) + dsr = _open_zarr(store, mask_and_scale=True) + dsr = unpack_variables(dsr) + # A minor tolerance is necessary here when packing is enabled + # because one of the genotype probabilities is constructed from the others + xr.testing.assert_allclose(ds.compute(), dsr.compute(), atol=1e-6) # type: ignore[no-untyped-call] + + +def test_rechunk_to_zarr__raise_on_invalid_chunk_length(shared_datadir, tmp_path): + with pytest.raises( + ValueError, + match="Chunk size in variant dimension for variable .* must evenly divide target chunk size", + ): + _rechunk_to_zarr(shared_datadir, tmp_path, chunk_length=11) + + +@pytest.mark.parametrize("chunks", [(10, 10), (50, 50), (100, 50), (50, 100)]) +def test_rechunk_from_zarr__target_chunks(shared_datadir, tmp_path, chunks): + ds, store = _rechunk_to_zarr( + shared_datadir, + tmp_path, + chunk_length=chunks[0], + chunk_width=chunks[1], + pack=False, + ) + ds = rechunk_from_zarr(store, chunk_length=chunks[0], chunk_width=chunks[1]) + for v in GT_DATA_VARS: + assert ds[v].data.chunksize[:2] == chunks + + +@pytest.mark.parametrize("dtype", ["uint32", "int8", "float32"]) +def test_rechunk_from_zarr__invalid_probability_type(shared_datadir, tmp_path, dtype): + with pytest.raises(ValueError, match="Probability integer dtype invalid"): + _rechunk_to_zarr(shared_datadir, tmp_path, probability_dtype=dtype) + + +def test_unpack_variables__invalid_gp_dims(shared_datadir, tmp_path): + # Validate that an error is thrown when variables are + # unpacked without being packed in the first place + _, store = _rechunk_to_zarr(shared_datadir, tmp_path, pack=False) + dsr = _open_zarr(store, mask_and_scale=True) + with pytest.raises( + ValueError, + match="Expecting variable 'call_genotype_probability' to have genotypes dimension of size 2", + ): + unpack_variables(dsr) + + +def test_rechunk_from_zarr__self_consistent(shared_datadir, tmp_path): + # With no probability dtype or packing, rechunk_{to,from}_zarr is a noop + ds, store = _rechunk_to_zarr( + shared_datadir, tmp_path, probability_dtype=None, pack=False + ) + dsr = rechunk_from_zarr(store) + xr.testing.assert_allclose(ds.compute(), dsr.compute()) # type: ignore[no-untyped-call]