diff --git a/src/ert/callbacks.py b/src/ert/callbacks.py index 6d1c297162a..d744bf4e7c7 100644 --- a/src/ert/callbacks.py +++ b/src/ert/callbacks.py @@ -6,7 +6,7 @@ from pathlib import Path from ert.config import InvalidResponseFile -from ert.storage import Ensemble +from ert.storage import Ensemble, ErtStorageException from ert.storage.realization_storage_state import RealizationStorageState from .load_status import LoadResult, LoadStatus @@ -96,36 +96,49 @@ async def forward_model_ok( realization: int, iter: int, ensemble: Ensemble, + forward_model_ok_permanent_error_future: asyncio.Future[str] | None = None, ) -> LoadResult: parameters_result = LoadResult(LoadStatus.LOAD_SUCCESSFUL, "") response_result = LoadResult(LoadStatus.LOAD_SUCCESSFUL, "") - try: - # We only read parameters after the prior, after that, ERT - # handles parameters - if iter == 0: - parameters_result = await _read_parameters( - run_path, - realization, - iter, - ensemble, - ) - - if parameters_result.status == LoadStatus.LOAD_SUCCESSFUL: - response_result = await _write_responses_to_storage( - run_path, - realization, - ensemble, - ) - - except Exception as err: - logger.exception( - f"Failed to load results for realization {realization}", - exc_info=err, - ) + if forward_model_ok_permanent_error_future is None: + forward_model_ok_permanent_error_future = asyncio.Future() + if ( + forward_model_ok_permanent_error_future.done() + and forward_model_ok_permanent_error_future.exception() + ): parameters_result = LoadResult( LoadStatus.LOAD_FAILURE, - f"Failed to load results for realization {realization}, failed with: {err}", + f"Failed to load results for realization {realization}, failed with: {forward_model_ok_permanent_error_future.exception()}", ) + else: + try: + # We only read parameters after the prior, after that, ERT + # handles parameters + if iter == 0: + parameters_result = await _read_parameters( + run_path, + realization, + iter, + ensemble, + ) + + if parameters_result.status == LoadStatus.LOAD_SUCCESSFUL: + response_result = await _write_responses_to_storage( + run_path, + realization, + ensemble, + ) + except Exception as err: + if isinstance(err, ErtStorageException): + forward_model_ok_permanent_error_future.set_exception(err) + logger.exception( + f"Failed to load results for realization {realization}", + exc_info=err, + ) + parameters_result = LoadResult( + LoadStatus.LOAD_FAILURE, + f"Failed to load results for realization {realization}, failed with: {err}", + ) final_result = parameters_result if response_result.status != LoadStatus.LOAD_SUCCESSFUL: diff --git a/src/ert/scheduler/job.py b/src/ert/scheduler/job.py index c1534d775ea..d5adf6df4a2 100644 --- a/src/ert/scheduler/job.py +++ b/src/ert/scheduler/job.py @@ -149,6 +149,7 @@ async def run( self, sem: asyncio.BoundedSemaphore, forward_model_ok_lock: asyncio.Lock, + forward_model_ok_permanent_error_future: asyncio.Future[str], checksum_lock: asyncio.Lock, max_submit: int = 1, ) -> None: @@ -165,7 +166,9 @@ async def run( if self._scheduler._manifest_queue is not None: await self._verify_checksum(checksum_lock) async with forward_model_ok_lock: - await self._handle_finished_forward_model() + await self._handle_finished_forward_model( + forward_model_ok_permanent_error_future + ) break if attempt < max_submit - 1: @@ -245,12 +248,15 @@ async def _verify_checksum( else: logger.error(f"Disk synchronization failed for {file_path}") - async def _handle_finished_forward_model(self) -> None: + async def _handle_finished_forward_model( + self, forward_model_ok_permanent_error_future: asyncio.Future[str] + ) -> None: callback_status, status_msg = await forward_model_ok( run_path=self.real.run_arg.runpath, realization=self.real.run_arg.iens, iter=self.real.run_arg.itr, ensemble=self.real.run_arg.ensemble_storage, + forward_model_ok_permanent_error_future=forward_model_ok_permanent_error_future, ) if self._message: self._message = status_msg diff --git a/src/ert/scheduler/scheduler.py b/src/ert/scheduler/scheduler.py index 8ca13446fb0..39f8839d9c8 100644 --- a/src/ert/scheduler/scheduler.py +++ b/src/ert/scheduler/scheduler.py @@ -254,6 +254,7 @@ async def execute( # this lock is to assure that no more than 1 task # does internalization at a time forward_model_ok_lock = asyncio.Lock() + forward_model_ok_permanent_error_future: asyncio.Future[str] = asyncio.Future() verify_checksum_lock = asyncio.Lock() for iens, job in self._jobs.items(): await asyncio.sleep(0) @@ -262,6 +263,7 @@ async def execute( job.run( sem, forward_model_ok_lock, + forward_model_ok_permanent_error_future, verify_checksum_lock, self._max_submit, ), diff --git a/src/ert/storage/__init__.py b/src/ert/storage/__init__.py index ccc8bc0bba0..dd380c0f12d 100644 --- a/src/ert/storage/__init__.py +++ b/src/ert/storage/__init__.py @@ -27,8 +27,7 @@ EnsembleAccessor = LocalEnsemble -class ErtStorageException(Exception): - pass +from ert.storage.local_experiment import ErtStorageException def open_storage( diff --git a/src/ert/storage/local_ensemble.py b/src/ert/storage/local_ensemble.py index b0f9d71671d..e1f33f81ac5 100644 --- a/src/ert/storage/local_ensemble.py +++ b/src/ert/storage/local_ensemble.py @@ -17,6 +17,7 @@ from typing_extensions import deprecated from ert.config.gen_kw_config import GenKwConfig +from ert.storage.local_experiment import ErtStorageException from ert.storage.mode import BaseMode, Mode, require_write from .realization_storage_state import RealizationStorageState @@ -406,8 +407,10 @@ def get_ensemble_state(self) -> list[set[RealizationStorageState]]: states : list of RealizationStorageState list of realization states. """ - - response_configs = self.experiment.response_configuration + try: + response_configs = self.experiment.response_configuration + except ErtStorageException: + response_configs = {} def _parameters_exist_for_realization(realization: int) -> bool: """ diff --git a/src/ert/storage/local_experiment.py b/src/ert/storage/local_experiment.py index ef4d88b78ed..f1f7ed13379 100644 --- a/src/ert/storage/local_experiment.py +++ b/src/ert/storage/local_experiment.py @@ -38,6 +38,10 @@ class _Index(BaseModel): name: str +class ErtStorageException(Exception): + pass + + class LocalExperiment(BaseMode): """ Represents an experiment within the local storage system of ERT. @@ -253,6 +257,10 @@ def parameter_info(self) -> dict[str, Any]: @property def response_info(self) -> dict[str, Any]: info: dict[str, Any] + if not (self.mount_point / self._responses_file).exists(): + raise ErtStorageException( + "responses.json does not exist. Please make sure storage is still valid." + ) with open(self.mount_point / self._responses_file, encoding="utf-8") as f: info = json.load(f) return info diff --git a/tests/ert/unit_tests/scheduler/test_job.py b/tests/ert/unit_tests/scheduler/test_job.py index 66cbd14b2d9..d2c1ae02925 100644 --- a/tests/ert/unit_tests/scheduler/test_job.py +++ b/tests/ert/unit_tests/scheduler/test_job.py @@ -135,7 +135,11 @@ async def load_result(**_): job_run_task = asyncio.create_task( job.run( - asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=max_submit + asyncio.Semaphore(), + asyncio.Lock(), + asyncio.Future(), + asyncio.Lock(), + max_submit=max_submit, ) ) @@ -178,7 +182,13 @@ async def test_num_cpu_is_propagated_to_driver(realization: Realization): scheduler = create_scheduler() job = Job(scheduler, realization) job_run_task = asyncio.create_task( - job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1) + job.run( + asyncio.Semaphore(), + asyncio.Lock(), + asyncio.Future(), + asyncio.Lock(), + max_submit=1, + ) ) job.started.set() job.returncode.set_result(0) @@ -201,7 +211,13 @@ async def test_realization_memory_is_propagated_to_driver(realization: Realizati scheduler = create_scheduler() job = Job(scheduler, realization) job_run_task = asyncio.create_task( - job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1) + job.run( + asyncio.Semaphore(), + asyncio.Lock(), + asyncio.Future(), + asyncio.Lock(), + max_submit=1, + ) ) job.started.set() job.returncode.set_result(0) @@ -239,7 +255,13 @@ async def test_when_waiting_for_disk_sync_times_out_an_error_is_logged( with captured_logs(log_msgs, logging.ERROR): job_run_task = asyncio.create_task( - job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1) + job.run( + asyncio.Semaphore(), + asyncio.Lock(), + asyncio.Future(), + asyncio.Lock(), + max_submit=1, + ) ) job.started.set() job.returncode.set_result(0) @@ -270,7 +292,13 @@ async def test_when_files_in_manifest_are_not_created_an_error_is_logged( with captured_logs(log_msgs, logging.ERROR): job_run_task = asyncio.create_task( - job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1) + job.run( + asyncio.Semaphore(), + asyncio.Lock(), + asyncio.Future(), + asyncio.Lock(), + max_submit=1, + ) ) job.started.set() job.returncode.set_result(0) @@ -304,7 +332,13 @@ async def test_when_checksums_do_not_match_a_warning_is_logged( with captured_logs(log_msgs, logging.WARNING): job_run_task = asyncio.create_task( - job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1) + job.run( + asyncio.Semaphore(), + asyncio.Lock(), + asyncio.Future(), + asyncio.Lock(), + max_submit=1, + ) ) job.started.set() job.returncode.set_result(0) @@ -332,7 +366,13 @@ async def test_when_no_checksum_info_is_received_a_warning_is_logged( with captured_logs(log_msgs, logging.WARNING): job_run_task = asyncio.create_task( - job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1) + job.run( + asyncio.Semaphore(), + asyncio.Lock(), + asyncio.Future(), + asyncio.Lock(), + max_submit=1, + ) ) job.started.set() job.returncode.set_result(0)