Skip to content

Commit

Permalink
Forward batch command from ensembles and ensemblesets
Browse files Browse the repository at this point in the history
Batch processing after init on ensembles

Demonstrate that batch works with apply

Ensembles load ScratchRealizations concurrently

Implement functionality for turning off concurrency

wip, trying to multiprocess process_batch in ensemble

Fix parent commit. Now process_batch works for ensembles

wip . check this diff later

Repair get_smryvalues, but perhaps deprecated

Move batch testing to designated test file and yaml-application
  • Loading branch information
berland committed Mar 13, 2020
1 parent d0b08e1 commit daec7ec
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 49 deletions.
1 change: 1 addition & 0 deletions src/fmu/ensemble/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

del theversion

from .ensemble import use_concurrent # noqa
from .ensemble import ScratchEnsemble # noqa
from .realization import ScratchRealization # noqa
from .ensembleset import EnsembleSet # noqa
Expand Down
28 changes: 28 additions & 0 deletions src/fmu/ensemble/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""Common functions for fmu.ensemble"""

import os
import sys


def use_concurrent():
"""Determine whether we should use concurrency or not
This is based on both an environment variable
and presence of concurrent.futures.
Returns:
bool: True if concurrency mode should be used
"""
env_name = "FMU_CONCURRENCY"
if "concurrent.futures" in sys.modules:
if env_name not in os.environ:
return True
else:
env_var = os.environ[env_name]
if str(env_var) == "0" or str(env_var).lower() == "false":
return False
else:
return True
else:
# If concurrent.futures is not available, we end here.
return False
124 changes: 88 additions & 36 deletions src/fmu/ensemble/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@
from ecl import EclDataType
from ecl.eclfile import EclKW

try:
from concurrent.futures import ProcessPoolExecutor
except (ImportError, ModuleNotFoundError):
pass

from .etc import Interaction
from .realization import ScratchRealization
from .virtualrealization import VirtualRealization
from .virtualensemble import VirtualEnsemble
from .ensemblecombination import EnsembleCombination
from .realization import parse_number
from .common import use_concurrent

xfmu = Interaction()
logger = xfmu.functionlogger(__name__)
Expand Down Expand Up @@ -239,13 +245,29 @@ def add_realizations(
globbedpaths = glob.glob(paths)

count = 0
for realdir in globbedpaths:
realization = ScratchRealization(
realdir,
realidxregexp=realidxregexp,
autodiscovery=autodiscovery,
batch=batch,
)
if use_concurrent():
with ProcessPoolExecutor() as executor:
loaded_reals = [
executor.submit(
ScratchRealization,
realdir,
realidxregexp=realidxregexp,
autodiscovery=autodiscovery,
batch=batch,
).result()
for realdir in globbedpaths
]
else:
loaded_reals = [
ScratchRealization(
realdir,
realidxregexp=realidxregexp,
autodiscovery=autodiscovery,
batch=batch,
)
for realdir in globbedpaths
]
for realdir, realization in zip(globbedpaths, loaded_reals):
if realization.index is None:
logger.critical(
"Could not determine realization index for path %s", realdir
Expand Down Expand Up @@ -520,18 +542,30 @@ def load_file(self, localpath, fformat, convert_numeric=False, force_reread=Fals
pd.Dataframe: with loaded data aggregated. Column 'REAL'
distuinguishes each realizations data.
"""
for index, realization in self._realizations.items():
try:
realization.load_file(localpath, fformat, convert_numeric, force_reread)
except ValueError:
# This would at least occur for unsupported fileformat,
# and that we should not skip.
logger.critical("load_file() failed in realization %d", index)
raise ValueError
except IOError:
# At ensemble level, we allow files to be missing in
# some realizations
logger.warning("Could not read %s for realization %d", localpath, index)
self.process_batch(
batch=[
{
"load_file": {
"localpath": localpath,
"fformat": fformat,
"convert_numeric": convert_numeric,
"force_reread": force_reread,
}
}
]
)
#for index, realization in self._realizations.items():
# try:
# realization.load_file(localpath, fformat, convert_numeric, force_reread)
# except ValueError:
# # This would at least occur for unsupported fileformat,
# # and that we should not skip.
# logger.critical("load_file() failed in realization %d", index)
# raise ValueError
# except IOError:
# # At ensemble level, we allow files to be missing in
# # some realizations
# logger.warning("Could not read %s for realization %d", localpath, index)
if self.get_df(localpath).empty:
raise ValueError("No ensemble data found for {}".format(localpath))
return self.get_df(localpath)
Expand Down Expand Up @@ -727,23 +761,25 @@ def load_smry(
"""
if not stacked:
raise NotImplementedError
# Future: Multithread this!
for realidx, realization in self._realizations.items():
# We do not store the returned DataFrames here,
# instead we look them up afterwards using get_df()
# Downside is that we have to compute the name of the
# cached object as it is not returned.
logger.info("Loading smry from realization %s", realidx)
realization.load_smry(
time_index=time_index,
column_keys=column_keys,
cache_eclsum=cache_eclsum,
start_date=start_date,
end_date=end_date,
include_restart=include_restart,
)
self.process_batch(
batch=[
{
"load_smry": {
"column_keys": column_keys,
"time_index": time_index,
"cache_eclsum": cache_eclsum,
"start_date": start_date,
"end_date": end_date,
"include_restart": include_restart,
}
}
]
)

if isinstance(time_index, list):
time_index = "custom"
# Note the dependency that the load_smry() function in
# ScratchRealization will store to this key-name:
return self.get_df("share/results/tables/unsmry--" + time_index + ".csv")

def get_volumetric_rates(self, column_keys=None, time_index=None):
Expand Down Expand Up @@ -884,8 +920,24 @@ def process_batch(self, batch=None):
ScratchEnsemble: This ensemble object (self), for it
to be picked up by ProcessPoolExecutor and pickling.
"""
for realization in self._realizations.values():
realization.process_batch(batch)
if use_concurrent():
with ProcessPoolExecutor() as executor:
real_indices = self._realizations.keys()
futures_reals = [
executor.submit(real.process_batch, batch)
for real in self._realizations.values()
]
# Reassemble the realization dictionary from
# the pickled results of the ProcessPool:
self._realizations = {
r_idx: real
for (r_idx, real) in zip(
real_indices, [x.result() for x in futures_reals]
)
}
else:
for realization in self._realizations.values():
realization.process_batch(batch)
return self

def apply(self, callback, **kwargs):
Expand Down
25 changes: 15 additions & 10 deletions src/fmu/ensemble/realization.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from .etc import Interaction
from .virtualrealization import VirtualRealization
from .realizationcombination import RealizationCombination
from .common import use_concurrent

HAVE_ECL2DF = False
try:
Expand Down Expand Up @@ -109,7 +110,7 @@ def __init__(
columns=["FULLPATH", "FILETYPE", "LOCALPATH", "BASENAME"]
)
self._eclsum = None # Placeholder for caching
self._eclsum_include_restart = None # Flag for cached object
self._eclsum_include_restart = None

# The datastore for internalized data. Dictionary
# indexed by filenames (local to the realization).
Expand Down Expand Up @@ -898,6 +899,10 @@ def get_eclsum(self, cache=True, include_restart=True):
EclSum: object representing the summary file. None if
nothing was found.
"""
if use_concurrent():
# In concurrent mode, caching is not used as
# we do not pickle the loaded EclSum objects
cache = False
if cache and self._eclsum: # Return cached object if available
if self._eclsum_include_restart == include_restart:
return self._eclsum
Expand Down Expand Up @@ -1113,7 +1118,7 @@ def _glob_smry_keys(self, column_keys):
keys = set()
for key in column_keys:
if isinstance(key, str):
keys = keys.union(set(self._eclsum.keys(key)))
keys = keys.union(set(self.get_eclsum().keys(key)))
return list(keys)

def get_volumetric_rates(self, column_keys=None, time_index=None, time_unit=None):
Expand Down Expand Up @@ -1272,31 +1277,31 @@ def get_smryvalues(self, props_wildcard=None):
"""
Fetch selected vectors from Eclipse Summary data.
NOTE: This function might face depreciation.
Args:
props_wildcard : string or list of strings with vector
wildcards
Returns:
a dataframe with values. Raw times from UNSMRY.
Empty dataframe if no summary file data available
"""
if not self._eclsum: # check if it is cached
self.get_eclsum()

if not self._eclsum:
if not self.get_eclsum():
# Return empty, but do not store the empty dataframe in self.data
return pd.DataFrame()

props = self._glob_smry_keys(props_wildcard)

if "numpy_vector" in dir(self._eclsum):
if "numpy_vector" in dir(self.get_eclsum()):
data = {
prop: self._eclsum.numpy_vector(prop, report_only=False)
prop: self.get_eclsum().numpy_vector(prop, report_only=False)
for prop in props
}
else: # get_values() is deprecated in newer libecl
data = {
prop: self._eclsum.get_values(prop, report_only=False) for prop in props
prop: self.get_eclsum().get_values(prop, report_only=False) for prop in props
}
dates = self._eclsum.get_dates(report_only=False)
dates = self.get_eclsum().get_dates(report_only=False)
return pd.DataFrame(data=data, index=dates)

def get_smry_dates(
Expand Down
8 changes: 6 additions & 2 deletions tests/test_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from fmu.ensemble import etc
from fmu.ensemble import ScratchEnsemble, ScratchRealization
from fmu.ensemble.common import use_concurrent

try:
SKIP_FMU_TOOLS = False
Expand All @@ -24,7 +25,7 @@
SKIP_FMU_TOOLS = True

fmux = etc.Interaction()
logger = fmux.basiclogger(__name__, level="WARNING")
logger = fmux.basiclogger(__name__, level="INFO")

if not fmux.testsetup():
raise SystemExit()
Expand Down Expand Up @@ -753,7 +754,10 @@ def test_nonexisting():


def test_eclsumcaching():
"""Test caching of eclsum"""
"""Test caching of eclsum, but only if we don't use concurrency"""

if use_concurrent():
pytest.skip("Not testing caching when we use concurrency")

if "__file__" in globals():
# Easen up copying test code into interactive sessions
Expand Down
2 changes: 1 addition & 1 deletion tests/test_ensembleset.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def rms_vol2df(kwargs):
{"load_scalar": {"localpath": "npv.txt"}},
{"load_smry": {"column_keys": "FOPT", "time_index": "yearly"}},
{"load_smry": {"column_keys": "*", "time_index": "daily"}},
]
],
)
assert len(ensset5.get_df("npv.txt")) == 10
assert len(ensset5.get_df("unsmry--yearly")) == 50
Expand Down

0 comments on commit daec7ec

Please sign in to comment.