Skip to content

Commit

Permalink
update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Karl5766 committed Aug 17, 2024
1 parent b846450 commit fa0f934
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 5 deletions.
80 changes: 80 additions & 0 deletions docs/GettingStarted/boilerplate.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,46 @@ page. We will use local laptop as example.
print((da.zeros((3, 3)) + 1).compute().sum().item()) # will output 9
Dask Logging Setup
******************

Distributed logging are hard (Python's logging module is supported by Dask but I've had some issues to get it
right), but Dask provides a simple strategy for debug logging as described in `this page
<https://docs.dask.org/en/latest/how-to/debug.html>`_. The solution is to use the same logging as usual for
the main threads, and use dask.distributed.print to print debugging messages if inside a worker thread. For
convenience I also echo the stdout and stderr outputs into separate logfiles so they will persist even if you
accidentally close the command window. Below is an example:

.. code-block:: Python
if __name__ == '__main__':
import cvpl_tools.im.fs as imfs
import numpy as np
from dask.distributed import print as dprint
logfile_stdout = open('log_stdout.txt', mode='w')
logfile_stderr = open('log_stderr.txt', mode='w')
sys.stdout = fs.MultiOutputStream(sys.stdout, logfile_stdout)
sys.stderr = fs.MultiOutputStream(sys.stderr, logfile_stderr)
import dask
import dask.config
import dask.array as da
with dask.config.set({'temporary_directory': TMP_PATH}):
client = Client(threads_per_worker=6, n_workers=1)
print((da.zeros((3, 3)) + 1).compute().sum().item()) # will output 9
def map_fn(block, block_info=None):
dprint(f'map_fn is called with input {block}')
return block + 1
arr = da.zeros((3, 3), dtype=np.uint8).map_blocks(map_fn, meta=np.array(tuple(), dtype=np.uint8))
print('result is:', arr.compute())
After running this program, you should see outputs in both the command window and the log_stdout.txt and
log_stderr.txt files under your working directory.

CacheDirectory
**************

Expand All @@ -40,3 +80,43 @@ that do not change across execution of the program. The **CacheDirectory** class
assigns paths for these intermediate results, based on their cache ID (cid) and the parent CacheDirectory
they belongs to.

In cvpl_tool's model of caching, there is a root cache directory that is created or loaded when the program
starts to run, and every cache directory may contain many sub-cache-directory or data directories within
which are intermediate files. To create a cache directory, we can write

.. code-block:: Python
if __name__ == '__main__':
import cvpl_tools.im.fs as imfs
with imfs.CacheDirectory(
f'{TMP_PATH}/CacheDirectory',
remove_when_done=False,
read_if_exists=True) as temp_directory):
# Use case #1. Create a data directory for caching computation results
cache_exists, cache_path = temp_directory.cache(is_dir=False, cid='some_cache_path')
if not cache_exists:
os.makedirs(cache_path.path, exists_ok=True)
# PUT CODE HERE: Now write your data into cache_path.path and load it back later
# Use case #2. Create a sub-directory and pass it to other processes for caching
def multi_step_computation(cache_at: imfs.CacheDirectory):
cache_exists, cache_path = cache_at.cache(is_dir=False, cid='A')
if not cache_exists:
A = computeA()
save(cache_path.path, A)
A = load(cache_path.path)
cache_exists_B, cache_path_B = cache_at.cache(is_dir=False, cid='B')
if not cache_exists_B:
B = computeBFromA()
save(cache_path_B.path, B)
B = load(cache_path_B.path)
return B
sub_temp_directory = temp_directory.cache(is_dir=True, cid='mult_step_cache')
result = multi_step_computation(cache_at=sub_temp_directory)
After running the above code once, caching file will be created. The second time the code is run, the computation steps
will be skipped. This sort of hierarchical caching is convenient for working with complex processes that can be broken
down to smaller and simpler compute steps.
5 changes: 3 additions & 2 deletions docs/GettingStarted/segmentation_pipeline.rst
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ and two optional parameters: cid and viewer.
Running the Pipeline
********************

See `Boilerplate Code <GettingStarted/boilerplate>`_ to understand boilerplate code used below. It's required
to do the following example.

Now we have defined a ExampleSegProcess class, the next step is to write our script that uses the pipeline to
segment an input dataset. Note we need a dask cluster and a temporary directory setup before running the
forward() method.
Expand Down Expand Up @@ -182,7 +185,5 @@ forward() method.
client.close()
See `Boilerplate Code <GettingStarted/boilerplate>`_ for explanation of the boilerplate code.

To learn more, see the API pages for cvpl_tools.im.seg_process, cvpl_tools.im.fs and
cvpl_tools.im.ndblock modules.
6 changes: 3 additions & 3 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ or on cloud.

cvpl_tools.napari.zarr.py <API/napari_zarr>
cvpl_tools.ome_zarr.io.py <API/ome_zarr_io>
cvpl_tools.im.fs <API/imfs>
cvpl_tools.im.ndblock <API/ndblock>
cvpl_tools.im.seg_process <API/seg_process>
cvpl_tools.im.fs.py <API/imfs>
cvpl_tools.im.ndblock.py <API/ndblock>
cvpl_tools.im.seg_process.py <API/seg_process>


13 changes: 13 additions & 0 deletions src/cvpl_tools/im/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,3 +447,16 @@ def __enter__(self):

def __exit__(self, exc_type, exc_val, exc_tb):
self.remove_tmp()


class MultiOutputStream:
def __init__(self, *files):
self.files = files

def write(self, message):
for file in self.files:
file.write(message)

def flush(self):
for file in self.files:
file.flush()

0 comments on commit fa0f934

Please sign in to comment.