Skip to content

Commit

Permalink
[python] Ingest/outgest round-trip improvements (#2804) (#2850)
Browse files Browse the repository at this point in the history
* anndata-based dataframe round-trip tests

* factor+document {in,out}gest DF logic

* anndata- and df-based dataframe round-trip tests

Co-authored-by: Ryan Williams <[email protected]>
  • Loading branch information
github-actions[bot] and ryan-williams committed Aug 6, 2024
1 parent 8779032 commit faf06e2
Show file tree
Hide file tree
Showing 4 changed files with 383 additions and 80 deletions.
97 changes: 82 additions & 15 deletions apis/python/src/tiledbsoma/io/_registration/signatures.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
from typing import Dict, Optional
from typing import Dict, Optional, Union

import anndata as ad
import attrs
Expand Down Expand Up @@ -69,25 +69,92 @@ def _string_dict_from_pandas_dataframe(
"""

df = df.head(1).copy() # since reset_index can be expensive on full data
if df.index.name is None or df.index.name == "index":
df.reset_index(inplace=True)
if default_index_name in df:
if "index" in df:
# Avoid the warning:
# "A value is trying to be set on a copy of a slice from a DataFrame"
# which would occur if we did:
# df.drop(columns=["index"], inplace=True)
df = df.drop(columns=["index"])
else:
df.rename(columns={"index": default_index_name}, inplace=True)
else:
df.reset_index(inplace=True)

_prepare_df_for_ingest(df, default_index_name)
arrow_table = df_to_arrow(df)
arrow_schema = arrow_table.schema.remove_metadata()
return _string_dict_from_arrow_schema(arrow_schema)


# Metadata indicating a SOMA DataFrame's original index column name, serialized as a JSON string or `null`.
# SOMA DataFrames are always given a `soma_joinid` index, but we want to be able to outgest a `pd.DataFrame` that is
# identical to the one we ingested, so we store an "original index name" in the DataFrame's metadata.
OriginalIndexMetadata = Union[None, str]


def _prepare_df_for_ingest(
df: pd.DataFrame, id_column_name: Optional[str]
) -> OriginalIndexMetadata:
"""Prepare a `pd.DataFrame` for persisting as a SOMA DataFrame: demote its index to a column (to make way for a
required `soma_joinid` index), and compute and return metadata for restoring the index column and name later (on
outgest).
If `df.index` has a name (and it's not "index", which is taken to be a default/unset value):
- `df.index.name` takes precedence over the `id_column_name` arg: the index will be reset to an eponymous column.
- That original `df.index.name` will be logged as `OriginalIndexMetadata` (for promotion back to index on outgest).
In this case, the overall round trip is basically just:
- `reset_index` on ingest (demote index to eponymous column).
- `set_index` on outgest (restore column to index, with its original name).
Otherwise (index name is `None` or "index"):
- A fallback name (`id_column_name` if provided, "index" otherwise) is used for the column that the index becomes.
- The returned `OriginalIndexMetadata` will be `None`.
There are several edge cases (detailed below and in `test_dataframe_io_roundtrips.py` and
https://github.com/single-cell-data/TileDB-SOMA/issues/2829) where the index, its name, or a specific column are not
restored properly on outgest. For now, all such behavior is preserved, for backwards compatibility, but we should
look into ways of improving these "round-trip mutation" cases. See
https://github.com/single-cell-data/TileDB-SOMA/issues/2829 for more info.
"""
use_existing_index = df.index.name is not None and df.index.name != "index"

original_index_name = None
if use_existing_index:
original_index_name = df.index.name

df.reset_index(inplace=True)
if id_column_name is not None:
if id_column_name in df:
if "index" in df:
# The assumption here is that the column named "index" was previously an unnamed `df.index`, and
# `id_column_name` was already a column (per the grandparent `if` above). In this case, we drop the
# original unnamed `df.index`.
# TODO: This prevents outgesting the same DataFrame we ingested. We should fix it; see
# https://github.com/single-cell-data/TileDB-SOMA/issues/2829.
#
# Also note: if the DataFrame already had columns named "index" and `id_column_name`, the original
# `df.index` will have been "reset" to a column named `level_0`, and we end up just dropping the column
# named "index" here.
#
# Another version of this occurs when the original DataFrame has `df.index.name == id_column_name` and a
# column named "index". In this case, the index will have been "reset" to a column named
# `id_column_name` above, which then satisfies the grendparent `if`'s predicate, and causes us to drop
# the column named "index" here.
df.drop(columns=["index"], inplace=True)
else:
# If `id_column_name` was passed, and is not already a column in the DataFrame, we assume the original index
# was "reset" to a column named "index" (by `reset_index` above), and we rename that column to
# `id_column_name`, so that `id_column_name` matches the name of a column representing the original
# DataFrame's index.
#
# NOTE: the assumption above can break in a few ways:
# 1. The original DataFrame index has a name other than "index" or `id_column_name`…
# a. and there is a column named "index" ⇒ that column will be renamed to `id_column_name`
# b. and there is no column named "index" ⇒ the rename below is a no-op (outgest currently restores the
# original DataFrame in this case)
# 2. The original DataFrame has a column named "index":
# - That column will become `df.index` on outgest, and acquire the original `df.index.name` as its name.
# - The original index will end up as a column, on outgest:
# - If it had a name, the column will have that name.
# - Otherwise, it will end up as a column named e.g. `level_0` (or `level_1`, if a column named
# `level_0` already exists, etc.)
#
# See https://github.com/single-cell-data/TileDB-SOMA/issues/2829 for more info.
df.rename(columns={"index": id_column_name}, inplace=True)

return original_index_name


@attrs.define(kw_only=True)
class Signature:
"""
Expand Down
21 changes: 5 additions & 16 deletions apis/python/src/tiledbsoma/io/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
get_dataframe_values,
signatures,
)
from ._registration.signatures import OriginalIndexMetadata, _prepare_df_for_ingest
from ._util import read_h5ad

_NDArr = TypeVar("_NDArr", bound=NDArray)
Expand Down Expand Up @@ -1154,20 +1155,8 @@ def _write_dataframe(
The caller should have copied anything pointing to a user-provided
adata.obs, adata.var, etc.
"""
original_index_name = None
if df.index is not None and df.index.name is not None and df.index.name != "index":
original_index_name = df.index.name

df.reset_index(inplace=True)
if id_column_name is not None:
if id_column_name in df:
if "index" in df:
df.drop(columns=["index"], inplace=True)
else:
df.rename(columns={"index": id_column_name}, inplace=True)

original_index_metadata = _prepare_df_for_ingest(df, id_column_name)
df[SOMA_JOINID] = np.asarray(axis_mapping.data, dtype=np.int64)

df.set_index(SOMA_JOINID, inplace=True)

return _write_dataframe_impl(
Expand All @@ -1176,7 +1165,7 @@ def _write_dataframe(
id_column_name,
ingestion_params=ingestion_params,
additional_metadata=additional_metadata,
original_index_name=original_index_name,
original_index_metadata=original_index_metadata,
platform_config=platform_config,
context=context,
)
Expand All @@ -1189,7 +1178,7 @@ def _write_dataframe_impl(
*,
ingestion_params: IngestionParams,
additional_metadata: AdditionalMetadata = None,
original_index_name: Optional[str] = None,
original_index_metadata: OriginalIndexMetadata = None,
platform_config: Optional[PlatformConfig] = None,
context: Optional[SOMATileDBContext] = None,
) -> DataFrame:
Expand Down Expand Up @@ -1250,7 +1239,7 @@ def _write_dataframe_impl(
# Save the original index name for outgest. We use JSON for elegant indication of index name
# being None (in Python anyway).
soma_df.metadata[_DATAFRAME_ORIGINAL_INDEX_NAME_JSON] = json.dumps(
original_index_name
original_index_metadata
)
add_metadata(soma_df, additional_metadata)

Expand Down
109 changes: 60 additions & 49 deletions apis/python/src/tiledbsoma/io/outgest.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,64 @@ def _extract_X_key(
return data


def _read_dataframe(
df: DataFrame, default_index_name: Optional[str], fallback_index_name: str
) -> pd.DataFrame:
"""Outgest a SOMA DataFrame to Pandas, including restoring the original index{,.name}.
An `OriginalIndexMetadata` attached to the DataFrame, if present, contains a string (or `null`), indicating a
SOMADataFrame column which should be restored as the pd.DataFrame index.
`default_index_name`, if provided, overrides the stored `OriginalIndexMetadata`; a column with this name will be
verified to exist, and set as index of the returned `pd.DataFrame`.
If neither `default_index_name` nor `OriginalIndexMetadata` are provided, the `fallback_index_name` will be used.
`to_anndata` passes "obs_id" / "var_id" for obs/var, matching `from_anndata`'s default `{obs,var}_id_name` values.
NOTE: several edge cases result in the outgested DataFrame not matching the original DataFrame; see
`test_dataframe_io_roundtrips.py` / https://github.com/single-cell-data/TileDB-SOMA/issues/2829.
"""
# Read and validate the "original index metadata" stored alongside this SOMA DataFrame.
original_index_metadata = json.loads(
df.metadata.get(_DATAFRAME_ORIGINAL_INDEX_NAME_JSON, "null")
)
if not (
original_index_metadata is None or isinstance(original_index_metadata, str)
):
raise ValueError(
f"{df.uri}: invalid {_DATAFRAME_ORIGINAL_INDEX_NAME_JSON} metadata: {original_index_metadata}"
)

pdf: pd.DataFrame = df.read().concat().to_pandas()
# SOMA DataFrames always have a `soma_joinid` added, as part of the ingest process, which we remove on outgest.
pdf.drop(columns=SOMA_JOINID, inplace=True)

default_index_name = default_index_name or original_index_metadata
if default_index_name is not None:
# One or both of the following was true:
# - Original DataFrame had an index name (other than "index") ⇒ that name was written as `OriginalIndexMetadata`
# - `default_index_name` was provided (e.g. `{obs,var}_id_name` args to `to_anndata`)
#
# ⇒ Verify a column with that name exists, and set it as index (keeping its name).
if default_index_name not in pdf.keys():
raise ValueError(
f"Requested ID column name {default_index_name} not found in input: {pdf.keys()}"
)
pdf.set_index(default_index_name, inplace=True)
else:
# The assumption here is that the original index was unnamed, and was given a "fallback name" (e.g. "obs_id",
# "var_id") during ingest that matches the `fallback_index_name` arg here. In this case, we restore that column
# as index, and remove the name.
#
# NOTE: several edge cases result in the outgested DF not matching the original DF; see
# https://github.com/single-cell-data/TileDB-SOMA/issues/2829.
if fallback_index_name in pdf:
pdf.set_index(fallback_index_name, inplace=True)
pdf.index.name = None

return pdf


# ----------------------------------------------------------------
def to_anndata(
experiment: Experiment,
Expand Down Expand Up @@ -197,55 +255,8 @@ def to_anndata(
# * Else if the names used at ingest time are available, use them.
# * Else use the default/fallback name.

# Restore the original index name for outgest. We use JSON for elegant indication of index
# name being None (in Python anyway). It may be 'null' which maps to Pyhton None.
obs_id_name = obs_id_name or json.loads(
experiment.obs.metadata.get(_DATAFRAME_ORIGINAL_INDEX_NAME_JSON, '"obs_id"')
)
var_id_name = var_id_name or json.loads(
measurement.var.metadata.get(_DATAFRAME_ORIGINAL_INDEX_NAME_JSON, '"var_id"')
)

obs_df = experiment.obs.read().concat().to_pandas()
obs_df.drop([SOMA_JOINID], axis=1, inplace=True)
if obs_id_name is not None:
if obs_id_name not in obs_df.keys():
raise ValueError(
f"requested obs IDs column name {obs_id_name} not found in input: {obs_df.keys()}"
)
obs_df.set_index(obs_id_name, inplace=True)
else:
# There are multiple cases to be handled here, all tested in CI.
# This else-block handle this one:
#
# orig.ident nCount_RNA ...
# ATGCCAGAACGACT 0 70.0 ...
# CATGGCCTGTGCAT 0 85.0 ...
# GAACCTGATGAACC 0 87.0 ...
#
# Namely:
# * The input AnnData dataframe had an index with no name
# * In the SOMA experiment we name that column "obs_id" and our index is "soma_joinid"
# * On outgest we drop "soma_joinid"
# * The thing we named "obs_id" needs to become the index again ...
# * ... and it needs to be nameless.
if "obs_id" in obs_df:
obs_df.set_index("obs_id", inplace=True)
obs_df.index.name = None

var_df = measurement.var.read().concat().to_pandas()

var_df.drop([SOMA_JOINID], axis=1, inplace=True)
if var_id_name is not None:
if var_id_name not in var_df.keys():
raise ValueError(
f"requested var IDs column name {var_id_name} not found in input: {var_df.keys()}"
)
var_df.set_index(var_id_name, inplace=True)
else:
if "var_id" in var_df:
var_df.set_index("var_id", inplace=True)
var_df.index.name = None
obs_df = _read_dataframe(experiment.obs, obs_id_name, "obs_id")
var_df = _read_dataframe(measurement.var, var_id_name, "var_id")

nobs = len(obs_df.index)
nvar = len(var_df.index)
Expand Down
Loading

0 comments on commit faf06e2

Please sign in to comment.