From daec7ecc2bc3880d0afd0cf37bfa315553fb87d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Fri, 13 Dec 2019 12:46:32 +0100 Subject: [PATCH] Forward batch command from ensembles and ensemblesets Batch processing after init on ensembles Demonstrate that batch works with apply Ensembles load ScratchRealizations concurrently Implement functionality for turning off concurrency wip, trying to multiprocess process_batch in ensemble Fix parent commit. Now process_batch works for ensembles wip . check this diff later Repair get_smryvalues, but perhaps deprecated Move batch testing to designated test file and yaml-application --- src/fmu/ensemble/__init__.py | 1 + src/fmu/ensemble/common.py | 28 ++++++++ src/fmu/ensemble/ensemble.py | 124 ++++++++++++++++++++++---------- src/fmu/ensemble/realization.py | 25 ++++--- tests/test_ensemble.py | 8 ++- tests/test_ensembleset.py | 2 +- 6 files changed, 139 insertions(+), 49 deletions(-) create mode 100644 src/fmu/ensemble/common.py diff --git a/src/fmu/ensemble/__init__.py b/src/fmu/ensemble/__init__.py index 8243cce9..9bfdb34f 100644 --- a/src/fmu/ensemble/__init__.py +++ b/src/fmu/ensemble/__init__.py @@ -6,6 +6,7 @@ del theversion +from .ensemble import use_concurrent # noqa from .ensemble import ScratchEnsemble # noqa from .realization import ScratchRealization # noqa from .ensembleset import EnsembleSet # noqa diff --git a/src/fmu/ensemble/common.py b/src/fmu/ensemble/common.py new file mode 100644 index 00000000..ecac5804 --- /dev/null +++ b/src/fmu/ensemble/common.py @@ -0,0 +1,28 @@ +"""Common functions for fmu.ensemble""" + +import os +import sys + + +def use_concurrent(): + """Determine whether we should use concurrency or not + + This is based on both an environment variable + and presence of concurrent.futures. + + Returns: + bool: True if concurrency mode should be used + """ + env_name = "FMU_CONCURRENCY" + if "concurrent.futures" in sys.modules: + if env_name not in os.environ: + return True + else: + env_var = os.environ[env_name] + if str(env_var) == "0" or str(env_var).lower() == "false": + return False + else: + return True + else: + # If concurrent.futures is not available, we end here. + return False diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index c14010a3..de54d1e8 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -18,12 +18,18 @@ from ecl import EclDataType from ecl.eclfile import EclKW +try: + from concurrent.futures import ProcessPoolExecutor +except (ImportError, ModuleNotFoundError): + pass + from .etc import Interaction from .realization import ScratchRealization from .virtualrealization import VirtualRealization from .virtualensemble import VirtualEnsemble from .ensemblecombination import EnsembleCombination from .realization import parse_number +from .common import use_concurrent xfmu = Interaction() logger = xfmu.functionlogger(__name__) @@ -239,13 +245,29 @@ def add_realizations( globbedpaths = glob.glob(paths) count = 0 - for realdir in globbedpaths: - realization = ScratchRealization( - realdir, - realidxregexp=realidxregexp, - autodiscovery=autodiscovery, - batch=batch, - ) + if use_concurrent(): + with ProcessPoolExecutor() as executor: + loaded_reals = [ + executor.submit( + ScratchRealization, + realdir, + realidxregexp=realidxregexp, + autodiscovery=autodiscovery, + batch=batch, + ).result() + for realdir in globbedpaths + ] + else: + loaded_reals = [ + ScratchRealization( + realdir, + realidxregexp=realidxregexp, + autodiscovery=autodiscovery, + batch=batch, + ) + for realdir in globbedpaths + ] + for realdir, realization in zip(globbedpaths, loaded_reals): if realization.index is None: logger.critical( "Could not determine realization index for path %s", realdir @@ -520,18 +542,30 @@ def load_file(self, localpath, fformat, convert_numeric=False, force_reread=Fals pd.Dataframe: with loaded data aggregated. Column 'REAL' distuinguishes each realizations data. """ - for index, realization in self._realizations.items(): - try: - realization.load_file(localpath, fformat, convert_numeric, force_reread) - except ValueError: - # This would at least occur for unsupported fileformat, - # and that we should not skip. - logger.critical("load_file() failed in realization %d", index) - raise ValueError - except IOError: - # At ensemble level, we allow files to be missing in - # some realizations - logger.warning("Could not read %s for realization %d", localpath, index) + self.process_batch( + batch=[ + { + "load_file": { + "localpath": localpath, + "fformat": fformat, + "convert_numeric": convert_numeric, + "force_reread": force_reread, + } + } + ] + ) + #for index, realization in self._realizations.items(): + # try: + # realization.load_file(localpath, fformat, convert_numeric, force_reread) + # except ValueError: + # # This would at least occur for unsupported fileformat, + # # and that we should not skip. + # logger.critical("load_file() failed in realization %d", index) + # raise ValueError + # except IOError: + # # At ensemble level, we allow files to be missing in + # # some realizations + # logger.warning("Could not read %s for realization %d", localpath, index) if self.get_df(localpath).empty: raise ValueError("No ensemble data found for {}".format(localpath)) return self.get_df(localpath) @@ -727,23 +761,25 @@ def load_smry( """ if not stacked: raise NotImplementedError - # Future: Multithread this! - for realidx, realization in self._realizations.items(): - # We do not store the returned DataFrames here, - # instead we look them up afterwards using get_df() - # Downside is that we have to compute the name of the - # cached object as it is not returned. - logger.info("Loading smry from realization %s", realidx) - realization.load_smry( - time_index=time_index, - column_keys=column_keys, - cache_eclsum=cache_eclsum, - start_date=start_date, - end_date=end_date, - include_restart=include_restart, - ) + self.process_batch( + batch=[ + { + "load_smry": { + "column_keys": column_keys, + "time_index": time_index, + "cache_eclsum": cache_eclsum, + "start_date": start_date, + "end_date": end_date, + "include_restart": include_restart, + } + } + ] + ) + if isinstance(time_index, list): time_index = "custom" + # Note the dependency that the load_smry() function in + # ScratchRealization will store to this key-name: return self.get_df("share/results/tables/unsmry--" + time_index + ".csv") def get_volumetric_rates(self, column_keys=None, time_index=None): @@ -884,8 +920,24 @@ def process_batch(self, batch=None): ScratchEnsemble: This ensemble object (self), for it to be picked up by ProcessPoolExecutor and pickling. """ - for realization in self._realizations.values(): - realization.process_batch(batch) + if use_concurrent(): + with ProcessPoolExecutor() as executor: + real_indices = self._realizations.keys() + futures_reals = [ + executor.submit(real.process_batch, batch) + for real in self._realizations.values() + ] + # Reassemble the realization dictionary from + # the pickled results of the ProcessPool: + self._realizations = { + r_idx: real + for (r_idx, real) in zip( + real_indices, [x.result() for x in futures_reals] + ) + } + else: + for realization in self._realizations.values(): + realization.process_batch(batch) return self def apply(self, callback, **kwargs): diff --git a/src/fmu/ensemble/realization.py b/src/fmu/ensemble/realization.py index b7cffed7..9aa566fb 100644 --- a/src/fmu/ensemble/realization.py +++ b/src/fmu/ensemble/realization.py @@ -33,6 +33,7 @@ from .etc import Interaction from .virtualrealization import VirtualRealization from .realizationcombination import RealizationCombination +from .common import use_concurrent HAVE_ECL2DF = False try: @@ -109,7 +110,7 @@ def __init__( columns=["FULLPATH", "FILETYPE", "LOCALPATH", "BASENAME"] ) self._eclsum = None # Placeholder for caching - self._eclsum_include_restart = None # Flag for cached object + self._eclsum_include_restart = None # The datastore for internalized data. Dictionary # indexed by filenames (local to the realization). @@ -898,6 +899,10 @@ def get_eclsum(self, cache=True, include_restart=True): EclSum: object representing the summary file. None if nothing was found. """ + if use_concurrent(): + # In concurrent mode, caching is not used as + # we do not pickle the loaded EclSum objects + cache = False if cache and self._eclsum: # Return cached object if available if self._eclsum_include_restart == include_restart: return self._eclsum @@ -1113,7 +1118,7 @@ def _glob_smry_keys(self, column_keys): keys = set() for key in column_keys: if isinstance(key, str): - keys = keys.union(set(self._eclsum.keys(key))) + keys = keys.union(set(self.get_eclsum().keys(key))) return list(keys) def get_volumetric_rates(self, column_keys=None, time_index=None, time_unit=None): @@ -1272,6 +1277,8 @@ def get_smryvalues(self, props_wildcard=None): """ Fetch selected vectors from Eclipse Summary data. + NOTE: This function might face depreciation. + Args: props_wildcard : string or list of strings with vector wildcards @@ -1279,24 +1286,22 @@ def get_smryvalues(self, props_wildcard=None): a dataframe with values. Raw times from UNSMRY. Empty dataframe if no summary file data available """ - if not self._eclsum: # check if it is cached - self.get_eclsum() - - if not self._eclsum: + if not self.get_eclsum(): + # Return empty, but do not store the empty dataframe in self.data return pd.DataFrame() props = self._glob_smry_keys(props_wildcard) - if "numpy_vector" in dir(self._eclsum): + if "numpy_vector" in dir(self.get_eclsum()): data = { - prop: self._eclsum.numpy_vector(prop, report_only=False) + prop: self.get_eclsum().numpy_vector(prop, report_only=False) for prop in props } else: # get_values() is deprecated in newer libecl data = { - prop: self._eclsum.get_values(prop, report_only=False) for prop in props + prop: self.get_eclsum().get_values(prop, report_only=False) for prop in props } - dates = self._eclsum.get_dates(report_only=False) + dates = self.get_eclsum().get_dates(report_only=False) return pd.DataFrame(data=data, index=dates) def get_smry_dates( diff --git a/tests/test_ensemble.py b/tests/test_ensemble.py index 682787f4..f6534a0e 100644 --- a/tests/test_ensemble.py +++ b/tests/test_ensemble.py @@ -16,6 +16,7 @@ from fmu.ensemble import etc from fmu.ensemble import ScratchEnsemble, ScratchRealization +from fmu.ensemble.common import use_concurrent try: SKIP_FMU_TOOLS = False @@ -24,7 +25,7 @@ SKIP_FMU_TOOLS = True fmux = etc.Interaction() -logger = fmux.basiclogger(__name__, level="WARNING") +logger = fmux.basiclogger(__name__, level="INFO") if not fmux.testsetup(): raise SystemExit() @@ -753,7 +754,10 @@ def test_nonexisting(): def test_eclsumcaching(): - """Test caching of eclsum""" + """Test caching of eclsum, but only if we don't use concurrency""" + + if use_concurrent(): + pytest.skip("Not testing caching when we use concurrency") if "__file__" in globals(): # Easen up copying test code into interactive sessions diff --git a/tests/test_ensembleset.py b/tests/test_ensembleset.py index 18e401ee..51d3734b 100644 --- a/tests/test_ensembleset.py +++ b/tests/test_ensembleset.py @@ -242,7 +242,7 @@ def rms_vol2df(kwargs): {"load_scalar": {"localpath": "npv.txt"}}, {"load_smry": {"column_keys": "FOPT", "time_index": "yearly"}}, {"load_smry": {"column_keys": "*", "time_index": "daily"}}, - ] + ], ) assert len(ensset5.get_df("npv.txt")) == 10 assert len(ensset5.get_df("unsmry--yearly")) == 50