From 8e8b0293056121b567d373ea7db0ef390b482375 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Tue, 24 Mar 2020 11:03:26 +0100 Subject: [PATCH] Fix concurrency, tests pass Add optional test-case for webviz-subsurface-data --- .travis.yml | 10 +++ src/fmu/ensemble/common.py | 27 ++++--- src/fmu/ensemble/ensemble.py | 50 ++++-------- src/fmu/ensemble/realization.py | 14 +++- tests/test_webviz_subsurface_testdata.py | 97 ++++++++++++++++++++++++ 5 files changed, 150 insertions(+), 48 deletions(-) create mode 100644 tests/test_webviz_subsurface_testdata.py diff --git a/.travis.yml b/.travis.yml index c1297b4e..26eb4b2f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,6 +9,15 @@ python: - "3.7" - "3.8" +env: + - FMU_CONCURRENCY=True + - FMU_CONCURRENCY=False + +jobs: + exclude: + - python: "2.7" + env: FMU_CONCURRENCY=True + addons: apt: packages: @@ -23,6 +32,7 @@ install: - ls - pip install . - pip install .[tests] + - pip install concurrent || echo # Supposed to fail on Py27 script: - python -c "import fmu.ensemble" diff --git a/src/fmu/ensemble/common.py b/src/fmu/ensemble/common.py index ecac5804..b06f912e 100644 --- a/src/fmu/ensemble/common.py +++ b/src/fmu/ensemble/common.py @@ -3,26 +3,33 @@ import os import sys +import six + def use_concurrent(): """Determine whether we should use concurrency or not This is based on both an environment variable - and presence of concurrent.futures. + and presence of concurrent.futures, and on Python version + (Py2 deliberately not attempted to support) Returns: bool: True if concurrency mode should be used """ + if six.PY2: + # Py2-support not attempted + return False env_name = "FMU_CONCURRENCY" if "concurrent.futures" in sys.modules: if env_name not in os.environ: return True - else: - env_var = os.environ[env_name] - if str(env_var) == "0" or str(env_var).lower() == "false": - return False - else: - return True - else: - # If concurrent.futures is not available, we end here. - return False + env_var = os.environ[env_name] + if ( + str(env_var) == "0" + or str(env_var).lower() == "false" + or str(env_var).lower() == "no" + ): + return False + return True + # If concurrent.futures is not available to import, we end here. + return False diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index 84c36930..3b8676e6 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -18,8 +18,10 @@ from ecl.eclfile import EclKW try: + # We always try to import concurrent.futures, but + # whether it is used depend on common.use_concurrent from concurrent.futures import ProcessPoolExecutor -except (ImportError, ModuleNotFoundError): +except ImportError: pass from .etc import Interaction @@ -333,8 +335,11 @@ def remove_realizations(self, realindices): realindices = [realindices] popped = 0 for index in realindices: - self.realizations.pop(index, None) - popped += 1 + if index in self.realizations.keys(): + self.realizations.pop(index, None) + popped += 1 + else: + logger.warning("Can't remove realization %d, it is not there", index) logger.info("removed %d realization(s)", popped) def to_virtual(self, name=None): @@ -530,18 +535,6 @@ def load_file(self, localpath, fformat, convert_numeric=False, force_reread=Fals } ] ) - # for index, realization in self._realizations.items(): - # try: - # realization.load_file(localpath, fformat, convert_numeric, force_reread) - # except ValueError: - # # This would at least occur for unsupported fileformat, - # # and that we should not skip. - # logger.critical("load_file() failed in realization %d", index) - # raise ValueError - # except IOError: - # # At ensemble level, we allow files to be missing in - # # some realizations - # logger.warning("Could not read %s for realization %d", localpath, index) if self.get_df(localpath).empty: raise ValueError("No ensemble data found for {}".format(localpath)) return self.get_df(localpath) @@ -790,23 +783,6 @@ def load_smry( """ if not stacked: raise NotImplementedError -#<<<<<<< HEAD -# # Future: Multithread this! -# for realidx, realization in self.realizations.items(): -# # We do not store the returned DataFrames here, -# # instead we look them up afterwards using get_df() -# # Downside is that we have to compute the name of the -# # cached object as it is not returned. -# logger.info("Loading smry from realization %s", realidx) -# realization.load_smry( -# time_index=time_index, -# column_keys=column_keys, -# cache_eclsum=cache_eclsum, -# start_date=start_date, -# end_date=end_date, -# include_restart=include_restart, -# ) -#======= self.process_batch( batch=[ { @@ -822,7 +798,6 @@ def load_smry( ] ) -#>>>>>>> Forward batch command from ensembles and ensemblesets if isinstance(time_index, list): time_index = "custom" # Note the dependency that the load_smry() function in @@ -971,8 +946,10 @@ def process_batch(self, batch=None): with ProcessPoolExecutor() as executor: real_indices = self.realizations.keys() futures_reals = [ - executor.submit(real.process_batch, batch) - for real in self._realizations.values() + executor.submit( + real.process_batch, batch, excepts=(OSError, IOError) + ) + for real in self.realizations.values() ] # Reassemble the realization dictionary from # the pickled results of the ProcessPool: @@ -984,7 +961,8 @@ def process_batch(self, batch=None): } else: for realization in self.realizations.values(): - realization.process_batch(batch) + realization.process_batch(batch, excepts=(OSError, IOError)) + return self def apply(self, callback, **kwargs): diff --git a/src/fmu/ensemble/realization.py b/src/fmu/ensemble/realization.py index 16c4f7d7..5e5e273c 100644 --- a/src/fmu/ensemble/realization.py +++ b/src/fmu/ensemble/realization.py @@ -176,7 +176,7 @@ def __init__( logger.info("Initialized %s", abspath) - def process_batch(self, batch): + def process_batch(self, batch, excepts=None): """Process a list of functions to run/apply This is equivalent to calling each function individually @@ -188,6 +188,8 @@ def process_batch(self, batch): batch (list): Each list element is a dictionary with one key, being a function names, value pr key is a dict with keyword arguments to be supplied to each function. + excepts (tuple): Tuple of exceptions that are to be ignored in + each individual realization. Returns: ScratchRealization: This realization object (self), for it to be picked up by ProcessPoolExecutor and pickling. @@ -217,7 +219,15 @@ def process_batch(self, batch): logger.warning("process_batch skips illegal function: %s", fn_name) continue assert isinstance(cmd[fn_name], dict) - getattr(self, fn_name)(**cmd[fn_name]) + if excepts is None: + getattr(self, fn_name)(**cmd[fn_name]) + else: + try: + getattr(self, fn_name)(**cmd[fn_name]) + except excepts as exception: + logger.info( + "Ignoring exception in real %d: %s", self.index, str(exception) + ) return self def runpath(self): diff --git a/tests/test_webviz_subsurface_testdata.py b/tests/test_webviz_subsurface_testdata.py new file mode 100644 index 00000000..8792e5fa --- /dev/null +++ b/tests/test_webviz_subsurface_testdata.py @@ -0,0 +1,97 @@ +"""Testing loading the webviz subsurface testdata.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os +import datetime + +import pytest + +from fmu.ensemble import ScratchEnsemble +from fmu.ensemble.common import use_concurrent + + +def check_testdata(): + """Check if we have webviz subsurface testdata, skip if not""" + testdir = os.path.dirname(os.path.abspath(__file__)) + if not os.path.exists(os.path.join(testdir, "data/webviz-subsurface-testdata")): + print("Skipping loading webviz-subsurface-testdata") + print("Do") + print(" $ cd tests/data") + print( + " $ git clone --depth 1 " + "https://github.com/equinor/webviz-subsurface-testdata" + ) + print("to download and use with pytest") + pytest.skip() + + +def test_webviz_subsurface_testdata(): + """Check that we can load the webviz subsurface testdata""" + + check_testdata() + + if "__file__" in globals(): + # Easen up copying test code into interactive sessions + testdir = os.path.dirname(os.path.abspath(__file__)) + else: + testdir = os.path.abspath(".") + + ensdir = os.path.join(testdir, "data/webviz-subsurface-testdata/reek_fullmatrix/") + ens = ScratchEnsemble("reek_fullmatrix", ensdir + "realization-*/iter-0") + + smry_monthly = ens.load_smry() + assert "REAL" in smry_monthly + assert len(smry_monthly["REAL"].unique()) == 40 + + ens.load_csv("share/results/tables/relperm.csv") + ens.load_csv("share/results/tables/equil.csv") + ens.load_csv("share/results/tables/rft.csv") + ens.load_csv("share/results/tables/pvt.csv") + + ens.load_csv("share/results/volumes/simulator_volume_fipnum.csv") + ens.load_csv("share/results/volumes/geogrid--oil.csv") + ens.load_csv("share/results/volumes/simgrid--oil.csv") + + assert len(ens.keys()) == 11 + + +def test_webviz_subsurface_testdata_batch(): + """Check that we can load the webviz subsurface testdata in batch + + Also display timings, this should hopefully reveal that concurrent operations + are actually faster. + """ + + check_testdata() + + testdir = os.path.dirname(os.path.abspath(__file__)) + ensdir = os.path.join(testdir, "data/webviz-subsurface-testdata/reek_fullmatrix/") + start_time = datetime.datetime.now() + batch_cmds = [ + {"load_smry": {"column_keys": "*", "time_index": "yearly"}}, + {"load_smry": {"column_keys": "*", "time_index": "monthly"}}, + {"load_smry": {"column_keys": "*", "time_index": "last"}}, + {"load_smry": {"column_keys": "*", "time_index": "daily"}}, + {"load_csv": {"localpath": "share/results/tables/relperm.csv"}}, + {"load_csv": {"localpath": "share/results/tables/equil.csv"}}, + {"load_csv": {"localpath": "share/results/tables/rft.csv"}}, + {"load_csv": {"localpath": "share/results/tables/pvt.csv"}}, + { + "load_csv": { + "localpath": "share/results/volumes/simulator_volume_fipnum.csv" + } + }, + {"load_csv": {"localpath": "share/results/volumes/geogrid--oil.csv"}}, + {"load_csv": {"localpath": "share/results/volumes/simgrid--oil.csv"}}, + ] + ens = ScratchEnsemble( + "reek_fullmatrix", ensdir + "realization-*/iter-0", batch=batch_cmds + ) + end_time = datetime.datetime.now() + elapsed = (end_time - start_time).total_seconds() + print("FMU_CONCURRENCY: {}".format(use_concurrent())) + print("Elapsed time for batch ensemble initialization: {}".format(elapsed)) + assert len(ens.keys()) == 3 + len(batch_cmds) # 3 more than length of batch