Skip to content

Commit

Permalink
Adds sparse sparse to aggregate (#160)
Browse files Browse the repository at this point in the history
* adds kernel and first implementation

* sparse var enabled

* switch to direct csr construction

* updates to bincount

* adds test

* adds release note
  • Loading branch information
Intron7 authored Apr 12, 2024
1 parent a9074f4 commit e0dcc83
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 52 deletions.
2 changes: 2 additions & 0 deletions docs/release-notes/0.10.2.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

```{rubric} Features
```
* adds the option to return the results for `get.aggregate` as sparse matrices if the input is sparse {pr}`160` {smaller}`S Dicks`


```{rubric} Performance
```
Expand Down
246 changes: 198 additions & 48 deletions src/rapids_singlecell/get/_aggregated.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
from typing import TYPE_CHECKING, Literal, Union, get_args

import cupy as cp
import numpy as np
from anndata import AnnData
from cupyx.scipy import sparse as cp_sparse
from scanpy._utils import _resolve_axis
from scanpy.get._aggregated import _combine_categories, sparse_indicator
from scanpy.get._aggregated import _combine_categories

from rapids_singlecell.get import _check_mask
from rapids_singlecell.preprocessing._utils import _check_gpu_X

if TYPE_CHECKING:
from collections.abc import Collection, Iterable

import numpy as np
import pandas as pd
from numpy.typing import NDArray

Expand Down Expand Up @@ -45,15 +45,14 @@ def __init__(
*,
mask: NDArray[np.bool_] | None = None,
) -> None:
self.indicator_matrix = cp_sparse.coo_matrix(
sparse_indicator(groupby, mask=mask)
)
self.mask = mask
self.groupby = cp.array(groupby.codes, dtype=cp.int32)
self.n_cells = cp.array(np.bincount(groupby.codes), dtype=cp.float64).reshape(
-1, 1
)
self.data = data

groupby: cp.ndarray
indicator_matrix: cp_sparse.coo_matrix
data: Array

def _get_mask(self):
Expand All @@ -73,23 +72,14 @@ def count_mean_var_sparse(self, dof: int = 1):

if self.data.format == "csc":
self.data = self.data.tocsr()
means = cp.zeros(
(self.indicator_matrix.shape[0], self.data.shape[1]), dtype=cp.float64
)
var = cp.zeros(
(self.indicator_matrix.shape[0], self.data.shape[1]), dtype=cp.float64
)
sums = cp.zeros(
(self.indicator_matrix.shape[0], self.data.shape[1]), dtype=cp.float64
)
counts = cp.zeros(
(self.indicator_matrix.shape[0], self.data.shape[1]), dtype=cp.int32
)
means = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64)
var = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64)
sums = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64)
counts = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.int32)
block = (128,)
grid = (self.data.shape[0],)
aggr_kernel = _get_aggr_sparse_kernel(self.data.dtype)
mask = self._get_mask()
n_cells = self.indicator_matrix.sum(axis=1).astype(cp.float64)
aggr_kernel(
grid,
block,
Expand All @@ -102,43 +92,194 @@ def count_mean_var_sparse(self, dof: int = 1):
means,
var,
self.groupby,
n_cells,
self.n_cells,
mask,
self.data.shape[0],
self.data.shape[1],
),
)

var = var - cp.power(means, 2)
var *= n_cells / (n_cells - dof)
return sums, counts, means, var
var *= self.n_cells / (self.n_cells - dof)

def count_mean_var_dense(self, dof: int = 1):
results = {"sum": sums, "count_nonzero": counts, "mean": means, "var": var}

return results

def count_mean_var_sparse_sparse(self, funcs, dof: int = 1):
"""
This function is used to calculate the sum, mean, and variance of the sparse data matrix.
It uses a custom cuda-kernel to perform the aggregation.
"""

assert dof >= 0
from ._kernels._aggr_kernels import _get_aggr_dense_kernel

means = cp.zeros(
(self.indicator_matrix.shape[0], self.data.shape[1]), dtype=cp.float64
from ._kernels._aggr_kernels import (
_get_aggr_sparse_sparse_kernel,
_get_sparse_var_kernel,
)
var = cp.zeros(
(self.indicator_matrix.shape[0], self.data.shape[1]), dtype=cp.float64
)
sums = cp.zeros(
(self.indicator_matrix.shape[0], self.data.shape[1]), dtype=cp.float64

if self.data.format == "csc":
self.data = self.data.tocsr()

src_row = cp.zeros(self.data.nnz, dtype=cp.int32)
src_col = cp.zeros(self.data.nnz, dtype=cp.int32)
src_data = cp.zeros(self.data.nnz, dtype=cp.float64)
block = (128,)
grid = (self.data.shape[0],)
aggr_kernel = _get_aggr_sparse_sparse_kernel(self.data.dtype)
mask = self._get_mask()
aggr_kernel(
grid,
block,
(
self.data.indptr,
self.data.indices,
self.data.data,
src_row,
src_col,
src_data,
self.groupby,
mask,
self.data.shape[0],
),
)
counts = cp.zeros(
(self.indicator_matrix.shape[0], self.data.shape[1]), dtype=cp.int32

keys = cp.stack([src_col, src_row])
order = cp.lexsort(keys)

src_row = src_row[order]
src_col = src_col[order]
src_data = src_data[order]

_sum_duplicates_diff = cp.ElementwiseKernel(
"raw T row, raw T col",
"T diff",
"""
T diff_out = 1;
if (i == 0 || row[i - 1] == row[i] && col[i - 1] == col[i]) {
diff_out = 0;
}
diff = diff_out;
""",
"cupyx_scipy_sparse_coo_sum_duplicates_diff",
)

diff = _sum_duplicates_diff(src_row, src_col, size=src_row.size)
index = cp.cumsum(diff, dtype=cp.int32)
nnz = index[-1].get()

# calculate the rows and indices
rows = cp.zeros(nnz + 1, dtype=cp.int32)
indices = cp.zeros(nnz + 1, dtype=cp.int32)

cp.ElementwiseKernel(
"int32 src_row, int32 src_col, int32 index",
"raw int32 rows, raw int32 indices",
"""
rows[index] = src_row;
indices[index] = src_col;
""",
"cupyx_scipy_sparse_coo_sum_duplicates_assign",
)(src_row, src_col, index, rows, indices)

# Calculate the indptr
transitions = cp.where(cp.diff(rows) > 0)[0]
indptr = cp.zeros(9, dtype=cp.int32)
indptr[1:-1] = transitions + 1
indptr[-1] = nnz + 1

# Calculate the the sparse matrices
results = {}

if "sum" in funcs:
sums = cp.zeros(nnz + 1, dtype=cp.float64)
cp.ElementwiseKernel(
"float64 src, int32 index",
"raw float64 sums",
"""
atomicAdd(&sums[index], src);
""",
"create_sum_sparse_matrix",
)(src_data, index, sums)

results["sum"] = cp_sparse.csr_matrix(
(sums, indices, indptr),
shape=(self.n_cells.shape[0], self.data.shape[1]),
)
if "var" in funcs or "mean" in funcs:
means = cp.zeros(nnz + 1, dtype=cp.float64)
var = cp.zeros(nnz + 1, dtype=cp.float64)
cp.ElementwiseKernel(
"float64 src, int32 rows, int32 index, raw float64 ncells",
"raw float64 means, raw float64 var",
"""
atomicAdd(&means[index], src / ncells[rows]);
atomicAdd(&var[index], (src * src) / ncells[rows]);
""",
"create_mean_var_sparse_matrix",
)(src_data, src_row, index, self.n_cells, means, var)
if "mean" in funcs:
results["mean"] = cp_sparse.csr_matrix(
(means, indices, indptr),
shape=(self.n_cells.shape[0], self.data.shape[1]),
)
if "var" in funcs:
var = cp_sparse.csr_matrix(
(var, indices, indptr),
shape=(self.n_cells.shape[0], self.data.shape[1]),
)

sparse_var = _get_sparse_var_kernel(var.dtype)
sparse_var(
grid,
block,
(
var.indptr,
var.indices,
var.data,
means,
self.n_cells,
dof,
var.shape[0],
),
)
results["var"] = var
if "count_nonzero" in funcs:
counts = cp.zeros(nnz + 1, dtype=cp.float32)
cp.ElementwiseKernel(
"float64 src,int32 index",
"raw float32 counts",
"""
if (src != 0){
atomicAdd(&counts[index], 1.0f);
}
""",
"create_count_nonzero_sparse_matrix",
)(src_data, index, counts)
results["count_nonzero"] = cp_sparse.csr_matrix(
(counts, indices, indptr),
shape=(self.n_cells.shape[0], self.data.shape[1]),
)

return results

def count_mean_var_dense(self, dof: int = 1):
"""
This function is used to calculate the sum, mean, and variance of the sparse data matrix.
It uses a custom cuda-kernel to perform the aggregation.
"""

assert dof >= 0
from ._kernels._aggr_kernels import _get_aggr_dense_kernel

means = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64)
var = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64)
sums = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64)
counts = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.int32)
block = (128,)
grid = (self.data.shape[0],)
aggr_kernel = _get_aggr_dense_kernel(self.data.dtype)
mask = self._get_mask()
n_cells = self.indicator_matrix.sum(axis=1).astype(cp.float64)
aggr_kernel(
grid,
block,
Expand All @@ -149,16 +290,19 @@ def count_mean_var_dense(self, dof: int = 1):
means,
var,
self.groupby,
n_cells,
self.n_cells,
mask,
self.data.shape[0],
self.data.shape[1],
),
)

var = var - cp.power(means, 2)
var *= n_cells / (n_cells - dof)
return sums, counts, means, var
var *= self.n_cells / (self.n_cells - dof)

results = {"sum": sums, "count_nonzero": counts, "mean": means, "var": var}

return results


def aggregate(
Expand All @@ -172,6 +316,7 @@ def aggregate(
layer: str | None = None,
obsm: str | None = None,
varm: str | None = None,
return_sparse: bool = False,
) -> AnnData:
"""\
Aggregate data matrix based on some categorical grouping.
Expand Down Expand Up @@ -206,6 +351,8 @@ def aggregate(
If not None, key for aggregation data.
varm
If not None, key for aggregation data.
return_sparse
Whether to return a sparse matrix. Only works for sparse input data.
Returns
-------
Expand Down Expand Up @@ -271,24 +418,27 @@ def aggregate(

groupby = Aggregate(groupby=categorical, data=data, mask=mask)

if isinstance(data, cp.ndarray):
sums_, counts_, means_, vars_ = groupby.count_mean_var_dense(dof)
else:
sums_, counts_, means_, vars_ = groupby.count_mean_var_sparse(dof)
layers = {}

funcs = set([func] if isinstance(func, str) else func)
if unknown := funcs - set(get_args(AggType)):
raise ValueError(f"func {unknown} is not one of {get_args(AggType)}")

if isinstance(data, cp.ndarray):
result = groupby.count_mean_var_dense(dof)
else:
if return_sparse:
result = groupby.count_mean_var_sparse_sparse(funcs, dof)
else:
result = groupby.count_mean_var_sparse(dof)
layers = {}

if "sum" in funcs:
layers["sum"] = sums_
layers["sum"] = result["sum"]
if "mean" in funcs:
layers["mean"] = means_
layers["mean"] = result["mean"]
if "count_nonzero" in funcs:
layers["count_nonzero"] = counts_
layers["count_nonzero"] = result["count_nonzero"]
if "var" in funcs:
layers["var"] = vars_
layers["var"] = result["var"]

result = AnnData(
layers=layers,
Expand Down
Loading

0 comments on commit e0dcc83

Please sign in to comment.