Skip to content

Commit

Permalink
Fix concurrency, tests pass
Browse files Browse the repository at this point in the history
Add optional test-case for webviz-subsurface-data
  • Loading branch information
berland committed Apr 14, 2020
1 parent f146192 commit 2221324
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 48 deletions.
10 changes: 10 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ python:
- "3.7"
- "3.8"

env:
- FMU_CONCURRENCY=True
- FMU_CONCURRENCY=False

jobs:
exclude:
- python: "2.7"
env: FMU_CONCURRENCY=True

addons:
apt:
packages:
Expand All @@ -23,6 +32,7 @@ install:
- ls
- pip install .
- pip install .[tests]
- pip install concurrent || echo # Supposed to fail on Py27

script:
- python -c "import fmu.ensemble"
Expand Down
27 changes: 17 additions & 10 deletions src/fmu/ensemble/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,33 @@
import os
import sys

import six


def use_concurrent():
"""Determine whether we should use concurrency or not
This is based on both an environment variable
and presence of concurrent.futures.
and presence of concurrent.futures, and on Python version
(Py2 deliberately not attempted to support)
Returns:
bool: True if concurrency mode should be used
"""
if six.PY2:
# Py2-support not attempted
return False
env_name = "FMU_CONCURRENCY"
if "concurrent.futures" in sys.modules:
if env_name not in os.environ:
return True
else:
env_var = os.environ[env_name]
if str(env_var) == "0" or str(env_var).lower() == "false":
return False
else:
return True
else:
# If concurrent.futures is not available, we end here.
return False
env_var = os.environ[env_name]
if (
str(env_var) == "0"
or str(env_var).lower() == "false"
or str(env_var).lower() == "no"
):
return False
return True
# If concurrent.futures is not available to import, we end here.
return False
50 changes: 14 additions & 36 deletions src/fmu/ensemble/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
from ecl.eclfile import EclKW

try:
# We always try to import concurrent.futures, but
# whether it is used depend on common.use_concurrent
from concurrent.futures import ProcessPoolExecutor
except (ImportError, ModuleNotFoundError):
except ImportError:
pass

from .etc import Interaction
Expand Down Expand Up @@ -333,8 +335,11 @@ def remove_realizations(self, realindices):
realindices = [realindices]
popped = 0
for index in realindices:
self.realizations.pop(index, None)
popped += 1
if index in self.realizations.keys():
self.realizations.pop(index, None)
popped += 1
else:
logger.warning("Can't remove realization %d, it is not there", index)
logger.info("removed %d realization(s)", popped)

def to_virtual(self, name=None):
Expand Down Expand Up @@ -530,18 +535,6 @@ def load_file(self, localpath, fformat, convert_numeric=False, force_reread=Fals
}
]
)
# for index, realization in self._realizations.items():
# try:
# realization.load_file(localpath, fformat, convert_numeric, force_reread)
# except ValueError:
# # This would at least occur for unsupported fileformat,
# # and that we should not skip.
# logger.critical("load_file() failed in realization %d", index)
# raise ValueError
# except IOError:
# # At ensemble level, we allow files to be missing in
# # some realizations
# logger.warning("Could not read %s for realization %d", localpath, index)
if self.get_df(localpath).empty:
raise ValueError("No ensemble data found for {}".format(localpath))
return self.get_df(localpath)
Expand Down Expand Up @@ -790,23 +783,6 @@ def load_smry(
"""
if not stacked:
raise NotImplementedError
#<<<<<<< HEAD
# # Future: Multithread this!
# for realidx, realization in self.realizations.items():
# # We do not store the returned DataFrames here,
# # instead we look them up afterwards using get_df()
# # Downside is that we have to compute the name of the
# # cached object as it is not returned.
# logger.info("Loading smry from realization %s", realidx)
# realization.load_smry(
# time_index=time_index,
# column_keys=column_keys,
# cache_eclsum=cache_eclsum,
# start_date=start_date,
# end_date=end_date,
# include_restart=include_restart,
# )
#=======
self.process_batch(
batch=[
{
Expand All @@ -822,7 +798,6 @@ def load_smry(
]
)

#>>>>>>> Forward batch command from ensembles and ensemblesets
if isinstance(time_index, list):
time_index = "custom"
# Note the dependency that the load_smry() function in
Expand Down Expand Up @@ -971,8 +946,10 @@ def process_batch(self, batch=None):
with ProcessPoolExecutor() as executor:
real_indices = self.realizations.keys()
futures_reals = [
executor.submit(real.process_batch, batch)
for real in self._realizations.values()
executor.submit(
real.process_batch, batch, excepts=(OSError, IOError)
)
for real in self.realizations.values()
]
# Reassemble the realization dictionary from
# the pickled results of the ProcessPool:
Expand All @@ -984,7 +961,8 @@ def process_batch(self, batch=None):
}
else:
for realization in self.realizations.values():
realization.process_batch(batch)
realization.process_batch(batch, excepts=(OSError, IOError))

return self

def apply(self, callback, **kwargs):
Expand Down
14 changes: 12 additions & 2 deletions src/fmu/ensemble/realization.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def __init__(

logger.info("Initialized %s", abspath)

def process_batch(self, batch):
def process_batch(self, batch, excepts=None):
"""Process a list of functions to run/apply
This is equivalent to calling each function individually
Expand All @@ -188,6 +188,8 @@ def process_batch(self, batch):
batch (list): Each list element is a dictionary with one key,
being a function names, value pr key is a dict with keyword
arguments to be supplied to each function.
excepts (tuple): Tuple of exceptions that are to be ignored in
each individual realization.
Returns:
ScratchRealization: This realization object (self), for it
to be picked up by ProcessPoolExecutor and pickling.
Expand Down Expand Up @@ -217,7 +219,15 @@ def process_batch(self, batch):
logger.warning("process_batch skips illegal function: %s", fn_name)
continue
assert isinstance(cmd[fn_name], dict)
getattr(self, fn_name)(**cmd[fn_name])
if excepts is None:
getattr(self, fn_name)(**cmd[fn_name])
else:
try:
getattr(self, fn_name)(**cmd[fn_name])
except excepts as exception:
logger.info(
"Ignoring exception in real %d: %s", self.index, str(exception)
)
return self

def runpath(self):
Expand Down
94 changes: 94 additions & 0 deletions tests/test_webviz_subsurface_testdata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""Testing loading the webviz subsurface testdata."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os
import datetime

import pytest

from fmu.ensemble import etc
from fmu.ensemble import ScratchEnsemble, ScratchRealization
from fmu.ensemble.common import use_concurrent


def check_testdata():
"""Check if we have webviz subsurface testdata, skip if not"""
testdir = os.path.dirname(os.path.abspath(__file__))
if not os.path.exists(os.path.join(testdir, "data/webviz-subsurface-testdata")):
print("Skipping loading webviz-subsurface-testdata")
print("Do")
print(" $ cd tests/data")
print(
" $ git clone git clone --depth 1 https://github.com/equinor/webviz-subsurface-testdata"
)
print("to download and use with pytest")
pytest.skip()


def test_webviz_subsurface_testdata():
"""Check that we can load the webviz subsurface testdata"""

check_testdata()

if "__file__" in globals():
# Easen up copying test code into interactive sessions
testdir = os.path.dirname(os.path.abspath(__file__))
else:
testdir = os.path.abspath(".")

ensdir = os.path.join(testdir, "data/webviz-subsurface-testdata/reek_fullmatrix/")
ens = ScratchEnsemble("reek_fullmatrix", ensdir + "realization-*/iter-0")

smry_monthly = ens.load_smry()
assert "REAL" in smry_monthly
assert len(smry_monthly["REAL"].unique()) == 40

ens.load_csv("share/results/tables/relperm.csv")
ens.load_csv("share/results/tables/equil.csv")
ens.load_csv("share/results/tables/rft.csv")
ens.load_csv("share/results/tables/pvt.csv")

ens.load_csv("share/results/volumes/simulator_volume_fipnum.csv")
ens.load_csv("share/results/volumes/geogrid--oil.csv")
ens.load_csv("share/results/volumes/simgrid--oil.csv")

assert len(ens.keys()) == 11


def test_webviz_subsurface_testdata_batch():
"""Check that we can load the webviz subsurface testdata in batch
Also display timings, this should reveal that concurrent operations
are actually faster.
"""
testdir = os.path.dirname(os.path.abspath(__file__))
ensdir = os.path.join(testdir, "data/webviz-subsurface-testdata/reek_fullmatrix/")
start_time = datetime.datetime.now()
batch_cmds = [
{"load_smry": {"column_keys": "*", "time_index": "yearly"}},
{"load_smry": {"column_keys": "*", "time_index": "monthly"}},
{"load_smry": {"column_keys": "*", "time_index": "last"}},
{"load_smry": {"column_keys": "*", "time_index": "daily"}},
{"load_csv": {"localpath": "share/results/tables/relperm.csv"}},
{"load_csv": {"localpath": "share/results/tables/equil.csv"}},
{"load_csv": {"localpath": "share/results/tables/rft.csv"}},
{"load_csv": {"localpath": "share/results/tables/pvt.csv"}},
{
"load_csv": {
"localpath": "share/results/volumes/simulator_volume_fipnum.csv"
}
},
{"load_csv": {"localpath": "share/results/volumes/geogrid--oil.csv"}},
{"load_csv": {"localpath": "share/results/volumes/simgrid--oil.csv"}},
]
ens = ScratchEnsemble(
"reek_fullmatrix", ensdir + "realization-*/iter-0", batch=batch_cmds
)
end_time = datetime.datetime.now()
elapsed = (end_time - start_time).total_seconds()
print("FMU_CONCURRENCY: {}".format(use_concurrent()))
print("Elapsed time for batch ensemble initialization: {}".format(elapsed))
assert len(ens.keys()) == 3 + len(batch_cmds) # 3 more than length of batch

0 comments on commit 2221324

Please sign in to comment.