diff --git a/src/fmu/ensemble/ensemble.py b/src/fmu/ensemble/ensemble.py index 39fd8910..480d57a8 100644 --- a/src/fmu/ensemble/ensemble.py +++ b/src/fmu/ensemble/ensemble.py @@ -72,7 +72,7 @@ class ScratchEnsemble(object): or relative path to a realization RUNPATH, third column is the basename of the Eclipse simulation, relative to RUNPATH. Fourth column is not used. - runpathfilter (str): If supplied, the only the runpaths in + runpathfilter (str): If supplied, only the runpaths in the runpathfile which contain this string will be included Use to select only a specific realization f.ex. autodiscovery (boolean): True by default, means that the class @@ -302,16 +302,13 @@ def add_from_runpathfile(self, runpath, runpathfilter=None, batch=None): loaded_reals = [ executor.submit( ScratchRealization, - row["runpath"], - index=int(row["index"]), + row.runpath, + index=int(row.index), autodiscovery=False, - find_files=[ - row["eclbase"] + ".DATA", - row["eclbase"] + ".UNSMRY", - ], + find_files=[row.eclbase + ".DATA", row.eclbase + ".UNSMRY",], batch=batch, ).result() - for _, row in runpath_df.iterrows() + for row in runpath_df.itertuples() ] else: logger.info( @@ -320,13 +317,13 @@ def add_from_runpathfile(self, runpath, runpathfilter=None, batch=None): ) loaded_reals = [ ScratchRealization( - row["runpath"], - index=int(row["index"]), + row.runpath, + index=int(row.index), autodiscovery=False, - find_files=[row["eclbase"] + ".DATA", row["eclbase"] + ".UNSMRY"], + find_files=[row.eclbase + ".DATA", row.eclbase + ".UNSMRY"], batch=batch, ) - for _, row in runpath_df.iterrows() + for row in runpath_df.itertuples() ] for real in loaded_reals: self.realizations[real.index] = real @@ -829,10 +826,16 @@ def load_smry( ] ) + # process_batch() will modify each Realization object + # and add the loaded smry data to the list of internalized + # data, under a well-defined name (/unsmry--.csv) + + # Since load_smry() also should return the aggregation + # of the loaded smry data, we need to pick up this + # data and aggregate it. + if isinstance(time_index, list): time_index = "custom" - # Note the dependency that the load_smry() function in - # ScratchRealization will store to this key-name: return self.get_df("share/results/tables/unsmry--" + time_index + ".csv") def get_volumetric_rates(self, column_keys=None, time_index=None): @@ -1022,7 +1025,7 @@ def apply(self, callback, **kwargs): # It is tempting to just call process_batch() here, but then we # don't know how to collect the results from this particular # apply() operation (if we enforced nonempty localpath, we could) - # > kwargs["calllback"] = callback + # > kwargs["callback"] = callback # > ens.process_batch(batch=[{"apply": **kwargs}]) # (untested) if use_concurrent(): with ProcessPoolExecutor() as executor: @@ -1035,8 +1038,8 @@ def apply(self, callback, **kwargs): # Reassemble a list of dataframes from the pickled results # of the ProcessPool: dframes_dict_from_apply = { - r_idx: dframe - for (r_idx, dframe) in zip( + realidx: dframe + for (realidx, dframe) in zip( real_indices, [x.result() for x in futures_reals] ) } @@ -1048,16 +1051,15 @@ def apply(self, callback, **kwargs): for realidx, dataframe in dframes_dict_from_apply.items(): self.realizations[realidx].data[kwargs["localpath"]] = dataframe dframes_from_apply = [ - dframe.assign(REAL=r_idx) - for (r_idx, dframe) in dframes_dict_from_apply.items() + dframe.assign(REAL=realidx) + for (realidx, dframe) in dframes_dict_from_apply.items() ] else: - dframes_from_apply = [] - for realidx, realization in self.realizations.items(): - dframes_from_apply.append( - realization.apply(callback, **kwargs).assign(REAL=realidx) - ) + dframes_from_apply = [ + realization.apply(callback, **kwargs).assign(REAL=realidx) + for (realidx, realization) in self.realizations.items() + ] return pd.concat(dframes_from_apply, sort=False, ignore_index=True) def get_smry_dates( @@ -1569,6 +1571,9 @@ def get_smry( if use_concurrent(): with ProcessPoolExecutor() as executor: real_indices = self.realizations.keys() + # Note that we cannot use process_batch() + # here as we need dataframes in return, not + # realizations. futures_dframes = [ executor.submit( realization.get_smry, diff --git a/src/fmu/ensemble/virtualensemble.py b/src/fmu/ensemble/virtualensemble.py index 9a3a33fa..cda5164b 100644 --- a/src/fmu/ensemble/virtualensemble.py +++ b/src/fmu/ensemble/virtualensemble.py @@ -9,7 +9,6 @@ import re import shutil import fnmatch -import datetime import six import yaml @@ -647,7 +646,7 @@ def from_disk(self, filesystempath, fmt="parquet", lazy_load=False): lazy_load (bool): If True, loading of dataframes from disk will be postponed until get_df() is actually called. """ - start_time = datetime.datetime.now() + start_time = time.time() if fmt not in ["csv", "parquet"]: raise ValueError("Unknown format for from_disk: %s" % fmt) @@ -723,7 +722,7 @@ def from_disk(self, filesystempath, fmt="parquet", lazy_load=False): # IT MIGHT BE INCORRECT IF LAZY_LOAD... self.update_realindices() - end_time = datetime.datetime.now() + end_time = time.time() if lazy_load: lazy_str = "(lazy) " else: @@ -731,7 +730,7 @@ def from_disk(self, filesystempath, fmt="parquet", lazy_load=False): logger.info( "Loading ensemble from disk %stook %g seconds", lazy_str, - (end_time - start_time).total_seconds(), + end_time - start_time, ) def _load_frame_fromdisk(self, key, filename): diff --git a/tests/test_batch.py b/tests/test_batch.py index c2cb5157..4cae5b9c 100644 --- a/tests/test_batch.py +++ b/tests/test_batch.py @@ -6,7 +6,6 @@ import os import time -import datetime import yaml import pandas as pd @@ -110,22 +109,17 @@ def test_speedup(): ) set_concurrent(True) - # really_concurrent = use_concurrent() - start_time = datetime.datetime.now() + start_time = time.time() ens.process_batch(batch=[{"apply": {"callback": sleeper}}]) - end_time = datetime.datetime.now() - conc_elapsed = (end_time - start_time).total_seconds() + end_time = time.time() + conc_elapsed = end_time - start_time print("FMU_CONCURRENCY: {}".format(use_concurrent())) print("Elapsed time for concurrent batch apply sleep: {}".format(conc_elapsed)) set_concurrent(False) - start_time = datetime.datetime.now() + start_time = time.time() ens.process_batch(batch=[{"apply": {"callback": sleeper}}]) - end_time = datetime.datetime.now() - seq_elapsed = (end_time - start_time).total_seconds() + end_time = time.time() + seq_elapsed = end_time - start_time print("FMU_CONCURRENCY: {}".format(use_concurrent())) print("Elapsed time for sequential batch apply sleep: {}".format(seq_elapsed)) - - # Can't enforce this, it depends on physical hardware availability. - # if really_concurrent: - # assert seq_elapsed > conc_elapsed * 4 diff --git a/tests/test_webviz_subsurface_testdata.py b/tests/test_webviz_subsurface_testdata.py index 7156d981..d64d6aca 100644 --- a/tests/test_webviz_subsurface_testdata.py +++ b/tests/test_webviz_subsurface_testdata.py @@ -5,7 +5,6 @@ from __future__ import print_function import os -import datetime import pytest @@ -94,10 +93,10 @@ def test_webviz_subsurface_testdata_batch(): """ check_testdata() - start_time = datetime.datetime.now() + start_time = time.time() _do_load_webviz_subsurface_testdata_batch() - end_time = datetime.datetime.now() - elapsed = (end_time - start_time).total_seconds() + end_time = time.time() + elapsed = end_time - start_time print("FMU_CONCURRENCY: {}".format(use_concurrent())) print("Elapsed time for batch ensemble initialization: {}".format(elapsed)) @@ -111,9 +110,9 @@ def test_webviz_subsurface_testdata_sequential_batch(): check_testdata() set_concurrent(False) - start_time = datetime.datetime.now() + start_time = time.time() _do_load_webviz_subsurface_testdata_batch() - end_time = datetime.datetime.now() - elapsed = (end_time - start_time).total_seconds() + end_time = time.time() + elapsed = end_time - start_time print("FMU_CONCURRENCY: {}".format(use_concurrent())) print("Elapsed time for batch ensemble initialization: {}".format(elapsed))