Skip to content

Commit

Permalink
NDBlock support storage_options; general refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Karl5766 committed Sep 27, 2024
1 parent 3cb356f commit edd1fa7
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 89 deletions.
19 changes: 13 additions & 6 deletions src/cvpl_tools/im/dask_label.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
"""

import dask.array as da
import numcodecs
import numpy as np
import numpy.typing as npt
from cvpl_tools.im.ndblock import NDBlock

from cvpl_tools.im.ndblock import NDBlock, dumps_numpy, loads_numpy
from cvpl_tools.im.partd_server import SQLiteKVStore, SQLitePartd, SqliteServer
from cvpl_tools.im.fs import CacheDirectory, CachePointer
import os
Expand Down Expand Up @@ -84,6 +86,8 @@ def label(im: npt.NDArray | da.Array | NDBlock,
if viewer_args is None:
viewer_args = {}
is_logging = viewer_args.get('logging', False)
compressor = numcodecs.Blosc(cname='lz4', clevel=9, shuffle=numcodecs.Blosc.BITSHUFFLE)
vargs = dict(compressor=compressor) # this is for compressing labels of uint8 or int32 types

if isinstance(im, np.ndarray):
return scipy_label(im, output=output_dtype)
Expand All @@ -104,7 +108,8 @@ def to_max(block: npt.NDArray, block_info: dict):
print('Locally label the image')
locally_labeled = cdir.cache_im(
lambda: im.map_blocks(map_block, meta=np.zeros(tuple(), dtype=output_dtype)),
cid='locally_labeled_without_cumsum'
cid='locally_labeled_without_cumsum',
viewer_args=vargs
)

def compute_nlbl_np_arr():
Expand Down Expand Up @@ -176,7 +181,7 @@ def compute_slices(block: npt.NDArray, block2: npt.NDArray, block_info: dict = N
sli_idx = face * (block.shape[ax] - 1)
sli = np.take(block, indices=sli_idx, axis=ax)
client.append({
indstr: pickle.dumps(sli)
indstr: dumps_numpy(sli, compressor)
})
block_index[ax] -= face
client.close()
Expand All @@ -185,7 +190,8 @@ def compute_slices(block: npt.NDArray, block2: npt.NDArray, block_info: dict = N
locally_labeled = cdir.cache_im(
lambda: da.map_blocks(compute_slices, locally_labeled, cumsum_da_arr,
meta=np.zeros(tuple(), dtype=output_dtype)),
cid='locally_labeled_with_cumsum'
cid='locally_labeled_with_cumsum',
viewer_args=vargs
)

if is_logging:
Expand All @@ -195,7 +201,7 @@ def compute_slices(block: npt.NDArray, block2: npt.NDArray, block_info: dict = N
for value1, value2 in read_kv_store.read_all():
if value1 is None or value2 is None:
continue
sli1, sli2 = pickle.loads(value1).flatten(), pickle.loads(value2).flatten()
sli1, sli2 = loads_numpy(value1, compressor).flatten(), loads_numpy(value2, compressor).flatten()
sli = np.stack((sli1, sli2), axis=1)
tups = cvpl_algorithms.np_unique(sli, axis=0)
for row in tups.tolist():
Expand Down Expand Up @@ -241,7 +247,8 @@ def local_to_global(block, block_info, ind_map_scatter):
globally_labeled = cdir.cache_im(
lambda: locally_labeled.map_blocks(func=local_to_global, meta=np.zeros(tuple(), dtype=output_dtype),
ind_map_scatter=ind_map_scatter),
cid='globally_labeled'
cid='globally_labeled',
viewer_args=vargs
)
result_arr = globally_labeled
if not is_dask:
Expand Down
66 changes: 37 additions & 29 deletions src/cvpl_tools/im/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import Any

import napari
import numcodecs
import numpy as np
import cvpl_tools.im.ndblock as cvpl_ndblock
from cvpl_tools.im.ndblock import NDBlock
Expand Down Expand Up @@ -51,21 +52,26 @@ def str_to_chunksize(chunksize_str: str):

def save(file: str,
im,
preferred_chunksize: tuple[int, ...] = None,
multiscale: int = 0):
storage_options: dict = None):
"""Save an image object into given path
Supported im object types:
- np.ndarray
- dask.Array
- cvpl_tools.im.ndblock.NDBlock
Storage options
preferred_chunksize (tuple[int, ...]) = None
chunk sizes to save as; will rechunk if different from current size; only applies to dask arrays.
multiscale (int) = 0
The number of downsample layers for save ome-zarr; only applies if the image is a dask image
compressor = None
The compressor to use to compress array or chunks
Args:
file: The full/relative path to the directory to be saved to
im: Object to be saved
preferred_chunksize: chunk sizes to save as; will rechunk if different from current size; only applies to
dask arrays.
multiscale: The number of downsample layers for save ome-zarr; only applies if the image is a dask image
storage_options: Specifies options in saving method and saved file format
"""
if isinstance(im, np.ndarray):
old_chunksize = im.shape
Expand All @@ -78,33 +84,40 @@ def save(file: str,
fmt = ImageFormat.NDBLOCK
else:
raise ValueError(f'Unexpected input type im {type(im)}')
if preferred_chunksize is None:

if storage_options is None:
preferred_chunksize = old_chunksize
else:
preferred_chunksize = storage_options.get('preferred_chunksize') or old_chunksize

if isinstance(im, np.ndarray):
NDBlock.save(file, NDBlock(im))
NDBlock.save(file, NDBlock(im), storage_options=storage_options)
elif isinstance(im, da.Array):
if old_chunksize != preferred_chunksize:
im = im.rechunk(preferred_chunksize)
NDBlock.save(file, NDBlock(im), downsample_level=multiscale)
NDBlock.save(file, NDBlock(im), storage_options=storage_options)
elif isinstance(im, NDBlock):
if im.get_repr_format() == cvpl_ndblock.ReprFormat.DASK_ARRAY and old_chunksize != preferred_chunksize:
im = NDBlock(im.get_arr().rechunk(preferred_chunksize))
NDBlock.save(file, im, downsample_level=multiscale)
NDBlock.save(file, im, storage_options=storage_options)
else:
raise ValueError(f'Unexpected input type im {type(im)}')
with open(f'{file}/.save_meta.txt', mode='w') as outfile:
outfile.write(str(fmt.value))
outfile.write(f'\n{chunksize_to_str(old_chunksize)}\n{chunksize_to_str(preferred_chunksize)}')

compressor = storage_options.get('compressor')
outfile.write(f'\ncompressor:{repr(compressor)}')

def load(file: str):

def load(file: str, storage_options: dict = None):
"""Load an image from the given directory.
The image is one saved by cvpl_tools.im.fs.save()
Args:
file: Full path to the directory to be read from
storage_options: Specifies options in saving method and saved file format
Returns:
Recreated image; this method attempts to keep meta and content of the loaded image stays
Expand All @@ -115,13 +128,13 @@ def load(file: str):
fmt = ImageFormat(int(items[0]))
old_chunksize, preferred_chunksize = str_to_chunksize(items[1]), str_to_chunksize(items[2])
if fmt == ImageFormat.NUMPY:
im = NDBlock.load(file).get_arr()
im = NDBlock.load(file, storage_options=storage_options).get_arr()
elif fmt == ImageFormat.DASK_ARRAY:
im = NDBlock.load(file).get_arr()
im = NDBlock.load(file, storage_options=storage_options).get_arr()
if old_chunksize != preferred_chunksize:
im = im.rechunk(old_chunksize)
elif fmt == ImageFormat.NDBLOCK:
im = NDBlock.load(file)
im = NDBlock.load(file, storage_options=storage_options)
if im.get_repr_format() == cvpl_ndblock.ReprFormat.DASK_ARRAY and old_chunksize != preferred_chunksize:
im = NDBlock(im.get_arr().rechunk(old_chunksize))
else:
Expand Down Expand Up @@ -174,17 +187,13 @@ def display(file: str, viewer_args: dict):

def cache_im(fn,
cpath: CachePath,
save_fn=save,
load_fn=load,
cache_level: int | float = 0,
viewer_args: dict = None):
"""Caches an image object
Args:
fn: Computes the image if it's not already cached
cpath: The cache ID within this directory
save_fn: fn(file: str, im) Used to save the image to file
load_fn: fn(file: str) Used to load the image from file
cache_level: cache level of this operation; note even if the caching is skipped, if there is
a cache file already available on disk then the file will still be read
viewer_args: contains viewer and arguments passed to the viewer's add image functions
Expand All @@ -198,23 +207,28 @@ def cache_im(fn,
viewer_args = copy.copy(viewer_args) # since we will pop off some attributes

preferred_chunksize = viewer_args.pop('preferred_chunksize', None)
multiscale = viewer_args.pop('multiscale', 0)
multiscale = viewer_args.pop('multiscale', None)
storage_options = viewer_args.pop('storage_options', {})
if preferred_chunksize is not None:
storage_options['preferred_chunksize'] = preferred_chunksize
if multiscale is not None:
storage_options['multiscale'] = multiscale

raw_path = cpath.abs_path
skip_cache = viewer_args.get('skip_cache', False) or cache_level > cache_level
skip_cache = viewer_args.get('skip_cache', False) or cache_level > viewer_args.get('cache_level', np.inf)
if not cpath.exists:
im = fn()
if skip_cache:
return im
save_fn(raw_path, im, preferred_chunksize=preferred_chunksize, multiscale=multiscale)
save(raw_path, im, storage_options)

assert os.path.exists(raw_path), f'Directory should be created at path {raw_path}, but it is not found'
if not skip_cache and viewer_args.get('viewer', None) is not None:
if not skip_cache and viewer_args.get('viewer') is not None:
viewer_args['layer_args'] = copy.copy(viewer_args.get('layer_args', {}))
viewer_args['layer_args'].setdefault('name', cpath.cid)
display(raw_path, viewer_args)

loaded = load_fn(raw_path)
loaded = load(raw_path, storage_options)

return loaded

Expand Down Expand Up @@ -397,8 +411,6 @@ def __init__(self,
False, then only the temporary folders within the directory will be removed. (The entire subtree
will be traversed to find any file or directory whose is_tmp is True, and they will be removed)
read_if_exists: If True, will read from the existing directory at the given path
cache_level: specifies how much caching to be done; caching operations with level > this will be ignored;
default to inf (cache all)
parent: parent of the directory in the directory structure; None if is root
exists: True if the directory is read from an already existing cache; False if it is created anew
"""
Expand Down Expand Up @@ -578,17 +590,13 @@ def cache_subdir(self, cid: str = None) -> CacheDirectory:
def cache_im(self,
fn,
cid: str = None,
save_fn=save,
load_fn=load,
cache_level: int | float = 0,
viewer_args: dict = None):
"""Caches an image object
Args:
fn: Computes the image if it's not already cached
cid: The cache ID within this directory
save_fn: fn(file: str, im) Used to save the image to file
load_fn: fn(file: str) Used to load the image from file
cache_level: cache level of this operation; note even if the caching is skipped, if there is
a cache file already available on disk then the file will still be read
viewer_args: contains viewer and arguments passed to the viewer's add image functions
Expand All @@ -597,7 +605,7 @@ def cache_im(self,
The cached image loaded
"""
cpath = self.cache_subpath(cid=cid)
return cache_im(fn, cpath, save_fn, load_fn, cache_level, viewer_args)
return cache_im(fn, cpath, cache_level, viewer_args)

def remove_tmp(self):
"""traverse all subnodes and self, removing those with is_tmp=True"""
Expand Down
Loading

0 comments on commit edd1fa7

Please sign in to comment.