Skip to content

Commit

Permalink
Bgen to zarr implementation sgkit-dev#16
Browse files Browse the repository at this point in the history
  • Loading branch information
eric-czech committed Sep 22, 2020
1 parent 4ee4ee2 commit bbdbb56
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 95 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ xarray
zarr
bgen_reader>=4.0.5
git+https://github.com/pystatgen/sgkit
git+https://github.com/eric-czech/rechunker.git
6 changes: 5 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ ignore =
[isort]
default_section = THIRDPARTY
known_first_party = sgkit
known_third_party = bgen_reader,dask,numpy,pytest,setuptools,xarray,zarr
known_third_party = bgen_reader,dask,numpy,pytest,rechunker,setuptools,xarray,zarr
multi_line_output = 3
include_trailing_comma = True
force_grid_wrap = 0
Expand All @@ -67,6 +67,8 @@ ignore_missing_imports = True
ignore_missing_imports = True
[mypy-setuptools.*]
ignore_missing_imports = True
[mypy-rechunker.*]
ignore_missing_imports = True
[mypy-bgen_reader.*]
ignore_missing_imports = True
[mypy-sgkit.*]
Expand All @@ -75,3 +77,5 @@ ignore_missing_imports = True
ignore_missing_imports = True
[mypy-sgkit_bgen.tests.*]
disallow_untyped_defs = False
[mypy-sgkit_bgen.*]
allow_redefinition = True
4 changes: 2 additions & 2 deletions sgkit_bgen/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .bgen_reader import read_bgen, rechunk_from_zarr, rechunk_to_zarr # noqa: F401
from .bgen_reader import bgen_to_zarr, read_bgen, rechunk_bgen # noqa: F401

__all__ = ["read_bgen", "rechunk_from_zarr", "rechunk_to_zarr"]
__all__ = ["read_bgen", "rechunk_bgen", "bgen_to_zarr"]
107 changes: 73 additions & 34 deletions sgkit_bgen/bgen_reader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""BGEN reader implementation (using bgen_reader)"""
import tempfile
from pathlib import Path
from typing import Any, Dict, Hashable, MutableMapping, Optional, Tuple, Union
from typing import Any, Dict, Hashable, Mapping, MutableMapping, Optional, Tuple, Union

import dask.array as da
import dask.dataframe as dd
Expand All @@ -12,11 +13,11 @@
from bgen_reader._metafile import create_metafile
from bgen_reader._reader import infer_metafile_filepath
from bgen_reader._samples import generate_samples, read_samples_file
from rechunker import api as rechunker_api
from xarray import Dataset
from xarray.backends.zarr import ZarrStore

from sgkit import create_genotype_dosage_dataset
from sgkit.typing import ArrayLike
from sgkit.typing import ArrayLike, DType
from sgkit.utils import encode_array

PathType = Union[str, Path]
Expand Down Expand Up @@ -241,6 +242,8 @@ def read_bgen(

def encode_variables(
ds: Dataset,
chunk_length: int,
chunk_width: int,
compressor: Optional[Any] = zarr.Blosc(cname="zstd", clevel=7, shuffle=2),
probability_dtype: Optional[Any] = "uint8",
) -> Dict[Hashable, Dict[str, Any]]:
Expand All @@ -249,6 +252,8 @@ def encode_variables(
e = {}
if compressor is not None:
e.update({"compressor": compressor})
if v in GT_DATA_VARS:
e.update({"chunks": (chunk_length, chunk_width) + ds[v].shape[2:]})
if probability_dtype is not None and v == "call_genotype_probability":
dtype = np.dtype(probability_dtype)
# Xarray will decode into float32 so any int greater than
Expand Down Expand Up @@ -287,16 +292,16 @@ def pack_variables(ds: Dataset) -> Dataset:
return ds


def unpack_variables(ds: Dataset, dtype: Any = "float32") -> Dataset:
def unpack_variables(ds: Dataset, dtype: DType = "float32") -> Dataset:
# Restore homozygous reference GP
gp = ds["call_genotype_probability"].astype(dtype)
gp = ds["call_genotype_probability"].astype(dtype) # type: ignore[no-untyped-call]
if gp.sizes["genotypes"] != 2:
raise ValueError(
"Expecting variable 'call_genotype_probability' to have genotypes "
f"dimension of size 2 (received sizes = {dict(gp.sizes)})"
)
ds = ds.drop_vars("call_genotype_probability")
ds["call_genotype_probability"] = xr.concat( # type: ignore[no-untyped-call]
ds["call_genotype_probability"] = xr.concat(
[1 - gp.sum(dim="genotypes", skipna=False), gp], dim="genotypes"
)

Expand All @@ -309,44 +314,78 @@ def unpack_variables(ds: Dataset, dtype: Any = "float32") -> Dataset:
return ds


def rechunk_to_zarr(
def rechunk_bgen(
ds: Dataset,
store: Union[PathType, MutableMapping[str, bytes]],
output: Union[PathType, MutableMapping[str, bytes]],
*,
mode: str = "w",
chunk_length: int = 10_000,
chunk_width: int = 10_000,
chunk_width: int = 1_000,
compressor: Optional[Any] = zarr.Blosc(cname="zstd", clevel=7, shuffle=2),
probability_dtype: Optional[Any] = "uint8",
probability_dtype: Optional[DType] = "uint8",
max_mem: str = "4GB",
pack: bool = True,
compute: bool = True,
) -> ZarrStore:
tempdir: Optional[PathType] = None,
) -> Dataset:
if isinstance(output, Path):
output = str(output)

chunk_length = min(chunk_length, ds.dims["variants"])
chunk_width = min(chunk_width, ds.dims["samples"])

if pack:
ds = pack_variables(ds)
for v in set(GT_DATA_VARS) & set(ds):
chunk_size = da.asarray(ds[v]).chunksize[0]
if chunk_length % chunk_size != 0:
raise ValueError(
f"Chunk size in variant dimension for variable '{v}' ({chunk_size}) "
f"must evenly divide target chunk size {chunk_length}"
)
ds[v] = ds[v].chunk(chunks=dict(samples=chunk_width)) # type: ignore[dict-item]

encoding = encode_variables(
ds, compressor=compressor, probability_dtype=probability_dtype
ds,
chunk_length=chunk_length,
chunk_width=chunk_width,
compressor=compressor,
probability_dtype=probability_dtype,
)
return ds.to_zarr(store, mode=mode, encoding=encoding or None, compute=compute) # type: ignore[arg-type]
with tempfile.TemporaryDirectory(
prefix="bgen_to_zarr_", suffix=".zarr", dir=tempdir
) as tmpdir:
rechunked = rechunker_api.rechunk_dataset(
ds,
encoding=encoding,
max_mem=max_mem,
target_store=output,
temp_store=tmpdir,
executor="dask",
)
rechunked.execute()

ds: Dataset = xr.open_zarr(output, concat_characters=False) # type: ignore[no-untyped-call]
if pack:
ds = unpack_variables(ds)

return ds


def rechunk_from_zarr(
store: Union[PathType, MutableMapping[str, bytes]],
def bgen_to_zarr(
input: PathType,
output: Union[PathType, MutableMapping[str, bytes]],
region: Optional[Mapping[Hashable, Any]] = None,
chunk_length: int = 10_000,
chunk_width: int = 10_000,
mask_and_scale: bool = True,
chunk_width: int = 1_000,
temp_chunk_length: int = 100,
compressor: Optional[Any] = zarr.Blosc(cname="zstd", clevel=7, shuffle=2),
probability_dtype: Optional[DType] = "uint8",
max_mem: str = "4GB",
pack: bool = True,
tempdir: Optional[PathType] = None,
) -> Dataset:
# Always use concat_characters=False to avoid https://github.com/pydata/xarray/issues/4405
ds = xr.open_zarr(store, mask_and_scale=mask_and_scale, concat_characters=False) # type: ignore[no-untyped-call]
for v in set(GT_DATA_VARS) & set(ds):
ds[v] = ds[v].chunk(chunks=dict(variants=chunk_length, samples=chunk_width))
# Workaround for https://github.com/pydata/xarray/issues/4380
del ds[v].encoding["chunks"]
return ds # type: ignore[no-any-return]
ds = read_bgen(input, chunks=(temp_chunk_length, -1, -1))
if region is not None:
ds = ds.isel(indexers=region)
return rechunk_bgen(
ds,
output,
chunk_length=chunk_length,
chunk_width=chunk_width,
compressor=compressor,
probability_dtype=probability_dtype,
max_mem=max_mem,
pack=pack,
tempdir=tempdir,
)
113 changes: 55 additions & 58 deletions sgkit_bgen/tests/test_bgen_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from sgkit_bgen.bgen_reader import (
GT_DATA_VARS,
BgenReader,
rechunk_from_zarr,
rechunk_to_zarr,
bgen_to_zarr,
rechunk_bgen,
unpack_variables,
)

Expand Down Expand Up @@ -44,19 +44,27 @@
[np.nan, 1.018, 0.010, 0.160, 0.991] # Generated using bgen-reader directly
)

EXPECTED_DIMS = dict(variants=199, samples=500, genotypes=3, alleles=2)


def _shape(*dims: str) -> Tuple[int, ...]:
return tuple(EXPECTED_DIMS[d] for d in dims)


@pytest.mark.parametrize("chunks", CHUNKS)
def test_read_bgen(shared_datadir, chunks):
path = shared_datadir / "example.bgen"
ds = read_bgen(path, chunks=chunks)

# check some of the data (in different chunks)
assert ds["call_dosage"].shape == (199, 500)
assert ds["call_dosage"].shape == _shape("variants", "samples")
npt.assert_almost_equal(ds["call_dosage"].values[1][0], 1.987, decimal=3)
npt.assert_almost_equal(ds["call_dosage"].values[100][0], 0.160, decimal=3)
npt.assert_array_equal(ds["call_dosage_mask"].values[0, 0], [True])
npt.assert_array_equal(ds["call_dosage_mask"].values[0, 1], [False])
assert ds["call_genotype_probability"].shape == (199, 500, 3)
assert ds["call_genotype_probability"].shape == _shape(
"variants", "samples", "genotypes"
)
npt.assert_almost_equal(
ds["call_genotype_probability"].values[1][0], [0.005, 0.002, 0.992], decimal=3
)
Expand Down Expand Up @@ -137,39 +145,45 @@ def test_read_bgen__raise_on_invalid_indexers(shared_datadir):
reader[([0], [0], [0])]


def _rechunk_to_zarr(
def _rechunk_bgen(
shared_datadir: Path, tmp_path: Path, **kwargs: Any
) -> Tuple[xr.Dataset, str]:
) -> Tuple[xr.Dataset, xr.Dataset, str]:
path = shared_datadir / "example.bgen"
ds = read_bgen(path, chunks=(10, -1, -1))
store = tmp_path / "example.zarr"
rechunk_to_zarr(ds, store, **kwargs)
return ds, str(store)
dsr = rechunk_bgen(ds, store, **kwargs)
return ds, dsr, str(store)


def _open_zarr(store: str, **kwargs: Any) -> xr.Dataset:
# Force concat_characters False to avoid to avoid https://github.com/pydata/xarray/issues/4405
return xr.open_zarr(store, concat_characters=False, **kwargs) # type: ignore[no-any-return,no-untyped-call]


@pytest.mark.parametrize("chunk_width", [10, 50, 500])
def test_rechunk_to_zarr__chunk_size(shared_datadir, tmp_path, chunk_width):
_, store = _rechunk_to_zarr(
shared_datadir, tmp_path, chunk_width=chunk_width, pack=False
@pytest.mark.parametrize("target_chunks", [(10, 10), (50, 50), (100, 50), (50, 100)])
def test_rechunk_bgen__target_chunks(shared_datadir, tmp_path, target_chunks):
_, dsr, store = _rechunk_bgen(
shared_datadir,
tmp_path,
chunk_length=target_chunks[0],
chunk_width=target_chunks[1],
pack=False,
)
dsr = _open_zarr(store)
for v in GT_DATA_VARS:
# Chunks shape should equal (
# length of chunks on read,
# width of chunks on rechunk
# )
assert dsr[v].data.chunksize[0] == 10
assert dsr[v].data.chunksize[1] == chunk_width
assert dsr[v].data.chunksize[:2] == target_chunks


def test_rechunk_from_zarr__self_consistent(shared_datadir, tmp_path):
# With no probability dtype or packing, rechunk_{to,from}_zarr is a noop
ds, dsr, store = _rechunk_bgen(
shared_datadir, tmp_path, probability_dtype=None, pack=False
)
xr.testing.assert_allclose(ds.compute(), dsr.compute()) # type: ignore[no-untyped-call]


@pytest.mark.parametrize("dtype", ["uint8", "uint16"])
def test_rechunk_to_zarr__probability_encoding(shared_datadir, tmp_path, dtype):
ds, store = _rechunk_to_zarr(
def test_rechunk_bgen__probability_encoding(shared_datadir, tmp_path, dtype):
ds, _, store = _rechunk_bgen(
shared_datadir, tmp_path, probability_dtype=dtype, pack=False
)
dsr = _open_zarr(store, mask_and_scale=False)
Expand All @@ -184,61 +198,44 @@ def test_rechunk_to_zarr__probability_encoding(shared_datadir, tmp_path, dtype):
np.testing.assert_allclose(ds[v], dsr[v], atol=tolerance)


def test_rechunk_to_zarr__variable_packing(shared_datadir, tmp_path):
ds, store = _rechunk_to_zarr(
def test_rechunk_bgen__variable_packing(shared_datadir, tmp_path):
ds, dsr, store = _rechunk_bgen(
shared_datadir, tmp_path, probability_dtype=None, pack=True
)
dsr = _open_zarr(store, mask_and_scale=True)
dsr = unpack_variables(dsr)
# A minor tolerance is necessary here when packing is enabled
# because one of the genotype probabilities is constructed from the others
xr.testing.assert_allclose(ds.compute(), dsr.compute(), atol=1e-6) # type: ignore[no-untyped-call]


def test_rechunk_to_zarr__raise_on_invalid_chunk_length(shared_datadir, tmp_path):
with pytest.raises(
ValueError,
match="Chunk size in variant dimension for variable .* must evenly divide target chunk size",
):
_rechunk_to_zarr(shared_datadir, tmp_path, chunk_length=11)


@pytest.mark.parametrize("chunks", [(10, 10), (50, 50), (100, 50), (50, 100)])
def test_rechunk_from_zarr__target_chunks(shared_datadir, tmp_path, chunks):
ds, store = _rechunk_to_zarr(
shared_datadir,
tmp_path,
chunk_length=chunks[0],
chunk_width=chunks[1],
pack=False,
)
ds = rechunk_from_zarr(store, chunk_length=chunks[0], chunk_width=chunks[1])
for v in GT_DATA_VARS:
assert ds[v].data.chunksize[:2] == chunks


@pytest.mark.parametrize("dtype", ["uint32", "int8", "float32"])
def test_rechunk_from_zarr__invalid_probability_type(shared_datadir, tmp_path, dtype):
def test_rechunk_bgen__invalid_probability_type(shared_datadir, tmp_path, dtype):
with pytest.raises(ValueError, match="Probability integer dtype invalid"):
_rechunk_to_zarr(shared_datadir, tmp_path, probability_dtype=dtype)
_rechunk_bgen(shared_datadir, tmp_path, probability_dtype=dtype)


def test_unpack_variables__invalid_gp_dims(shared_datadir, tmp_path):
# Validate that an error is thrown when variables are
# unpacked without being packed in the first place
_, store = _rechunk_to_zarr(shared_datadir, tmp_path, pack=False)
dsr = _open_zarr(store, mask_and_scale=True)
_, dsr, store = _rechunk_bgen(shared_datadir, tmp_path, pack=False)
with pytest.raises(
ValueError,
match="Expecting variable 'call_genotype_probability' to have genotypes dimension of size 2",
):
unpack_variables(dsr)


def test_rechunk_from_zarr__self_consistent(shared_datadir, tmp_path):
# With no probability dtype or packing, rechunk_{to,from}_zarr is a noop
ds, store = _rechunk_to_zarr(
shared_datadir, tmp_path, probability_dtype=None, pack=False
)
dsr = rechunk_from_zarr(store)
xr.testing.assert_allclose(ds.compute(), dsr.compute()) # type: ignore[no-untyped-call]
@pytest.mark.parametrize(
"region", [None, dict(variants=slice(0, 100)), dict(samples=slice(0, 50))]
)
def test_bgen_to_zarr(shared_datadir, tmp_path, region):
input = shared_datadir / "example.bgen"
output = tmp_path / "example.zarr"
ds = bgen_to_zarr(input, output, region=region)
expected_dims = {
k: EXPECTED_DIMS[k]
if region is None or k not in region
else region[k].stop - region[k].start
for k in EXPECTED_DIMS
}
actual_dims = {k: v for k, v in ds.dims.items() if k in expected_dims}
assert actual_dims == expected_dims

0 comments on commit bbdbb56

Please sign in to comment.