Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Have forward_model_ok not run if storage in invalid (missing responses.json) #9857

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 38 additions & 25 deletions src/ert/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pathlib import Path

from ert.config import InvalidResponseFile
from ert.storage import Ensemble
from ert.storage import Ensemble, ErtStorageException
from ert.storage.realization_storage_state import RealizationStorageState

from .load_status import LoadResult, LoadStatus
Expand Down Expand Up @@ -96,36 +96,49 @@ async def forward_model_ok(
realization: int,
iter: int,
ensemble: Ensemble,
forward_model_ok_permanent_error_future: asyncio.Future[str] | None = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how the Future is usually used is that when you do somewhere:

await forward_model_ok_permanent_error_future

and somewhere else you get exception Ex you can propagate it to the future by

forward_model_ok_permanent_error_future.set_exception(Ex)

which then gets triggered by the line above (with await).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not want to await it, as it might never be set. I only use it so that multiple tasks can set the value, and the others can check if that value has already been set, meaning they should halt. It worked the same if I used myAsyncioFuture.set_result(reason_why_it_failed) or myAsyncioFuture.set_exception(exception_why_it_failed).
I will change it to use the latter one, as I already have the raw exception available.

) -> LoadResult:
parameters_result = LoadResult(LoadStatus.LOAD_SUCCESSFUL, "")
response_result = LoadResult(LoadStatus.LOAD_SUCCESSFUL, "")
try:
# We only read parameters after the prior, after that, ERT
# handles parameters
if iter == 0:
parameters_result = await _read_parameters(
run_path,
realization,
iter,
ensemble,
)

if parameters_result.status == LoadStatus.LOAD_SUCCESSFUL:
response_result = await _write_responses_to_storage(
run_path,
realization,
ensemble,
)

except Exception as err:
logger.exception(
f"Failed to load results for realization {realization}",
exc_info=err,
)
if forward_model_ok_permanent_error_future is None:
forward_model_ok_permanent_error_future = asyncio.Future()
if (
forward_model_ok_permanent_error_future.done()
and forward_model_ok_permanent_error_future.exception()
):
parameters_result = LoadResult(
LoadStatus.LOAD_FAILURE,
f"Failed to load results for realization {realization}, failed with: {err}",
f"Failed to load results for realization {realization}, failed with: {forward_model_ok_permanent_error_future.exception()}",
)
else:
try:
# We only read parameters after the prior, after that, ERT
# handles parameters
if iter == 0:
parameters_result = await _read_parameters(
run_path,
realization,
iter,
ensemble,
)

if parameters_result.status == LoadStatus.LOAD_SUCCESSFUL:
response_result = await _write_responses_to_storage(
run_path,
realization,
ensemble,
)
except Exception as err:
if isinstance(err, ErtStorageException):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ErtStorageException does not just signify that the storage mount point does not exist, but many other errors for which forward_model_ok might succeed.

forward_model_ok_permanent_error_future.set_exception(err)
logger.exception(
f"Failed to load results for realization {realization}",
exc_info=err,
)
parameters_result = LoadResult(
LoadStatus.LOAD_FAILURE,
f"Failed to load results for realization {realization}, failed with: {err}",
)

final_result = parameters_result
if response_result.status != LoadStatus.LOAD_SUCCESSFUL:
Expand Down
10 changes: 8 additions & 2 deletions src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ async def run(
self,
sem: asyncio.BoundedSemaphore,
forward_model_ok_lock: asyncio.Lock,
forward_model_ok_permanent_error_future: asyncio.Future[str],
checksum_lock: asyncio.Lock,
max_submit: int = 1,
) -> None:
Expand All @@ -165,7 +166,9 @@ async def run(
if self._scheduler._manifest_queue is not None:
await self._verify_checksum(checksum_lock)
async with forward_model_ok_lock:
await self._handle_finished_forward_model()
await self._handle_finished_forward_model(
forward_model_ok_permanent_error_future
)
break

if attempt < max_submit - 1:
Expand Down Expand Up @@ -245,12 +248,15 @@ async def _verify_checksum(
else:
logger.error(f"Disk synchronization failed for {file_path}")

async def _handle_finished_forward_model(self) -> None:
async def _handle_finished_forward_model(
self, forward_model_ok_permanent_error_future: asyncio.Future[str]
) -> None:
callback_status, status_msg = await forward_model_ok(
run_path=self.real.run_arg.runpath,
realization=self.real.run_arg.iens,
iter=self.real.run_arg.itr,
ensemble=self.real.run_arg.ensemble_storage,
forward_model_ok_permanent_error_future=forward_model_ok_permanent_error_future,
)
if self._message:
self._message = status_msg
Expand Down
2 changes: 2 additions & 0 deletions src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ async def execute(
# this lock is to assure that no more than 1 task
# does internalization at a time
forward_model_ok_lock = asyncio.Lock()
forward_model_ok_permanent_error_future: asyncio.Future[str] = asyncio.Future()
verify_checksum_lock = asyncio.Lock()
for iens, job in self._jobs.items():
await asyncio.sleep(0)
Expand All @@ -262,6 +263,7 @@ async def execute(
job.run(
sem,
forward_model_ok_lock,
forward_model_ok_permanent_error_future,
verify_checksum_lock,
self._max_submit,
),
Expand Down
3 changes: 1 addition & 2 deletions src/ert/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
EnsembleAccessor = LocalEnsemble


class ErtStorageException(Exception):
pass
from ert.storage.local_experiment import ErtStorageException


def open_storage(
Expand Down
7 changes: 5 additions & 2 deletions src/ert/storage/local_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing_extensions import deprecated

from ert.config.gen_kw_config import GenKwConfig
from ert.storage.local_experiment import ErtStorageException
from ert.storage.mode import BaseMode, Mode, require_write

from .realization_storage_state import RealizationStorageState
Expand Down Expand Up @@ -406,8 +407,10 @@ def get_ensemble_state(self) -> list[set[RealizationStorageState]]:
states : list of RealizationStorageState
list of realization states.
"""

response_configs = self.experiment.response_configuration
try:
response_configs = self.experiment.response_configuration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not immediately seem fine to ignore a storage exception here.

except ErtStorageException:
response_configs = {}

def _parameters_exist_for_realization(realization: int) -> bool:
"""
Expand Down
8 changes: 8 additions & 0 deletions src/ert/storage/local_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class _Index(BaseModel):
name: str


class ErtStorageException(Exception):
pass


class LocalExperiment(BaseMode):
"""
Represents an experiment within the local storage system of ERT.
Expand Down Expand Up @@ -253,6 +257,10 @@ def parameter_info(self) -> dict[str, Any]:
@property
def response_info(self) -> dict[str, Any]:
info: dict[str, Any]
if not (self.mount_point / self._responses_file).exists():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just changed to give a FileNotFoundError because the exception did not show full path. Yes the responses.json does not exist, but the entire directory might be wrong:
76a2ba9

raise ErtStorageException(
"responses.json does not exist. Please make sure storage is still valid."
)
with open(self.mount_point / self._responses_file, encoding="utf-8") as f:
info = json.load(f)
return info
Expand Down
54 changes: 47 additions & 7 deletions tests/ert/unit_tests/scheduler/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,11 @@ async def load_result(**_):

job_run_task = asyncio.create_task(
job.run(
asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=max_submit
asyncio.Semaphore(),
asyncio.Lock(),
asyncio.Future(),
asyncio.Lock(),
max_submit=max_submit,
)
)

Expand Down Expand Up @@ -178,7 +182,13 @@ async def test_num_cpu_is_propagated_to_driver(realization: Realization):
scheduler = create_scheduler()
job = Job(scheduler, realization)
job_run_task = asyncio.create_task(
job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1)
job.run(
asyncio.Semaphore(),
asyncio.Lock(),
asyncio.Future(),
asyncio.Lock(),
max_submit=1,
)
)
job.started.set()
job.returncode.set_result(0)
Expand All @@ -201,7 +211,13 @@ async def test_realization_memory_is_propagated_to_driver(realization: Realizati
scheduler = create_scheduler()
job = Job(scheduler, realization)
job_run_task = asyncio.create_task(
job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1)
job.run(
asyncio.Semaphore(),
asyncio.Lock(),
asyncio.Future(),
asyncio.Lock(),
max_submit=1,
)
)
job.started.set()
job.returncode.set_result(0)
Expand Down Expand Up @@ -239,7 +255,13 @@ async def test_when_waiting_for_disk_sync_times_out_an_error_is_logged(

with captured_logs(log_msgs, logging.ERROR):
job_run_task = asyncio.create_task(
job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1)
job.run(
asyncio.Semaphore(),
asyncio.Lock(),
asyncio.Future(),
asyncio.Lock(),
max_submit=1,
)
)
job.started.set()
job.returncode.set_result(0)
Expand Down Expand Up @@ -270,7 +292,13 @@ async def test_when_files_in_manifest_are_not_created_an_error_is_logged(

with captured_logs(log_msgs, logging.ERROR):
job_run_task = asyncio.create_task(
job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1)
job.run(
asyncio.Semaphore(),
asyncio.Lock(),
asyncio.Future(),
asyncio.Lock(),
max_submit=1,
)
)
job.started.set()
job.returncode.set_result(0)
Expand Down Expand Up @@ -304,7 +332,13 @@ async def test_when_checksums_do_not_match_a_warning_is_logged(

with captured_logs(log_msgs, logging.WARNING):
job_run_task = asyncio.create_task(
job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1)
job.run(
asyncio.Semaphore(),
asyncio.Lock(),
asyncio.Future(),
asyncio.Lock(),
max_submit=1,
)
)
job.started.set()
job.returncode.set_result(0)
Expand Down Expand Up @@ -332,7 +366,13 @@ async def test_when_no_checksum_info_is_received_a_warning_is_logged(

with captured_logs(log_msgs, logging.WARNING):
job_run_task = asyncio.create_task(
job.run(asyncio.Semaphore(), asyncio.Lock(), asyncio.Lock(), max_submit=1)
job.run(
asyncio.Semaphore(),
asyncio.Lock(),
asyncio.Future(),
asyncio.Lock(),
max_submit=1,
)
)
job.started.set()
job.returncode.set_result(0)
Expand Down
Loading