Skip to content

Commit

Permalink
Merge pull request #422 from juaml/enh/storage-2dtimeseries
Browse files Browse the repository at this point in the history
Allow to store 2d timeseries
  • Loading branch information
fraimondo authored Jan 26, 2025
2 parents c54f591 + d0f39ca commit 5135251
Show file tree
Hide file tree
Showing 7 changed files with 309 additions and 17 deletions.
1 change: 1 addition & 0 deletions docs/changes/newsfragments/422.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow storage of 2D timeseries, currently only supported by HDF5FeatureStorage by `Fede Raimondo`_
33 changes: 33 additions & 0 deletions junifer/storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ def store(self, kind: str, **kwargs) -> None:
self.store_scalar_table(
meta_md5=meta_md5, element=t_element, **kwargs
)
elif kind == "timeseries_2d":
self.store_timeseries_2d(
meta_md5=meta_md5, element=t_element, **kwargs
)

def store_matrix(
self,
Expand Down Expand Up @@ -321,6 +325,35 @@ def store_timeseries(
klass=NotImplementedError,
)

def store_timeseries_2d(
self,
meta_md5: str,
element: dict,
data: np.ndarray,
col_names: Optional[Iterable[str]] = None,
row_names: Optional[Iterable[str]] = None,
) -> None:
"""Store 2D timeseries.
Parameters
----------
meta_md5 : str
The metadata MD5 hash.
element : dict
The element as a dictionary.
data : numpy.ndarray
The timeseries data to store.
col_names : list or tuple of str, optional
The column labels (default None).
row_names : list or tuple of str, optional
The row labels (default None).
"""
raise_error(
msg="Concrete classes need to implement store_timeseries_2d().",
klass=NotImplementedError,
)

def store_scalar_table(
self,
meta_md5: str,
Expand Down
87 changes: 74 additions & 13 deletions junifer/storage/hdf5.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
# Federico Raimondo <[email protected]>
# License: AGPL


from collections import defaultdict
from collections.abc import Iterable
from pathlib import Path
Expand All @@ -24,7 +23,13 @@
)
from ..utils import logger, raise_error
from .base import BaseFeatureStorage
from .utils import element_to_prefix, matrix_to_vector, store_matrix_checks
from .utils import (
element_to_prefix,
matrix_to_vector,
store_matrix_checks,
store_timeseries_2d_checks,
timeseries2d_to_vector,
)


__all__ = ["HDF5FeatureStorage"]
Expand Down Expand Up @@ -82,7 +87,7 @@ def _create_chunk(
chunk_size=tuple(array_chunk_size),
n_chunk=i_chunk,
)
elif kind in ["timeseries", "scalar_table"]:
elif kind in ["timeseries", "scalar_table", "timeseries_2d"]:
out = ChunkedList(
data=chunk_data,
size=element_count,
Expand All @@ -91,8 +96,8 @@ def _create_chunk(
else:
raise_error(
f"Invalid kind: {kind}. "
"Must be one of ['vector', 'matrix', 'timeseries',"
"'scalar_table']."
"Must be one of ['vector', 'matrix', 'timeseries', "
"'timeseries_2d', 'scalar_table']."
)
return out

Expand Down Expand Up @@ -196,8 +201,7 @@ def _fetch_correct_uri_for_io(self, element: Optional[dict]) -> str:
if not self.single_output and not element:
raise_error(
msg=(
"`element` must be provided when `single_output` "
"is False"
"`element` must be provided when `single_output` is False"
),
klass=RuntimeError,
)
Expand Down Expand Up @@ -514,6 +518,27 @@ def read_df(
columns = hdf_data["column_headers"]
# Convert data from 3D to 2D
reshaped_data = np.concatenate(all_data, axis=0)
elif hdf_data["kind"] == "timeseries_2d":
# Create dictionary for aggregating index data
element_idx = defaultdict(list)
all_data = []
for idx, element in enumerate(hdf_data["element"]):
# Get row count for the element
t_data = hdf_data["data"][idx]
flat_data, columns = timeseries2d_to_vector(
data=t_data,
col_names=hdf_data["column_headers"],
row_names=hdf_data["row_headers"],
)
all_data.append(flat_data)
n_timepoints = flat_data.shape[0]
# Set rows for the index
for key, val in element.items():
element_idx[key].extend([val] * n_timepoints)
# Add extra column for timepoints
element_idx["timepoint"].extend(np.arange(n_timepoints))
# Convert data from 3D to 2D
reshaped_data = np.concatenate(all_data, axis=0)
elif hdf_data["kind"] == "scalar_table":
# Create dictionary for aggregating index data
element_idx = defaultdict(list)
Expand Down Expand Up @@ -765,7 +790,7 @@ def _store_data(
)

t_data = stored_data["data"]
if kind in ["timeseries", "scalar_table"]:
if kind in ["timeseries", "scalar_table", "timeseries_2d"]:
t_data += data
else:
t_data = np.concatenate((t_data, data), axis=-1)
Expand Down Expand Up @@ -947,6 +972,44 @@ def store_timeseries(
row_header_column_name="timepoint",
)

def store_timeseries_2d(
self,
meta_md5: str,
element: dict[str, str],
data: np.ndarray,
col_names: Optional[Iterable[str]] = None,
row_names: Optional[Iterable[str]] = None,
) -> None:
"""Store a 2D timeseries.
Parameters
----------
meta_md5 : str
The metadata MD5 hash.
element : dict
The element as dictionary.
data : numpy.ndarray
The 2D timeseries data to store.
col_names : list or tuple of str, optional
The column labels (default None).
row_names : list or tuple of str, optional
The row labels (default None).
"""
store_timeseries_2d_checks(
data_shape=data.shape,
row_names_len=len(row_names), # type: ignore
col_names_len=len(col_names), # type: ignore
)
self._store_data(
kind="timeseries_2d",
meta_md5=meta_md5,
element=[element], # convert to list
data=[data], # convert to list
column_headers=col_names,
row_headers=row_names,
)

def store_scalar_table(
self,
meta_md5: str,
Expand Down Expand Up @@ -1014,8 +1077,7 @@ def collect(self) -> None:

# Run loop to collect metadata
logger.info(
"Collecting metadata from "
f"{self.uri.parent}/*_{self.uri.name}" # type: ignore
f"Collecting metadata from {self.uri.parent}/*_{self.uri.name}" # type: ignore
)
# Collect element files per feature MD5
elements_per_feature_md5 = defaultdict(list)
Expand Down Expand Up @@ -1046,8 +1108,7 @@ def collect(self) -> None:

# Run loop to collect data per feature per file
logger.info(
"Collecting data from "
f"{self.uri.parent}/*_{self.uri.name}" # type: ignore
f"Collecting data from {self.uri.parent}/*_{self.uri.name}" # type: ignore
)
logger.info(f"Will collect {len(elements_per_feature_md5)} features.")

Expand Down Expand Up @@ -1092,7 +1153,7 @@ def collect(self) -> None:
kind = static_data["kind"]

# Append the "dynamic" data
if kind in ["timeseries", "scalar_table"]:
if kind in ["timeseries", "scalar_table", "timeseries_2d"]:
chunk_data.extend(t_data["data"])
else:
chunk_data.append(t_data["data"])
Expand Down
87 changes: 85 additions & 2 deletions junifer/storage/tests/test_hdf5.py
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,70 @@ def test_store_timeseries(tmp_path: Path) -> None:
assert_array_equal(read_df.values, data)


def test_store_timeseries2d(tmp_path: Path) -> None:
"""Test 2D timeseries store.
Parameters
----------
tmp_path : pathlib.Path
The path to the test directory.
"""
uri = tmp_path / "test_store_timeseries_2d.hdf5"
storage = HDF5FeatureStorage(uri=uri)
# Metadata to store
element = {"subject": "test"}
meta = {
"element": element,
"dependencies": ["numpy"],
"marker": {"name": "fc"},
"type": "BOLD",
}
# Process the metadata
meta_md5, meta_to_store, element_to_store = process_meta(meta)
# Store metadata
storage.store_metadata(
meta_md5=meta_md5, element=element_to_store, meta=meta_to_store
)

# Data to store
data = np.array(
[[10, 11, 12], [20, 21, 22], [30, 31, 32], [40, 41, 42], [50, 51, 52]]
)
data = np.c_[[data + (i * 100) for i in range(4)]] # Generate timeseries

col_names = ["roi1", "roi2", "roi3"]
row_names = ["ev1", "ev2", "ev3", "ev4", "ev5"]

# Store 2D timeseries
storage.store_timeseries_2d(
meta_md5=meta_md5,
element=element_to_store,
data=data,
col_names=col_names,
row_names=row_names,
)

# Read into dataframe
read_data = storage.read(feature_md5=meta_md5)
# Check if data are equal
assert_array_equal(read_data["data"][0], data)
assert read_data["column_headers"] == col_names
assert read_data["row_headers"], row_names

read_df = storage.read_df(feature_md5=meta_md5)
flatted_names = [f"{row}~{col}" for row in row_names for col in col_names]

expected_flat_data = np.array(
[10, 11, 12, 20, 21, 22, 30, 31, 32, 40, 41, 42, 50, 51, 52]
)
expected_flat_data = np.c_[
[expected_flat_data + (i * 100) for i in range(4)]
] # Generate timeseries
assert_array_equal(read_df.values, expected_flat_data)
assert read_df.columns.to_list() == flatted_names


def test_store_scalar_table(tmp_path: Path) -> None:
"""Test scalar table store.
Expand Down Expand Up @@ -857,7 +921,7 @@ def test_store_scalar_table(tmp_path: Path) -> None:
col_names = ["roi1", "roi2"]
row_names = ["ev1", "ev2", "ev3"]

# Store timeseries
# Store scalar table
storage.store_scalar_table(
meta_md5=meta_md5,
element=element_to_store,
Expand Down Expand Up @@ -910,6 +974,12 @@ def _create_data_to_store(n_elements: int, kind: str) -> tuple[str, dict]:
"data": np.arange(20).reshape(2, 10),
"col_names": [f"col-{i}" for i in range(10)],
}
elif kind in "timeseries_2d":
data_to_store = {
"data": np.arange(120).reshape(6, 5, 4),
"row_names": [f"row-{i}" for i in range(5)],
"col_names": [f"col-{i}" for i in range(4)],
}
elif kind in "scalar_table":
data_to_store = {
"data": np.arange(50).reshape(5, 10),
Expand Down Expand Up @@ -961,6 +1031,7 @@ def _create_data_to_store(n_elements: int, kind: str) -> tuple[str, dict]:
(10, 5, "matrix"),
(10, 5, "timeseries"),
(10, 5, "scalar_table"),
(10, 5, "timeseries_2d"),
],
)
def test_multi_output_store_and_collect(
Expand All @@ -982,7 +1053,9 @@ def test_multi_output_store_and_collect(
"""
uri = tmp_path / "test_multi_output_store_and_collect.hdf5"
storage = HDF5FeatureStorage(
uri=uri, single_output=False, chunk_size=chunk_size
uri=uri,
single_output=False,
chunk_size=chunk_size,
)

meta_md5, all_data = _create_data_to_store(n_elements, kind)
Expand Down Expand Up @@ -1013,6 +1086,12 @@ def test_multi_output_store_and_collect(
element=t_data["element"],
**t_data["data"],
)
elif kind == "timeseries_2d":
storage.store_timeseries_2d(
meta_md5=meta_md5,
element=t_data["element"],
**t_data["data"],
)
elif kind == "scalar_table":
storage.store_scalar_table(
meta_md5=meta_md5,
Expand Down Expand Up @@ -1052,6 +1131,10 @@ def test_multi_output_store_and_collect(
data_size = np.sum([x["data"]["data"].shape[0] for x in all_data])
assert len(all_df) == data_size
idx_names = [x for x in all_df.index.names if x != "timepoint"]
elif kind == "timeseries_2d":
data_size = np.sum([x["data"]["data"].shape[0] for x in all_data])
assert len(all_df) == data_size
idx_names = [x for x in all_df.index.names if x != "timepoint"]
elif kind == "scalar_table":
data_size = np.sum([x["data"]["data"].shape[0] for x in all_data])
assert len(all_df) == data_size
Expand Down
7 changes: 5 additions & 2 deletions junifer/storage/tests/test_storage_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ class MyFeatureStorage(BaseFeatureStorage):
"""Implement concrete class."""

def __init__(self, uri, single_output=True):
storage_types = ["matrix", "vector", "timeseries"]
storage_types = ["matrix", "vector", "timeseries", "timeseries_2d"]
super().__init__(
uri=uri,
storage_types=storage_types,
single_output=single_output,
)

def get_valid_inputs(self):
return ["matrix", "vector", "timeseries"]
return ["matrix", "vector", "timeseries", "timeseries_2d"]

def list_features(self):
super().list_features()
Expand Down Expand Up @@ -97,6 +97,9 @@ def collect(self):
with pytest.raises(NotImplementedError):
st.store(kind="timeseries", meta=meta)

with pytest.raises(NotImplementedError):
st.store(kind="timeseries_2d", meta=meta)

with pytest.raises(NotImplementedError):
st.store(kind="vector", meta=meta)

Expand Down
Loading

0 comments on commit 5135251

Please sign in to comment.