From 0e1809bf3bf2ed4d308d6727105102e36bab6698 Mon Sep 17 00:00:00 2001 From: William Moore Date: Tue, 6 Aug 2024 16:45:05 +0100 Subject: [PATCH 01/16] Don't check shard guess if output_script --- src/ome2024_ngff_challenge/resave.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/ome2024_ngff_challenge/resave.py b/src/ome2024_ngff_challenge/resave.py index 4ef116f..99e1a69 100755 --- a/src/ome2024_ngff_challenge/resave.py +++ b/src/ome2024_ngff_challenge/resave.py @@ -36,10 +36,8 @@ def guess_shards(shape: list, chunks: list): ./resave.py input.zarr output.json --output-write-details """ - # TODO: hard-coded to return the full size unless too large - if math.prod(shape) < 100_000_000: - return shape - raise ValueError(f"no shard guess: shape={shape}, chunks={chunks}") + # TODO: hard-coded to return the full size + return shape def csv_int(vstr, sep=",") -> list: @@ -417,13 +415,17 @@ def convert_image( with output_config.path.open(mode="w") as o: json.dump(details, o) else: - if output_chunks: + if output_chunks and output_shards: ds_chunks = output_chunks ds_shards = output_shards elif output_read_details: # read row by row and overwrite ds_chunks = details[idx]["chunks"] ds_shards = details[idx]["shards"] + else: + # if we're going to convert, let's validate the guess... + if not output_script and math.prod(ds_shards) > 100_000_000: + raise ValueError(f"no shard guess: shape={ds_shape}, chunks={ds_chunks}") if output_script: chunk_txt = ",".join(map(str, ds_chunks)) From 8b3dd1f1cd444de7803f1102b71c85a79ec6d454 Mon Sep 17 00:00:00 2001 From: William Moore Date: Wed, 7 Aug 2024 12:11:06 +0100 Subject: [PATCH 02/16] ruff fix --- src/ome2024_ngff_challenge/resave.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/ome2024_ngff_challenge/resave.py b/src/ome2024_ngff_challenge/resave.py index 99e1a69..49ef600 100755 --- a/src/ome2024_ngff_challenge/resave.py +++ b/src/ome2024_ngff_challenge/resave.py @@ -422,10 +422,11 @@ def convert_image( # read row by row and overwrite ds_chunks = details[idx]["chunks"] ds_shards = details[idx]["shards"] - else: + elif not output_script and math.prod(ds_shards) > 100_000_000: # if we're going to convert, let's validate the guess... - if not output_script and math.prod(ds_shards) > 100_000_000: - raise ValueError(f"no shard guess: shape={ds_shape}, chunks={ds_chunks}") + raise ValueError( + f"no shard guess: shape={ds_shape}, chunks={ds_chunks}" + ) if output_script: chunk_txt = ",".join(map(str, ds_chunks)) From 8788444f955d1ddf173c35428a8bc211683e9a2d Mon Sep 17 00:00:00 2001 From: William Moore Date: Wed, 7 Aug 2024 16:14:33 +0100 Subject: [PATCH 03/16] Don't require output-chunks AND output-shards together --- src/ome2024_ngff_challenge/resave.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/ome2024_ngff_challenge/resave.py b/src/ome2024_ngff_challenge/resave.py index 49ef600..30ac474 100755 --- a/src/ome2024_ngff_challenge/resave.py +++ b/src/ome2024_ngff_challenge/resave.py @@ -415,18 +415,21 @@ def convert_image( with output_config.path.open(mode="w") as o: json.dump(details, o) else: - if output_chunks and output_shards: - ds_chunks = output_chunks - ds_shards = output_shards - elif output_read_details: + if output_read_details: # read row by row and overwrite ds_chunks = details[idx]["chunks"] ds_shards = details[idx]["shards"] - elif not output_script and math.prod(ds_shards) > 100_000_000: - # if we're going to convert, let's validate the guess... - raise ValueError( - f"no shard guess: shape={ds_shape}, chunks={ds_chunks}" - ) + else: + if output_chunks: + ds_chunks = output_chunks + if output_shards: + ds_shards = output_shards + elif not output_script and math.prod(ds_shards) > 100_000_000: + # if we're going to convert, and we guessed the shards, + # let's validate the guess... + raise ValueError( + f"no shard guess: shape={ds_shape}, chunks={ds_chunks}" + ) if output_script: chunk_txt = ",".join(map(str, ds_chunks)) From b4cc2024d52d1517e4199cc88f1e5c91808cea6d Mon Sep 17 00:00:00 2001 From: William Moore Date: Thu, 8 Aug 2024 10:14:59 +0100 Subject: [PATCH 04/16] Fix lint error --- src/ome2024_ngff_challenge/resave.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ome2024_ngff_challenge/resave.py b/src/ome2024_ngff_challenge/resave.py index 30ac474..b1b63ae 100755 --- a/src/ome2024_ngff_challenge/resave.py +++ b/src/ome2024_ngff_challenge/resave.py @@ -37,6 +37,7 @@ def guess_shards(shape: list, chunks: list): ./resave.py input.zarr output.json --output-write-details """ # TODO: hard-coded to return the full size + assert chunks is not None # fixes unused parameter return shape From 2823ec382d6eae1b0ef6efa1cbae0d65ddd40506 Mon Sep 17 00:00:00 2001 From: William Moore Date: Thu, 8 Aug 2024 11:11:11 +0100 Subject: [PATCH 05/16] Add --log option and add ts configs logging --- src/ome2024_ngff_challenge/resave.py | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/src/ome2024_ngff_challenge/resave.py b/src/ome2024_ngff_challenge/resave.py index b1b63ae..d8d3e7c 100755 --- a/src/ome2024_ngff_challenge/resave.py +++ b/src/ome2024_ngff_challenge/resave.py @@ -28,6 +28,14 @@ # Helpers # +class SafeEncoder(json.JSONEncoder): + # Handle any TypeErrors so we are safe to use this for logging + # E.g. dtype obj is not JSON serializable + def default(self, obj): + try: + return super().default(obj) + except TypeError: + return str(obj) def guess_shards(shape: list, chunks: list): """ @@ -52,7 +60,7 @@ def csv_int(vstr, sep=",") -> list: values.append(v) except ValueError as ve: raise argparse.ArgumentError( - message="Invalid value %s, values must be a number" % v0 + message=f'Invalid value {v0}, values must be a number' ) from ve return values @@ -236,7 +244,7 @@ def check_or_delete_path(self): else: shutil.rmtree(self.path) else: - raise Exception(f"{self.path} exists. Exiting") + raise Exception(f"{self.path} exists. Use --output-overwrite to overwrite") def open_group(self): # Needs zarr_format=2 or we get ValueError("store mode does not support writing") @@ -339,6 +347,13 @@ def convert_array( write_config["create"] = True write_config["delete_existing"] = output_config.overwrite + LOGGER.debug(f"""input_config: +{json.dumps(input_config.ts_config, indent=4)} + """) + LOGGER.debug(f"""write_config: +{json.dumps(write_config, indent=4, cls=SafeEncoder)} + """) + verify_config = base_config.copy() write = ts.open(write_config).result() @@ -682,6 +697,8 @@ def cli(args=sys.argv[1:]): parser.add_argument("--rocrate-organism", type=str) parser.add_argument("--rocrate-modality", type=str) parser.add_argument("--rocrate-skip", action="store_true") + parser.add_argument("--log", default="warn", + help="warn, 'info' or 'debug'") group_ex = parser.add_mutually_exclusive_group() group_ex.add_argument( "--output-write-details", @@ -705,7 +722,11 @@ def cli(args=sys.argv[1:]): parser.add_argument("output_path", type=Path) ns = parser.parse_args(args) - logging.basicConfig() + # configure logging + numeric_level = getattr(logging, ns.log.upper(), None) + if not isinstance(numeric_level, int): + raise ValueError(f"Invalid log level: {ns.log}. Use 'info' or 'debug'") + logging.basicConfig(level=numeric_level) rocrate = None if not ns.rocrate_skip: From 9d0b99b1993ea7021c57f307ce4d5400478543fe Mon Sep 17 00:00:00 2001 From: William Moore Date: Thu, 8 Aug 2024 11:31:02 +0100 Subject: [PATCH 06/16] ruff formatting --- src/ome2024_ngff_challenge/resave.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/ome2024_ngff_challenge/resave.py b/src/ome2024_ngff_challenge/resave.py index d8d3e7c..23fb376 100755 --- a/src/ome2024_ngff_challenge/resave.py +++ b/src/ome2024_ngff_challenge/resave.py @@ -28,6 +28,7 @@ # Helpers # + class SafeEncoder(json.JSONEncoder): # Handle any TypeErrors so we are safe to use this for logging # E.g. dtype obj is not JSON serializable @@ -37,6 +38,7 @@ def default(self, obj): except TypeError: return str(obj) + def guess_shards(shape: list, chunks: list): """ Method to calculate best shard sizes. These values can be written to @@ -60,7 +62,7 @@ def csv_int(vstr, sep=",") -> list: values.append(v) except ValueError as ve: raise argparse.ArgumentError( - message=f'Invalid value {v0}, values must be a number' + message=f"Invalid value {v0}, values must be a number" ) from ve return values @@ -244,7 +246,9 @@ def check_or_delete_path(self): else: shutil.rmtree(self.path) else: - raise Exception(f"{self.path} exists. Use --output-overwrite to overwrite") + raise Exception( + f"{self.path} exists. Use --output-overwrite to overwrite" + ) def open_group(self): # Needs zarr_format=2 or we get ValueError("store mode does not support writing") @@ -697,8 +701,7 @@ def cli(args=sys.argv[1:]): parser.add_argument("--rocrate-organism", type=str) parser.add_argument("--rocrate-modality", type=str) parser.add_argument("--rocrate-skip", action="store_true") - parser.add_argument("--log", default="warn", - help="warn, 'info' or 'debug'") + parser.add_argument("--log", default="warn", help="warn, 'info' or 'debug'") group_ex = parser.add_mutually_exclusive_group() group_ex.add_argument( "--output-write-details", From f17a6de9638acb1d9493a34576aa7916e0737393 Mon Sep 17 00:00:00 2001 From: William Moore Date: Thu, 8 Aug 2024 11:43:25 +0100 Subject: [PATCH 07/16] Fix pylint warning --- src/ome2024_ngff_challenge/resave.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ome2024_ngff_challenge/resave.py b/src/ome2024_ngff_challenge/resave.py index 23fb376..8cad1a8 100755 --- a/src/ome2024_ngff_challenge/resave.py +++ b/src/ome2024_ngff_challenge/resave.py @@ -32,11 +32,11 @@ class SafeEncoder(json.JSONEncoder): # Handle any TypeErrors so we are safe to use this for logging # E.g. dtype obj is not JSON serializable - def default(self, obj): + def default(self, o): try: - return super().default(obj) + return super().default(o) except TypeError: - return str(obj) + return str(o) def guess_shards(shape: list, chunks: list): From 717fb5d2986c8946bf3d6a2b81ae027de58dfdce Mon Sep 17 00:00:00 2001 From: William Moore Date: Mon, 12 Aug 2024 16:59:05 +0100 Subject: [PATCH 08/16] Use dask map_blocks() to read and write a chunk at a time --- pyproject.toml | 1 + src/ome2024_ngff_challenge/resave.py | 24 ++++++++++++++++++++++-- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 48c18ca..9e088fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ classifiers = [ ] [tool.poetry.dependencies] +dask ">=2024.5.2" python = "^3.10" rocrate = "^0.10" s3fs = ">=2024.6.1" diff --git a/src/ome2024_ngff_challenge/resave.py b/src/ome2024_ngff_challenge/resave.py index 4ef116f..5880496 100755 --- a/src/ome2024_ngff_challenge/resave.py +++ b/src/ome2024_ngff_challenge/resave.py @@ -11,6 +11,7 @@ import time from pathlib import Path +import dask.array as da import numpy as np import tensorstore as ts import tqdm @@ -345,8 +346,27 @@ def convert_array( write = ts.open(write_config).result() before = TSMetrics(input_config.ts_config, write_config) - future = write.write(read) - future.result() + + # We want to read & write a chunk at a time with TensorStore + # but don't have a map_blocks() function for TensorStore? + # So we use Dask Array's map_blocks() purely to iterate through chunks, + # using the array-location of each chunk to read/write in TensorStore + my_zeros = da.zeros(read.shape, chunks=chunks) + def domap(data, block_info=None): + # handle fake calling of function to establish dtype + if block_info is None: + return data + # array-location e.g. [(1, 2), (512, 1024), (0, 512)] + array_location = block_info[0]["array-location"] + LOGGER.debug(f"array_location: ${array_location}") + slice_tuple = tuple([slice(dim[0], dim[1]) for dim in array_location]) + future = write[slice_tuple].write(read[slice_tuple]) + future.result() + return data + + # trigger domap on each chunk + my_zeros.map_blocks(domap).compute() + after = TSMetrics(input_config.ts_config, write_config, before) LOGGER.info(f"""Re-encode (tensorstore) {input_config} to {output_config} From 89905d6a626d8b4c106961e028f35ff0ce166fb6 Mon Sep 17 00:00:00 2001 From: William Moore Date: Tue, 13 Aug 2024 15:42:39 +0100 Subject: [PATCH 09/16] Remove usage of dask for map_blocks --- pyproject.toml | 1 - src/ome2024_ngff_challenge/resave.py | 35 ++++++++++++++-------------- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 9e088fa..48c18ca 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,6 @@ classifiers = [ ] [tool.poetry.dependencies] -dask ">=2024.5.2" python = "^3.10" rocrate = "^0.10" s3fs = ">=2024.6.1" diff --git a/src/ome2024_ngff_challenge/resave.py b/src/ome2024_ngff_challenge/resave.py index 2ec2b41..5092cb1 100755 --- a/src/ome2024_ngff_challenge/resave.py +++ b/src/ome2024_ngff_challenge/resave.py @@ -9,9 +9,9 @@ import shutil import sys import time +from itertools import product from pathlib import Path -import dask.array as da import numpy as np import tensorstore as ts import tqdm @@ -52,6 +52,20 @@ def guess_shards(shape: list, chunks: list): return shape +def chunk_iter(shape: list, chunks: list): + """ + Returns a series of tuples, each containing chunck slice + E.g. for 2D shape/chunks: ((slice(0, 512, 1), slice(0, 512, 1)), (slice(0, 512, 1), slice(512, 1024, 1))...) + Thanks to Davis Bennett. + """ + assert(len(shape) == len(chunks)) + chunk_iters = [] + for chunk_size, dim_size in zip(chunks, shape): + chunk_tuple = tuple(slice(c_index * chunk_size, min(dim_size, c_index * chunk_size + chunk_size), 1) for c_index in range(-(-dim_size // chunk_size))) + chunk_iters.append(chunk_tuple) + return tuple(product(*chunk_iters)) + + def csv_int(vstr, sep=",") -> list: """Convert a string of comma separated values to integers @returns iterable of floats @@ -365,25 +379,10 @@ def convert_array( before = TSMetrics(input_config.ts_config, write_config) - # We want to read & write a chunk at a time with TensorStore - # but don't have a map_blocks() function for TensorStore? - # So we use Dask Array's map_blocks() purely to iterate through chunks, - # using the array-location of each chunk to read/write in TensorStore - my_zeros = da.zeros(read.shape, chunks=chunks) - def domap(data, block_info=None): - # handle fake calling of function to establish dtype - if block_info is None: - return data - # array-location e.g. [(1, 2), (512, 1024), (0, 512)] - array_location = block_info[0]["array-location"] - LOGGER.debug(f"array_location: ${array_location}") - slice_tuple = tuple([slice(dim[0], dim[1]) for dim in array_location]) + for slice_tuple in chunk_iter(read.shape, chunks): + LOGGER.debug(f"array_location: {slice_tuple}") future = write[slice_tuple].write(read[slice_tuple]) future.result() - return data - - # trigger domap on each chunk - my_zeros.map_blocks(domap).compute() after = TSMetrics(input_config.ts_config, write_config, before) From 4b6a0f36b7bf167a293ed0d32a2750dd45e56ad5 Mon Sep 17 00:00:00 2001 From: William Moore Date: Tue, 13 Aug 2024 15:48:50 +0100 Subject: [PATCH 10/16] Write a shard at a time instead of chunk --- src/ome2024_ngff_challenge/resave.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/ome2024_ngff_challenge/resave.py b/src/ome2024_ngff_challenge/resave.py index 5092cb1..6a76580 100755 --- a/src/ome2024_ngff_challenge/resave.py +++ b/src/ome2024_ngff_challenge/resave.py @@ -379,7 +379,9 @@ def convert_array( before = TSMetrics(input_config.ts_config, write_config) - for slice_tuple in chunk_iter(read.shape, chunks): + # read & write a chunk (or shard) at a time: + blocks = shards if shards is not None else chunks + for slice_tuple in chunk_iter(read.shape, blocks): LOGGER.debug(f"array_location: {slice_tuple}") future = write[slice_tuple].write(read[slice_tuple]) future.result() From 6e353d7ff573daba764763591ef22ed8ee8b5642 Mon Sep 17 00:00:00 2001 From: Josh Moore Date: Mon, 19 Aug 2024 16:27:31 +0200 Subject: [PATCH 11/16] Fix build --- src/ome2024_ngff_challenge/resave.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/ome2024_ngff_challenge/resave.py b/src/ome2024_ngff_challenge/resave.py index 6a76580..3debaf4 100755 --- a/src/ome2024_ngff_challenge/resave.py +++ b/src/ome2024_ngff_challenge/resave.py @@ -54,14 +54,21 @@ def guess_shards(shape: list, chunks: list): def chunk_iter(shape: list, chunks: list): """ - Returns a series of tuples, each containing chunck slice + Returns a series of tuples, each containing chunk slice E.g. for 2D shape/chunks: ((slice(0, 512, 1), slice(0, 512, 1)), (slice(0, 512, 1), slice(512, 1024, 1))...) Thanks to Davis Bennett. """ - assert(len(shape) == len(chunks)) + assert len(shape) == len(chunks) chunk_iters = [] for chunk_size, dim_size in zip(chunks, shape): - chunk_tuple = tuple(slice(c_index * chunk_size, min(dim_size, c_index * chunk_size + chunk_size), 1) for c_index in range(-(-dim_size // chunk_size))) + chunk_tuple = tuple( + slice( + c_index * chunk_size, + min(dim_size, c_index * chunk_size + chunk_size), + 1, + ) + for c_index in range(-(-dim_size // chunk_size)) + ) chunk_iters.append(chunk_tuple) return tuple(product(*chunk_iters)) From c23c196dc0ca49be666c4d8fd78cf9cf53a79008 Mon Sep 17 00:00:00 2001 From: Josh Moore Date: Mon, 19 Aug 2024 20:36:22 +0200 Subject: [PATCH 12/16] Add --output-threads flag for simultaneous writes --- src/ome2024_ngff_challenge/resave.py | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/src/ome2024_ngff_challenge/resave.py b/src/ome2024_ngff_challenge/resave.py index 3debaf4..2a93d1f 100755 --- a/src/ome2024_ngff_challenge/resave.py +++ b/src/ome2024_ngff_challenge/resave.py @@ -9,7 +9,7 @@ import shutil import sys import time -from itertools import product +from itertools import batched, product from pathlib import Path import numpy as np @@ -324,6 +324,7 @@ def convert_array( dimension_names: list, chunks: list, shards: list, + threads: int, ): read = input_config.ts_read() @@ -388,10 +389,15 @@ def convert_array( # read & write a chunk (or shard) at a time: blocks = shards if shards is not None else chunks - for slice_tuple in chunk_iter(read.shape, blocks): - LOGGER.debug(f"array_location: {slice_tuple}") - future = write[slice_tuple].write(read[slice_tuple]) - future.result() + for idx, batch in enumerate(batched(chunk_iter(read.shape, blocks), threads)): + futures = [] + for slice_tuple in batch: + future = write[slice_tuple].write(read[slice_tuple]) + LOGGER.info(f"batch {idx}: {slice_tuple} scheduled -- {future}") + futures.append((slice_tuple, future)) + for slice_tuple, future in futures: + future.result() + LOGGER.info(f"batch {idx}: {slice_tuple} completed -- {future}") after = TSMetrics(input_config.ts_config, write_config, before) @@ -420,6 +426,7 @@ def convert_image( output_read_details: str | None, output_write_details: bool, output_script: bool, + threads: int, ): dimension_names = None # top-level version... @@ -494,6 +501,7 @@ def convert_image( dimension_names, ds_chunks, ds_shards, + threads, ) @@ -603,6 +611,7 @@ def main(ns: argparse.Namespace, rocrate: ROCrateWriter | None = None) -> int: ns.output_read_details, ns.output_write_details, ns.output_script, + ns.output_threads, ) converted += 1 @@ -656,6 +665,7 @@ def main(ns: argparse.Namespace, rocrate: ROCrateWriter | None = None) -> int: ns.output_read_details, ns.output_write_details, ns.output_script, + ns.output_threads, ) converted += 1 # Note: plates can *also* contain this metadata @@ -698,6 +708,7 @@ def main(ns: argparse.Namespace, rocrate: ROCrateWriter | None = None) -> int: ns.output_read_details, ns.output_write_details, ns.output_script, + ns.output_threads, ) converted += 1 else: @@ -723,6 +734,12 @@ def cli(args=sys.argv[1:]): parser.add_argument("--output-region", default="us-east-1") parser.add_argument("--output-overwrite", action="store_true") parser.add_argument("--output-script", action="store_true") + parser.add_argument( + "--output-threads", + type=int, + default=16, + help="number of simultaneous write threads", + ) parser.add_argument("--rocrate-name", type=str) parser.add_argument("--rocrate-description", type=str) parser.add_argument("--rocrate-license", type=str) From 704b1b3d35cf72899ba49c58426a4fb18b5a97e8 Mon Sep 17 00:00:00 2001 From: Josh Moore Date: Mon, 19 Aug 2024 21:09:47 +0200 Subject: [PATCH 13/16] Copy batched implementation for python<3.12 --- src/ome2024_ngff_challenge/resave.py | 29 +++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/src/ome2024_ngff_challenge/resave.py b/src/ome2024_ngff_challenge/resave.py index 2a93d1f..f94018d 100755 --- a/src/ome2024_ngff_challenge/resave.py +++ b/src/ome2024_ngff_challenge/resave.py @@ -2,6 +2,7 @@ from __future__ import annotations import argparse +import itertools import json import logging import math @@ -9,7 +10,6 @@ import shutil import sys import time -from itertools import batched, product from pathlib import Path import numpy as np @@ -30,6 +30,29 @@ # +class Batched: + """ + implementation of itertools.batched for pre-3.12 Python versions + from https://mathspp.com/blog/itertools-batched + """ + + def __init__(self, iterable, n: int): + if n < 1: + msg = f"n must be at least one ({n})" + raise ValueError(msg) + self.iter = iter(iterable) + self.n = n + + def __iter__(self): + return self + + def __next__(self): + batch = tuple(itertools.islice(self.iter, self.n)) + if not batch: + raise StopIteration() + return batch + + class SafeEncoder(json.JSONEncoder): # Handle any TypeErrors so we are safe to use this for logging # E.g. dtype obj is not JSON serializable @@ -70,7 +93,7 @@ def chunk_iter(shape: list, chunks: list): for c_index in range(-(-dim_size // chunk_size)) ) chunk_iters.append(chunk_tuple) - return tuple(product(*chunk_iters)) + return tuple(itertools.product(*chunk_iters)) def csv_int(vstr, sep=",") -> list: @@ -389,7 +412,7 @@ def convert_array( # read & write a chunk (or shard) at a time: blocks = shards if shards is not None else chunks - for idx, batch in enumerate(batched(chunk_iter(read.shape, blocks), threads)): + for idx, batch in enumerate(Batched(chunk_iter(read.shape, blocks), threads)): futures = [] for slice_tuple in batch: future = write[slice_tuple].write(read[slice_tuple]) From 2c4dd36aff5452bab5a920b3c6928d45d0a7da42 Mon Sep 17 00:00:00 2001 From: Josh Moore Date: Mon, 19 Aug 2024 21:19:10 +0200 Subject: [PATCH 14/16] Implement txn suggestion from @JoOkuma --- src/ome2024_ngff_challenge/resave.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/ome2024_ngff_challenge/resave.py b/src/ome2024_ngff_challenge/resave.py index f94018d..4bfb7fb 100755 --- a/src/ome2024_ngff_challenge/resave.py +++ b/src/ome2024_ngff_challenge/resave.py @@ -413,14 +413,10 @@ def convert_array( # read & write a chunk (or shard) at a time: blocks = shards if shards is not None else chunks for idx, batch in enumerate(Batched(chunk_iter(read.shape, blocks), threads)): - futures = [] - for slice_tuple in batch: - future = write[slice_tuple].write(read[slice_tuple]) - LOGGER.info(f"batch {idx}: {slice_tuple} scheduled -- {future}") - futures.append((slice_tuple, future)) - for slice_tuple, future in futures: - future.result() - LOGGER.info(f"batch {idx}: {slice_tuple} completed -- {future}") + with ts.Transaction() as txn: + for slice_tuple in batch: + write.with_transaction(txn)[slice_tuple] = read[slice_tuple] + LOGGER.info(f"batch {idx}: {slice_tuple} scheduled in transaction") after = TSMetrics(input_config.ts_config, write_config, before) From 782216efa8f6e29eac542b93ea27a12c25b195af Mon Sep 17 00:00:00 2001 From: Josh Moore Date: Tue, 20 Aug 2024 09:36:12 +0200 Subject: [PATCH 15/16] Introduce trace logging as well --- src/ome2024_ngff_challenge/resave.py | 29 +++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/src/ome2024_ngff_challenge/resave.py b/src/ome2024_ngff_challenge/resave.py index 4bfb7fb..e2f347f 100755 --- a/src/ome2024_ngff_challenge/resave.py +++ b/src/ome2024_ngff_challenge/resave.py @@ -397,12 +397,18 @@ def convert_array( write_config["create"] = True write_config["delete_existing"] = output_config.overwrite - LOGGER.debug(f"""input_config: + LOGGER.log( + 5, + f"""input_config: {json.dumps(input_config.ts_config, indent=4)} - """) - LOGGER.debug(f"""write_config: + """, + ) + LOGGER.log( + 5, + f"""write_config: {json.dumps(write_config, indent=4, cls=SafeEncoder)} - """) + """, + ) verify_config = base_config.copy() @@ -765,7 +771,9 @@ def cli(args=sys.argv[1:]): parser.add_argument("--rocrate-organism", type=str) parser.add_argument("--rocrate-modality", type=str) parser.add_argument("--rocrate-skip", action="store_true") - parser.add_argument("--log", default="warn", help="warn, 'info' or 'debug'") + parser.add_argument( + "--log", default="warn", help="'error', 'warn', 'info', 'debug' or 'trace'" + ) group_ex = parser.add_mutually_exclusive_group() group_ex.add_argument( "--output-write-details", @@ -790,10 +798,17 @@ def cli(args=sys.argv[1:]): ns = parser.parse_args(args) # configure logging - numeric_level = getattr(logging, ns.log.upper(), None) + if ns.log.upper() == "TRACE": + numeric_level = 5 + else: + numeric_level = getattr(logging, ns.log.upper(), None) if not isinstance(numeric_level, int): raise ValueError(f"Invalid log level: {ns.log}. Use 'info' or 'debug'") - logging.basicConfig(level=numeric_level) + logging.basicConfig( + level=numeric_level, + format="%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) rocrate = None if not ns.rocrate_skip: From 3e175becf534df73b1541f0816eda98ec762a9c0 Mon Sep 17 00:00:00 2001 From: Josh Moore Date: Tue, 20 Aug 2024 09:36:41 +0200 Subject: [PATCH 16/16] Improve transaction logging --- src/ome2024_ngff_challenge/resave.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/ome2024_ngff_challenge/resave.py b/src/ome2024_ngff_challenge/resave.py index e2f347f..e4a8fe1 100755 --- a/src/ome2024_ngff_challenge/resave.py +++ b/src/ome2024_ngff_challenge/resave.py @@ -419,10 +419,21 @@ def convert_array( # read & write a chunk (or shard) at a time: blocks = shards if shards is not None else chunks for idx, batch in enumerate(Batched(chunk_iter(read.shape, blocks), threads)): + start = time.time() with ts.Transaction() as txn: + LOGGER.log(5, f"batch {idx:03d}: scheduling transaction size={len(batch)}") for slice_tuple in batch: write.with_transaction(txn)[slice_tuple] = read[slice_tuple] - LOGGER.info(f"batch {idx}: {slice_tuple} scheduled in transaction") + LOGGER.log( + 5, f"batch {idx:03d}: {slice_tuple} scheduled in transaction" + ) + LOGGER.log(5, f"batch {idx:03d}: waiting on transaction size={len(batch)}") + stop = time.time() + elapsed = stop - start + avg = float(elapsed) / len(batch) + LOGGER.debug( + f"batch {idx:03d}: completed transaction size={len(batch)} in {stop-start:0.2f}s (avg={avg:0.2f})" + ) after = TSMetrics(input_config.ts_config, write_config, before)