From faf06e25fb2f4fcdf38c6788c6ab9262d8efc23a Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 6 Aug 2024 17:42:28 -0400 Subject: [PATCH] [python] Ingest/outgest round-trip improvements (#2804) (#2850) * anndata-based dataframe round-trip tests * factor+document {in,out}gest DF logic * anndata- and df-based dataframe round-trip tests Co-authored-by: Ryan Williams --- .../tiledbsoma/io/_registration/signatures.py | 97 +++++-- apis/python/src/tiledbsoma/io/ingest.py | 21 +- apis/python/src/tiledbsoma/io/outgest.py | 109 ++++---- .../tests/test_dataframe_io_roundtrips.py | 236 ++++++++++++++++++ 4 files changed, 383 insertions(+), 80 deletions(-) create mode 100644 apis/python/tests/test_dataframe_io_roundtrips.py diff --git a/apis/python/src/tiledbsoma/io/_registration/signatures.py b/apis/python/src/tiledbsoma/io/_registration/signatures.py index 3afc7982b..9caf394d1 100644 --- a/apis/python/src/tiledbsoma/io/_registration/signatures.py +++ b/apis/python/src/tiledbsoma/io/_registration/signatures.py @@ -1,5 +1,5 @@ import json -from typing import Dict, Optional +from typing import Dict, Optional, Union import anndata as ad import attrs @@ -69,25 +69,92 @@ def _string_dict_from_pandas_dataframe( """ df = df.head(1).copy() # since reset_index can be expensive on full data - if df.index.name is None or df.index.name == "index": - df.reset_index(inplace=True) - if default_index_name in df: - if "index" in df: - # Avoid the warning: - # "A value is trying to be set on a copy of a slice from a DataFrame" - # which would occur if we did: - # df.drop(columns=["index"], inplace=True) - df = df.drop(columns=["index"]) - else: - df.rename(columns={"index": default_index_name}, inplace=True) - else: - df.reset_index(inplace=True) - + _prepare_df_for_ingest(df, default_index_name) arrow_table = df_to_arrow(df) arrow_schema = arrow_table.schema.remove_metadata() return _string_dict_from_arrow_schema(arrow_schema) +# Metadata indicating a SOMA DataFrame's original index column name, serialized as a JSON string or `null`. +# SOMA DataFrames are always given a `soma_joinid` index, but we want to be able to outgest a `pd.DataFrame` that is +# identical to the one we ingested, so we store an "original index name" in the DataFrame's metadata. +OriginalIndexMetadata = Union[None, str] + + +def _prepare_df_for_ingest( + df: pd.DataFrame, id_column_name: Optional[str] +) -> OriginalIndexMetadata: + """Prepare a `pd.DataFrame` for persisting as a SOMA DataFrame: demote its index to a column (to make way for a + required `soma_joinid` index), and compute and return metadata for restoring the index column and name later (on + outgest). + + If `df.index` has a name (and it's not "index", which is taken to be a default/unset value): + - `df.index.name` takes precedence over the `id_column_name` arg: the index will be reset to an eponymous column. + - That original `df.index.name` will be logged as `OriginalIndexMetadata` (for promotion back to index on outgest). + + In this case, the overall round trip is basically just: + - `reset_index` on ingest (demote index to eponymous column). + - `set_index` on outgest (restore column to index, with its original name). + + Otherwise (index name is `None` or "index"): + - A fallback name (`id_column_name` if provided, "index" otherwise) is used for the column that the index becomes. + - The returned `OriginalIndexMetadata` will be `None`. + + There are several edge cases (detailed below and in `test_dataframe_io_roundtrips.py` and + https://github.com/single-cell-data/TileDB-SOMA/issues/2829) where the index, its name, or a specific column are not + restored properly on outgest. For now, all such behavior is preserved, for backwards compatibility, but we should + look into ways of improving these "round-trip mutation" cases. See + https://github.com/single-cell-data/TileDB-SOMA/issues/2829 for more info. + """ + use_existing_index = df.index.name is not None and df.index.name != "index" + + original_index_name = None + if use_existing_index: + original_index_name = df.index.name + + df.reset_index(inplace=True) + if id_column_name is not None: + if id_column_name in df: + if "index" in df: + # The assumption here is that the column named "index" was previously an unnamed `df.index`, and + # `id_column_name` was already a column (per the grandparent `if` above). In this case, we drop the + # original unnamed `df.index`. + # TODO: This prevents outgesting the same DataFrame we ingested. We should fix it; see + # https://github.com/single-cell-data/TileDB-SOMA/issues/2829. + # + # Also note: if the DataFrame already had columns named "index" and `id_column_name`, the original + # `df.index` will have been "reset" to a column named `level_0`, and we end up just dropping the column + # named "index" here. + # + # Another version of this occurs when the original DataFrame has `df.index.name == id_column_name` and a + # column named "index". In this case, the index will have been "reset" to a column named + # `id_column_name` above, which then satisfies the grendparent `if`'s predicate, and causes us to drop + # the column named "index" here. + df.drop(columns=["index"], inplace=True) + else: + # If `id_column_name` was passed, and is not already a column in the DataFrame, we assume the original index + # was "reset" to a column named "index" (by `reset_index` above), and we rename that column to + # `id_column_name`, so that `id_column_name` matches the name of a column representing the original + # DataFrame's index. + # + # NOTE: the assumption above can break in a few ways: + # 1. The original DataFrame index has a name other than "index" or `id_column_name`… + # a. and there is a column named "index" ⇒ that column will be renamed to `id_column_name` + # b. and there is no column named "index" ⇒ the rename below is a no-op (outgest currently restores the + # original DataFrame in this case) + # 2. The original DataFrame has a column named "index": + # - That column will become `df.index` on outgest, and acquire the original `df.index.name` as its name. + # - The original index will end up as a column, on outgest: + # - If it had a name, the column will have that name. + # - Otherwise, it will end up as a column named e.g. `level_0` (or `level_1`, if a column named + # `level_0` already exists, etc.) + # + # See https://github.com/single-cell-data/TileDB-SOMA/issues/2829 for more info. + df.rename(columns={"index": id_column_name}, inplace=True) + + return original_index_name + + @attrs.define(kw_only=True) class Signature: """ diff --git a/apis/python/src/tiledbsoma/io/ingest.py b/apis/python/src/tiledbsoma/io/ingest.py index 5ae56964f..108a529ae 100644 --- a/apis/python/src/tiledbsoma/io/ingest.py +++ b/apis/python/src/tiledbsoma/io/ingest.py @@ -101,6 +101,7 @@ get_dataframe_values, signatures, ) +from ._registration.signatures import OriginalIndexMetadata, _prepare_df_for_ingest from ._util import read_h5ad _NDArr = TypeVar("_NDArr", bound=NDArray) @@ -1154,20 +1155,8 @@ def _write_dataframe( The caller should have copied anything pointing to a user-provided adata.obs, adata.var, etc. """ - original_index_name = None - if df.index is not None and df.index.name is not None and df.index.name != "index": - original_index_name = df.index.name - - df.reset_index(inplace=True) - if id_column_name is not None: - if id_column_name in df: - if "index" in df: - df.drop(columns=["index"], inplace=True) - else: - df.rename(columns={"index": id_column_name}, inplace=True) - + original_index_metadata = _prepare_df_for_ingest(df, id_column_name) df[SOMA_JOINID] = np.asarray(axis_mapping.data, dtype=np.int64) - df.set_index(SOMA_JOINID, inplace=True) return _write_dataframe_impl( @@ -1176,7 +1165,7 @@ def _write_dataframe( id_column_name, ingestion_params=ingestion_params, additional_metadata=additional_metadata, - original_index_name=original_index_name, + original_index_metadata=original_index_metadata, platform_config=platform_config, context=context, ) @@ -1189,7 +1178,7 @@ def _write_dataframe_impl( *, ingestion_params: IngestionParams, additional_metadata: AdditionalMetadata = None, - original_index_name: Optional[str] = None, + original_index_metadata: OriginalIndexMetadata = None, platform_config: Optional[PlatformConfig] = None, context: Optional[SOMATileDBContext] = None, ) -> DataFrame: @@ -1250,7 +1239,7 @@ def _write_dataframe_impl( # Save the original index name for outgest. We use JSON for elegant indication of index name # being None (in Python anyway). soma_df.metadata[_DATAFRAME_ORIGINAL_INDEX_NAME_JSON] = json.dumps( - original_index_name + original_index_metadata ) add_metadata(soma_df, additional_metadata) diff --git a/apis/python/src/tiledbsoma/io/outgest.py b/apis/python/src/tiledbsoma/io/outgest.py index 600c9f5d4..1462ed57a 100644 --- a/apis/python/src/tiledbsoma/io/outgest.py +++ b/apis/python/src/tiledbsoma/io/outgest.py @@ -124,6 +124,64 @@ def _extract_X_key( return data +def _read_dataframe( + df: DataFrame, default_index_name: Optional[str], fallback_index_name: str +) -> pd.DataFrame: + """Outgest a SOMA DataFrame to Pandas, including restoring the original index{,.name}. + + An `OriginalIndexMetadata` attached to the DataFrame, if present, contains a string (or `null`), indicating a + SOMADataFrame column which should be restored as the pd.DataFrame index. + + `default_index_name`, if provided, overrides the stored `OriginalIndexMetadata`; a column with this name will be + verified to exist, and set as index of the returned `pd.DataFrame`. + + If neither `default_index_name` nor `OriginalIndexMetadata` are provided, the `fallback_index_name` will be used. + `to_anndata` passes "obs_id" / "var_id" for obs/var, matching `from_anndata`'s default `{obs,var}_id_name` values. + + NOTE: several edge cases result in the outgested DataFrame not matching the original DataFrame; see + `test_dataframe_io_roundtrips.py` / https://github.com/single-cell-data/TileDB-SOMA/issues/2829. + """ + # Read and validate the "original index metadata" stored alongside this SOMA DataFrame. + original_index_metadata = json.loads( + df.metadata.get(_DATAFRAME_ORIGINAL_INDEX_NAME_JSON, "null") + ) + if not ( + original_index_metadata is None or isinstance(original_index_metadata, str) + ): + raise ValueError( + f"{df.uri}: invalid {_DATAFRAME_ORIGINAL_INDEX_NAME_JSON} metadata: {original_index_metadata}" + ) + + pdf: pd.DataFrame = df.read().concat().to_pandas() + # SOMA DataFrames always have a `soma_joinid` added, as part of the ingest process, which we remove on outgest. + pdf.drop(columns=SOMA_JOINID, inplace=True) + + default_index_name = default_index_name or original_index_metadata + if default_index_name is not None: + # One or both of the following was true: + # - Original DataFrame had an index name (other than "index") ⇒ that name was written as `OriginalIndexMetadata` + # - `default_index_name` was provided (e.g. `{obs,var}_id_name` args to `to_anndata`) + # + # ⇒ Verify a column with that name exists, and set it as index (keeping its name). + if default_index_name not in pdf.keys(): + raise ValueError( + f"Requested ID column name {default_index_name} not found in input: {pdf.keys()}" + ) + pdf.set_index(default_index_name, inplace=True) + else: + # The assumption here is that the original index was unnamed, and was given a "fallback name" (e.g. "obs_id", + # "var_id") during ingest that matches the `fallback_index_name` arg here. In this case, we restore that column + # as index, and remove the name. + # + # NOTE: several edge cases result in the outgested DF not matching the original DF; see + # https://github.com/single-cell-data/TileDB-SOMA/issues/2829. + if fallback_index_name in pdf: + pdf.set_index(fallback_index_name, inplace=True) + pdf.index.name = None + + return pdf + + # ---------------------------------------------------------------- def to_anndata( experiment: Experiment, @@ -197,55 +255,8 @@ def to_anndata( # * Else if the names used at ingest time are available, use them. # * Else use the default/fallback name. - # Restore the original index name for outgest. We use JSON for elegant indication of index - # name being None (in Python anyway). It may be 'null' which maps to Pyhton None. - obs_id_name = obs_id_name or json.loads( - experiment.obs.metadata.get(_DATAFRAME_ORIGINAL_INDEX_NAME_JSON, '"obs_id"') - ) - var_id_name = var_id_name or json.loads( - measurement.var.metadata.get(_DATAFRAME_ORIGINAL_INDEX_NAME_JSON, '"var_id"') - ) - - obs_df = experiment.obs.read().concat().to_pandas() - obs_df.drop([SOMA_JOINID], axis=1, inplace=True) - if obs_id_name is not None: - if obs_id_name not in obs_df.keys(): - raise ValueError( - f"requested obs IDs column name {obs_id_name} not found in input: {obs_df.keys()}" - ) - obs_df.set_index(obs_id_name, inplace=True) - else: - # There are multiple cases to be handled here, all tested in CI. - # This else-block handle this one: - # - # orig.ident nCount_RNA ... - # ATGCCAGAACGACT 0 70.0 ... - # CATGGCCTGTGCAT 0 85.0 ... - # GAACCTGATGAACC 0 87.0 ... - # - # Namely: - # * The input AnnData dataframe had an index with no name - # * In the SOMA experiment we name that column "obs_id" and our index is "soma_joinid" - # * On outgest we drop "soma_joinid" - # * The thing we named "obs_id" needs to become the index again ... - # * ... and it needs to be nameless. - if "obs_id" in obs_df: - obs_df.set_index("obs_id", inplace=True) - obs_df.index.name = None - - var_df = measurement.var.read().concat().to_pandas() - - var_df.drop([SOMA_JOINID], axis=1, inplace=True) - if var_id_name is not None: - if var_id_name not in var_df.keys(): - raise ValueError( - f"requested var IDs column name {var_id_name} not found in input: {var_df.keys()}" - ) - var_df.set_index(var_id_name, inplace=True) - else: - if "var_id" in var_df: - var_df.set_index("var_id", inplace=True) - var_df.index.name = None + obs_df = _read_dataframe(experiment.obs, obs_id_name, "obs_id") + var_df = _read_dataframe(measurement.var, var_id_name, "var_id") nobs = len(obs_df.index) nvar = len(var_df.index) diff --git a/apis/python/tests/test_dataframe_io_roundtrips.py b/apis/python/tests/test_dataframe_io_roundtrips.py new file mode 100644 index 000000000..6415115ea --- /dev/null +++ b/apis/python/tests/test_dataframe_io_roundtrips.py @@ -0,0 +1,236 @@ +import json +from dataclasses import asdict, dataclass, fields +from inspect import getfullargspec +from os.path import join +from pathlib import Path +from typing import List, Optional, Tuple + +import numpy as np +import pandas as pd +import pyarrow as pa +import pytest +from anndata import AnnData +from pandas._testing import assert_frame_equal + +from tiledbsoma import SOMA_JOINID, DataFrame, Experiment +from tiledbsoma.io._common import _DATAFRAME_ORIGINAL_INDEX_NAME_JSON +from tiledbsoma.io._registration import AxisIDMapping +from tiledbsoma.io.ingest import IngestionParams, _write_dataframe, from_anndata +from tiledbsoma.io.outgest import _read_dataframe, to_anndata + + +def parse_col(col_str: str) -> Tuple[Optional[str], List[str]]: + """Parse a "column string" of the form `val1,val2,...` or `name=val1,val2,...`.""" + pcs = col_str.split("=") + if len(pcs) == 1: + return None, col_str.split(",") + elif len(pcs) == 2: + name, vals_str = pcs + vals = vals_str.split(",") + return name, vals + else: + raise ValueError(f"Invalid column string: {col_str}") + + +def make_df(index_str: str, **cols) -> pd.DataFrame: + """DataFrame construction helper, for tests. + + - index and columns are provided as strings of the form `name=val1,val2,...`. + - `name=` is optional for the initial (`index_str`) arg. + """ + cols = dict([(col, parse_col(col_str)[1]) for col, col_str in cols.items()]) + index = None + index_name = None + if index_str: + index_name, index = parse_col(index_str) + df = pd.DataFrame(cols, index=index) + df.index.name = index_name + return df + + +@dataclass +class RoundTrip: + # Test-case name + name: str + # DataFrame to ingest + original_df: pd.DataFrame + # Expected DataFrame after outgest + outgested_df: pd.DataFrame + # Columns that should be persisted; all are expected to be of type "large string" (for the purposes of these test + # cases); the required `soma_joinid` (with type `int64 not null`) is excluded from this list, but always verified to + # also exist. + persisted_column_names: List[str] + # Expected "original index metadata" (attached to the persisted SOMA DataFrame) + persisted_metadata: Optional[str] = None + # Argument passed to `_write_dataframe` on ingest (default here matches `from_anndata`'s "obs" path) + ingest_id_column_name: Optional[str] = "obs_id" + # Argument passed to `_read_dataframe` on outgest (default here matches `to_anndata`'s "obs_id" path) + outgest_default_index_name: Optional[str] = None + # Argument passed to `_read_dataframe` on outgest (default here matches `to_anndata`'s "obs_id" path) + outgest_fallback_index_name: Optional[str] = "obs_id" + + +def parametrize_roundtrips(roundtrips: List[RoundTrip]): + def wrapper(fn): + # Test-case IDs + ids = [rt.name for rt in roundtrips] + # Convert `RoundTrip`s to "values" arrays, filtered and reordered to match kwargs expected by the wrapped + # function + fields_names = [f.name for f in fields(RoundTrip)] + spec = getfullargspec(fn) + names = [arg for arg in spec.args if arg in fields_names] + values = [ + {name: rt_dict[name] for name in names}.values() + for rt_dict in [asdict(rt) for rt in roundtrips] + ] + # Delegate to PyTest `parametrize` + return pytest.mark.parametrize( + names, # arg names + values, # arg value lists + ids=ids, # test-case names + )(fn) + + return wrapper + + +# fmt: off +ROUND_TRIPS = [ + RoundTrip( + '1. `df.index` named "index"', + make_df("index=xx,yy,zz", col0="aa,bb,cc", col1="AA,BB,CC"), + # ⇒ index name is lost + make_df( "xx,yy,zz", col0="aa,bb,cc", col1="AA,BB,CC"), + [ "obs_id", "col0", "col1", ], + ), + RoundTrip( + '2. DataFrame has a column named `obs_id`', + make_df("xx,yy,zz", col0="AA,BB,CC", obs_id="aa,bb,cc"), + # ⇒ `obs_id` column becomes `df.index`, loses name + # ⇒ Original `df.index` dropped + make_df("aa,bb,cc", col0="AA,BB,CC"), + [ "col0", "obs_id", ], + ), + RoundTrip( + '3. DataFrame has a column named "index", and `df.index` is unnamed', + make_df("xx,yy,zz", col0="aa,bb,cc", index="AA,BB,CC"), + # ⇒ "index" column is promoted to `df.index` (unnamed) + # ⇒ Original (unnamed) index becomes a column named `level_0` + make_df("AA,BB,CC", level_0="xx,yy,zz", col0="aa,bb,cc"), + [ "level_0", "col0", "obs_id", ], + ), + RoundTrip( + '4. DataFrame has a column named "index", and `df.index` is named `id_column_name` (default `obs_id`)', + make_df("obs_id=xx,yy,zz", col0="aa,bb,cc", index="AA,BB,CC"), + # ⇒ "index" column is dropped + make_df("obs_id=xx,yy,zz", col0="aa,bb,cc"), + [ "obs_id", "col0", ], + "obs_id", + ), + RoundTrip( + '5. DataFrame has a column named "index" and df.index has another name', + make_df("idx=xx,yy,zz", col0="aa,bb,cc", index="AA,BB,CC"), + # ⇒ "index" column renamed to `obs_id` + make_df("idx=xx,yy,zz", col0="aa,bb,cc", obs_id="AA,BB,CC"), + [ "idx", "col0", "obs_id", ], + "idx", + ), + RoundTrip( + '6. DataFrame has columns named "index" and `id_column_name` (default: obs_id), and `df.index` is unnamed:', + make_df("xx,yy,zz", obs_id="aa,bb,cc", index="AA,BB,CC"), + # ⇒ unnamed index → column named `level_0` + # ⇒ `obs_id` column → unnamed index + # ⇒ "index" column dropped + make_df("aa,bb,cc", level_0="xx,yy,zz"), + [ "level_0", "obs_id", ], + ), + RoundTrip( + '7. DataFrame has columns named "index" and `id_column_name` (default: obs_id), and `df.index` has a name:', + make_df("idx=xx,yy,zz", obs_id="aa,bb,cc", index="AA,BB,CC"), + # ⇒ "index" column is dropped + make_df("idx=xx,yy,zz", obs_id="aa,bb,cc"), + [ "idx", "obs_id", ], + "idx", + ), +] +# fmt: on + + +def verify_metadata( + sdf: DataFrame, persisted_column_names: List[str], persisted_metadata: Optional[str] +): + # Verify column names and types + schema = sdf.schema + assert schema.names == [SOMA_JOINID, *persisted_column_names] + [soma_joinid_type, *string_col_types] = schema.types + assert soma_joinid_type == pa.int64() and schema.field(0).nullable is False + for string_col_type in string_col_types: + assert string_col_type == pa.large_string() + + # Verify "original index metadata" + actual_index_metadata = json.loads( + sdf.metadata[_DATAFRAME_ORIGINAL_INDEX_NAME_JSON] + ) + assert actual_index_metadata == persisted_metadata + + +@parametrize_roundtrips(ROUND_TRIPS) +def test_adata_io_roundtrips( + tmp_path: Path, + original_df: pd.DataFrame, + persisted_column_names: List[str], + persisted_metadata: Optional[str], + ingest_id_column_name: Optional[str], + outgest_default_index_name: Optional[str], + outgested_df: pd.DataFrame, +): + uri = str(tmp_path) + n_obs = len(original_df) + var = pd.DataFrame({"var1": [1, 2, 3], "var2": ["a", "b", "c"]}) # unused + n_var = len(var) + X = np.array([0] * n_obs * n_var).reshape(n_obs, n_var) # unused + adata0 = AnnData(X=X, obs=original_df, var=var) + ingested_uri = from_anndata(uri, adata0, "meas", obs_id_name=ingest_id_column_name) + assert ingested_uri == uri + + # Verify column names and types + obs_uri = join(uri, "obs") + obs = DataFrame.open(obs_uri) + verify_metadata(obs, persisted_column_names, persisted_metadata) + + # Verify outgested pd.DataFrame + with Experiment.open(ingested_uri) as exp: + adata1 = to_anndata(exp, "meas", obs_id_name=outgest_default_index_name) + outgested_obs = adata1.obs + assert_frame_equal(outgested_obs, outgested_df) + + +@parametrize_roundtrips(ROUND_TRIPS) +def test_df_io_roundtrips( + tmp_path: Path, + original_df: pd.DataFrame, + persisted_column_names: List[str], + persisted_metadata: Optional[str], + ingest_id_column_name: Optional[str], + outgest_default_index_name: Optional[str], + outgest_fallback_index_name: Optional[str], + outgested_df: pd.DataFrame, +): + uri = str(tmp_path) + _write_dataframe( + uri, + original_df, + id_column_name=ingest_id_column_name, + axis_mapping=AxisIDMapping(data=tuple(range(len(original_df)))), + ingestion_params=IngestionParams("write", None), + ).close() + + sdf = DataFrame.open(uri) + verify_metadata(sdf, persisted_column_names, persisted_metadata) + + # Verify outgested pd.DataFrame + actual_outgested_df = _read_dataframe( + sdf, + default_index_name=outgest_default_index_name, + fallback_index_name=outgest_fallback_index_name, + ) + assert_frame_equal(actual_outgested_df, outgested_df)