Skip to content

Commit

Permalink
Have forward_model_ok not run if storage in invalid
Browse files Browse the repository at this point in the history
This commit makes it so that if a realization fails due to `missing responses.json` (can happen if storage is deleted), we will not run `forward_model_ok` for the next realizations, as it is not something we can recover from.
We have this logic to avoid spamming `logger.exception(...)` for something we cannot really handle.
  • Loading branch information
jonathan-eq committed Jan 24, 2025
1 parent b7aa307 commit 8479240
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 38 deletions.
63 changes: 38 additions & 25 deletions src/ert/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pathlib import Path

from ert.config import InvalidResponseFile
from ert.storage import Ensemble
from ert.storage import Ensemble, ErtStorageException
from ert.storage.realization_storage_state import RealizationStorageState

from .load_status import LoadResult, LoadStatus
Expand Down Expand Up @@ -96,36 +96,49 @@ async def forward_model_ok(
realization: int,
iter: int,
ensemble: Ensemble,
forward_model_ok_permanent_error_future: asyncio.Future[str] | None = None,
) -> LoadResult:
parameters_result = LoadResult(LoadStatus.LOAD_SUCCESSFUL, "")
response_result = LoadResult(LoadStatus.LOAD_SUCCESSFUL, "")
try:
# We only read parameters after the prior, after that, ERT
# handles parameters
if iter == 0:
parameters_result = await _read_parameters(
run_path,
realization,
iter,
ensemble,
)

if parameters_result.status == LoadStatus.LOAD_SUCCESSFUL:
response_result = await _write_responses_to_storage(
run_path,
realization,
ensemble,
)

except Exception as err:
logger.exception(
f"Failed to load results for realization {realization}",
exc_info=err,
)
if forward_model_ok_permanent_error_future is None:
forward_model_ok_permanent_error_future = asyncio.Future()
if (
forward_model_ok_permanent_error_future.done()
and forward_model_ok_permanent_error_future.exception()
):
parameters_result = LoadResult(
LoadStatus.LOAD_FAILURE,
f"Failed to load results for realization {realization}, failed with: {err}",
f"Failed to load results for realization {realization}, failed with: {forward_model_ok_permanent_error_future.exception()}",
)
else:
try:
# We only read parameters after the prior, after that, ERT
# handles parameters
if iter == 0:
parameters_result = await _read_parameters(
run_path,
realization,
iter,
ensemble,
)

if parameters_result.status == LoadStatus.LOAD_SUCCESSFUL:
response_result = await _write_responses_to_storage(
run_path,
realization,
ensemble,
)
except Exception as err:
if isinstance(err, ErtStorageException):
forward_model_ok_permanent_error_future.set_exception(err)
logger.exception(
f"Failed to load results for realization {realization}",
exc_info=err,
)
parameters_result = LoadResult(
LoadStatus.LOAD_FAILURE,
f"Failed to load results for realization {realization}, failed with: {err}",
)

final_result = parameters_result
if response_result.status != LoadStatus.LOAD_SUCCESSFUL:
Expand Down
10 changes: 8 additions & 2 deletions src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ async def run(
self,
sem: asyncio.BoundedSemaphore,
forward_model_ok_lock: asyncio.Lock,
forward_model_ok_permanent_error_future: asyncio.Future[str],
checksum_lock: asyncio.Lock,
max_submit: int = 1,
) -> None:
Expand All @@ -165,7 +166,9 @@ async def run(
if self._scheduler._manifest_queue is not None:
await self._verify_checksum(checksum_lock)
async with forward_model_ok_lock:
await self._handle_finished_forward_model()
await self._handle_finished_forward_model(
forward_model_ok_permanent_error_future
)
break

if attempt < max_submit - 1:
Expand Down Expand Up @@ -245,12 +248,15 @@ async def _verify_checksum(
else:
logger.error(f"Disk synchronization failed for {file_path}")

async def _handle_finished_forward_model(self) -> None:
async def _handle_finished_forward_model(
self, forward_model_ok_permanent_error_future: asyncio.Future[str]
) -> None:
callback_status, status_msg = await forward_model_ok(
run_path=self.real.run_arg.runpath,
realization=self.real.run_arg.iens,
iter=self.real.run_arg.itr,
ensemble=self.real.run_arg.ensemble_storage,
forward_model_ok_permanent_error_future=forward_model_ok_permanent_error_future,
)
if self._message:
self._message = status_msg
Expand Down
2 changes: 2 additions & 0 deletions src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ async def execute(
# this lock is to assure that no more than 1 task
# does internalization at a time
forward_model_ok_lock = asyncio.Lock()
forward_model_ok_permanent_error_future: asyncio.Future[str] = asyncio.Future()
verify_checksum_lock = asyncio.Lock()
for iens, job in self._jobs.items():
await asyncio.sleep(0)
Expand All @@ -262,6 +263,7 @@ async def execute(
job.run(
sem,
forward_model_ok_lock,
forward_model_ok_permanent_error_future,
verify_checksum_lock,
self._max_submit,
),
Expand Down
3 changes: 1 addition & 2 deletions src/ert/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
EnsembleAccessor = LocalEnsemble


class ErtStorageException(Exception):
pass
from ert.storage.local_experiment import ErtStorageException


def open_storage(
Expand Down
7 changes: 5 additions & 2 deletions src/ert/storage/local_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing_extensions import deprecated

from ert.config.gen_kw_config import GenKwConfig
from ert.storage.local_experiment import ErtStorageException
from ert.storage.mode import BaseMode, Mode, require_write

from .realization_storage_state import RealizationStorageState
Expand Down Expand Up @@ -406,8 +407,10 @@ def get_ensemble_state(self) -> list[set[RealizationStorageState]]:
states : list of RealizationStorageState
list of realization states.
"""

response_configs = self.experiment.response_configuration
try:
response_configs = self.experiment.response_configuration
except ErtStorageException:
response_configs = {}

def _parameters_exist_for_realization(realization: int) -> bool:
"""
Expand Down
8 changes: 8 additions & 0 deletions src/ert/storage/local_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class _Index(BaseModel):
name: str


class ErtStorageException(Exception):
pass


class LocalExperiment(BaseMode):
"""
Represents an experiment within the local storage system of ERT.
Expand Down Expand Up @@ -253,6 +257,10 @@ def parameter_info(self) -> dict[str, Any]:
@property
def response_info(self) -> dict[str, Any]:
info: dict[str, Any]
if not (self.mount_point / self._responses_file).exists():
raise ErtStorageException(
"responses.json does not exist. Please make sure storage is still valid."
)
with open(self.mount_point / self._responses_file, encoding="utf-8") as f:
info = json.load(f)
return info
Expand Down
54 changes: 47 additions & 7 deletions tests/ert/unit_tests/scheduler/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,11 @@ async def load_result(**_):

job_run_task = asyncio.create_task(
job.run(
asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=max_submit
asyncio.Semaphore(),
asyncio.Lock(),
asyncio.Future(),
asyncio.Lock(),
max_submit=max_submit,
)
)

Expand Down Expand Up @@ -178,7 +182,13 @@ async def test_num_cpu_is_propagated_to_driver(realization: Realization):
scheduler = create_scheduler()
job = Job(scheduler, realization)
job_run_task = asyncio.create_task(
job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1)
job.run(
asyncio.Semaphore(),
asyncio.Lock(),
asyncio.Future(),
asyncio.Lock(),
max_submit=1,
)
)
job.started.set()
job.returncode.set_result(0)
Expand All @@ -201,7 +211,13 @@ async def test_realization_memory_is_propagated_to_driver(realization: Realizati
scheduler = create_scheduler()
job = Job(scheduler, realization)
job_run_task = asyncio.create_task(
job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1)
job.run(
asyncio.Semaphore(),
asyncio.Lock(),
asyncio.Future(),
asyncio.Lock(),
max_submit=1,
)
)
job.started.set()
job.returncode.set_result(0)
Expand Down Expand Up @@ -239,7 +255,13 @@ async def test_when_waiting_for_disk_sync_times_out_an_error_is_logged(

with captured_logs(log_msgs, logging.ERROR):
job_run_task = asyncio.create_task(
job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1)
job.run(
asyncio.Semaphore(),
asyncio.Lock(),
asyncio.Future(),
asyncio.Lock(),
max_submit=1,
)
)
job.started.set()
job.returncode.set_result(0)
Expand Down Expand Up @@ -270,7 +292,13 @@ async def test_when_files_in_manifest_are_not_created_an_error_is_logged(

with captured_logs(log_msgs, logging.ERROR):
job_run_task = asyncio.create_task(
job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1)
job.run(
asyncio.Semaphore(),
asyncio.Lock(),
asyncio.Future(),
asyncio.Lock(),
max_submit=1,
)
)
job.started.set()
job.returncode.set_result(0)
Expand Down Expand Up @@ -304,7 +332,13 @@ async def test_when_checksums_do_not_match_a_warning_is_logged(

with captured_logs(log_msgs, logging.WARNING):
job_run_task = asyncio.create_task(
job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1)
job.run(
asyncio.Semaphore(),
asyncio.Lock(),
asyncio.Future(),
asyncio.Lock(),
max_submit=1,
)
)
job.started.set()
job.returncode.set_result(0)
Expand Down Expand Up @@ -332,7 +366,13 @@ async def test_when_no_checksum_info_is_received_a_warning_is_logged(

with captured_logs(log_msgs, logging.WARNING):
job_run_task = asyncio.create_task(
job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1)
job.run(
asyncio.Semaphore(),
asyncio.Lock(),
asyncio.Future(),
asyncio.Lock(),
max_submit=1,
)
)
job.started.set()
job.returncode.set_result(0)
Expand Down

0 comments on commit 8479240

Please sign in to comment.