From a2d0fb17b5181d19240cbf5f738a378b06bebbdc Mon Sep 17 00:00:00 2001 From: Eddie Bergman Date: Tue, 11 Jun 2024 11:11:38 +0200 Subject: [PATCH] fix(runtime): explicitly load previous results (#106) --- neps/runtime.py | 730 +++++++++----------- neps/search_spaces/hyperparameters/float.py | 3 +- neps/status/status.py | 33 +- neps/utils/data_loading.py | 89 ++- neps/utils/types.py | 2 +- tests/__init__.py | 0 6 files changed, 421 insertions(+), 436 deletions(-) create mode 100644 tests/__init__.py diff --git a/neps/runtime.py b/neps/runtime.py index 1e7df59a..6687b2ef 100644 --- a/neps/runtime.py +++ b/neps/runtime.py @@ -64,7 +64,7 @@ Mapping, Union, ) -from typing_extensions import TypeAlias +from typing_extensions import Self, TypeAlias import numpy as np @@ -76,7 +76,6 @@ POST_EVAL_HOOK_SIGNATURE, ConfigID, ConfigResult, - Metadata, RawConfig, ) @@ -125,232 +124,29 @@ def get_shared_state_poll_and_timeout() -> tuple[float, float | None]: @dataclass class SuccessReport: - """A successful report of the evaluation of a configuration. + """A successful report of the evaluation of a configuration.""" - Attributes: - trial: The trial that was evaluated - id: The identifier of the configuration - loss: The loss of the evaluation - cost: The cost of the evaluation - results: The results of the evaluation - config: The configuration that was evaluated - metadata: Additional metadata about the configuration - pipeline_dir: The directory where the configuration was evaluated - previous: The report of the previous iteration of this trial - time_sampled: The time the configuration was sampled - time_end: The time the configuration was evaluated - """ - - trial: Trial - id: ConfigID loss: float cost: float | None account_for_cost: bool results: Mapping[str, Any] - config: RawConfig - metadata: Metadata - pipeline_dir: Path - previous: Report | None # NOTE: Assumption only one previous report - time_sampled: float - time_end: float - - def __post_init__(self) -> None: - if "time_end" not in self.metadata: - self.metadata["time_end"] = self.time_end - if "time_sampled" not in self.metadata: - self.metadata["time_sampled"] = self.time_sampled - - @property - def disk(self) -> TrialOnDisk: - """Access the disk information of the trial.""" - return TrialOnDisk(pipeline_dir=self.pipeline_dir) - - def to_config_result( - self, - config_to_search_space: Callable[[RawConfig], SearchSpace], - ) -> ConfigResult: - """Convert the report to a `ConfigResult` object.""" - return ConfigResult( - self.id, - config=config_to_search_space(self.config), - result=self.results, - metadata=self.metadata, - ) @dataclass class ErrorReport: - """A failed report of the evaluation of a configuration. + """A failed report of the evaluation of a configuration.""" - Attributes: - trial: The trial that was evaluated - id: The identifier of the configuration - loss: The loss of the evaluation, if any - cost: The cost of the evaluation, if any - results: The results of the evaluation - config: The configuration that was evaluated - metadata: Additional metadata about the configuration - pipeline_dir: The directory where the configuration was evaluated - previous: The report of the previous iteration of this trial - time_sampled: The time the configuration was sampled - time_end: The time the configuration was evaluated - """ - - trial: Trial - id: ConfigID err: Exception tb: str | None loss: float | None cost: float | None account_for_cost: bool results: Mapping[str, Any] - config: RawConfig - metadata: Metadata - pipeline_dir: Path - previous: Report | None # NOTE: Assumption only one previous report - time_sampled: float - time_end: float - - def __post_init__(self) -> None: - if "time_end" not in self.metadata: - self.metadata["time_end"] = self.time_end - if "time_sampled" not in self.metadata: - self.metadata["time_sampled"] = self.time_sampled - - @property - def disk(self) -> TrialOnDisk: - """Access the disk information of the trial.""" - return TrialOnDisk(pipeline_dir=self.pipeline_dir) - - def to_config_result( - self, - config_to_search_space: Callable[[RawConfig], SearchSpace], - ) -> ConfigResult: - """Convert the report to a `ConfigResult` object.""" - return ConfigResult( - self.id, - config=config_to_search_space(self.config), - result="error", - metadata=self.metadata, - ) Report: TypeAlias = Union[SuccessReport, ErrorReport] -@dataclass -class TrialOnDisk: - """The disk information of a trial. - - Attributes: - pipeline_dir: The directory where the trial is stored - id: The unique identifier of the trial - config_file: The path to the configuration file - result_file: The path to the result file - metadata_file: The path to the metadata file - optimization_dir: The directory from which optimization is running - previous_config_id_file: The path to the previous config id file - previous_pipeline_dir: The directory of the previous configuration - lock: The lock for the trial. Obtaining this lock indicates the worker - is evaluating this trial. - """ - - pipeline_dir: Path - - id: ConfigID = field(init=False) - config_file: Path = field(init=False) - result_file: Path = field(init=False) - error_file: Path = field(init=False) - metadata_file: Path = field(init=False) - optimization_dir: Path = field(init=False) - previous_config_id_file: Path = field(init=False) - previous_pipeline_dir: Path | None = field(init=False) - lock: Locker = field(init=False) - - def __post_init__(self) -> None: - self.id = self.pipeline_dir.name[len("config_") :] - self.config_file = self.pipeline_dir / "config.yaml" - self.result_file = self.pipeline_dir / "result.yaml" - self.error_file = self.pipeline_dir / "error.yaml" - self.metadata_file = self.pipeline_dir / "metadata.yaml" - - # NOTE: This is a bit of an assumption! - self.optimization_dir = self.pipeline_dir.parent - - self.previous_config_id_file = self.pipeline_dir / "previous_config.id" - if not empty_file(self.previous_config_id_file): - with self.previous_config_id_file.open("r") as f: - self.previous_config_id = f.read().strip() - - self.previous_pipeline_dir = ( - self.pipeline_dir.parent / f"config_{self.previous_config_id}" - ) - else: - self.previous_pipeline_dir = None - self.pipeline_dir.mkdir(exist_ok=True, parents=True) - self.lock = Locker(self.pipeline_dir / ".config_lock") - - class State(Enum): - """The state of a trial.""" - - PENDING = "pending" - IN_PROGRESS = "in_progress" - SUCCESS = "success" - ERROR = "error" - CORRUPTED = "corrupted" - - def raw_config(self) -> dict[str, Any]: - """Deserialize the configuration from disk.""" - return deserialize(self.config_file) - - def state(self) -> TrialOnDisk.State: - """The state of the trial.""" - if not empty_file(self.result_file): - return TrialOnDisk.State.SUCCESS - if not empty_file(self.error_file): - return TrialOnDisk.State.ERROR - if self.lock.is_locked(): - return TrialOnDisk.State.IN_PROGRESS - if not empty_file(self.config_file): - return TrialOnDisk.State.PENDING - - return TrialOnDisk.State.CORRUPTED - - @classmethod - def from_dir(cls, pipeline_dir: Path) -> TrialOnDisk: - """Create a `Trial.Disk` object from a directory.""" - return cls(pipeline_dir=pipeline_dir) - - def load( - self, - ) -> tuple[ - RawConfig, - Metadata, - ConfigID | None, - dict[str, Any] | tuple[Exception, str | None] | None, - ]: - """Load the trial from disk.""" - config = deserialize(self.config_file) - metadata = deserialize(self.metadata_file) - - result: dict[str, Any] | tuple[Exception, str | None] | None - if not empty_file(self.result_file): - result = deserialize(self.result_file) - assert isinstance(result, dict) - elif not empty_file(self.error_file): - error_tb = deserialize(self.error_file) - result = (Exception(error_tb["err"]), error_tb.get("tb")) - else: - result = None - - if not empty_file(self.previous_config_id_file): - previous_config_id = self.previous_config_id_file.read_text().strip() - else: - previous_config_id = None - - return config, metadata, previous_config_id, result - - @dataclass class Trial: """A trial is a configuration and it's associated data. @@ -362,7 +158,7 @@ class Trial: id: Unique identifier for the configuration config: The configuration to evaluate pipeline_dir: Directory where the configuration is evaluated - previous: The report of the previous iteration of this trial + previous: The previous trial before this trial. time_sampled: The time the configuration was sampled metadata: Additional metadata about the configuration """ @@ -370,18 +166,45 @@ class Trial: id: ConfigID config: Mapping[str, Any] pipeline_dir: Path - previous: Report | None + previous: Trial | None + report: Report | None time_sampled: float metadata: dict[str, Any] _lock: Locker = field(init=False) - disk: TrialOnDisk = field(init=False) + disk: Trial.Disk = field(init=False) + + def to_config_result( + self, + config_to_search_space: Callable[[RawConfig], SearchSpace], + ) -> ConfigResult: + """Convert the report to a `ConfigResult` object.""" + result: ERROR | Mapping[str, Any] = ( + "error" + if self.report is None or isinstance(self.report, ErrorReport) + else self.report.results + ) + return ConfigResult( + self.id, + config=config_to_search_space(self.config), + result=result, + metadata=self.metadata, + ) + + class State(Enum): + """The state of a trial.""" + + PENDING = "pending" + IN_PROGRESS = "in_progress" + SUCCESS = "success" + ERROR = "error" + CORRUPTED = "corrupted" def __post_init__(self) -> None: if "time_sampled" not in self.metadata: self.metadata["time_sampled"] = self.time_sampled self.pipeline_dir.mkdir(exist_ok=True, parents=True) self._lock = Locker(self.pipeline_dir / ".config_lock") - self.disk = TrialOnDisk(pipeline_dir=self.pipeline_dir) + self.disk = Trial.Disk(pipeline_dir=self.pipeline_dir) @property def config_file(self) -> Path: @@ -393,50 +216,132 @@ def metadata_file(self) -> Path: """The path to the metadata file.""" return self.pipeline_dir / "metadata.yaml" + @classmethod + def from_dir(cls, pipeline_dir: Path, *, previous: Trial | None = None) -> Self: + """Create a `Trial` object from a directory. + + Args: + pipeline_dir: The directory where the trial is stored + previous: The previous trial before this trial. + You can use this to prevent loading the previous trial from disk, + if it exists, i.e. a caching shortcut. + + Returns: + The trial object. + """ + return cls.from_disk( + Trial.Disk.from_dir(pipeline_dir), + previous=previous, + ) + + @classmethod + def from_disk(cls, disk: Trial.Disk, *, previous: Trial | None = None) -> Self: + """Create a `Trial` object from a disk. + + Args: + disk: The disk information of the trial. + previous: The previous trial before this trial. + You can use this to prevent loading the previous trial from disk, + if it exists, i.e. a caching shortcut. + + Returns: + The trial object. + """ + try: + config = deserialize(disk.config_file) + except Exception as e: + logger.error( + f"Error loading config from {disk.config_file}: {e}", + exc_info=True, + ) + config = {} + + try: + metadata = deserialize(disk.metadata_file) + time_sampled = metadata["time_sampled"] + except Exception as e: + logger.error( + f"Error loading metadata from {disk.metadata_file}: {e}", + exc_info=True, + ) + metadata = {} + time_sampled = float("nan") + + try: + result: dict[str, Any] | tuple[Exception, str | None] | None + report: Report | None + if not empty_file(disk.result_file): + result = deserialize(disk.result_file) + + assert isinstance(result, dict) + report = SuccessReport( + loss=result["loss"], + cost=result.get("cost", None), + account_for_cost=result.get("account_for_cost", True), + results=result, + ) + elif not empty_file(disk.error_file): + error_tb = deserialize(disk.error_file) + result = deserialize(disk.result_file) + report = ErrorReport( + # NOTE: Not sure we can easily get the original exception type, + # once serialized + err=Exception(error_tb["err"]), + tb=error_tb.get("tb"), + loss=result.get("loss", None), + cost=result.get("cost", None), + account_for_cost=result.get("account_for_cost", True), + results=result, + ) + else: + report = None + except Exception as e: + logger.error( + f"Error loading result from {disk.result_file}: {e}", + exc_info=True, + ) + report = None + + try: + if previous is None and disk.previous_pipeline_dir is not None: + previous = Trial.from_dir(disk.previous_pipeline_dir) + except Exception as e: + logger.error( + f"Error loading previous from {disk.previous_pipeline_dir}: {e}", + exc_info=True, + ) + previous = None + + return cls( + id=disk.config_id, + config=config, + pipeline_dir=disk.pipeline_dir, + report=report, + previous=previous, + time_sampled=time_sampled, + metadata=metadata, + ) + @property def previous_config_id_file(self) -> Path: """The path to the previous configuration id file.""" return self.pipeline_dir / "previous_config.id" - def error( - self, - err: Exception, - tb: str | None = None, - *, - time_end: float | None = None, - ) -> ErrorReport: + def create_error_report(self, err: Exception, tb: str | None = None) -> ErrorReport: """Create a [`Report`][neps.runtime.Report] object with an error.""" - time_end = time_end if time_end is not None else time.time() - if time_end not in self.metadata: - self.metadata["time_end"] = time_end - # TODO(eddiebergman): For now we assume the loss and cost for an error is None # and that we don't account for cost and there are no possible results. return ErrorReport( - config=self.config, - id=self.id, loss=None, cost=None, account_for_cost=False, results={}, err=err, tb=tb, - pipeline_dir=self.pipeline_dir, - previous=self.previous, - trial=self, - metadata=self.metadata, - time_sampled=self.time_sampled, - time_end=time_end, ) - def success( - self, - result: float | Mapping[str, Any], - *, - time_end: float | None = None, - ) -> SuccessReport: + def create_success_report(self, result: float | Mapping[str, Any]) -> SuccessReport: """Check if the trial has succeeded.""" - time_end = time_end if time_end is not None else time.time() _result: dict[str, Any] = {} if isinstance(result, Mapping): if "loss" not in result: @@ -471,20 +376,113 @@ def success( _account_for_cost = _result.get("account_for_cost", True) return SuccessReport( - config=self.config, - id=self.id, loss=_result["loss"], cost=_cost, account_for_cost=_account_for_cost, results=_result, - pipeline_dir=self.pipeline_dir, - previous=self.previous, - trial=self, - metadata=self.metadata, - time_sampled=self.time_sampled, - time_end=time_end, ) + @dataclass + class Disk: + """The disk information of a trial. + + Attributes: + pipeline_dir: The directory where the trial is stored + id: The unique identifier of the trial + config_file: The path to the configuration file + result_file: The path to the result file + metadata_file: The path to the metadata file + optimization_dir: The directory from which optimization is running + previous_config_id_file: The path to the previous config id file + previous_pipeline_dir: The directory of the previous configuration + lock: The lock for the trial. Obtaining this lock indicates the worker + is evaluating this trial. + """ + + pipeline_dir: Path + + config_id: ConfigID = field(init=False) + config_file: Path = field(init=False) + result_file: Path = field(init=False) + error_file: Path = field(init=False) + metadata_file: Path = field(init=False) + optimization_dir: Path = field(init=False) + previous_config_id_file: Path = field(init=False) + previous_config_id: ConfigID | None = field(init=False) + previous_pipeline_dir: Path | None = field(init=False) + lock: Locker = field(init=False) + + def __post_init__(self) -> None: + self.config_id = self.pipeline_dir.name[len("config_") :] + self.config_file = self.pipeline_dir / "config.yaml" + self.result_file = self.pipeline_dir / "result.yaml" + self.error_file = self.pipeline_dir / "error.yaml" + self.metadata_file = self.pipeline_dir / "metadata.yaml" + + # NOTE: This is a bit of an assumption! + self.optimization_dir = self.pipeline_dir.parent + + self.previous_config_id_file = self.pipeline_dir / "previous_config.id" + if not empty_file(self.previous_config_id_file): + with self.previous_config_id_file.open("r") as f: + self.previous_config_id = f.read().strip() + + self.previous_pipeline_dir = ( + self.pipeline_dir.parent / f"config_{self.previous_config_id}" + ) + else: + self.previous_pipeline_dir = None + self.previous_config_id = None + + self.pipeline_dir.mkdir(exist_ok=True, parents=True) + self.lock = Locker(self.pipeline_dir / ".config_lock") + + def raw_config(self) -> dict[str, Any]: + """Deserialize the configuration from disk.""" + return deserialize(self.config_file) + + def state(self) -> Trial.State: # noqa: PLR0911 + """The state of the trial.""" + result_file_exists = not empty_file(self.result_file) + error_file_exists = not empty_file(self.error_file) + config_file_exists = not empty_file(self.config_file) + + # NOTE: We don't handle the case where it's locked and there is a result + # or error file existing, namely as this might introduce a race condition, + # where the result/error is being written to while the lock still exists. + + if error_file_exists: + # Should not have a results file if there is an error file + if result_file_exists: + return Trial.State.CORRUPTED + + # Should have a config file if there is an error file + if not config_file_exists: + return Trial.State.CORRUPTED + + return Trial.State.ERROR + + if result_file_exists: + # Should have a config file if there is a results file + if not config_file_exists: + return Trial.State.CORRUPTED + + return Trial.State.SUCCESS + + if self.lock.is_locked(): + # Should have a config to evaluate if it's locked + if not config_file_exists: + return Trial.State.CORRUPTED + + return Trial.State.IN_PROGRESS + + return Trial.State.PENDING + + @classmethod + def from_dir(cls, pipeline_dir: Path) -> Trial.Disk: + """Create a `Trial.Disk` object from a directory.""" + return cls(pipeline_dir=pipeline_dir) + @dataclass class StatePaths: @@ -547,14 +545,22 @@ class SharedState: paths: StatePaths = field(init=False) create_dirs: bool = False lock: Locker = field(init=False) - evaluated_trials: dict[ConfigID, Report] = field(default_factory=dict) - pending_trials: dict[ConfigID, Trial] = field(default_factory=dict) - in_progress_trials: dict[ConfigID, Trial] = field(default_factory=dict) + + trials: dict[ConfigID, tuple[Trial, Trial.State]] = field(default_factory=dict) + """Mapping from a configid to the trial and it's last known state, including if + it's been evaluated.""" def __post_init__(self) -> None: self.paths = StatePaths(root=self.base_dir, create_dirs=self.create_dirs) self.lock = Locker(self.base_dir / ".decision_lock") + def trials_by_state(self) -> dict[Trial.State, list[Trial]]: + """Get the trials grouped by their state.""" + _dict: dict[Trial.State, list[Trial]] = {state: [] for state in Trial.State} + for trial, state in self.trials.values(): + _dict[state].append(trial) + return _dict + def check_optimizer_info_on_disk_matches( self, optimizer_info: dict[str, Any], @@ -601,119 +607,58 @@ def use_sampler( with sampler.using_state(self.paths.optimizer_state_file): yield sampler - def update_from_disk(self) -> None: # noqa: C901, PLR0912, PLR0915 + def update_from_disk(self) -> None: """Update the shared state from disk.""" trial_dirs = (p for p in self.paths.results_dir.iterdir() if p.is_dir()) - trials_on_disk = [TrialOnDisk.from_dir(p) for p in trial_dirs] - - for trial_on_disk in trials_on_disk: - state = trial_on_disk.state() - - if state in (TrialOnDisk.State.SUCCESS, TrialOnDisk.State.ERROR): - if trial_on_disk.id in self.evaluated_trials: - continue - - # It's been evaluated and we can move it out of pending - self.pending_trials.pop(trial_on_disk.id, None) - self.in_progress_trials.pop(trial_on_disk.id, None) - - raw_config, metadata, previous_config_id, result = trial_on_disk.load() - - # NOTE: Assuming that the previous one will always have been - # evaluated, if there is a previous one. - previous_report = None - if previous_config_id is not None: - previous_report = self.evaluated_trials[previous_config_id] - - trial = Trial( - id=trial_on_disk.id, - config=raw_config, - pipeline_dir=trial_on_disk.pipeline_dir, - previous=previous_report, - time_sampled=metadata["time_sampled"], - metadata=metadata, - ) + _disks = [Trial.Disk.from_dir(p) for p in trial_dirs] + _disk_lookup = {disk.config_id: disk for disk in _disks} + + # NOTE: We sort all trials such that we process previous trials first, i.e. + # if trial_3 has trial_2 as previous, we process trial_2 first, which + # requires trial_1 to have been processed first. + def _depth(trial: Trial.Disk) -> int: + depth = 0 + previous = trial.previous_config_id + while previous is not None: + depth += 1 + previous_trial = _disk_lookup.get(previous) + if previous_trial is None: + raise RuntimeError( + "Previous trial not found on disk when processing a trial." + " This should not happen as if a tria has a previous trial," + " then it should be present and evaluated on disk.", + ) + previous = previous_trial.previous_config_id + + return depth + + # This allows is to traverse linearly and used cached values of previous + # trial data loading, as done below. + _disks.sort(key=_depth) + + for disk in _disks: + config_id = disk.config_id + state = disk.state() + + if state is Trial.State.CORRUPTED: + logger.warning(f"Trial {config_id} was corrupted somehow!") - report: Report - if isinstance(result, dict): - report = trial.success(result, time_end=metadata["time_end"]) - elif isinstance(result, tuple): - err, tb = result - report = trial.error(err=err, tb=tb, time_end=metadata["time_end"]) - elif result is None: + previous: Trial | None = None + if disk.previous_config_id is not None: + previous, _ = self.trials.get(disk.previous_config_id, (None, None)) + if previous is None: raise RuntimeError( - "Result should not have been None, this is a bug!", - "Please report this to the developers with some sample code" - " if possible.", + "Previous trial not found in memory when processing a trial." + " This should not happen as if a trial has a previous trial," + " then it should be present and evaluated in memory.", ) - else: - raise TypeError(f"Unknown result type {type(result)}") - - self.evaluated_trials[trial_on_disk.id] = report - - elif state is TrialOnDisk.State.PENDING: - assert trial_on_disk.id not in self.evaluated_trials - if trial_on_disk.id in self.pending_trials: - continue - - raw_config, metadata, previous_config_id, result = trial_on_disk.load() - - # NOTE: Assuming that the previous one will always have been evaluated, - # if there is a previous one. - previous_report = None - if previous_config_id is not None: - previous_report = self.evaluated_trials[previous_config_id] - - trial = Trial( - id=trial_on_disk.id, - config=raw_config, - pipeline_dir=trial_on_disk.pipeline_dir, - previous=previous_report, - time_sampled=metadata["time_sampled"], - metadata=metadata, - ) - self.pending_trials[trial_on_disk.id] = trial - - elif state is TrialOnDisk.State.IN_PROGRESS: - assert trial_on_disk.id not in self.evaluated_trials - if trial_on_disk.id in self.in_progress_trials: - continue - - # If this was previously in the pending queue, jsut pop - # it into the in progress queue - previously_pending_trial = self.pending_trials.pop(trial_on_disk.id, None) - if previously_pending_trial is not None: - self.in_progress_trials[trial_on_disk.id] = previously_pending_trial - continue - - # Otherwise it's the first time we saw it so we have to load it in - raw_config, metadata, previous_config_id, result = trial_on_disk.load() - - # NOTE: Assuming that the previous one will always have been evaluated, - # if there is a previous one. - previous_report = None - if previous_config_id is not None: - previous_report = self.evaluated_trials[previous_config_id] - - trial = Trial( - id=trial_on_disk.id, - config=raw_config, - pipeline_dir=trial_on_disk.pipeline_dir, - previous=previous_report, - time_sampled=metadata["time_sampled"], - metadata=metadata, - ) - self.pending_trials[trial_on_disk.id] = trial - elif state == TrialOnDisk.State.CORRUPTED: - logger.warning(f"Removing corrupted trial {trial_on_disk.id}") - try: - shutil.rmtree(trial_on_disk.pipeline_dir) - except Exception as e: - logger.exception(e) + cached_trial = self.trials.get(config_id, None) - else: - raise ValueError(f"Unknown state {state} for trial {trial_on_disk.id}") + # If not currently cached or it was and had a state change + if cached_trial is None or cached_trial[1] != state: + trial = Trial.from_disk(disk, previous=previous) + self.trials[config_id] = (trial, state) @contextmanager def sync(self, *, lock: bool = True) -> Iterator[None]: @@ -777,7 +722,7 @@ def _worker_should_continue( def _sample_trial_from_optimizer( optimizer: BaseOptimizer, config_dir_f: Callable[[ConfigID], Path], - evaluated_trials: Mapping[ConfigID, Report], + evaluated_trials: Mapping[ConfigID, Trial], pending_trials: Mapping[ConfigID, Trial], ) -> Trial: optimizer.load_results( @@ -795,21 +740,11 @@ def _sample_trial_from_optimizer( if prev_config_id is not None: previous = evaluated_trials[prev_config_id] - # NOTE(eddiebergman): So weirdly enough, `SearchSpace.hp_values()` will - # get the raw .value of everything, EXCEPT for `GraphParameter` which will - # just give the whole parameter. This assertion is used to check those - # are the only two things coming through here... - # This caused some nightmare to debug bug which led to a hack - # in the `SearchSpace.load_from()` - # - # -- from neps.search_spaces import GraphParameter, Parameter - # -- for k, v in config.items(): - # -- assert isinstance(v, GraphParameter) or not isinstance(v, Parameter) - time_sampled = time.time() return Trial( id=config_id, config=config, + report=None, time_sampled=time_sampled, pipeline_dir=config_dir_f(config_id), previous=previous, @@ -901,10 +836,14 @@ def launch_runtime( # noqa: PLR0913, C901, PLR0915 break with shared_state.sync(lock=True): + trials_by_state = shared_state.trials_by_state() if not _worker_should_continue( max_evaluations_total, - n_inprogress=len(shared_state.pending_trials), - n_evaluated=len(shared_state.evaluated_trials), + n_inprogress=len(trials_by_state[Trial.State.IN_PROGRESS]), + n_evaluated=( + len(trials_by_state[Trial.State.SUCCESS]) + + len(trials_by_state[Trial.State.ERROR]) + ), continue_until_max_evaluation_completed=continue_until_max_evaluation_completed, ): logger.info("Maximum total evaluations is reached, shutting down") @@ -923,16 +862,24 @@ def launch_runtime( # noqa: PLR0913, C901, PLR0915 logger.debug("Sampling a new configuration") + evaluated = ( + trials_by_state[Trial.State.SUCCESS] + + trials_by_state[Trial.State.ERROR] + ) + pending = ( + trials_by_state[Trial.State.PENDING] + + trials_by_state[Trial.State.IN_PROGRESS] + ) trial = _sample_trial_from_optimizer( sampler, shared_state.paths.config_dir, - evaluated_trials=shared_state.evaluated_trials, - pending_trials=shared_state.pending_trials, + evaluated_trials={trial.id: trial for trial in evaluated}, + pending_trials={trial.id: trial for trial in pending}, ) serialize(trial.config, trial.config_file) serialize(trial.metadata, trial.metadata_file) if trial.previous is not None: - trial.previous_config_id_file.write_text(trial.previous.trial.id) + trial.previous_config_id_file.write_text(trial.previous.id) logger.debug(f"Sampled config {trial.id}") @@ -948,7 +895,6 @@ def launch_runtime( # noqa: PLR0913, C901, PLR0915 # TODO(eddiebergman): Right now if a trial crashes, it's cost is not accounted # for, this should probably removed from BaseOptimizer as it does not need # to know this and the runtime can fill this in for it. - report: Report try: user_result = _evaluate_config(trial, evaluation_fn, logger) except Exception as e: # noqa: BLE001 @@ -964,31 +910,30 @@ def launch_runtime( # noqa: PLR0913, C901, PLR0915 ) logger.exception(e) tb = traceback.format_exc() - report = trial.error(e, tb=tb, time_end=time.time()) - shared_state.evaluated_trials[trial.id] = report - shared_state.pending_trials.pop(trial.id, None) - shared_state.in_progress_trials.pop(trial.id, None) + trial.report = trial.create_error_report(e, tb=tb) + trial.metadata["time_end"] = time.time() + + shared_state.trials[trial.id] = (trial, Trial.State.ERROR) - serialize({"err": str(e), "tb": tb}, report.disk.error_file) - serialize(report.metadata, report.disk.metadata_file) + serialize({"err": str(e), "tb": tb}, trial.disk.error_file) + serialize(trial.metadata, trial.disk.metadata_file) else: - report = trial.success(user_result, time_end=time.time()) - if sampler.budget is not None and report.cost is None: + trial.report = trial.create_success_report(user_result) + trial.metadata["time_end"] = time.time() + if sampler.budget is not None and trial.report.cost is None: raise ValueError( - "The evaluation function result should contain " - f"a 'cost' field when used with a budget. Got {report.results}", + "The evaluation function result should contain a 'cost'" + f"field when used with a budget. Got {trial.report.results}", ) with shared_state.lock(poll=_poll, timeout=_timeout): - shared_state.evaluated_trials[trial.id] = report - shared_state.pending_trials.pop(trial.id, None) - shared_state.in_progress_trials.pop(trial.id, None) + shared_state.trials[trial.id] = (trial, Trial.State.SUCCESS) - eval_cost = report.cost + eval_cost = trial.report.cost account_for_cost = False if eval_cost is not None: - account_for_cost = report.account_for_cost + account_for_cost = trial.report.account_for_cost budget_metadata = { "max": sampler.budget, "used": sampler.used_budget, @@ -997,8 +942,8 @@ def launch_runtime( # noqa: PLR0913, C901, PLR0915 } trial.metadata.update(budget_metadata) - serialize(report.metadata, report.disk.metadata_file) - serialize(report.results, report.disk.result_file) + serialize(trial.metadata, trial.disk.metadata_file) + serialize(trial.report.results, trial.disk.result_file) if account_for_cost: assert eval_cost is not None with shared_state.use_sampler(sampler, serialize_seed=False): @@ -1006,14 +951,15 @@ def launch_runtime( # noqa: PLR0913, C901, PLR0915 _result: ERROR | dict[str, Any] if post_evaluation_hook is not None: + report = trial.report if isinstance(report, ErrorReport): _result = "error" elif isinstance(report, SuccessReport): _result = dict(report.results) else: - raise TypeError( - f"Unknown result type '{type(report)}' for report: {report}" - ) + _type = type(report) + raise TypeError(f"Unknown result type '{_type}' for report: {report}") + post_evaluation_hook( trial.config, trial.id, diff --git a/neps/search_spaces/hyperparameters/float.py b/neps/search_spaces/hyperparameters/float.py index 311f00b1..b780f3ff 100644 --- a/neps/search_spaces/hyperparameters/float.py +++ b/neps/search_spaces/hyperparameters/float.py @@ -170,7 +170,8 @@ def normalized_to_value(self, normalized_value: float) -> float: low, high = self.lower, self.upper normalized_value = normalized_value * (high - low) + low - return np.exp(normalized_value) if self.log else normalized_value + _value = np.exp(normalized_value) if self.log else normalized_value + return float(_value) @override def _get_non_unique_neighbors( diff --git a/neps/status/status.py b/neps/status/status.py index 42f720ca..0199a911 100644 --- a/neps/status/status.py +++ b/neps/status/status.py @@ -3,14 +3,15 @@ # ruff: noqa: T201 from __future__ import annotations +from itertools import chain from pathlib import Path from typing import TYPE_CHECKING, Any import pandas as pd -from neps.runtime import ErrorReport, SharedState +from neps.runtime import ErrorReport, SharedState, Trial from neps.utils._locker import Locker -from neps.utils.types import _ConfigResultForStats +from neps.utils.types import ConfigID, _ConfigResultForStats if TYPE_CHECKING: from neps.search_spaces.search_space import SearchSpace @@ -38,19 +39,27 @@ def get_summary_dict( shared_state = SharedState(root_directory) shared_state.update_from_disk() - evaluated = { - _id: _ConfigResultForStats( - _id, - report.config, - "error" if isinstance(report, ErrorReport) else report.results, - report.metadata, + trials_by_state = shared_state.trials_by_state() + + evaluated: dict[ConfigID, _ConfigResultForStats] = {} + + for trial in chain( + trials_by_state[Trial.State.SUCCESS], + trials_by_state[Trial.State.ERROR], + ): + assert trial.report is not None + _result_for_stats = _ConfigResultForStats( + trial.id, + trial.config, + "error" if isinstance(trial.report, ErrorReport) else trial.report.results, + trial.metadata, ) - for _id, report in shared_state.evaluated_trials.items() - } + evaluated[trial.id] = _result_for_stats + in_progress = { - _id: trial.config for _id, trial in shared_state.in_progress_trials.items() + trial.id: trial.config for trial in trials_by_state[Trial.State.IN_PROGRESS] } - pending = {_id: trial.config for _id, trial in shared_state.pending_trials.items()} + pending = {trial.id: trial.config for trial in trials_by_state[Trial.State.PENDING]} summary: dict[str, Any] = {} diff --git a/neps/utils/data_loading.py b/neps/utils/data_loading.py index 1c9be51b..0bdb15e3 100644 --- a/neps/utils/data_loading.py +++ b/neps/utils/data_loading.py @@ -12,8 +12,8 @@ import numpy as np import yaml -from neps.runtime import ErrorReport, SharedState -from neps.utils.types import ERROR, ResultDict, _ConfigResultForStats +from neps.runtime import ErrorReport, SharedState, Trial +from neps.utils.types import ERROR, ConfigID, ResultDict, _ConfigResultForStats def _get_loss( @@ -137,16 +137,26 @@ def read_tasks_and_dev_stages_from_disk( state = SharedState(Path(dev_dir_path)) state.update_from_disk() - result = { - _id: _ConfigResultForStats( - _id, - report.config, - "error" if isinstance(report, ErrorReport) else report.results, - report.metadata, + trials_by_state = state.trials_by_state() + + evaluated: dict[ConfigID, _ConfigResultForStats] = {} + + for trial in chain( + trials_by_state[Trial.State.SUCCESS], + trials_by_state[Trial.State.ERROR], + ): + assert trial.report is not None + _result_for_stats = _ConfigResultForStats( + trial.id, + trial.config, + "error" + if isinstance(trial.report, ErrorReport) + else trial.report.results, + trial.metadata, ) - for _id, report in state.evaluated_trials.items() - } - results[task_id][dev_id] = result + evaluated[trial.id] = _result_for_stats + + results[task_id][dev_id] = evaluated return results @@ -172,16 +182,26 @@ def read_user_prior_results_from_disk( continue state = SharedState(prior_dir) - with state.sync(): - results[prior_dir.name] = { - _id: _ConfigResultForStats( - _id, - report.config, - "error" if isinstance(report, ErrorReport) else report.results, - report.metadata, + with state.sync(lock=False): + evaluated: dict[ConfigID, _ConfigResultForStats] = {} + trials_by_state = state.trials_by_state() + + for trial in chain( + trials_by_state[Trial.State.SUCCESS], + trials_by_state[Trial.State.ERROR], + ): + assert trial.report is not None + _result_for_stats = _ConfigResultForStats( + trial.id, + trial.config, + "error" + if isinstance(trial.report, ErrorReport) + else trial.report.results, + trial.metadata, ) - for _id, report in state.evaluated_trials.items() - } + evaluated[trial.id] = _result_for_stats + + results[prior_dir.name] = evaluated return results @@ -265,7 +285,7 @@ class BestLossesDict(TypedDict): # TODO(unknown): Implement summarize results for nested working directories # with multiple experiments -def summarize_results( +def summarize_results( # noqa: C901 working_dir: str | Path, final_task_id: int | None = None, final_dev_id: int | None = None, @@ -301,6 +321,7 @@ def summarize_results( if sub_dir: seed_dir = seed_dir / sub_dir # noqa: PLW2901 + final_results: dict[ConfigID, _ConfigResultForStats] if final_task_id is not None and final_dev_id is not None: results = read_tasks_and_dev_stages_from_disk([seed_dir]) @@ -308,16 +329,24 @@ def summarize_results( final_results = results[final_task_id][final_dev_id] else: state = SharedState(Path(seed_dir)) - with state.sync(): - final_results = { - _id: _ConfigResultForStats( - _id, - report.config, - "error" if isinstance(report, ErrorReport) else report.results, - report.metadata, + with state.sync(lock=False): + trials_by_state = state.trials_by_state() + + final_results = {} + for trial in chain( + trials_by_state[Trial.State.SUCCESS], + trials_by_state[Trial.State.ERROR], + ): + assert trial.report is not None + _result_for_stats = _ConfigResultForStats( + trial.id, + trial.config, + "error" + if isinstance(trial.report, ErrorReport) + else trial.report.results, + trial.metadata, ) - for _id, report in state.evaluated_trials.items() - } + final_results[trial.id] = _result_for_stats # This part is copied from neps.status() best_loss: float = float("inf") diff --git a/neps/utils/types.py b/neps/utils/types.py index 3b16601b..d0e8c46c 100644 --- a/neps/utils/types.py +++ b/neps/utils/types.py @@ -91,7 +91,7 @@ class _ConfigResultForStats: @property def loss(self) -> float | ERROR: if isinstance(self.result, dict): - return self.result["loss"] + return float(self.result["loss"]) return "error" diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b