Skip to content

Commit

Permalink
Minor changes after review
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Apr 17, 2020
1 parent 24ded58 commit 6db6234
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 47 deletions.
53 changes: 29 additions & 24 deletions src/fmu/ensemble/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class ScratchEnsemble(object):
or relative path to a realization RUNPATH, third column is
the basename of the Eclipse simulation, relative to RUNPATH.
Fourth column is not used.
runpathfilter (str): If supplied, the only the runpaths in
runpathfilter (str): If supplied, only the runpaths in
the runpathfile which contain this string will be included
Use to select only a specific realization f.ex.
autodiscovery (boolean): True by default, means that the class
Expand Down Expand Up @@ -302,16 +302,13 @@ def add_from_runpathfile(self, runpath, runpathfilter=None, batch=None):
loaded_reals = [
executor.submit(
ScratchRealization,
row["runpath"],
index=int(row["index"]),
row.runpath,
index=int(row.index),
autodiscovery=False,
find_files=[
row["eclbase"] + ".DATA",
row["eclbase"] + ".UNSMRY",
],
find_files=[row.eclbase + ".DATA", row.eclbase + ".UNSMRY",],
batch=batch,
).result()
for _, row in runpath_df.iterrows()
for row in runpath_df.itertuples()
]
else:
logger.info(
Expand All @@ -320,13 +317,13 @@ def add_from_runpathfile(self, runpath, runpathfilter=None, batch=None):
)
loaded_reals = [
ScratchRealization(
row["runpath"],
index=int(row["index"]),
row.runpath,
index=int(row.index),
autodiscovery=False,
find_files=[row["eclbase"] + ".DATA", row["eclbase"] + ".UNSMRY"],
find_files=[row.eclbase + ".DATA", row.eclbase + ".UNSMRY"],
batch=batch,
)
for _, row in runpath_df.iterrows()
for row in runpath_df.itertuples()
]
for real in loaded_reals:
self.realizations[real.index] = real
Expand Down Expand Up @@ -829,10 +826,16 @@ def load_smry(
]
)

# process_batch() will modify each Realization object
# and add the loaded smry data to the list of internalized
# data, under a well-defined name (<dir>/unsmry--<timeindexstr>.csv)

# Since load_smry() also should return the aggregation
# of the loaded smry data, we need to pick up this
# data and aggregate it.

if isinstance(time_index, list):
time_index = "custom"
# Note the dependency that the load_smry() function in
# ScratchRealization will store to this key-name:
return self.get_df("share/results/tables/unsmry--" + time_index + ".csv")

def get_volumetric_rates(self, column_keys=None, time_index=None):
Expand Down Expand Up @@ -1022,7 +1025,7 @@ def apply(self, callback, **kwargs):
# It is tempting to just call process_batch() here, but then we
# don't know how to collect the results from this particular
# apply() operation (if we enforced nonempty localpath, we could)
# > kwargs["calllback"] = callback
# > kwargs["callback"] = callback
# > ens.process_batch(batch=[{"apply": **kwargs}]) # (untested)
if use_concurrent():
with ProcessPoolExecutor() as executor:
Expand All @@ -1035,8 +1038,8 @@ def apply(self, callback, **kwargs):
# Reassemble a list of dataframes from the pickled results
# of the ProcessPool:
dframes_dict_from_apply = {
r_idx: dframe
for (r_idx, dframe) in zip(
realidx: dframe
for (realidx, dframe) in zip(
real_indices, [x.result() for x in futures_reals]
)
}
Expand All @@ -1048,16 +1051,15 @@ def apply(self, callback, **kwargs):
for realidx, dataframe in dframes_dict_from_apply.items():
self.realizations[realidx].data[kwargs["localpath"]] = dataframe
dframes_from_apply = [
dframe.assign(REAL=r_idx)
for (r_idx, dframe) in dframes_dict_from_apply.items()
dframe.assign(REAL=realidx)
for (realidx, dframe) in dframes_dict_from_apply.items()
]

else:
dframes_from_apply = []
for realidx, realization in self.realizations.items():
dframes_from_apply.append(
realization.apply(callback, **kwargs).assign(REAL=realidx)
)
dframes_from_apply = [
realization.apply(callback, **kwargs).assign(REAL=realidx)
for (realidx, realization) in self.realizations.items()
]
return pd.concat(dframes_from_apply, sort=False, ignore_index=True)

def get_smry_dates(
Expand Down Expand Up @@ -1569,6 +1571,9 @@ def get_smry(
if use_concurrent():
with ProcessPoolExecutor() as executor:
real_indices = self.realizations.keys()
# Note that we cannot use process_batch()
# here as we need dataframes in return, not
# realizations.
futures_dframes = [
executor.submit(
realization.get_smry,
Expand Down
7 changes: 3 additions & 4 deletions src/fmu/ensemble/virtualensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import re
import shutil
import fnmatch
import datetime

import six
import yaml
Expand Down Expand Up @@ -647,7 +646,7 @@ def from_disk(self, filesystempath, fmt="parquet", lazy_load=False):
lazy_load (bool): If True, loading of dataframes from disk
will be postponed until get_df() is actually called.
"""
start_time = datetime.datetime.now()
start_time = time.time()
if fmt not in ["csv", "parquet"]:
raise ValueError("Unknown format for from_disk: %s" % fmt)

Expand Down Expand Up @@ -723,15 +722,15 @@ def from_disk(self, filesystempath, fmt="parquet", lazy_load=False):
# IT MIGHT BE INCORRECT IF LAZY_LOAD...
self.update_realindices()

end_time = datetime.datetime.now()
end_time = time.time()
if lazy_load:
lazy_str = "(lazy) "
else:
lazy_str = ""
logger.info(
"Loading ensemble from disk %stook %g seconds",
lazy_str,
(end_time - start_time).total_seconds(),
end_time - start_time,
)

def _load_frame_fromdisk(self, key, filename):
Expand Down
18 changes: 6 additions & 12 deletions tests/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import os
import time
import datetime

import yaml
import pandas as pd
Expand Down Expand Up @@ -110,22 +109,17 @@ def test_speedup():
)

set_concurrent(True)
# really_concurrent = use_concurrent()
start_time = datetime.datetime.now()
start_time = time.time()
ens.process_batch(batch=[{"apply": {"callback": sleeper}}])
end_time = datetime.datetime.now()
conc_elapsed = (end_time - start_time).total_seconds()
end_time = time.time()
conc_elapsed = end_time - start_time
print("FMU_CONCURRENCY: {}".format(use_concurrent()))
print("Elapsed time for concurrent batch apply sleep: {}".format(conc_elapsed))

set_concurrent(False)
start_time = datetime.datetime.now()
start_time = time.time()
ens.process_batch(batch=[{"apply": {"callback": sleeper}}])
end_time = datetime.datetime.now()
seq_elapsed = (end_time - start_time).total_seconds()
end_time = time.time()
seq_elapsed = end_time - start_time
print("FMU_CONCURRENCY: {}".format(use_concurrent()))
print("Elapsed time for sequential batch apply sleep: {}".format(seq_elapsed))

# Can't enforce this, it depends on physical hardware availability.
# if really_concurrent:
# assert seq_elapsed > conc_elapsed * 4
13 changes: 6 additions & 7 deletions tests/test_webviz_subsurface_testdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from __future__ import print_function

import os
import datetime

import pytest

Expand Down Expand Up @@ -94,10 +93,10 @@ def test_webviz_subsurface_testdata_batch():
"""

check_testdata()
start_time = datetime.datetime.now()
start_time = time.time()
_do_load_webviz_subsurface_testdata_batch()
end_time = datetime.datetime.now()
elapsed = (end_time - start_time).total_seconds()
end_time = time.time()
elapsed = end_time - start_time
print("FMU_CONCURRENCY: {}".format(use_concurrent()))
print("Elapsed time for batch ensemble initialization: {}".format(elapsed))

Expand All @@ -111,9 +110,9 @@ def test_webviz_subsurface_testdata_sequential_batch():

check_testdata()
set_concurrent(False)
start_time = datetime.datetime.now()
start_time = time.time()
_do_load_webviz_subsurface_testdata_batch()
end_time = datetime.datetime.now()
elapsed = (end_time - start_time).total_seconds()
end_time = time.time()
elapsed = end_time - start_time
print("FMU_CONCURRENCY: {}".format(use_concurrent()))
print("Elapsed time for batch ensemble initialization: {}".format(elapsed))

0 comments on commit 6db6234

Please sign in to comment.