Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polinabinder/file extend #477

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from bionemo.scdl.api.single_cell_row_dataset import SingleCellRowDatasetCore
from bionemo.scdl.index.row_feature_index import RowFeatureIndex
from bionemo.scdl.io.single_cell_memmap_dataset import Mode, SingleCellMemMapDataset
from bionemo.scdl.io.single_cell_memmap_dataset import SingleCellMemMapDataset
from bionemo.scdl.util.async_worker_queue import AsyncWorkQueue


Expand Down Expand Up @@ -213,24 +213,19 @@ def shape(self) -> Tuple[int, List[int]]:
return self.number_of_rows(), self.number_of_variables()

def flatten(
self,
output_path: str,
destroy_on_copy: bool = False,
self, output_path: str, destroy_on_copy: bool = False, extend_copy_size: int = 10 * 1_024 * 1_024
edawson marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
"""Flattens the collection into a single SingleCellMemMapDataset.

Args:
output_path: location to store new dataset
destroy_on_copy: Whether to remove the current data_path
polinabinder1 marked this conversation as resolved.
Show resolved Hide resolved
"""
output = SingleCellMemMapDataset(
output_path,
num_elements=self.number_of_rows(),
num_rows=self.number_nonzero_values(),
mode=Mode.CREATE_APPEND,
)
extend_copy_size: how much to copy in memory at once

output.concat(list(self.fname_to_mmap.values()))
"""
output = next(iter(self.fname_to_mmap.values()))
single_cell_list = list(self.fname_to_mmap.values())[1:]
output.concat(single_cell_list, extend_copy_size=extend_copy_size, output_path=output_path)
polinabinder1 marked this conversation as resolved.
Show resolved Hide resolved

# Hit save!
output.save()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import json
import os
import shutil
import tempfile
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
Expand All @@ -30,6 +29,7 @@

from bionemo.scdl.api.single_cell_row_dataset import SingleCellRowDataset
from bionemo.scdl.index.row_feature_index import RowFeatureIndex
from bionemo.scdl.util.filecopyutil import extend_files


class FileNames(str, Enum):
Expand Down Expand Up @@ -62,51 +62,6 @@ class METADATA(str, Enum):
NUM_ROWS = "num_rows"


def _swap_mmap_array(
src_array: np.memmap,
src_path: str,
dest_array: np.memmap,
dest_path: str,
destroy_src: bool = False,
) -> None:
"""Function that swaps the location of two mmap arrays.

This is used when concatanating SingleCellMemMapDataset. This emables the
newly merged arrays to be stored in the same place as the original dataset.

Args:
src_array: the first memmap array
src_path: location of the first memmap array
dest_array: the second memmap array
dest_path: location of the first second array
destroy_src: set to True if the source array is destroyed

Raises:
FileNotFoundError if the source or destination path are not found.
"""
if not os.path.isfile(src_path):
raise FileNotFoundError(f"The source file {src_path} does not exist")
if not os.path.isfile(dest_path):
raise FileNotFoundError(f"The destination file {dest_path} does not exist")

# Flush and close arrays
src_array.flush()
dest_array.flush()

del src_array
del dest_array

# Swap the file locations on disk using a tmp file.
with tempfile.TemporaryDirectory() as tempdir:
temp_file_name = f"{tempdir}/arr_temp"
shutil.move(src_path, temp_file_name)
shutil.move(dest_path, src_path)
shutil.move(temp_file_name, dest_path)

if destroy_src:
os.remove(src_path)


def _pad_sparse_array(row_values, row_col_ptr, n_cols: int) -> np.ndarray:
"""Creates a conventional array from a sparse one.

Expand Down Expand Up @@ -724,6 +679,9 @@ def shape(self) -> Tuple[int, List[int]]:
def concat(
self,
other_dataset: Union[list["SingleCellMemMapDataset"], "SingleCellMemMapDataset"],
extend_copy_size: int = 10 * 1_024 * 1_024,
output_path: str | None = None,
destroy_on_copy: bool = False,
) -> None:
"""Concatenates another SingleCellMemMapDataset to the existing one.

Expand All @@ -733,6 +691,9 @@ def concat(
Args:
other_dataset: A SingleCellMemMapDataset or a list of
SingleCellMemMapDatasets
extend_copy_size: how much to copy in memory at once
output_path: location to store new dataset
destroy_on_copy: Whether to remove the current data_path

polinabinder1 marked this conversation as resolved.
Show resolved Hide resolved
Raises:
ValueError if the other dataset(s) are not of the same version or
Expand All @@ -759,87 +720,70 @@ def concat(
# Set our mode:
self.mode: Mode = Mode.READ_APPEND

if output_path is not None:
shutil.move(self.data_path, output_path)
self.data_path = output_path

mmaps = []
mmaps.extend(other_dataset)
# Calculate the size of our new dataset arrays
total_num_elements = (self.number_nonzero_values() if self.number_of_rows() > 0 else 0) + sum(
[m.number_nonzero_values() for m in mmaps]
)
total_num_rows = self.number_of_rows() + sum([m.number_of_rows() for m in mmaps])

# Create new arrays to store the data, colptr, and rowptr.
with tempfile.TemporaryDirectory(prefix="_tmp", dir=self.data_path) as tmp:
data_arr, col_arr, row_arr = _create_compressed_sparse_row_memmaps(
num_elements=total_num_elements,
num_rows=total_num_rows,
memmap_dir_path=Path(tmp),
mode=Mode.CREATE_APPEND,
dtypes=self.dtypes,
)
# Copy the data from self and other into the new arrays.
cumulative_elements = 0
cumulative_rows = 0
if self.number_of_rows() > 0:
data_arr[cumulative_elements : cumulative_elements + self.number_nonzero_values()] = self.data.data
col_arr[cumulative_elements : cumulative_elements + self.number_nonzero_values()] = self.col_index.data
row_arr[cumulative_rows : cumulative_rows + self.number_of_rows() + 1] = self.row_index.data
cumulative_elements += self.number_nonzero_values()
cumulative_rows += self.number_of_rows()
for mmap in mmaps:
# Fill the data array for the span of this scmmap
data_arr[cumulative_elements : cumulative_elements + mmap.number_nonzero_values()] = mmap.data.data
# fill the col array for the span of this scmmap
col_arr[cumulative_elements : cumulative_elements + mmap.number_nonzero_values()] = mmap.col_index.data
# Fill the row array for the span of this scmmap
row_arr[cumulative_rows : cumulative_rows + mmap.number_of_rows() + 1] = (
mmap.row_index + int(cumulative_elements)
).data

self._feature_index.concat(mmap._feature_index)
# Update counters
cumulative_elements += mmap.number_nonzero_values()
cumulative_rows += mmap.number_of_rows()
# The arrays are swapped to ensure that the data remains stored at self.data_path and
# not at a temporary filepath.
_swap_mmap_array(
data_arr,
f"{tmp}/{FileNames.DATA.value}",
self.data,
f"{self.data_path}/{FileNames.DATA.value}",
destroy_src=True,
)
_swap_mmap_array(
col_arr,
f"{tmp}/{FileNames.COLPTR.value}",
self.col_index,
f"{self.data_path}/{FileNames.COLPTR.value}",
destroy_src=True,

# Copy the data from self and other into the new arrays.
cumulative_elements = self.number_nonzero_values()
cumulative_rows = self.number_of_rows()
for mmap in mmaps:
destination_memmap = np.memmap(
f"{mmap.data_path}/{FileNames.ROWPTR.value}_copy",
dtype=self.dtypes[f"{FileNames.ROWPTR.value}"],
mode="w+",
shape=mmap.row_index.shape,
)
_swap_mmap_array(
row_arr,
f"{tmp}/{FileNames.ROWPTR.value}",
self.row_index,
destination_memmap[:] = mmap.row_index[:]
destination_memmap += int(cumulative_elements)

destination_memmap.flush()
extend_files(
f"{self.data_path}/{FileNames.ROWPTR.value}",
destroy_src=True,
f"{mmap.data_path}/{FileNames.ROWPTR.value}_copy",
buffer_size_b=extend_copy_size,
delete_file2_on_complete=True,
offset=8,
)
# Reopen the data, colptr, and rowptr arrays
self.data = np.memmap(

extend_files(
f"{self.data_path}/{FileNames.DATA.value}",
dtype=self.dtypes[f"{FileNames.DATA.value}"],
shape=(cumulative_elements,),
mode=Mode.READ_APPEND.value,
)
self.row_index = np.memmap(
f"{self.data_path}/{FileNames.ROWPTR.value}",
dtype=self.dtypes[f"{FileNames.ROWPTR.value}"],
shape=(cumulative_rows + 1,),
mode=Mode.READ_APPEND.value,
f"{mmap.data_path}/{FileNames.DATA.value}",
buffer_size_b=extend_copy_size,
delete_file2_on_complete=destroy_on_copy,
)
self.col_index = np.memmap(
extend_files(
f"{self.data_path}/{FileNames.COLPTR.value}",
dtype=self.dtypes[f"{FileNames.COLPTR.value}"],
shape=(cumulative_elements,),
mode=Mode.READ_APPEND.value,
f"{mmap.data_path}/{FileNames.COLPTR.value}",
buffer_size_b=extend_copy_size,
delete_file2_on_complete=destroy_on_copy,
)

self._feature_index.concat(mmap._feature_index)
# Update counters
cumulative_elements += mmap.number_nonzero_values()
cumulative_rows += mmap.number_of_rows()

# Reopen the data, colptr, and rowptr arrays
self.data = np.memmap(
f"{self.data_path}/{FileNames.DATA.value}",
dtype=self.dtypes[f"{FileNames.DATA.value}"],
shape=(cumulative_elements,),
mode=Mode.READ_APPEND.value,
)
self.row_index = np.memmap(
f"{self.data_path}/{FileNames.ROWPTR.value}",
dtype=self.dtypes[f"{FileNames.ROWPTR.value}"],
shape=(cumulative_rows + 1,),
mode=Mode.READ_APPEND.value,
)
self.col_index = np.memmap(
f"{self.data_path}/{FileNames.COLPTR.value}",
dtype=self.dtypes[f"{FileNames.COLPTR.value}"],
shape=(cumulative_elements,),
mode=Mode.READ_APPEND.value,
)
self.save()
63 changes: 63 additions & 0 deletions sub-packages/bionemo-scdl/src/bionemo/scdl/util/filecopyutil.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: LicenseRef-Apache2
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os


def extend_files(
first: str, second: str, buffer_size_b: int = 10 * 1024 * 1024, delete_file2_on_complete: bool = False, offset=0
):
"""Concatenates the contents of `second` into `first` using memory-efficient operations.

Shrinks `second` incrementally after reading each chunk.

Parameters:
- first (str): Path to the first file (will be extended).
- second (str): Path to the second file (data will be read from here).
- buffer_size_b (int): Size of the buffer to use for reading/writing data.
- delete_file2_on_complete (bool): Whether to delete the second file after operation.

"""
polinabinder1 marked this conversation as resolved.
Show resolved Hide resolved
with open(first, "r+b") as f1, open(second, "rb") as f2:
size1 = os.path.getsize(first)
size2 = os.path.getsize(second)

# Resize file1 to the final size to accommodate both files
f1.seek(size1 + size2 - 1 - offset)
f1.write(b"\0") # Extend file1

# Move data from file2 to file1 in chunks
read_position = offset # Start reading from the beginning of file2
write_position = size1 # Start appending at the end of original data1
f2.seek(read_position)

while read_position < size2:
# Determine how much to read/write in this iteration
chunk_size = min(buffer_size_b, size2 - read_position)

# Read data from file2
new_data = f2.read(chunk_size)

# Write the new data into file1
f1.seek(write_position)
f1.write(new_data)

# Update pointers
read_position += chunk_size
write_position += chunk_size
f2.seek(read_position)

if delete_file2_on_complete:
os.remove(second)
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,17 @@ def test_sccollection_multi(tmp_path, test_directory):
def test_sccollection_serialization(tmp_path, test_directory):
coll = SingleCellCollection(tmp_path / "sccy")
coll.load_h5ad_multi(test_directory / "", max_workers=4, use_processes=False)
assert coll.number_of_rows() == 114
assert coll.number_of_values() == 2092
assert coll.number_nonzero_values() == 57

coll.flatten(tmp_path / "flattened")
dat = SingleCellMemMapDataset(tmp_path / "flattened")
assert dat.number_of_rows() == 114
assert dat.number_of_values() == 2092
assert dat.number_nonzero_values() == 57
assert np.isclose(coll.sparsity(), 0.972753346080306, rtol=1e-6)
assert np.isclose(dat.sparsity(), 0.972753346080306, rtol=1e-6)

for fn in ["col_ptr.npy", "data.npy", "features", "metadata.json", "row_ptr.npy", "version.json"]:
assert os.path.exists(tmp_path / "flattened" / fn)

Expand All @@ -94,6 +99,7 @@ def test_sc_concat_in_flatten_cellxval(tmp_path, create_cellx_val_data):
data = SingleCellMemMapDataset(memmap_data)
assert np.array(data.row_index)[2] != 2 # regression test for bug
assert np.array(data.row_index)[3] != 1149 # regression test for bug

assert all(data.row_index == np.array([0, 440, 972, 2119]))


Expand Down
Loading
Loading