From dbc26b2b1267e51e1c1931aa1a7cd7b9e1a0f070 Mon Sep 17 00:00:00 2001 From: Wolfgang Preimesberger Date: Thu, 29 Aug 2024 15:25:38 +0200 Subject: [PATCH] Rename parallel function (#36) * Rename parallel function * Rename parallel function * Update changelog * Update changelog --- CHANGELOG.rst | 20 +++++++++----------- src/repurpose/img2ts.py | 6 +++--- src/repurpose/process.py | 14 +++++++++++--- src/repurpose/ts2img.py | 6 +++--- tests/test_process.py | 4 ++-- 5 files changed, 28 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 458fd9d..8eaa90b 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -6,16 +6,23 @@ Unreleased changes in master branch =================================== - -Version 0.11 + +Version 0.12 ============ +- Updates to repurpose (img2ts) for performant conversion (for non-orthogonal data) +- img2ts logging was improved +- The method for parallelization was updated to allow different backends +- A check was implemented to repeatedly try to append to a (temporarily) unavailable file +- Input grid for img2ts is now used from the input dataset if not specified by the user. This allows e.g. conversion of swath data. +Version 0.11 +============ - Use joblib for parallel processing framework, improved logging - Added option to parallelize Img2Ts process - Fix bug where a wrong grid attribute was used. Version 0.10 ============ - - Ts2Img module was rebuilt. Allows conversion of time series with NN lookup. - Added example notebook for converting ASCAT time series into regularly gridded images. - Added a simple parallelization framework, with logging and error handling. @@ -23,52 +30,43 @@ Version 0.10 Version 0.9 =========== - - Update for new pyscaffold - Fixed bug where resampling failed when a BasicGrid was passed instead of a CellGrid Version 0.8 =========== - - Update pyscaffold package structure (pyscaffold 3) - Drop py2 support - Add pypi deployment to travis. Version 0.7 =========== - - Add resample functions (from pytesmo) Version 0.6 =========== - - Update setup.cfg Version 0.5 =========== - - Update readme - Update pyscaffold version in setup.py because of compatibility issues with setuptools 39 Version 0.4 =========== - - Enable compression by default. Version 0.3 =========== - - Enable image to timeseries conversion if missing images are encountered. Version 0.2 =========== - - First public version - Rename to repurpose - Improve test coverage Version 0.1 =========== - - initial version supporting image to time series conversion - draft for time series to image conversion diff --git a/src/repurpose/img2ts.py b/src/repurpose/img2ts.py index f3acc16..663cc97 100644 --- a/src/repurpose/img2ts.py +++ b/src/repurpose/img2ts.py @@ -1,4 +1,4 @@ -from repurpose.process import parallel_process_async, idx_chunks +from repurpose.process import parallel_process, idx_chunks import pynetcf.time_series as nc from pygeogrids.grids import CellGrid import repurpose.resample as resamp @@ -686,7 +686,7 @@ def calc(self): # time information is contained in `celldata` FUNC = self._write_non_orthogonal - parallel_process_async( + parallel_process( FUNC=FUNC, ITER_KWARGS=ITER_KWARGS, STATIC_KWARGS=STATIC_KWARGS, @@ -748,7 +748,7 @@ def img_bulk(self): ITER_KWARGS = {'date': dates} - results = parallel_process_async( + results = parallel_process( self._read_image, ITER_KWARGS=ITER_KWARGS, STATIC_KWARGS={'target_grid': target_grid, diff --git a/src/repurpose/process.py b/src/repurpose/process.py index 8164467..448520c 100644 --- a/src/repurpose/process.py +++ b/src/repurpose/process.py @@ -1,5 +1,6 @@ import time import os +import warnings # Note: Must be set BEFORE the first numpy import!! os.environ['MKL_NUM_THREADS'] = '1' @@ -197,7 +198,12 @@ def run_with_error_handling(FUNC, raise e return r -def parallel_process_async( +def parallel_process_async(*args, **kwargs): + warnings.warn("The 'parallel_process_async' method was renamed to " + "`parallel_process`.", DeprecationWarning) + return parallel_process(*args, **kwargs) + +def parallel_process( FUNC, ITER_KWARGS, STATIC_KWARGS=None, @@ -213,7 +219,7 @@ def parallel_process_async( progress_bar_label="Processed", backend="threading", sharedmem=False, - parallel_kwargs=None, + joblib_kwargs=None, ) -> list: """ Applies the passed function to all elements of the passed iterables. @@ -273,6 +279,8 @@ def parallel_process_async( sharedmem: bool, optional (default:True) Activate shared memory option (slow) WARNING: Option not fully implemented / tested. + joblib_kwargs: dict, optional (default: None) + Additional keyword arguments to pass to joblib.Parallel Returns ------- @@ -375,7 +383,7 @@ def parallel_process_async( desc=progress_bar_label, require='sharedmem' if sharedmem else None, return_as="list", - **parallel_kwargs or dict(), + **joblib_kwargs or dict(), )(delayed(run_with_error_handling)( FUNC, ignore_errors, log_queue=q, diff --git a/src/repurpose/ts2img.py b/src/repurpose/ts2img.py index e9fb2b8..d1406d0 100644 --- a/src/repurpose/ts2img.py +++ b/src/repurpose/ts2img.py @@ -1,4 +1,4 @@ -from repurpose.process import parallel_process_async, idx_chunks +from repurpose.process import parallel_process, idx_chunks import logging import numpy as np import pandas as pd @@ -245,7 +245,7 @@ def _calc_chunk(self, timestamps, preprocess_func=None, preprocess_kwargs=None, ITER_KWARGS['lons'].append(lons) ITER_KWARGS['lats'].append(lats) - stack = parallel_process_async( + stack = parallel_process( _convert, ITER_KWARGS, STATIC_KWARGS, n_proc=n_proc, show_progress_bars=True, log_path=log_path, backend=self.backend, verbose=False, ignore_errors=self.ignore_errors) @@ -525,7 +525,7 @@ def store_netcdf_images(self, path_out, fn_template=f"{datetime}.nc", STATIC_KWARGS = {'out_path': path_out, 'annual_folder': annual_folder, 'encoding': encoding} - _ = parallel_process_async( + _ = parallel_process( FUNC=_write_img, ITER_KWARGS=ITER_KWARGS, STATIC_KWARGS=STATIC_KWARGS, diff --git a/tests/test_process.py b/tests/test_process.py index fd0c1ec..e9c5002 100644 --- a/tests/test_process.py +++ b/tests/test_process.py @@ -5,7 +5,7 @@ import logging import pytest -from repurpose.process import parallel_process_async, idx_chunks +from repurpose.process import parallel_process, idx_chunks def test_index_chunks(): timestamps = pd.date_range('2020-07-01', '2020-07-09', freq='1D') @@ -34,7 +34,7 @@ def test_apply_to_elements(n_proc, backend): iter_kwargs = {'x': [1, 2, 3, 4]} static_kwargs = {'p': 2} with tempfile.TemporaryDirectory() as log_path: - res = parallel_process_async( + res = parallel_process( func, iter_kwargs, static_kwargs, n_proc=int(n_proc), show_progress_bars=False, verbose=False, loglevel="DEBUG", ignore_errors=True, log_path=log_path, backend=backend)