Skip to content

Commit

Permalink
Rename parallel function (#36)
Browse files Browse the repository at this point in the history
* Rename parallel function

* Rename parallel function

* Update changelog

* Update changelog
  • Loading branch information
wpreimes authored Aug 29, 2024
1 parent 9e233ba commit dbc26b2
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 22 deletions.
20 changes: 9 additions & 11 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,69 +6,67 @@ Unreleased changes in master branch
===================================
-

Version 0.11

Version 0.12
============
- Updates to repurpose (img2ts) for performant conversion (for non-orthogonal data)
- img2ts logging was improved
- The method for parallelization was updated to allow different backends
- A check was implemented to repeatedly try to append to a (temporarily) unavailable file
- Input grid for img2ts is now used from the input dataset if not specified by the user. This allows e.g. conversion of swath data.

Version 0.11
============
- Use joblib for parallel processing framework, improved logging
- Added option to parallelize Img2Ts process
- Fix bug where a wrong grid attribute was used.

Version 0.10
============

- Ts2Img module was rebuilt. Allows conversion of time series with NN lookup.
- Added example notebook for converting ASCAT time series into regularly gridded images.
- Added a simple parallelization framework, with logging and error handling.
- Added the option to pass custom pre- and post-processing functions to ts2img.

Version 0.9
===========

- Update for new pyscaffold
- Fixed bug where resampling failed when a BasicGrid was passed instead of a CellGrid

Version 0.8
===========

- Update pyscaffold package structure (pyscaffold 3)
- Drop py2 support
- Add pypi deployment to travis.

Version 0.7
===========

- Add resample functions (from pytesmo)

Version 0.6
===========

- Update setup.cfg

Version 0.5
===========

- Update readme
- Update pyscaffold version in setup.py because of compatibility issues with setuptools 39

Version 0.4
===========

- Enable compression by default.

Version 0.3
===========

- Enable image to timeseries conversion if missing images are encountered.

Version 0.2
===========

- First public version
- Rename to repurpose
- Improve test coverage

Version 0.1
===========

- initial version supporting image to time series conversion
- draft for time series to image conversion
6 changes: 3 additions & 3 deletions src/repurpose/img2ts.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from repurpose.process import parallel_process_async, idx_chunks
from repurpose.process import parallel_process, idx_chunks
import pynetcf.time_series as nc
from pygeogrids.grids import CellGrid
import repurpose.resample as resamp
Expand Down Expand Up @@ -686,7 +686,7 @@ def calc(self):
# time information is contained in `celldata`
FUNC = self._write_non_orthogonal

parallel_process_async(
parallel_process(
FUNC=FUNC,
ITER_KWARGS=ITER_KWARGS,
STATIC_KWARGS=STATIC_KWARGS,
Expand Down Expand Up @@ -748,7 +748,7 @@ def img_bulk(self):

ITER_KWARGS = {'date': dates}

results = parallel_process_async(
results = parallel_process(
self._read_image,
ITER_KWARGS=ITER_KWARGS,
STATIC_KWARGS={'target_grid': target_grid,
Expand Down
14 changes: 11 additions & 3 deletions src/repurpose/process.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import time
import os
import warnings

# Note: Must be set BEFORE the first numpy import!!
os.environ['MKL_NUM_THREADS'] = '1'
Expand Down Expand Up @@ -197,7 +198,12 @@ def run_with_error_handling(FUNC,
raise e
return r

def parallel_process_async(
def parallel_process_async(*args, **kwargs):
warnings.warn("The 'parallel_process_async' method was renamed to "
"`parallel_process`.", DeprecationWarning)
return parallel_process(*args, **kwargs)

def parallel_process(
FUNC,
ITER_KWARGS,
STATIC_KWARGS=None,
Expand All @@ -213,7 +219,7 @@ def parallel_process_async(
progress_bar_label="Processed",
backend="threading",
sharedmem=False,
parallel_kwargs=None,
joblib_kwargs=None,
) -> list:
"""
Applies the passed function to all elements of the passed iterables.
Expand Down Expand Up @@ -273,6 +279,8 @@ def parallel_process_async(
sharedmem: bool, optional (default:True)
Activate shared memory option (slow)
WARNING: Option not fully implemented / tested.
joblib_kwargs: dict, optional (default: None)
Additional keyword arguments to pass to joblib.Parallel
Returns
-------
Expand Down Expand Up @@ -375,7 +383,7 @@ def parallel_process_async(
desc=progress_bar_label,
require='sharedmem' if sharedmem else None,
return_as="list",
**parallel_kwargs or dict(),
**joblib_kwargs or dict(),
)(delayed(run_with_error_handling)(
FUNC, ignore_errors,
log_queue=q,
Expand Down
6 changes: 3 additions & 3 deletions src/repurpose/ts2img.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from repurpose.process import parallel_process_async, idx_chunks
from repurpose.process import parallel_process, idx_chunks
import logging
import numpy as np
import pandas as pd
Expand Down Expand Up @@ -245,7 +245,7 @@ def _calc_chunk(self, timestamps, preprocess_func=None, preprocess_kwargs=None,
ITER_KWARGS['lons'].append(lons)
ITER_KWARGS['lats'].append(lats)

stack = parallel_process_async(
stack = parallel_process(
_convert, ITER_KWARGS, STATIC_KWARGS, n_proc=n_proc,
show_progress_bars=True, log_path=log_path, backend=self.backend,
verbose=False, ignore_errors=self.ignore_errors)
Expand Down Expand Up @@ -525,7 +525,7 @@ def store_netcdf_images(self, path_out, fn_template=f"{datetime}.nc",
STATIC_KWARGS = {'out_path': path_out, 'annual_folder': annual_folder,
'encoding': encoding}

_ = parallel_process_async(
_ = parallel_process(
FUNC=_write_img,
ITER_KWARGS=ITER_KWARGS,
STATIC_KWARGS=STATIC_KWARGS,
Expand Down
4 changes: 2 additions & 2 deletions tests/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
import pytest

from repurpose.process import parallel_process_async, idx_chunks
from repurpose.process import parallel_process, idx_chunks

def test_index_chunks():
timestamps = pd.date_range('2020-07-01', '2020-07-09', freq='1D')
Expand Down Expand Up @@ -34,7 +34,7 @@ def test_apply_to_elements(n_proc, backend):
iter_kwargs = {'x': [1, 2, 3, 4]}
static_kwargs = {'p': 2}
with tempfile.TemporaryDirectory() as log_path:
res = parallel_process_async(
res = parallel_process(
func, iter_kwargs, static_kwargs, n_proc=int(n_proc),
show_progress_bars=False, verbose=False, loglevel="DEBUG",
ignore_errors=True, log_path=log_path, backend=backend)
Expand Down

0 comments on commit dbc26b2

Please sign in to comment.