-
Notifications
You must be signed in to change notification settings - Fork 157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(feat): read_elem_as_dask
method
#1469
Merged
Merged
Changes from all commits
Commits
Show all changes
160 commits
Select commit
Hold shift + click to select a range
d111f04
(feat): `read_elem_lazy` method
ilan-gold 00be7f0
(revert): error message
ilan-gold fd635d7
(refactor): declare `is_csc` reading elem directly in h5
ilan-gold f5e7fda
(chore): `read_elem_lazy` -> `read_elem_as_dask`
ilan-gold ae5396c
(chore): remove string handling
ilan-gold 664336a
(refactor): use `elem` for h5 where posssble
ilan-gold 2370215
Merge branch 'main' into ig/read_dask_elem
ilan-gold 52002b6
(chore): remove invlaud syntax
ilan-gold 5ab1ad1
Merge branch 'ig/read_dask_elem' of github.com:scverse/anndata into i…
ilan-gold aa1006e
(fix): put dask import inside function
ilan-gold dda7d83
(refactor): try maybe open?
ilan-gold fd418f0
Merge branch 'main' into ig/read_dask_elem
ilan-gold 23b0bfd
Merge branch 'main' into ig/read_dask_elem
ilan-gold 97b8031
Merge branch 'main' into ig/read_dask_elem
ilan-gold 1fc4cc3
(fix): revert `encoding-version`
ilan-gold 5ca71ea
(chore): document `create_sparse_store` test function
ilan-gold 3672c18
(chore): sort indices to prevent warning
ilan-gold 33c3599
(fix): remove utility function `make_dask_array`
ilan-gold 157e710
(chore): `read_sparse_as_dask_h5` -> `read_sparse_as_dask`
ilan-gold 375000d
(feat): make params of `h5_chunks` and `stride`
ilan-gold 241904a
(chore): add distributed test
ilan-gold 42d0d22
(fix): `TypeVar` bind
ilan-gold 0bba2c0
(chore): release note
ilan-gold 0d0b43a
(chore): `0.10.8` -> `0.11.0`
ilan-gold 762d4c6
Merge branch 'main' into ig/read_dask_elem
ilan-gold c935fe0
(fix): `ruff` for default `pytest.fixture` `scope`
ilan-gold 23e0ea2
Apply suggestions from code review
ilan-gold 5b96c77
(fix): `Any` to `DaskArray`
ilan-gold 0907a4e
(fix): type `make_index` + fix undeclared
ilan-gold 20ced16
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 36ae8f2
Merge branch 'main' into ig/read_dask_elem
ilan-gold bb6607e
fix rest
flying-sheep 419691b
(fix): use `chunks` kwarg
ilan-gold a23df34
Merge branch 'main' into ig/read_dask_elem
ilan-gold fd2376a
(feat): expose `chunks` as an option to `read_elem_as_dask` via `data…
ilan-gold ae723d0
Merge branch 'ig/read_dask_elem' of github.com:scverse/anndata into i…
ilan-gold 42b1093
(fix): `test_read_dispatched_null_case` test
ilan-gold 78de057
(fix): disallowed spread syntax?
ilan-gold 717b997
(refactor): reuse `compute_chunk_layout_for_axis_shape` functionality
ilan-gold 2b86293
(fix): remove unneeded `slice` arguments
ilan-gold 8d5a9df
(fix): revert message
ilan-gold 449fc1a
(refactor): `make_index` -> `make_block_indexer`
ilan-gold 1522de3
(fix): export from `experimental`
ilan-gold 71c150d
(fix): `callback` signature for `test_read_dispatched_null_case
ilan-gold b441366
(chore): `get_elem_name` helper
ilan-gold 0307a1d
(chore): use `H5Group` consistently
ilan-gold ee075cd
(refactor): make `chunks` public facing API instead of `dataset_kwargs`
ilan-gold 89acec4
(fix): regsiter for group not array
ilan-gold 48b7630
(chore): add warning test
ilan-gold 8712582
(chore): make arg order consistent
ilan-gold cda8aa7
(feat): add `callback` typing for `read_dispatched`
ilan-gold e8f62f4
(chore): use `npt.NDArray`
ilan-gold f6e48ac
(fix): remove uneceesary union
ilan-gold 4de3246
(chore): release note
ilan-gold ba817e0
(fix); try protocol docs
ilan-gold 438d28d
(feat): create `InMemoryElem` + `DictElemType` to remove `Any`
ilan-gold 296ea3f
(chore): refactor `DictElemType` -> `InMemoryArrayOrScalarType` for r…
ilan-gold cf13a57
(fix): use `Union`
ilan-gold d02ba49
(fix): more `Union`
ilan-gold 6970a97
(refactor): `InMemoryElem` -> `InMemoryReadElem`
ilan-gold 2282351
(chore): add needed types to public export + docs fix
ilan-gold 810cd0a
Merge branch 'main' into ig/read_dask_elem
flying-sheep a996081
(chore): type `write_elem` functions
ilan-gold f6e457b
(chore): create `write_callback` protocol
ilan-gold a0b4057
Merge branch 'main' into ig/protocol_for_callback
ilan-gold 4416526
(chore): export + docs
ilan-gold fbe44f0
(fix): add string descriptions
ilan-gold 8c1f01d
(fix): try sphinx protocol doc
ilan-gold a7d412a
(fix): try ignoring exports
ilan-gold 4d56396
(fix): remap callback internal usages
ilan-gold 2012ee5
(fix): add docstring
ilan-gold f65f065
Discard changes to pyproject.toml
flying-sheep 8f6ea49
re-add dep
flying-sheep 155a21e
Fix docs
flying-sheep daae3e5
Almost works
flying-sheep c415ae4
works!
flying-sheep 00010b8
(chore): use pascal-case
ilan-gold 0bd87fc
(feat): type read/write funcs in callback
ilan-gold 5997678
(fix): use generic for `Read` as well.
ilan-gold f208332
(fix): need more aliases
ilan-gold eb69fcb
Split table, format
flying-sheep 477bbef
(refactor): move to `_types` file
ilan-gold 103cad6
Merge branch 'ig/protocol_for_callback' of github.com:scverse/anndata…
ilan-gold 8d23f6f
bump scanpydoc
flying-sheep 9b647c2
Some basic syntax fixes
flying-sheep d6d01bc
Merge branch 'ig/protocol_for_callback' into ig/read_dask_elem
ilan-gold 5ef93e1
(fix): change `Read{Callback}` type for kwargs
ilan-gold 9cfe908
(chore): test `chunks `argument
ilan-gold 99fc6db
(fix): type `read_recarray`
ilan-gold b5bccc3
(fix): `GroupyStorageType` not `StorageType`
ilan-gold e5ea2b0
(fix): little type fixes
ilan-gold 6ac72d6
(fix): clarify `H5File` typing
ilan-gold 989dc65
(fix): dask doc
ilan-gold 36b0207
(fix): dask docs
ilan-gold dadfb4d
Merge branch 'ig/protocol_for_callback' into ig/read_dask_elem
ilan-gold ca6cf66
(fix): typing
ilan-gold eabaf35
(fix): handle case when `chunks` is `None`
ilan-gold 4c398c3
(feat): add string-array reading
ilan-gold d6fc8a4
(fix): remove `string-array` because it is not tested
ilan-gold 33aebb2
(refactor): clean up tests
ilan-gold 701cd85
(fix): overfetching problem
ilan-gold 43b21a2
Fix circular import
flying-sheep 0e22449
add some typing
flying-sheep ec546f4
fix mapping types
flying-sheep 7c2e4da
Fix Read/Write
flying-sheep 1ba5b99
Fix one more
flying-sheep 49c0d49
unify names
flying-sheep 3666735
claift ReadCallback signature
flying-sheep 3a332ad
Fix type aliases
flying-sheep d0f4d13
(fix): clean up typing to use `RWAble`
ilan-gold 6e89e14
Merge branch 'main' into ig/protocol_for_callback
ilan-gold ea29cfa
(fix): use `Union`
ilan-gold f4ff236
(fix): add qualname override
ilan-gold f50b286
(fix): ignore dask and masked array
ilan-gold 712e085
(fix): ignore erroneous class warning
ilan-gold 24dd18b
(fix): upgrade `scanpydoc`
ilan-gold 79d3fdc
(fix): use `MutableMapping` instead of `dict` due to broken docstring
ilan-gold 9a2be00
Merge branch 'ig/protocol_for_callback' into ig/read_dask_elem
ilan-gold d3bcddf
Add data docs
flying-sheep 84fdc96
Revert "(fix): use `MutableMapping` instead of `dict` due to broken d…
flying-sheep 2608bc3
(fix): add clarification
ilan-gold e551e18
Simplify
flying-sheep 13e3bb1
Merge branch 'ig/protocol_for_callback' into ig/read_dask_elem
ilan-gold 2935e45
Merge branch 'main' into ig/read_dask_elem
ilan-gold bf0be15
Merge branch 'ig/read_dask_elem' of github.com:scverse/anndata into i…
ilan-gold 9d37fc8
Merge branch 'main' into ig/read_dask_elem
ilan-gold 1ffe43e
(fix): remove double `dask` intersphinx
ilan-gold f9df5bc
(fix): remove `_types.DaskArray` from type checking block
ilan-gold a85da39
(refactor): use `block_info` for resolving fetch location
ilan-gold 3bef77c
Merge branch 'ig/read_dask_elem' of github.com:scverse/anndata into i…
ilan-gold 899184f
(fix): dtype for reading
ilan-gold efb70ec
(fix): ignore import cycle problem (why??)
ilan-gold 118f43c
(fix): add issue
ilan-gold f742a0a
(fix): subclass `Reader` to remove `datasetkwargs`
ilan-gold ae68731
(fix): add message tp errpr
ilan-gold f5e7760
Update tests/test_io_elementwise.py
ilan-gold 96b13a3
(fix): correct `self.callback` check
ilan-gold 9c68e36
(fix): erroneous diffs
ilan-gold 410aeda
(fix): extra `read_elem` `dataset_kwargs`
ilan-gold 31a30c4
(fix): remove more `dataset_kwargs` nonsense
ilan-gold 80fe8cb
(chore): add docs
ilan-gold b314248
(fix): use `block_info` for dense
ilan-gold 02d4735
(fix): more erroneous diffs
ilan-gold 6e5534a
(fix): use context again
ilan-gold d26cfe8
(fix): change size by dimension in tests
ilan-gold 94e43a3
(refactor): clean up `get_elem_name`
ilan-gold 5160016
(fix): try new sphinx for error
ilan-gold 43da9a3
(fix): return type
ilan-gold 9735ced
(fix): protocol for reading
ilan-gold f1730c3
(fix): bring back ignored warning
ilan-gold 9861b56
Fix docs
flying-sheep 235096a
almost fix typing
flying-sheep dce9f07
add wrapper
flying-sheep 2725ef2
move into type checking
flying-sheep ffe89f0
(fix): small type fxes
ilan-gold 6cb231e
Merge branch 'main' into ig/read_dask_elem
ilan-gold 75a64fc
block info types
flying-sheep 3f734fe
simplify
flying-sheep c4c2356
rename
flying-sheep cc67a9b
simplify more
flying-sheep File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
from __future__ import annotations | ||
|
||
from contextlib import contextmanager | ||
from functools import partial | ||
from pathlib import Path | ||
from typing import TYPE_CHECKING | ||
|
||
import h5py | ||
import numpy as np | ||
from scipy import sparse | ||
|
||
import anndata as ad | ||
|
||
from ..._core.file_backing import filename, get_elem_name | ||
from ...compat import H5Array, H5Group, ZarrArray, ZarrGroup | ||
from .registry import _LAZY_REGISTRY, IOSpec | ||
|
||
if TYPE_CHECKING: | ||
from collections.abc import Callable, Generator, Mapping, Sequence | ||
from typing import Literal, ParamSpec, TypeVar | ||
|
||
from ..._core.sparse_dataset import CSCDataset, CSRDataset | ||
from ..._types import ArrayStorageType, StorageType | ||
from ...compat import DaskArray | ||
from .registry import DaskReader | ||
|
||
BlockInfo = Mapping[ | ||
Literal[None], | ||
dict[str, Sequence[tuple[int, int]]], | ||
] | ||
|
||
P = ParamSpec("P") | ||
R = TypeVar("R") | ||
|
||
|
||
@contextmanager | ||
def maybe_open_h5( | ||
path_or_group: Path | ZarrGroup, elem_name: str | ||
) -> Generator[StorageType, None, None]: | ||
if not isinstance(path_or_group, Path): | ||
yield path_or_group | ||
return | ||
file = h5py.File(path_or_group, "r") | ||
try: | ||
yield file[elem_name] | ||
finally: | ||
file.close() | ||
|
||
|
||
_DEFAULT_STRIDE = 1000 | ||
|
||
|
||
def compute_chunk_layout_for_axis_shape( | ||
chunk_axis_shape: int, full_axis_shape: int | ||
) -> tuple[int, ...]: | ||
n_strides, rest = np.divmod(full_axis_shape, chunk_axis_shape) | ||
chunk = (chunk_axis_shape,) * n_strides | ||
if rest > 0: | ||
chunk += (rest,) | ||
return chunk | ||
|
||
|
||
def make_dask_chunk( | ||
path_or_group: Path | ZarrGroup, | ||
elem_name: str, | ||
block_info: BlockInfo | None = None, | ||
*, | ||
wrap: Callable[[ArrayStorageType], ArrayStorageType] | ||
| Callable[[H5Group | ZarrGroup], CSRDataset | CSCDataset] = lambda g: g, | ||
): | ||
if block_info is None: | ||
msg = "Block info is required" | ||
raise ValueError(msg) | ||
# We need to open the file in each task since `dask` cannot share h5py objects when using `dask.distributed` | ||
# https://github.com/scverse/anndata/issues/1105 | ||
with maybe_open_h5(path_or_group, elem_name) as f: | ||
mtx = wrap(f) | ||
idx = tuple( | ||
slice(start, stop) for start, stop in block_info[None]["array-location"] | ||
) | ||
chunk = mtx[idx] | ||
return chunk | ||
|
||
|
||
@_LAZY_REGISTRY.register_read(H5Group, IOSpec("csc_matrix", "0.1.0")) | ||
@_LAZY_REGISTRY.register_read(H5Group, IOSpec("csr_matrix", "0.1.0")) | ||
@_LAZY_REGISTRY.register_read(ZarrGroup, IOSpec("csc_matrix", "0.1.0")) | ||
@_LAZY_REGISTRY.register_read(ZarrGroup, IOSpec("csr_matrix", "0.1.0")) | ||
def read_sparse_as_dask( | ||
elem: H5Group | ZarrGroup, | ||
*, | ||
_reader: DaskReader, | ||
chunks: tuple[int, ...] | None = None, # only tuple[int, int] is supported here | ||
) -> DaskArray: | ||
import dask.array as da | ||
|
||
path_or_group = Path(filename(elem)) if isinstance(elem, H5Group) else elem | ||
elem_name = get_elem_name(elem) | ||
shape: tuple[int, int] = tuple(elem.attrs["shape"]) | ||
dtype = elem["data"].dtype | ||
is_csc: bool = elem.attrs["encoding-type"] == "csc_matrix" | ||
|
||
stride: int = _DEFAULT_STRIDE | ||
major_dim, minor_dim = (1, 0) if is_csc else (0, 1) | ||
if chunks is not None: | ||
if len(chunks) != 2: | ||
raise ValueError("`chunks` must be a tuple of two integers") | ||
flying-sheep marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if chunks[minor_dim] != shape[minor_dim]: | ||
raise ValueError( | ||
"Only the major axis can be chunked. " | ||
f"Try setting chunks to {((-1, _DEFAULT_STRIDE) if is_csc else (_DEFAULT_STRIDE, -1))}" | ||
) | ||
stride = chunks[major_dim] | ||
|
||
shape_minor, shape_major = shape if is_csc else shape[::-1] | ||
chunks_major = compute_chunk_layout_for_axis_shape(stride, shape_major) | ||
chunks_minor = (shape_minor,) | ||
chunk_layout = ( | ||
(chunks_minor, chunks_major) if is_csc else (chunks_major, chunks_minor) | ||
) | ||
memory_format = sparse.csc_matrix if is_csc else sparse.csr_matrix | ||
make_chunk = partial( | ||
make_dask_chunk, path_or_group, elem_name, wrap=ad.experimental.sparse_dataset | ||
) | ||
da_mtx = da.map_blocks( | ||
make_chunk, | ||
dtype=dtype, | ||
chunks=chunk_layout, | ||
meta=memory_format((0, 0), dtype=dtype), | ||
) | ||
return da_mtx | ||
|
||
|
||
@_LAZY_REGISTRY.register_read(H5Array, IOSpec("array", "0.2.0")) | ||
def read_h5_array( | ||
elem: H5Array, *, _reader: DaskReader, chunks: tuple[int, ...] | None = None | ||
) -> DaskArray: | ||
import dask.array as da | ||
|
||
path = Path(elem.file.filename) | ||
elem_name: str = elem.name | ||
shape = tuple(elem.shape) | ||
dtype = elem.dtype | ||
chunks: tuple[int, ...] = ( | ||
chunks if chunks is not None else (_DEFAULT_STRIDE,) * len(shape) | ||
) | ||
|
||
chunk_layout = tuple( | ||
compute_chunk_layout_for_axis_shape(chunks[i], shape[i]) | ||
for i in range(len(shape)) | ||
) | ||
|
||
make_chunk = partial(make_dask_chunk, path, elem_name) | ||
return da.map_blocks(make_chunk, dtype=dtype, chunks=chunk_layout) | ||
|
||
|
||
@_LAZY_REGISTRY.register_read(ZarrArray, IOSpec("array", "0.2.0")) | ||
def read_zarr_array( | ||
elem: ZarrArray, *, _reader: DaskReader, chunks: tuple[int, ...] | None = None | ||
) -> DaskArray: | ||
chunks: tuple[int, ...] = chunks if chunks is not None else elem.chunks | ||
import dask.array as da | ||
|
||
return da.from_zarr(elem, chunks=chunks) |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I just spent a long time trying to debug why a zarr store was being overfetched, and it turns out it wasn't. This number is just crazy high. So I'm not entirely sure what the best way forward is. This number is (probably) good for bulk tasks like filtering or PCA but absolutely useless for random access. For example, on a CSC matrix, you end up fetching 5% of the dataset if there are 20,000 genes. So we need to be VERY careful here. Not sure what the answer is, actually. I don't think exposing
stride
is great because it's specific to csr/csc and also a subset of the functionality ofchunks
, but the flipside is that resolving the size of a sparse matrix requires looking atattrs
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exposing a per-matrix (i.e.,
layers
) chunking is tough with a monolithic API #1247 but a monolithic API (i.e.,read_backed
) makes it easier to deal with telling people how to do things, which is currently very difficult. I'll need to think about that PR.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But with that PR, it doesn't really make sense to expose a method called
read_elem_as_xarray
because that's way too specific - only our dataframes work there. So I'm not sure what the answer is. Perhaps changing this toread_lazy
or something is not a bad idea.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is probably a fine value for now. And agree on not exposing
stride
. Also don't know what the answer is.