From cc124c1432780c84a9374172fe2c1c84d76bd2f8 Mon Sep 17 00:00:00 2001 From: eddiebergman Date: Thu, 25 Apr 2024 19:55:32 +0200 Subject: [PATCH] refactor(runtime): Integrate metahyper closely --- .github/workflows/tests.yaml | 2 +- neps/api.py | 6 +- neps/metahyper/__init__.py | 2 - neps/metahyper/_locker.py | 36 -- neps/metahyper/api.py | 559 ----------------- neps/metahyper/example.py | 37 -- neps/metahyper/utils.py | 179 ------ neps/optimizers/base_optimizer.py | 65 +- .../bayesian_optimization/cost_cooling.py | 5 +- .../kernels/get_kernels.py | 2 +- .../bayesian_optimization/mf_tpe.py | 28 +- .../bayesian_optimization/models/__init__.py | 3 +- .../bayesian_optimization/optimizer.py | 10 +- neps/optimizers/grid_search/optimizer.py | 4 +- neps/optimizers/multi_fidelity/_dyhpo.py | 19 +- neps/optimizers/multi_fidelity/dyhpo.py | 18 +- neps/optimizers/multi_fidelity/hyperband.py | 13 +- neps/optimizers/multi_fidelity/mf_bo.py | 6 +- .../multi_fidelity/sampling_policy.py | 26 +- .../multi_fidelity/successive_halving.py | 32 +- .../multi_fidelity_prior/async_priorband.py | 11 +- .../prototype_optimizer.py | 4 +- neps/optimizers/random_search/optimizer.py | 4 +- neps/plot/tensorboard_eval.py | 44 +- neps/runtime.py | 578 ++++++++++++++++++ neps/search_spaces/search_space.py | 9 +- neps/status/status.py | 125 ++-- neps/types.py | 42 ++ neps/utils/__init__.py | 0 neps/utils/_locker.py | 61 ++ neps/utils/common.py | 148 ++++- neps/utils/data_loading.py | 25 +- neps/utils/files.py | 72 +++ pyproject.toml | 296 ++++++--- tests/test_metahyper/test_locking.py | 2 +- 35 files changed, 1396 insertions(+), 1077 deletions(-) delete mode 100644 neps/metahyper/__init__.py delete mode 100644 neps/metahyper/_locker.py delete mode 100644 neps/metahyper/api.py delete mode 100644 neps/metahyper/example.py delete mode 100644 neps/metahyper/utils.py create mode 100644 neps/runtime.py create mode 100644 neps/types.py create mode 100644 neps/utils/__init__.py create mode 100644 neps/utils/_locker.py create mode 100644 neps/utils/files.py diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 4c52f9d5..94be8aea 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -41,4 +41,4 @@ jobs: - name: Run pytest timeout-minutes: 15 - run: poetry run pytest -m "all_examples or metahyper or neps_api or summary_csv" + run: poetry run pytest -m "all_examples or runtime or neps_api or summary_csv" diff --git a/neps/api.py b/neps/api.py index 2584fcbe..977e8c77 100644 --- a/neps/api.py +++ b/neps/api.py @@ -1,6 +1,5 @@ """API for the neps package. """ - from __future__ import annotations import logging @@ -10,7 +9,8 @@ import ConfigSpace as CS -from .metahyper import instance_from_map, metahyper_run +from neps.utils.common import instance_from_map +from neps.runtime import launch_runtime from .optimizers import BaseOptimizer, SearcherMapping from .plot.tensorboard_eval import tblogger from .search_spaces.parameter import Parameter @@ -271,7 +271,7 @@ def run( if development_stage_id is not None: root_directory = Path(root_directory) / f"dev_{development_stage_id}" - metahyper_run( + launch_runtime( run_pipeline, searcher_instance, searcher_info, diff --git a/neps/metahyper/__init__.py b/neps/metahyper/__init__.py deleted file mode 100644 index 6e2aa0f7..00000000 --- a/neps/metahyper/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .api import ConfigResult, Sampler, read, metahyper_run -from .utils import instance_from_map diff --git a/neps/metahyper/_locker.py b/neps/metahyper/_locker.py deleted file mode 100644 index bd4d2528..00000000 --- a/neps/metahyper/_locker.py +++ /dev/null @@ -1,36 +0,0 @@ -import atexit -import time -from contextlib import contextmanager - -import portalocker as pl - - -class Locker: - def __init__(self, lock_path, logger): - self.logger = logger - atexit.register(self.__del__) - self.lock_path = lock_path - self.lock_file = self.lock_path.open("a") # a for security - - def __del__(self): - self.lock_file.close() - - def release_lock(self): - self.logger.debug(f"Release lock for {self.lock_path}") - pl.unlock(self.lock_file) - - def acquire_lock(self): - try: - pl.lock(self.lock_file, pl.LOCK_EX | pl.LOCK_NB) - self.logger.debug(f"Acquired lock for {self.lock_path}") - return True - except pl.exceptions.LockException: - self.logger.debug(f"Failed to acquire lock for {self.lock_path}") - return False - - @contextmanager - def acquire_force(self, time_step=1): - while not self.acquire_lock(): - time.sleep(time_step) - yield True - self.release_lock() diff --git a/neps/metahyper/api.py b/neps/metahyper/api.py deleted file mode 100644 index 1dc9c3bf..00000000 --- a/neps/metahyper/api.py +++ /dev/null @@ -1,559 +0,0 @@ -from __future__ import annotations - -import inspect -import logging -import shutil -import time -import warnings -from abc import ABC, abstractmethod -from contextlib import contextmanager -from copy import deepcopy -from dataclasses import dataclass -from pathlib import Path -from typing import Any, Iterable - -from ._locker import Locker -from .utils import YamlSerializer, find_files, non_empty_file - -warnings.simplefilter("always", DeprecationWarning) - - -@dataclass -class ConfigResult: - config: Any - result: dict - metadata: dict - - def __lt__(self, other): - return self.result["loss"] < other.result["loss"] - - -class ConfigInRun: - config: Any | None = None - config_id: str | None = None - pipeline_directory: Path | str | None = None - previous_pipeline_directory: Path | str | None = None - optimization_dir: Path | str | None = None - - @staticmethod - def store_in_run_data( - config, - config_id, - pipeline_directory, - previous_pipeline_directory, - optimization_dir, - ): - ConfigInRun.config = config - ConfigInRun.config_id = config_id - ConfigInRun.pipeline_directory = Path(pipeline_directory) - ConfigInRun.previous_pipeline_directory = previous_pipeline_directory - ConfigInRun.optimization_dir = Path(optimization_dir) - - -class Sampler(ABC): - # pylint: disable=no-self-use,unused-argument - def __init__(self, budget: int | float | None = None): - self.used_budget: int | float = 0 - self.budget = budget - - def get_state(self) -> Any: - """Return a state for the sampler that will be used in every other thread""" - state = { - "used_budget": self.used_budget, - } - if self.budget is not None: - state["remaining_budget"] = self.budget - self.used_budget - return state - - def load_state(self, state: dict[str, Any]): - """Load a state for the sampler shared accross threads""" - self.used_budget = state["used_budget"] - - @contextmanager - def using_state(self, state_file: Path, serializer): - if state_file.exists(): - self.load_state(serializer.load(state_file)) - yield self - - serializer.dump(self.get_state(), state_file) - - def load_results( - self, results: dict[Any, ConfigResult], pending_configs: dict[Any, ConfigResult] - ) -> None: - return - - @abstractmethod - def get_config_and_ids(self) -> tuple[Any, str, str | None]: - """Sample a new configuration - - Returns: - config: serializable object representing the configuration - config_id: unique identifier for the configuration - previous_config_id: if provided, id of a previous on which this - configuration is based - """ - raise NotImplementedError - - def load_config(self, config: Any): - """Transform a serialized object into a configuration object""" - return config - - -class Configuration: - """If the configuration is not a simple dictionary, it should inherit from - this object and define the 'hp_values' method""" - - def hp_values(self): - """Should return a dictionary of the hyperparameter values""" - raise NotImplementedError - - -def _process_sampler_info( - serializer: YamlSerializer, - sampler_info: dict, - sampler_info_file: Path, - decision_locker: Locker, - logger=None, -): - """ - This function is called by the metahyper before sampling and training happens. - It performs checks on the optimizer's YAML data file to ensure data integrity - and applies sanity checks for potential user errors when running NePS. - - The function utilizes a file-locking mechanism using the `Locker` class to ensure - thread safety, preventing potential conflicts when multiple threads access the file - simultaneously. - - Args: - - serializer: The YAML serializer object used for loading and dumping data. - - sampler_info: The dictionary containing information for the optimizer. - - sampler_info_file: The path to the YAML file storing optimizer data if available. - - decision_locker: The Locker file to use during multi-thread communication. - - logger: An optional logger object for logging messages (default is 'neps'). - Note: - - The file-locking mechanism is employed to avoid potential errors in multiple threads. - """ - if logger is None: - logger = logging.getLogger("neps") - - should_break = False - while not should_break: - if decision_locker.acquire_lock(): - try: - if sampler_info_file.exists(): - optimizer_data = serializer.load(sampler_info_file) - excluded_key = "searcher_name" - sampler_info_copy = sampler_info.copy() - optimizer_data_copy = optimizer_data.copy() - sampler_info_copy.pop(excluded_key, None) - optimizer_data_copy.pop(excluded_key, None) - - if sampler_info_copy != optimizer_data_copy: - raise ValueError( - f"The sampler_info in the file {sampler_info_file} is not valid. " - f"Expected: {sampler_info_copy}, Found: {optimizer_data_copy}" - ) - else: - # If the file is empty or doesn't exist, write the sampler_info - serializer.dump(sampler_info, sampler_info_file, sort_keys=False) - except ValueError as ve: - # Handle specific value error - raise ve - except Exception as e: - raise RuntimeError(f"Error during data saving: {e}") from e - finally: - decision_locker.release_lock() - should_break = True - else: - time.sleep(2) - - -def _load_sampled_paths(optimization_dir: Path | str, serializer, logger): - optimization_dir = Path(optimization_dir) - base_result_directory = optimization_dir / "results" - logger.debug(f"Loading results from {base_result_directory}") - - previous_paths, pending_paths = {}, {} - for config_dir in base_result_directory.iterdir(): - if not config_dir.is_dir(): - continue - config_id = config_dir.name[len("config_") :] - config_file = config_dir / f"config{serializer.SUFFIX}" - result_file = config_dir / f"result{serializer.SUFFIX}" - - if non_empty_file(result_file): - previous_paths[config_id] = (config_dir, config_file, result_file) - elif non_empty_file(config_file): - pending_paths[config_id] = (config_dir, config_file) - else: - existing_config = find_files( - config_dir, ["config"], any_suffix=True, check_nonempty=True - ) - if existing_config: - existing_format = existing_config[0].suffix - logger.warning( - f"Found directory {config_dir} with file {existing_config[0].name}. " - f"But function was called with the serializer for " - f"'{serializer.SUFFIX}' files, not '{existing_format}'." - ) - else: - # Should probably warn the user somehow about this, although it is not - # dangerous - logger.warning( - f"Removing {config_dir} as worker died during config sampling." - ) - try: - shutil.rmtree(str(config_dir)) - except Exception: # The worker doesn't need to crash for this - logger.exception(f"Can't delete {config_dir}") - return previous_paths, pending_paths - - -def _read_config_result(result_dir: Path | str, serializer: YamlSerializer): - result_dir = Path(result_dir) - config = serializer.load_config(result_dir / "config") - result = serializer.load(result_dir / "result") - metadata = serializer.load(result_dir / "metadata") - return ConfigResult(config, result, metadata) - - -def read(optimization_dir: Path | str, serializer=None, logger=None, do_lock=True): - optimization_dir = Path(optimization_dir) - - if logger is None: - logger = logging.getLogger("metahyper") - - if do_lock: - decision_lock_file = optimization_dir / ".decision_lock" - decision_lock_file.touch(exist_ok=True) - decision_locker = Locker(decision_lock_file, logger.getChild("_locker")) - while not decision_locker.acquire_lock(): - time.sleep(2) - - if serializer is None: - serializer = YamlSerializer() - - previous_paths, pending_paths = _load_sampled_paths( - optimization_dir, serializer, logger - ) - previous_results, pending_configs, pending_configs_free = {}, {}, {} - - for config_id, (config_dir, _, _) in previous_paths.items(): - previous_results[config_id] = _read_config_result(config_dir, serializer) - - for config_id, (config_dir, config_file) in pending_paths.items(): - pending_configs[config_id] = serializer.load_config(config_file) - - config_lock_file = config_dir / ".config_lock" - config_locker = Locker(config_lock_file, logger.getChild("_locker")) - if config_locker.acquire_lock(): - pending_configs_free[config_id] = pending_configs[config_id] - - logger.debug( - f"Read in {len(previous_results)} previous results and " - f"{len(pending_configs)} pending evaluations " - f"({len(pending_configs_free)} without a worker)" - ) - logger.debug( - f"Read in previous_results={previous_results}, " - f"pending_configs={pending_configs}, " - f"and pending_configs_free={pending_configs_free}, " - ) - - if do_lock: - decision_locker.release_lock() - return previous_results, pending_configs, pending_configs_free - - -def _check_max_evaluations( - optimization_dir, - max_evaluations, - serializer, - logger, - continue_until_max_evaluation_completed, -): - logger.debug("Checking if max evaluations is reached") - previous_results, pending_configs, pending_configs_free = read( - optimization_dir, serializer, logger - ) - evaluation_count = len(previous_results) - - # Taking into account pending evaluations - if not continue_until_max_evaluation_completed: - evaluation_count += len(pending_configs) - len(pending_configs_free) - - return evaluation_count >= max_evaluations - - -def _sample_config(optimization_dir, sampler, serializer, logger, pre_load_hooks): - # First load the results and state of the optimizer - previous_results, pending_configs, pending_configs_free = read( - optimization_dir, serializer, logger, do_lock=False - ) - - base_result_directory = optimization_dir / "results" - - logger.debug("Sampling a new configuration") - - for hook in pre_load_hooks: - # executes operations on the sampler before setting its state - # can be used for setting custom constraints on the optimizer state - # for example, can be used to input custom grid of configs, meta learning - # information for surrogate building, any non-stationary auxiliary information - sampler = hook(sampler) - - sampler.load_results(previous_results, pending_configs) - config, config_id, previous_config_id = sampler.get_config_and_ids() - - pipeline_directory = base_result_directory / f"config_{config_id}" - pipeline_directory.mkdir(exist_ok=True) - - if pending_configs_free: - logger.warning( - f"There are {len(pending_configs_free)} configs that were sampled, but " - "have no worker assigned. Sometimes this is due to a delay in the filesystem " - "communication, but most likely some configs crashed during their execution " - "or a jobtime-limit was reached." - ) - - if previous_config_id is not None: - previous_config_id_file = pipeline_directory / "previous_config.id" - previous_config_id_file.write_text(previous_config_id) # TODO: Get rid of this - serializer.dump( - {"time_sampled": time.time(), "previous_config_id": previous_config_id}, - pipeline_directory / "metadata", - ) - previous_pipeline_directory = Path( - base_result_directory, f"config_{previous_config_id}" - ) - else: - serializer.dump({"time_sampled": time.time()}, pipeline_directory / "metadata") - previous_pipeline_directory = None - - # We want this to be the last action in sampling to catch potential crashes - serializer.dump(config, pipeline_directory / "config") - - logger.debug(f"Sampled config {config_id}") - return ( - config_id, - config, - pipeline_directory, - previous_pipeline_directory, - ) - - -def _evaluate_config( - config_id, - config, - pipeline_directory, - evaluation_fn, - previous_pipeline_directory, - logger, -): - if isinstance(config, Configuration): - config = config.hp_values() - config = deepcopy(config) - logger.info(f"Start evaluating config {config_id}") - try: - # If pipeline_directory and previous_pipeline_directory are included in the - # signature we supply their values, otherwise we simply do nothing. - evaluation_fn_params = inspect.signature(evaluation_fn).parameters - directory_params = [] - if "pipeline_directory" in evaluation_fn_params: - directory_params.append(pipeline_directory) - elif "working_directory" in evaluation_fn_params: - warnings.warn( - "the argument: 'working_directory' is deprecated. " - "In the function: '{}', please, " - "use 'pipeline_directory' instead. " - "version==0.5.5".format(evaluation_fn.__name__), - DeprecationWarning, - stacklevel=2, - ) - directory_params.append(pipeline_directory) - - if "previous_pipeline_directory" in evaluation_fn_params: - directory_params.append(previous_pipeline_directory) - elif "previous_working_directory" in evaluation_fn_params: - warnings.warn( - "the argument: 'previous_working_directory' is deprecated. " - "In the function: '{}', please, " - "use 'previous_pipeline_directory' instead. " - "version==0.5.5".format(evaluation_fn.__name__), - DeprecationWarning, - stacklevel=2, - ) - directory_params.append(previous_pipeline_directory) - - result = evaluation_fn( - *directory_params, - **config, - ) - - # Ensuring the result have the correct format that can be exploited by other functions - if isinstance(result, dict): - try: - result["loss"] = float(result["loss"]) - except KeyError as e: - raise ValueError("The loss should value should be provided") from e - except (TypeError, ValueError) as e: - raise ValueError("The loss should be a float") from e - else: - try: - result = float(result) - except (TypeError, ValueError) as e: - raise ValueError( - "The evaluation result should be a dictionnary or a float" - ) from e - result = {"loss": result} - except Exception: - logger.exception( - f"An error occured during evaluation of config {config_id}: " f"{config}." - ) - result = "error" - - return result, {"time_end": time.time()} - - -def metahyper_run( - evaluation_fn, - sampler: Sampler, - sampler_info: dict, - optimization_dir, - max_evaluations_total=None, - max_evaluations_per_run=None, - continue_until_max_evaluation_completed=False, - logger=None, - post_evaluation_hook=None, - overwrite_optimization_dir=False, - pre_load_hooks: Iterable | None = None, -): - serializer = YamlSerializer(sampler.load_config) - if logger is None: - logger = logging.getLogger("metahyper") - - optimization_dir = Path(optimization_dir) - if overwrite_optimization_dir and optimization_dir.exists(): - logger.warning("Overwriting working_directory") - shutil.rmtree(optimization_dir) - - sampler_state_file = optimization_dir / ".optimizer_state.yaml" - sampler_info_file = optimization_dir / ".optimizer_info.yaml" - base_result_directory = optimization_dir / "results" - base_result_directory.mkdir(parents=True, exist_ok=True) - - decision_lock_file = optimization_dir / ".decision_lock" - decision_lock_file.touch(exist_ok=True) - decision_locker = Locker(decision_lock_file, logger.getChild("_locker")) - - # Configuring the .optimizer_info.yaml file - _process_sampler_info( - serializer, sampler_info, sampler_info_file, decision_locker, logger - ) - - evaluations_in_this_run = 0 - while True: - if max_evaluations_total is not None and _check_max_evaluations( - optimization_dir, - max_evaluations_total, - serializer, - logger, - continue_until_max_evaluation_completed, - ): - logger.info("Maximum total evaluations is reached, shutting down") - break - - if ( - max_evaluations_per_run is not None - and evaluations_in_this_run >= max_evaluations_per_run - ): - logger.info("Maximum evaluations per run is reached, shutting down") - break - - if decision_locker.acquire_lock(): - try: - with sampler.using_state(sampler_state_file, serializer): - if sampler.budget is not None: - if sampler.used_budget >= sampler.budget: - logger.info("Maximum budget reached, shutting down") - break - ( - config_id, - config, - pipeline_directory, - previous_pipeline_directory, - ) = _sample_config( - optimization_dir, sampler, serializer, logger, pre_load_hooks - ) - # Storing the config details in ConfigInRun - ConfigInRun.store_in_run_data( - config, - config_id, - pipeline_directory, - previous_pipeline_directory, - optimization_dir, - ) - - config_lock_file = pipeline_directory / ".config_lock" - config_lock_file.touch(exist_ok=True) - config_locker = Locker(config_lock_file, logger.getChild("_locker")) - config_lock_acquired = config_locker.acquire_lock() - finally: - decision_locker.release_lock() - - if config_lock_acquired: - try: - # 1. First, we evaluate the config - result, metadata = _evaluate_config( - config_id, - config, - pipeline_directory, - evaluation_fn, - previous_pipeline_directory, - logger, - ) - - # 2. Then, we now dump all information to disk: - serializer.dump(result, pipeline_directory / "result") - - if result != "error": - # Updating the global budget - if "cost" in result: - eval_cost = float(result["cost"]) - account_for_cost = result.get("account_for_cost", True) - if account_for_cost: - with decision_locker.acquire_force(time_step=1): - with sampler.using_state( - sampler_state_file, serializer - ): - sampler.used_budget += eval_cost - - metadata["budget"] = { - "max": sampler.budget, - "used": sampler.used_budget, - "eval_cost": eval_cost, - "account_for_cost": account_for_cost, - } - elif sampler.budget is not None: - raise ValueError( - "The evaluation function result should contain " - "a 'cost' field when used with a budget" - ) - - config_metadata = serializer.load(pipeline_directory / "metadata") - config_metadata.update(metadata) - serializer.dump(config_metadata, pipeline_directory / "metadata") - - # 3. Anything the user might want to do after the evaluation - if post_evaluation_hook is not None: - post_evaluation_hook( - config, config_id, pipeline_directory, result, logger - ) - else: - logger.info(f"Finished evaluating config {config_id}") - finally: - config_locker.release_lock() - - evaluations_in_this_run += 1 - else: - time.sleep(3) diff --git a/neps/metahyper/example.py b/neps/metahyper/example.py deleted file mode 100644 index b1aef5e3..00000000 --- a/neps/metahyper/example.py +++ /dev/null @@ -1,37 +0,0 @@ -import time -import uuid - -import metahyper - - -class MinimalSampler(metahyper.Sampler): - def __init__(self): - super().__init__() - self.results = dict() - - def load_results(self, results, pending_configs): - self.results = results - - def get_config_and_ids(self): - config = dict(a=len(self.results)) - config_id = str(uuid.uuid4())[:6] - previous_config_id = None - return config, config_id, previous_config_id - - -def evaluation_fn(pipeline_directory, **config): # pylint: disable=unused-argument - time.sleep(15) - return 5 - - -if __name__ == "__main__": - import logging - - logging.basicConfig(level=logging.INFO) - - opt_dir = "test_opt_dir" - sampler = MinimalSampler() - metahyper.run( - evaluation_fn, sampler, optimization_dir=opt_dir, max_evaluations_total=5 - ) - previous_results, pending_configs, _ = metahyper.read(opt_dir) diff --git a/neps/metahyper/utils.py b/neps/metahyper/utils.py deleted file mode 100644 index fede6adc..00000000 --- a/neps/metahyper/utils.py +++ /dev/null @@ -1,179 +0,0 @@ -from __future__ import annotations - -import glob -import inspect -from functools import partial -from pathlib import Path -from typing import Any, Callable - -import yaml - - -def non_empty_file(file_path: Path) -> bool: - return file_path.exists() and file_path.stat().st_size != 0 - - -def find_files( - directory: Path, files: list[str], any_suffix=False, check_nonempty=False -) -> list[Path]: - found_paths = [] - for file_name in files: - pattern = f"{directory.absolute()}/**/{file_name}" - if any_suffix: - pattern += "*" - for f_path in glob.glob(pattern, recursive=True): - path_found = Path(f_path) - if path_found.is_file(): - if check_nonempty and not non_empty_file(path_found): - continue - found_paths.append(path_found) - return found_paths - - -def get_data_representation(data: Any): - """Common data representations. Other specific types should be handled - by the user in his Parameter class.""" - if isinstance(data, dict): - return {key: get_data_representation(val) for key, val in data.items()} - elif isinstance(data, list) or isinstance(data, tuple): - return [get_data_representation(val) for val in data] - elif type(data).__module__ in ["numpy", "torch"]: - data = data.tolist() - if type(data).__module__ == "numpy": - data = data.item() - return get_data_representation(data) - elif hasattr(data, "serialize"): - return get_data_representation(data.serialize()) - else: - return data - - -class MissingDependencyError(Exception): - def __init__(self, dep: str, cause: Exception, *args: Any): - super().__init__(dep, cause, *args) - self.dep = dep - self.__cause__ = cause # This is what `raise a from b` does - - def __str__(self) -> str: - return ( - f"Some required dependency-({self.dep}) to use this optional feature is " - f"missing. Please, include neps[experimental] dependency group in your " - f"installation of neps to be able to use all the optional features." - f" Otherwise, just install ({self.dep})" - ) - - -class YamlSerializer: - SUFFIX = ".yaml" - PRE_SERIALIZE = True - - def __init__(self, config_loader: Callable | None = None): - self.config_loader = config_loader or (lambda x: x) - - def load(self, path: Path | str, add_suffix=True): - path = str(path) - if add_suffix and Path(path).suffix != self.SUFFIX: - path = path + self.SUFFIX - - with open(path) as file_stream: - return yaml.full_load(file_stream) - - def dump(self, data: Any, path: Path | str, add_suffix=True, sort_keys=True): - if self.PRE_SERIALIZE: - data = get_data_representation(data) - path = str(path) - if add_suffix and Path(path).suffix != self.SUFFIX: - path = path + self.SUFFIX - with open(path, "w") as file_stream: - try: - return yaml.safe_dump(data, file_stream, sort_keys=sort_keys) - except yaml.representer.RepresenterError as e: - raise TypeError( - "You should return objects that are JSON-serializable. The object " - f"{e.args[1]} of type {type(e.args[1])} is not." - ) from e - - def load_config(self, path: Path | str): - if self.PRE_SERIALIZE: - return self.config_loader(self.load(path)) - return self.load(path) - - -def is_partial_class(obj): - """Check if the object is a (partial) class, or an instance""" - if isinstance(obj, partial): - obj = obj.func - return inspect.isclass(obj) - - -def instance_from_map( - mapping: dict[str, Any], - request: str | list | tuple | Any, - name: str = "mapping", - allow_any: bool = True, - as_class: bool = False, - kwargs: dict | None = None, -): - """Get an instance of an class from a mapping. - - Arguments: - mapping: Mapping from string keys to classes or instances - request: A key from the mapping. If allow_any is True, could also be an - object or a class, to use a custom object. - name: Name of the mapping used in error messages - allow_any: If set to True, allows using custom classes/objects. - as_class: If the class should be returned without beeing instanciated - kwargs: Arguments used for the new instance, if created. Its purpose is - to serve at default arguments if the user doesn't built the object. - - Raises: - ValueError: if the request is invalid (not a string if allow_any is False), - or invalid key. - """ - - # Split arguments of the form (request, kwargs) - args_dict = kwargs or {} - if isinstance(request, tuple) or isinstance(request, list): - if len(request) != 2: - raise ValueError( - "When building an instance and specifying arguments, " - "you should give a pair (class, arguments)" - ) - request, req_args_dict = request - if not isinstance(req_args_dict, dict): - raise ValueError("The arguments should be given as a dictionary") - args_dict = {**args_dict, **req_args_dict} - - # Then, get the class/instance from the request - if isinstance(request, str): - if request in mapping: - instance = mapping[request] - else: - raise ValueError(f"{request} doesn't exists for {name}") - elif allow_any: - instance = request - else: - raise ValueError(f"Object {request} invalid key for {name}") - - if isinstance(instance, MissingDependencyError): - raise instance - - # Check if the request is a class if it is mandatory - if (args_dict or as_class) and not is_partial_class(instance): - raise ValueError( - f"{instance} is not a class and can't be used with additional arguments" - ) - - # Give the arguments to the class - if args_dict: - instance = partial(instance, **args_dict) - - # Return the class / instance - if as_class: - return instance - if is_partial_class(instance): - try: - instance = instance() - except TypeError as e: - raise TypeError(f"{e} when calling {instance} with {args_dict}") from e - return instance diff --git a/neps/optimizers/base_optimizer.py b/neps/optimizers/base_optimizer.py index b4502eca..fce12158 100644 --- a/neps/optimizers/base_optimizer.py +++ b/neps/optimizers/base_optimizer.py @@ -3,32 +3,37 @@ import logging from abc import abstractmethod from copy import deepcopy -from typing import Any +from typing import Any, Iterator, Mapping +from typing_extensions import Self +from contextlib import contextmanager +from pathlib import Path -from ..metahyper.api import ConfigResult, Sampler +from neps.types import ConfigResult +from neps.utils.files import serialize, deserialize from ..search_spaces.search_space import SearchSpace from ..utils.common import get_rnd_state, set_rnd_state from ..utils.result_utils import get_cost, get_learning_curve, get_loss -class BaseOptimizer(Sampler): +class BaseOptimizer: """Base sampler class. Implements all the low-level work.""" def __init__( self, pipeline_space: SearchSpace, patience: int = 50, - logger=None, - budget: None | int | float = None, - loss_value_on_error: None | float = None, - cost_value_on_error: None | float = None, - learning_curve_on_error: None | float | list[float] = None, + logger: logging.Logger | None = None, + budget: int | float | None = None, + loss_value_on_error: float | None = None, + cost_value_on_error: float | None = None, + learning_curve_on_error: float | list[float] | None = None, ignore_errors=False, - ): - super().__init__(budget=budget) + ) -> None: if patience < 1: raise ValueError("Patience should be at least 1") + self.used_budget = 0 + self.budget = budget self.pipeline_space = pipeline_space self.patience = patience self.logger = logger or logging.getLogger("neps") @@ -41,25 +46,37 @@ def __init__( def load_results( self, previous_results: dict[str, ConfigResult], - pending_evaluations: dict[str, ConfigResult], + pending_evaluations: dict[str, SearchSpace], ) -> None: raise NotImplementedError @abstractmethod def get_config_and_ids(self) -> tuple[SearchSpace, str, str | None]: + """Sample a new configuration + + Returns: + config: serializable object representing the configuration + config_id: unique identifier for the configuration + previous_config_id: if provided, id of a previous on which this + configuration is based + """ raise NotImplementedError def get_state(self) -> Any: # pylint: disable=no-self-use - return { - "rnd_seeds": get_rnd_state(), - **super().get_state(), - } + _state = {"rnd_seeds": get_rnd_state(), "used_budget": self.used_budget} + if self.budget is not None: + # TODO(eddiebergman): Seems like this isn't used anywhere, + # A fuzzy find search for `remaining_budget` shows this as the + # only use point. + _state["remaining_budget"] = self.budget - self.used_budget + + return _state - def load_state(self, state: Any): # pylint: disable=no-self-use + def load_state(self, state: Any) -> None: # pylint: disable=no-self-use set_rnd_state(state["rnd_seeds"]) - super().load_state(state) + self.used_budget = state["used_budget"] - def load_config(self, config_dict): + def load_config(self, config_dict: Mapping[str, Any]) -> SearchSpace: config = deepcopy(self.pipeline_space) config.load_from(config_dict) return config @@ -91,5 +108,15 @@ def get_learning_curve(self, result: str | dict | float) -> float | Any: ignore_errors=self.ignore_errors, ) - def whoami(self): + def whoami(self) -> str: return type(self).__name__ + + @contextmanager + def using_state(self, state_file: Path) -> Iterator[Self]: + if state_file.exists(): + state = deserialize(state_file) + self.load_state(state) + + yield self + + serialize(self.get_state(), path=state_file) diff --git a/neps/optimizers/bayesian_optimization/cost_cooling.py b/neps/optimizers/bayesian_optimization/cost_cooling.py index 8077dced..c2b35397 100644 --- a/neps/optimizers/bayesian_optimization/cost_cooling.py +++ b/neps/optimizers/bayesian_optimization/cost_cooling.py @@ -2,7 +2,8 @@ from typing import Any -from ...metahyper import ConfigResult, instance_from_map +from neps.types import ConfigResult +from neps.utils.common import instance_from_map from ...optimizers.bayesian_optimization.acquisition_functions.cost_cooling import ( CostCooler, ) @@ -186,7 +187,7 @@ def __init__( def load_results( self, previous_results: dict[str, ConfigResult], - pending_evaluations: dict[str, ConfigResult], + pending_evaluations: dict[str, SearchSpace], ) -> None: # TODO(Jan): read out cost and fit cost model train_x = [el.config for el in previous_results.values()] diff --git a/neps/optimizers/bayesian_optimization/kernels/get_kernels.py b/neps/optimizers/bayesian_optimization/kernels/get_kernels.py index eb4069af..f606f442 100644 --- a/neps/optimizers/bayesian_optimization/kernels/get_kernels.py +++ b/neps/optimizers/bayesian_optimization/kernels/get_kernels.py @@ -1,6 +1,6 @@ from __future__ import annotations -from ....metahyper import instance_from_map +from neps.utils.common import instance_from_map from ....search_spaces.architecture.core_graph_grammar import CoreGraphGrammar from ....search_spaces.hyperparameters.categorical import CategoricalParameter from ....search_spaces.hyperparameters.float import FloatParameter diff --git a/neps/optimizers/bayesian_optimization/mf_tpe.py b/neps/optimizers/bayesian_optimization/mf_tpe.py index f705f9f0..7d139efe 100644 --- a/neps/optimizers/bayesian_optimization/mf_tpe.py +++ b/neps/optimizers/bayesian_optimization/mf_tpe.py @@ -9,7 +9,8 @@ from scipy.stats import spearmanr from typing_extensions import Literal -from ...metahyper import ConfigResult, instance_from_map +from neps.types import ConfigResult +from neps.utils.common import instance_from_map from ...search_spaces import ( CategoricalParameter, ConstantParameter, @@ -221,7 +222,9 @@ def _get_rung_maps(self, s: int = 0) -> dict: eta = round(1 / self.good_fraction) new_min_budget = self.min_fidelity * (1 / eta**s) nrungs = ( - np.floor(np.log(self.max_fidelity / new_min_budget) / np.log(eta)).astype(int) + np.floor(np.log(self.max_fidelity / new_min_budget) / np.log(eta)).astype( + int + ) + 1 ) _max_budget = self.max_fidelity @@ -295,7 +298,8 @@ def __call__( # TODO have this as a setting in the acq_sampler instead if only_lowest_fidelity: is_lowest_fidelity = ( - np.array([x_.fidelity.value for x_ in x]) == self.rung_map[self.min_rung] + np.array([x_.fidelity.value for x_ in x]) + == self.rung_map[self.min_rung] ) return np.log(self.surrogate_models["good"].pdf(x)) - np.log( self.surrogate_models["bad"].pdf(x) @@ -344,7 +348,9 @@ def _split_configs( if self.round_up: num_good_configs = np.ceil(len(configs_fid) * good_fraction).astype(int) else: - num_good_configs = np.floor(len(configs_fid) * good_fraction).astype(int) + num_good_configs = np.floor(len(configs_fid) * good_fraction).astype( + int + ) ordered_loss_indices = np.argsort(losses_fid) good_indices = ordered_loss_indices[0:num_good_configs] @@ -364,7 +370,9 @@ def _split_configs( good_configs_weights.extend( [weight_per_fidelity[fid]] * len(good_configs_fid) ) - bad_configs_weights.extend([weight_per_fidelity[fid]] * len(bad_configs_fid)) + bad_configs_weights.extend( + [weight_per_fidelity[fid]] * len(bad_configs_fid) + ) return good_configs, bad_configs, good_configs_weights, bad_configs_weights def _compute_improvement_weights(self, losses, num_good_configs, max_weight): @@ -455,7 +463,7 @@ def is_init_phase(self) -> bool: def load_results( self, previous_results: dict[str, ConfigResult], - pending_evaluations: dict[str, ConfigResult], + pending_evaluations: dict[str, SearchSpace], ) -> None: # TODO remove doubles from previous results train_y = [self.get_loss(el.result) for el in previous_results.values()] @@ -551,7 +559,9 @@ def _get_hard_promotable(self, configs): configs_per_rung[rung] += 1 cumulative_per_rung = np.flip(np.cumsum(np.flip(configs_per_rung))) - cumulative_above = np.append(np.flip(np.cumsum(np.flip(configs_per_rung[1:]))), 0) + cumulative_above = np.append( + np.flip(np.cumsum(np.flip(configs_per_rung[1:]))), 0 + ) # then check which one can make the most informed decision on promotions rungs_to_promote = cumulative_per_rung * self.good_fraction - cumulative_above @@ -593,7 +603,9 @@ def _promote_existing(self, configs_for_promotion): # i.e. give it zero weight in the KDE, and ensure the count is correct assert len(configs_for_promotion) > 0, "No promotable configurations" if self.promote_from_acq: - acq_values = self.__call__(configs_for_promotion, only_lowest_fidelity=False) + acq_values = self.__call__( + configs_for_promotion, only_lowest_fidelity=False + ) else: acq_values = self.__call__( configs_for_promotion, only_lowest_fidelity=False, only_good=True diff --git a/neps/optimizers/bayesian_optimization/models/__init__.py b/neps/optimizers/bayesian_optimization/models/__init__.py index 0eaeb127..c76bedfd 100755 --- a/neps/optimizers/bayesian_optimization/models/__init__.py +++ b/neps/optimizers/bayesian_optimization/models/__init__.py @@ -1,4 +1,5 @@ -from ....metahyper.utils import MissingDependencyError +from neps.utils.common import MissingDependencyError + from .gp import ComprehensiveGP from .gp_hierarchy import ComprehensiveGPHierarchy diff --git a/neps/optimizers/bayesian_optimization/optimizer.py b/neps/optimizers/bayesian_optimization/optimizer.py index 6c47ac8b..c1673d02 100644 --- a/neps/optimizers/bayesian_optimization/optimizer.py +++ b/neps/optimizers/bayesian_optimization/optimizer.py @@ -5,13 +5,17 @@ from typing_extensions import Literal -from ...metahyper import ConfigResult, instance_from_map +from neps.types import ConfigResult +from neps.utils.common import instance_from_map from ...search_spaces.hyperparameters.categorical import ( CATEGORICAL_CONFIDENCE_SCORES, CategoricalParameter, ) from ...search_spaces.hyperparameters.constant import ConstantParameter -from ...search_spaces.hyperparameters.float import FLOAT_CONFIDENCE_SCORES, FloatParameter +from ...search_spaces.hyperparameters.float import ( + FLOAT_CONFIDENCE_SCORES, + FloatParameter, +) from ...search_spaces.hyperparameters.integer import IntegerParameter from ...search_spaces.search_space import SearchSpace from ..base_optimizer import BaseOptimizer @@ -221,7 +225,7 @@ def is_init_phase(self) -> bool: def load_results( self, previous_results: dict[str, ConfigResult], - pending_evaluations: dict[str, ConfigResult], + pending_evaluations: dict[str, SearchSpace], ) -> None: train_x = [el.config for el in previous_results.values()] train_y = [self.get_loss(el.result) for el in previous_results.values()] diff --git a/neps/optimizers/grid_search/optimizer.py b/neps/optimizers/grid_search/optimizer.py index 1c988926..8a9934fc 100644 --- a/neps/optimizers/grid_search/optimizer.py +++ b/neps/optimizers/grid_search/optimizer.py @@ -2,7 +2,7 @@ import random -from ...metahyper import ConfigResult +from neps.types import ConfigResult from ...search_spaces.search_space import SearchSpace from ..base_optimizer import BaseOptimizer @@ -21,7 +21,7 @@ def __init__( def load_results( self, previous_results: dict[str, ConfigResult], - pending_evaluations: dict[str, ConfigResult], + pending_evaluations: dict[str, SearchSpace], ) -> None: self._num_previous_configs = len(previous_results) + len(pending_evaluations) diff --git a/neps/optimizers/multi_fidelity/_dyhpo.py b/neps/optimizers/multi_fidelity/_dyhpo.py index 1c948cea..f472469f 100644 --- a/neps/optimizers/multi_fidelity/_dyhpo.py +++ b/neps/optimizers/multi_fidelity/_dyhpo.py @@ -3,10 +3,12 @@ import numpy as np -from ...metahyper import ConfigResult +from neps.types import ConfigResult from ...search_spaces.search_space import FloatParameter, IntegerParameter, SearchSpace from ..base_optimizer import BaseOptimizer -from ..bayesian_optimization.acquisition_functions.base_acquisition import BaseAcquisition +from ..bayesian_optimization.acquisition_functions.base_acquisition import ( + BaseAcquisition, +) from ..bayesian_optimization.acquisition_samplers.base_acq_sampler import ( AcquisitionSampler, ) @@ -206,7 +208,7 @@ def num_train_configs(self): def load_results( self, previous_results: dict[str, ConfigResult], - pending_evaluations: dict[str, ConfigResult], + pending_evaluations: dict[str, SearchSpace], ) -> None: """This is basically the fit method. @@ -239,7 +241,8 @@ def _load_previous_observations(self, previous_results): self.observed_configs.add_data([config_val.config, perf], index=index) if not np.isclose( - self.observed_configs.df.loc[index, self.observed_configs.perf_col], perf + self.observed_configs.df.loc[index, self.observed_configs.perf_col], + perf, ): self.observed_configs.update_data( { @@ -276,7 +279,9 @@ def is_promotable(self, promotion_type: str = "model") -> Union[int, None]: ID of the promoted configuration, else return None. """ if promotion_type == "model": - config_id = self.model_policy.sample(is_promotion=True, **self.sampling_args) + config_id = self.model_policy.sample( + is_promotion=True, **self.sampling_args + ) elif promotion_type == "policy": config_id = self.promotion_policy.retrieve_promotions() elif promotion_type is None: @@ -290,7 +295,9 @@ def is_promotable(self, promotion_type: str = "model") -> Union[int, None]: return config_id def sample_new_config( - self, sample_type: str = "model", **kwargs # pylint: disable=unused-argument + self, + sample_type: str = "model", + **kwargs, # pylint: disable=unused-argument ) -> SearchSpace: """ Sample completely new configuration that diff --git a/neps/optimizers/multi_fidelity/dyhpo.py b/neps/optimizers/multi_fidelity/dyhpo.py index b72afd15..ebf042db 100755 --- a/neps/optimizers/multi_fidelity/dyhpo.py +++ b/neps/optimizers/multi_fidelity/dyhpo.py @@ -6,11 +6,14 @@ import numpy as np -from ...metahyper import ConfigResult, instance_from_map +from neps.types import ConfigResult +from neps.utils.common import instance_from_map from ...search_spaces.search_space import FloatParameter, IntegerParameter, SearchSpace from ..base_optimizer import BaseOptimizer from ..bayesian_optimization.acquisition_functions import AcquisitionMapping -from ..bayesian_optimization.acquisition_functions.base_acquisition import BaseAcquisition +from ..bayesian_optimization.acquisition_functions.base_acquisition import ( + BaseAcquisition, +) from ..bayesian_optimization.acquisition_samplers import AcquisitionSamplerMapping from ..bayesian_optimization.acquisition_samplers.base_acq_sampler import ( AcquisitionSampler, @@ -83,7 +86,9 @@ def __init__( ignore_errors=ignore_errors, logger=logger, ) - self.raw_tabular_space = None # placeholder, can be populated using pre_load_hook + self.raw_tabular_space = ( + None # placeholder, can be populated using pre_load_hook + ) self._budget_list: list[int | float] = [] self.step_size: int | float = step_size self.min_budget = self.pipeline_space.fidelity.lower @@ -91,7 +96,10 @@ def __init__( self.max_budget = self.pipeline_space.fidelity.upper self._initial_design_fraction = initial_design_fraction - self._initial_design_size, self._initial_design_budget = self._set_initial_design( + ( + self._initial_design_size, + self._initial_design_budget, + ) = self._set_initial_design( initial_design_size, initial_design_budget, self._initial_design_fraction ) # TODO: Write use cases for these parameters @@ -281,7 +289,7 @@ def num_train_configs(self): def load_results( self, previous_results: dict[str, ConfigResult], - pending_evaluations: dict[str, ConfigResult], + pending_evaluations: dict[str, SearchSpace], ) -> None: """This is basically the fit method. diff --git a/neps/optimizers/multi_fidelity/hyperband.py b/neps/optimizers/multi_fidelity/hyperband.py index 86ff2f5f..507f35f1 100644 --- a/neps/optimizers/multi_fidelity/hyperband.py +++ b/neps/optimizers/multi_fidelity/hyperband.py @@ -9,9 +9,11 @@ import numpy as np from typing_extensions import Literal -from ...metahyper import ConfigResult +from neps.types import ConfigResult from ...search_spaces.search_space import SearchSpace -from ..bayesian_optimization.acquisition_functions.base_acquisition import BaseAcquisition +from ..bayesian_optimization.acquisition_functions.base_acquisition import ( + BaseAcquisition, +) from ..bayesian_optimization.acquisition_samplers.base_acq_sampler import ( AcquisitionSampler, ) @@ -124,7 +126,7 @@ def _handle_promotions(self): def load_results( self, previous_results: dict[str, ConfigResult], - pending_evaluations: dict[str, ConfigResult], + pending_evaluations: dict[str, SearchSpace], ) -> None: super().load_results(previous_results, pending_evaluations) # important for the global HB to run the right SH @@ -302,7 +304,7 @@ def __init__( prior_confidence=prior_confidence, random_interleave_prob=random_interleave_prob, sample_default_first=sample_default_first, - sample_default_at_target=sample_default_at_target + sample_default_at_target=sample_default_at_target, ) self.sampling_args = { "inc": None, @@ -394,7 +396,8 @@ def _get_bracket_to_run(self): # Since in this version, we see the full SH rung, we fix the K to max_rung K = self.max_rung bracket_probs = [ - self.eta ** (K - s) * (K + 1) / (K - s + 1) for s in range(self.max_rung + 1) + self.eta ** (K - s) * (K + 1) / (K - s + 1) + for s in range(self.max_rung + 1) ] bracket_probs = np.array(bracket_probs) / sum(bracket_probs) bracket_next = np.random.choice(range(self.max_rung + 1), p=bracket_probs) diff --git a/neps/optimizers/multi_fidelity/mf_bo.py b/neps/optimizers/multi_fidelity/mf_bo.py index 8b14d70b..883b555b 100755 --- a/neps/optimizers/multi_fidelity/mf_bo.py +++ b/neps/optimizers/multi_fidelity/mf_bo.py @@ -7,7 +7,7 @@ import pandas as pd import torch -from ...metahyper import instance_from_map +from neps.utils.common import instance_from_map from ..bayesian_optimization.models import SurrogateModelMapping from ..multi_fidelity.utils import normalize_vectorize_config from ..multi_fidelity_prior.utils import calc_total_resources_spent, update_fidelity @@ -286,7 +286,9 @@ def update_model(self, train_x=None, train_y=None, pending_x=None, decay_t=None) if decay_t is None: decay_t = len(train_x) - train_x, train_y, train_lcs = self._fantasize_pending(train_x, train_y, pending_x) + train_x, train_y, train_lcs = self._fantasize_pending( + train_x, train_y, pending_x + ) self._fit(train_x, train_y, train_lcs) return self.surrogate_model, decay_t diff --git a/neps/optimizers/multi_fidelity/sampling_policy.py b/neps/optimizers/multi_fidelity/sampling_policy.py index fc8075e4..782def9b 100644 --- a/neps/optimizers/multi_fidelity/sampling_policy.py +++ b/neps/optimizers/multi_fidelity/sampling_policy.py @@ -10,10 +10,12 @@ import pandas as pd import torch -from ...metahyper import instance_from_map +from neps.utils.common import instance_from_map from ...search_spaces.search_space import SearchSpace from ..bayesian_optimization.acquisition_functions import AcquisitionMapping -from ..bayesian_optimization.acquisition_functions.base_acquisition import BaseAcquisition +from ..bayesian_optimization.acquisition_functions.base_acquisition import ( + BaseAcquisition, +) from ..bayesian_optimization.acquisition_functions.prior_weighted import ( DecayingPriorWeightedAcquisition, ) @@ -169,7 +171,9 @@ def sample( policy_idx = np.random.choice(range(len(prob_weights)), p=prob_weights) policy = sorted(self.policy_map.keys())[policy_idx] - self.logger.info(f"Sampling from {policy} with weights (i, p, r)={prob_weights}") + self.logger.info( + f"Sampling from {policy} with weights (i, p, r)={prob_weights}" + ) if policy == "prior": config = self.pipeline_space.sample( @@ -201,14 +205,18 @@ def sample( # then sample with prior=True from that configuration # since the defaults are treated as the prior config = _inc.sample( - patience=self.patience, user_priors=user_priors, ignore_fidelity=True + patience=self.patience, + user_priors=user_priors, + ignore_fidelity=True, ) elif self.inc_type == "crossover": # choosing the configuration for crossover with incumbent # the weight distributed across prior adnd inc _w_priors = 1 - self.policy_map["random"] # re-calculate normalized score ratio for prior-inc - w_prior = np.clip(self.policy_map["prior"] / _w_priors, a_min=0, a_max=1) + w_prior = np.clip( + self.policy_map["prior"] / _w_priors, a_min=0, a_max=1 + ) w_inc = np.clip(self.policy_map["inc"] / _w_priors, a_min=0, a_max=1) # calculating difference of prior and inc score score_diff = np.abs(w_prior - w_inc) @@ -227,7 +235,9 @@ def sample( ) # sampling a configuration either randomly or from a prior _config = self.pipeline_space.sample( - patience=self.patience, user_priors=user_priors, ignore_fidelity=True + patience=self.patience, + user_priors=user_priors, + ignore_fidelity=True, ) # injecting hyperparameters from the sampled config into the incumbent # TODO: ideally lower crossover prob overtime @@ -541,9 +551,7 @@ def _fantasize_pending(self, *args, **kwargs): # pylint: disable=unused-argumen max_budget_configs, max_budget_perf, pending_configs ) - def sample( - self, rand_promotion_prob=0.5, seed=777, is_promotion=False, **kwargs - ): # pylint: disable=unused-argument + def sample(self, rand_promotion_prob=0.5, seed=777, is_promotion=False, **kwargs): # pylint: disable=unused-argument promoted = False # np.random.seed(seed) if np.random.random_sample() < rand_promotion_prob: diff --git a/neps/optimizers/multi_fidelity/successive_halving.py b/neps/optimizers/multi_fidelity/successive_halving.py index a3145dc2..e4cfe71f 100644 --- a/neps/optimizers/multi_fidelity/successive_halving.py +++ b/neps/optimizers/multi_fidelity/successive_halving.py @@ -10,13 +10,16 @@ import pandas as pd from typing_extensions import Literal -from ...metahyper import ConfigResult +from neps.types import ConfigResult from ...search_spaces.hyperparameters.categorical import ( CATEGORICAL_CONFIDENCE_SCORES, CategoricalParameter, ) from ...search_spaces.hyperparameters.constant import ConstantParameter -from ...search_spaces.hyperparameters.float import FLOAT_CONFIDENCE_SCORES, FloatParameter +from ...search_spaces.hyperparameters.float import ( + FLOAT_CONFIDENCE_SCORES, + FloatParameter, +) from ...search_spaces.hyperparameters.integer import IntegerParameter from ...search_spaces.search_space import SearchSpace from ..base_optimizer import BaseOptimizer @@ -102,8 +105,7 @@ def __init__( # the parameter is exposed to allow HB to call SH with different stopping rates self.early_stopping_rate = early_stopping_rate self.sampling_policy = sampling_policy( - pipeline_space=self.pipeline_space, - logger=self.logger + pipeline_space=self.pipeline_space, logger=self.logger ) self.promotion_policy = promotion_policy(self.eta) @@ -174,9 +176,9 @@ def _get_rung_map(self, s: int = 0) -> dict: assert s <= self.stopping_rate_limit new_min_budget = self.min_budget * (self.eta**s) nrungs = ( - np.floor(np.log(self.max_budget / new_min_budget) / np.log(self.eta)).astype( - int - ) + np.floor( + np.log(self.max_budget / new_min_budget) / np.log(self.eta) + ).astype(int) + 1 ) _max_budget = self.max_budget @@ -195,9 +197,9 @@ def _get_config_map(self, s: int = 0) -> dict: assert s <= self.stopping_rate_limit new_min_budget = self.min_budget * (self.eta**s) nrungs = ( - np.floor(np.log(self.max_budget / new_min_budget) / np.log(self.eta)).astype( - int - ) + np.floor( + np.log(self.max_budget / new_min_budget) / np.log(self.eta) + ).astype(int) + 1 ) s_max = self.stopping_rate_limit + 1 @@ -317,7 +319,7 @@ def _fit_models(self): def load_results( self, previous_results: dict[str, ConfigResult], - pending_evaluations: dict[str, ConfigResult], + pending_evaluations: dict[str, SearchSpace], ) -> None: """This is basically the fit method. @@ -374,7 +376,9 @@ def sample_new_config( return config def _generate_new_config_id(self): - return self.observed_configs.index.max() + 1 if len(self.observed_configs) else 0 + return ( + self.observed_configs.index.max() + 1 if len(self.observed_configs) else 0 + ) def get_default_configuration(self): pass @@ -422,7 +426,9 @@ def get_config_and_ids( # pylint: disable=no-self-use if self.sample_default_at_target: # sets the default config to be evaluated at the target fidelity rung_id = self.max_rung - self.logger.info("Next config will be evaluated at target fidelity.") + self.logger.info( + "Next config will be evaluated at target fidelity." + ) self.logger.info("Sampling the default configuration...") config = self.pipeline_space.sample_default_configuration() diff --git a/neps/optimizers/multi_fidelity_prior/async_priorband.py b/neps/optimizers/multi_fidelity_prior/async_priorband.py index c932d45d..1bbffc9c 100644 --- a/neps/optimizers/multi_fidelity_prior/async_priorband.py +++ b/neps/optimizers/multi_fidelity_prior/async_priorband.py @@ -5,9 +5,11 @@ import numpy as np from typing_extensions import Literal -from ...metahyper import ConfigResult +from neps.types import ConfigResult from ...search_spaces.search_space import SearchSpace -from ..bayesian_optimization.acquisition_functions.base_acquisition import BaseAcquisition +from ..bayesian_optimization.acquisition_functions.base_acquisition import ( + BaseAcquisition, +) from ..bayesian_optimization.acquisition_samplers.base_acq_sampler import ( AcquisitionSampler, ) @@ -244,7 +246,7 @@ def _update_sh_bracket_state(self) -> None: def load_results( self, previous_results: dict[str, ConfigResult], - pending_evaluations: dict[str, ConfigResult], + pending_evaluations: dict[str, SearchSpace], ) -> None: super().load_results(previous_results, pending_evaluations) # important for the global HB to run the right SH @@ -263,7 +265,8 @@ def _get_bracket_to_run(self): # Since in this version, we see the full SH rung, we fix the K to max_rung K = self.max_rung bracket_probs = [ - self.eta ** (K - s) * (K + 1) / (K - s + 1) for s in range(self.max_rung + 1) + self.eta ** (K - s) * (K + 1) / (K - s + 1) + for s in range(self.max_rung + 1) ] bracket_probs = np.array(bracket_probs) / sum(bracket_probs) bracket_next = np.random.choice(range(self.max_rung + 1), p=bracket_probs) diff --git a/neps/optimizers/multiple_knowledge_sources/prototype_optimizer.py b/neps/optimizers/multiple_knowledge_sources/prototype_optimizer.py index 9ab96b1a..2cbcc454 100644 --- a/neps/optimizers/multiple_knowledge_sources/prototype_optimizer.py +++ b/neps/optimizers/multiple_knowledge_sources/prototype_optimizer.py @@ -3,7 +3,7 @@ import logging from typing import Any -from ...metahyper import ConfigResult +from neps.types import ConfigResult from ...search_spaces.search_space import SearchSpace from ...utils.data_loading import read_tasks_and_dev_stages_from_disk from .. import BaseOptimizer @@ -46,7 +46,7 @@ def calculate_defaults(self): def load_results( self, previous_results: dict[str, ConfigResult], - pending_evaluations: dict[str, ConfigResult], + pending_evaluations: dict[str, SearchSpace], ) -> None: self._num_previous_configs = len(previous_results) + len(pending_evaluations) diff --git a/neps/optimizers/random_search/optimizer.py b/neps/optimizers/random_search/optimizer.py index 4b84838d..89f786ea 100644 --- a/neps/optimizers/random_search/optimizer.py +++ b/neps/optimizers/random_search/optimizer.py @@ -1,6 +1,6 @@ from __future__ import annotations -from ...metahyper import ConfigResult +from neps.types import ConfigResult from ...search_spaces.search_space import SearchSpace from ..base_optimizer import BaseOptimizer @@ -15,7 +15,7 @@ def __init__(self, use_priors=False, ignore_fidelity=True, **optimizer_kwargs): def load_results( self, previous_results: dict[str, ConfigResult], - pending_evaluations: dict[str, ConfigResult], + pending_evaluations: dict[str, SearchSpace], ) -> None: self._num_previous_configs = len(previous_results) + len(pending_evaluations) diff --git a/neps/plot/tensorboard_eval.py b/neps/plot/tensorboard_eval.py index 6463563e..9f1c0bd7 100644 --- a/neps/plot/tensorboard_eval.py +++ b/neps/plot/tensorboard_eval.py @@ -4,6 +4,7 @@ import os import warnings from pathlib import Path +from typing import Any, Mapping import numpy as np import torch @@ -17,7 +18,7 @@ from torch.utils.tensorboard import SummaryWriter from torch.utils.tensorboard.summary import hparams -from ..metahyper.api import ConfigInRun +from neps.runtime import get_in_progress_trial from ..status.status import get_summary_dict from ..utils.common import get_initial_directory @@ -40,7 +41,9 @@ class SummaryWriter_(SummaryWriter): metrics with better formatting. """ - def add_hparams(self, hparam_dict: dict, metric_dict: dict, global_step: int) -> None: + def add_hparams( + self, hparam_dict: dict, metric_dict: dict, global_step: int + ) -> None: if not isinstance(hparam_dict, dict) or not isinstance(metric_dict, dict): raise TypeError("hparam_dict and metric_dict should be dictionary.") updated_metric = {f"Summary/{key}": val for key, val in metric_dict.items()} @@ -55,9 +58,9 @@ def add_hparams(self, hparam_dict: dict, metric_dict: dict, global_step: int) -> class tblogger: config_id: str | None = None - config: dict | None = None + config: Mapping[str, Any] | None = None config_working_directory: Path | None = None - optim_path: Path | None = None + optimizer_dir: Path | None = None config_previous_directory: Path | None = None disable_logging: bool = False @@ -77,14 +80,19 @@ class tblogger: @staticmethod def _initiate_internal_configurations() -> None: """ - Track the Configuration space data from the way handled by neps metahyper + Track the Configuration space data from the way handled by neps runtime '_sample_config' to keep in sync with config ids and directories NePS is operating on. """ - tblogger.config_working_directory = ConfigInRun.pipeline_directory - tblogger.config_previous_directory = ConfigInRun.previous_pipeline_directory - tblogger.optim_path = ConfigInRun.optimization_dir - tblogger.config = ConfigInRun.config + trial = get_in_progress_trial() + assert trial is not None + + # TODO(eddiebergman): We could just save the instance of the trial + # on this object, OR even just use `get_in_process_trial()` in each call directly. + tblogger.config_working_directory = trial.pipeline_dir + tblogger.config_previous_directory = trial.disk.previous_pipeline_dir + tblogger.optimizer_dir = trial.disk.optimization_dir + tblogger.config = trial.config @staticmethod def _is_initialized() -> bool: @@ -384,7 +392,7 @@ def _write_hparam_config() -> None: # Just an extra safety measure if tblogger.config_writer is not None: tblogger.config_writer.add_hparams( - hparam_dict=tblogger.config, + hparam_dict=dict(tblogger.config), metric_dict=values, global_step=tblogger.current_epoch, ) @@ -401,18 +409,18 @@ def _tracking_incumbent_api() -> None: Note: The function relies on the following global variables: - - tblogger.optim_path (str) - - tblogger.summary_writer (SummaryWriter_) + - tblogger.optimizer_dir + - tblogger.summary_writer The function logs the incumbent trajectory in TensorBoard. """ - summary_dict = get_summary_dict(tblogger.optim_path, add_details=True) + summary_dict = get_summary_dict(tblogger.optimizer_dir, add_details=True) incum_tracker = summary_dict["num_evaluated_configs"] incum_val = summary_dict["best_loss"] - if tblogger.summary_writer is None and tblogger.optim_path: - tblogger.summary_writer = SummaryWriter_(tblogger.optim_path / "summary") + if tblogger.summary_writer is None and tblogger.optimizer_dir: + tblogger.summary_writer = SummaryWriter_(tblogger.optimizer_dir / "summary") tblogger.summary_writer.add_scalar( tag="Summary/Incumbent_graph", @@ -484,7 +492,7 @@ def log( curve on tensorboard (default: True) writer_config_hparam (bool, optional): Write hyperparameters logging of the configs (default: True). - write_summary_incumbent (bool, optional): Set to `True` for a live + write_summary_incumbent (bool, optional): Set to `True` for a live incumbent trajectory. extra_data (dict, optional): Additional experiment data for logging. """ @@ -509,7 +517,9 @@ def log( if extra_data is not None: for key in extra_data: if extra_data[key][0] == "scalar": - tblogger._write_scalar_config(tag=str(key), value=extra_data[key][1]) + tblogger._write_scalar_config( + tag=str(key), value=extra_data[key][1] + ) elif extra_data[key][0] == "image": tblogger._write_image_config( diff --git a/neps/runtime.py b/neps/runtime.py new file mode 100644 index 00000000..258020d2 --- /dev/null +++ b/neps/runtime.py @@ -0,0 +1,578 @@ +"""Module for the runtime of a single instance of NePS running. + +An important advantage of NePS with a running instance per worker and no +multiprocessing is that we can reliably use globals to store information such +as the currently runnig configuraiton, without interfering with other +workers which have launched. + +This allows us to have a global `Trial` object which can be accessed +using `import neps.runtime; neps.get_in_progress_trial()`. + +--- + +This module primarily handles the worker loop where important concepts are: +* **State**: The state of optimization is all of the configurations, their results and + the current state of the optimizer. +* **Shared State**: Whenever a worker wishes to read or write any state, they will _lock_ the + shared state, declaring themselves as operating on it. At this point, no other worker can + access the shared state. +* **Optimizer Hydration**: This is the process through which an optimzier instance is _hydrated_ + with the Shared State so it can make a decision, i.e. for sampling. Equally we _serialize_ + the optimizer when writing it back to Shared State +* **Trial Lock**: When evaluating a configuration, a worker must _lock_ it to declared itself + as evaluating it. This communicates to other workers that this configuration is pending + +### Loop +We mark lines with `+` as the worker having locked the Shared State and `~` as the worker +having locked the Trial. The trial lock `~` is allowed to fail, in which case all steps +with a `~` are skipped and the loop continues. + +1. + Check exit conditions +2. + Hydrate the optimizer +3. + Sample a new Trial +3. Unlock the Shared State +4. ~ Obtain a Trial Lock +5. ~ Set the global trial for this work to the current trial +6. ~ Evaluate the trial +7. ~+ Lock the shared state +8. ~+ Write the results of the config to disk +9. ~+ Update the optimizer if required (used budget for evaluating trial) +10. ~ Unlock the shared state +11. Unlock Trial Lock +""" +from __future__ import annotations + +import inspect +import logging +import shutil +import time +import warnings +import os +from collections import defaultdict +from copy import deepcopy +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Iterable, Mapping, Callable, TYPE_CHECKING +from enum import Enum + +from neps.utils._locker import Locker +from neps.utils.files import empty_file, serialize, deserialize +from neps.types import ConfigLike, ERROR, ConfigResult, POST_EVAL_HOOK_SIGNATURE + +if TYPE_CHECKING: + from .optimizers.base_optimizer import BaseOptimizer + +# Wait time between each successive poll to see if state can be grabbed +DEFAULT_STATE_POLL = 1 +ENVIRON_STATE_POLL_KEY = "NEPS_STATE_POLL" + +# Timeout before giving up on trying to grab the state, raising an error +DEFAULT_STATE_TIMEOUT = None +ENVIRON_STATE_TIMEOUT_KEY = "NEPS_STATE_TIMEOUT" + + +# TODO(eddiebergman): We should not do this... +warnings.simplefilter("always", DeprecationWarning) + + +# NOTE: As each NEPS process is only ever evaluating a single trial, +# this global can be retrieved in NePS and refers to what this process +# is currently evaluating. +_CURRENTLY_RUNNING_TRIAL_IN_PROCESS: Trial | None = None + + +def get_in_progress_trial() -> Trial | None: + return _CURRENTLY_RUNNING_TRIAL_IN_PROCESS + + +def _set_in_progress_trial(trial: Trial | None) -> None: + global _CURRENTLY_RUNNING_TRIAL_IN_PROCESS + _CURRENTLY_RUNNING_TRIAL_IN_PROCESS = trial + + +@dataclass +class Trial: + """A trial is a configuration and it's associated data. + + The object is considered mutable and the global trial currently being + evaluated can be access using `get_in_progress_trial()`. + + Attributes: + id: Unique identifier for the configuration + config: The configuration to evaluate + pipeline_dir: Directory where the configuration is evaluated + prev_config_id: The id of the previous configuration evaluated for this trial. + metadata: Additional metadata about the configuration + results: The results of the evaluation, if any + disk: The disk information of this trial such as paths and locks + """ + + id: str + config: ConfigLike + pipeline_dir: Path + prev_config_id: str | None + metadata: dict[str, Any] + results: dict[str, Any] | ERROR | None = None + + disk: Trial.Disk = field(init=False) + + def __post_init__(self): + if self.prev_config_id is not None: + self.metadata["previous_config_id"] = self.prev_config_id + self.disk = Trial.Disk(pipeline_dir=self.pipeline_dir) + self.disk.pipeline_dir.mkdir(exist_ok=True, parents=True) + + @property + def state(self) -> Trial.State: + return self.disk.state + + def write_to_disk(self) -> Trial.Disk: + serialize(self.config, self.disk.config_file) + serialize(self.metadata, self.disk.metadata_file) + + if self.prev_config_id is not None: + self.disk.previous_config_id_file.write_text(self.prev_config_id) + + if self.results is not None: + serialize(self.results, self.disk.result_file) + + return self.disk + + class State(str, Enum): + COMPLETE = "evaluated" + IN_PROGRESS = "in_progress" + PENDING = "pending" + CORRUPTED = "corrupted" + + def __str__(self): + return self.value + + @dataclass + class Disk: + pipeline_dir: Path + + id: str = field(init=False) + config_file: Path = field(init=False) + result_file: Path = field(init=False) + metadata_file: Path = field(init=False) + optimization_dir: Path = field(init=False) + previous_config_id_file: Path = field(init=False) + previous_pipeline_dir: Path | None = field(init=False) + lock: Locker = field(init=False) + + def __post_init__(self): + self.id = self.pipeline_dir.name[len("config_") :] + self.config_file = self.pipeline_dir / "config.yaml" + self.result_file = self.pipeline_dir / "result.yaml" + self.metadata_file = self.pipeline_dir / "metadata.yaml" + + # NOTE: This is a bit of an assumption! + self.optimization_dir = self.pipeline_dir.parent + + self.previous_config_id_file = self.pipeline_dir / "previous_config.id" + if not empty_file(self.previous_config_id_file): + with open(self.previous_config_id_file, "r") as f: + self.previous_config_id = f.read().strip() + + self.previous_pipeline_dir = ( + self.pipeline_dir.parent / f"config_{self.previous_config_id}" + ) + else: + self.previous_pipeline_dir = None + self.pipeline_dir.mkdir(exist_ok=True, parents=True) + self.lock = Locker(self.pipeline_dir / ".config_lock") + + def config(self) -> ConfigLike: + return deserialize(self.config_file) + + @property + def state(self) -> Trial.State: + if not empty_file(self.result_file): + return Trial.State.COMPLETE + elif self.lock.is_locked(): + return Trial.State.IN_PROGRESS + elif not empty_file(self.config_file): + return Trial.State.PENDING + else: + return Trial.State.CORRUPTED + + @classmethod + def from_dir(cls, pipeline_dir: Path) -> Trial.Disk: + return cls(pipeline_dir=pipeline_dir) + + def load(self) -> Trial: + config = deserialize(self.config_file) + if not empty_file(self.metadata_file): + metadata = deserialize(self.metadata_file) + else: + metadata = {} + + if not empty_file(self.result_file): + result = deserialize(self.result_file) + else: + result = None + + if not empty_file(self.previous_config_id_file): + previous_config_id = self.previous_config_id_file.read_text().strip() + else: + previous_config_id = None + + return Trial( + id=self.id, + config=config, + pipeline_dir=self.pipeline_dir, + metadata=metadata, + prev_config_id=previous_config_id, + results=result, + ) + + # TODO(eddiebergman): Backwards compatibility on things that require the `ConfigResult` + # Ideally, we just use `Trial` objects directly for things that need all informations + # about trials. + def to_result( + self, + config_transform: Callable[[ConfigLike], ConfigLike] | None = None, + ) -> ConfigResult: + config = deserialize(self.config_file) + result = deserialize(self.result_file) + metadata = deserialize(self.metadata_file) + if config_transform is not None: + config = config_transform(config) + + return ConfigResult( + id=self.id, + config=config, + result=result, + metadata=metadata, + ) + + +@dataclass +class SharedState: + base_dir: Path + create_dirs: bool = False + + lock: Locker = field(init=False) + optimizer_state_file: Path = field(init=False) + optimizer_info_file: Path = field(init=False) + results_dir: Path = field(init=False) + + def __post_init__(self) -> None: + if self.create_dirs: + self.base_dir.mkdir(parents=True, exist_ok=True) + + self.results_dir = self.base_dir / "results" + + if self.create_dirs: + self.results_dir.mkdir(exist_ok=True) + + self.lock = Locker(self.base_dir / ".decision_lock") + self.optimizer_state_file = self.base_dir / ".optimizer_state.yaml" + self.optimizer_info_file = self.base_dir / ".optimizer_info.yaml" + + def trial_refs(self) -> dict[Trial.State, list[Trial.Disk]]: + refs = [ + Trial.Disk.from_dir(pipeline_dir=pipeline_dir) + for pipeline_dir in self.results_dir.iterdir() + if pipeline_dir.is_dir() + ] + by_state: dict[Trial.State, list[Trial.Disk]] = defaultdict(list) + for ref in refs: + by_state[ref.state].append(ref) + + return by_state + + def check_optimizer_info_on_disk_matches( + self, + optimizer_info: dict[str, Any], + *, + excluded_keys: Iterable[str] = ("searcher_name",), + ) -> None: + optimizer_info_copy = optimizer_info.copy() + loaded_info = deserialize(self.optimizer_info_file) + + for key in excluded_keys: + optimizer_info_copy.pop(key, None) + loaded_info.pop(key, None) + + if optimizer_info_copy != loaded_info: + raise ValueError( + f"The sampler_info in the file {self.optimizer_info_file} is not valid. " + f"Expected: {optimizer_info_copy}, Found: {loaded_info}" + ) + + +def _evaluate_config( + trial: Trial, + evaluation_fn: Callable[..., float | Mapping[str, Any]], + logger: logging.Logger, +) -> tuple[ERROR | dict, float]: + config = trial.config + config_id = trial.id + pipeline_directory = trial.disk.pipeline_dir + previous_pipeline_directory = trial.disk.previous_pipeline_dir + + logger.info(f"Start evaluating config {config_id}") + + config = deepcopy(config) + + # If pipeline_directory and previous_pipeline_directory are included in the + # signature we supply their values, otherwise we simply do nothing. + directory_params = [] + + evaluation_fn_params = inspect.signature(evaluation_fn).parameters + if "pipeline_directory" in evaluation_fn_params: + directory_params.append(pipeline_directory) + if "previous_pipeline_directory" in evaluation_fn_params: + directory_params.append(previous_pipeline_directory) + + try: + result = evaluation_fn(*directory_params, **config) + except Exception as e: + logger.error(f"An error occured evaluating config '{config_id}': {config}.") + logger.exception(e) + result = "error" + else: + # Ensuring the result have the correct format that can be exploited by other functions + if isinstance(result, Mapping): + result = dict(result) + if "loss" not in result: + raise KeyError("The 'loss' should be provided in the evaluation result") + loss = result["loss"] + else: + loss = result + result = {} + + try: + result["loss"] = float(loss) + except (TypeError, ValueError) as e: + raise ValueError( + "The evaluation result should be a dictionnary or a float but got" + f" a `{type(loss)}` with value of {loss}" + ) from e + + time_end = time.time() + return result, time_end + + +def _try_remove_corrupted_configs( + refs: Iterable[Trial.Disk], + logger: logging.Logger, +) -> None: + # If there are corrupted configs, we should remove them with a warning + for ref in refs: + logger.warning(f"Removing corrupted config {ref.id}") + try: + shutil.rmtree(ref.pipeline_dir) + except Exception as e: + logger.exception(e) + + +def _worker_should_continue( + max_evaluations_total: int | None, + continue_until_max_evaluation_completed: bool, + refs: Mapping[Trial.State, list[Trial.Disk]], + logger: logging.Logger, +) -> bool: + # Check if we have reached the total amount of configurations to evaluated + # (including pending evaluations possibly) + if max_evaluations_total is None: + return True + + logger.debug("Checking if max evaluations is reached") + + n_evaluated = len(refs[Trial.State.COMPLETE]) + n_inprogress = len(refs[Trial.State.IN_PROGRESS]) + + n_counter = ( + n_evaluated + if continue_until_max_evaluation_completed + else n_evaluated + n_inprogress + ) + return n_counter < max_evaluations_total + + +def launch_runtime( + evaluation_fn: Callable[..., float | Mapping[str, Any]], + sampler: BaseOptimizer, + optimizer_info: dict, + optimization_dir: Path | str, + max_evaluations_total: int | None = None, + max_evaluations_per_run: int | None = None, + continue_until_max_evaluation_completed: bool = False, + logger: logging.Logger | None = None, + post_evaluation_hook: POST_EVAL_HOOK_SIGNATURE | None = None, + overwrite_optimization_dir: bool = False, + pre_load_hooks: Iterable[Callable[[BaseOptimizer], BaseOptimizer]] | None = None, +) -> None: + # NOTE(eddiebergman): This was deprecated a while ago and called at + # evaluate, now we just crash immediatly instead. Should probably + # promote this check closer to the user, i.e. `neps.run()` + evaluation_fn_params = inspect.signature(evaluation_fn).parameters + if "previous_working_directory" in evaluation_fn_params: + raise RuntimeError( + "the argument: 'previous_working_directory' was deprecated. " + f"In the function: '{evaluation_fn.__name__}', please, " + "use 'previous_pipeline_directory' instead. " + ) + if "working_directory" in evaluation_fn_params: + raise RuntimeError( + "the argument: 'working_directory' was deprecated. " + f"In the function: '{evaluation_fn.__name__}', please, " + "use 'pipeline_directory' instead. " + ) + + if logger is None: + logger = logging.getLogger("neps") + + optimization_dir = Path(optimization_dir) + + # TODO(eddiebergman): Not sure how overwriting works with multiple workers.... + if overwrite_optimization_dir and optimization_dir.exists(): + logger.warning("Overwriting working_directory") + shutil.rmtree(optimization_dir) + + shared_state = SharedState(optimization_dir, create_dirs=True) + + _poll = float(os.environ.get(ENVIRON_STATE_POLL_KEY, DEFAULT_STATE_POLL)) + _timeout = os.environ.get(ENVIRON_STATE_TIMEOUT_KEY, DEFAULT_STATE_TIMEOUT) + if _timeout is not None: + _timeout = float(_timeout) + + with shared_state.lock(poll=_poll, timeout=_timeout): + if not shared_state.optimizer_info_file.exists(): + serialize(optimizer_info, shared_state.optimizer_info_file, sort_keys=False) + else: + shared_state.check_optimizer_info_on_disk_matches(optimizer_info) + + evaluations_in_this_run = 0 + while True: + if ( + max_evaluations_per_run is not None + and evaluations_in_this_run >= max_evaluations_per_run + ): + logger.info("Maximum evaluations per run is reached, shutting down") + break + + with shared_state.lock(poll=_poll, timeout=_timeout): + refs = shared_state.trial_refs() + + _try_remove_corrupted_configs(refs[Trial.State.CORRUPTED], logger) + + if not _worker_should_continue( + max_evaluations_total, + continue_until_max_evaluation_completed, + refs, + logger, + ): + logger.info("Maximum total evaluations is reached, shutting down") + break + + # TODO(eddiebergman): I assume we should skip sampling and just go evaluate + # pending configs? + if any(refs[Trial.State.PENDING]): + logger.warning( + f"There are {len(refs[Trial.State.PENDING])} configs that" + " were sampled, but have no worker assigned. Sometimes this is due to" + " a delay in the filesystem communication, but most likely some configs" + " crashed during their execution or a jobtime-limit was reached." + ) + + # While we have the decision lock, we will now sample with the optimizer in this process + with sampler.using_state(shared_state.optimizer_state_file): + if sampler.budget is not None and sampler.used_budget >= sampler.budget: + logger.info("Maximum budget reached, shutting down") + break + + logger.debug("Sampling a new configuration") + if pre_load_hooks is not None: + for hook in pre_load_hooks: + sampler = hook(sampler) + + sampler.load_results( + previous_results={ + ref.id: ref.to_result(config_transform=sampler.load_config) + for ref in refs[Trial.State.COMPLETE] + }, + pending_evaluations={ + ref.id: sampler.load_config(ref.config()) + for ref in refs[Trial.State.IN_PROGRESS] + }, + ) + + # TODO(eddiebergman): If we have some unified `Trial` like object, + # we can just have them return this instead. + config, config_id, prev_config_id = sampler.get_config_and_ids() + + trial = Trial( + id=config_id, + config=config, + pipeline_dir=shared_state.results_dir / f"config_{config_id}", + prev_config_id=prev_config_id, + metadata={"time_sampled": time.time()}, + ) + trial.write_to_disk() + logger.debug(f"Sampled config {config_id}") + + # Inform the global state of this process that we are evaluating this trial + _set_in_progress_trial(trial) + + # Obtain the lock on this trial and evaluate it, + # otherwise continue back to waiting to sampling + with trial.disk.lock.try_lock() as acquired: + if not acquired: + continue + + # NOTE: Bit of an extra safety check but check that the trial is not complete + if trial.disk.state == Trial.State.COMPLETE: + continue + + result, time_end = _evaluate_config(trial, evaluation_fn, logger) + meta: dict[str, Any] = {"time_end": time_end} + + # If this is set, it means we update the optimzier with the used + # budget once we write the trial to disk and mark it as complete + account_for_cost: bool = False + eval_cost: float | None = None + + if result == "error": + # TODO(eddiebergman): We should probably do something here... + pass + elif "cost" not in result and sampler.budget is not None: + raise ValueError( + "The evaluation function result should contain " + f"a 'cost' field when used with a budget. Got {result}" + ) + elif "cost" in result: + eval_cost = float(result["cost"]) + account_for_cost = result.get("account_for_cost", True) + meta["budget"] = { + "max": sampler.budget, + "used": sampler.used_budget, + "eval_cost": eval_cost, + "account_for_cost": account_for_cost, + } + + trial.results = result + trial.metadata.update(meta) + + with shared_state.lock(poll=_poll, timeout=_timeout): + trial.write_to_disk() + if account_for_cost: + assert eval_cost is not None + with sampler.using_state(shared_state.optimizer_state_file): + sampler.used_budget += eval_cost + + # 3. Anything the user might want to do after the evaluation + if post_evaluation_hook is not None: + post_evaluation_hook( + trial.config, + trial.id, + trial.pipeline_dir, + trial.results, + logger, + ) + + logger.info(f"Finished evaluating config {config_id}") + + evaluations_in_this_run += 1 diff --git a/neps/search_spaces/search_space.py b/neps/search_spaces/search_space.py index 0280f14c..4f19e98c 100644 --- a/neps/search_spaces/search_space.py +++ b/neps/search_spaces/search_space.py @@ -1,12 +1,12 @@ from __future__ import annotations -import collections.abc import pprint import random from collections import OrderedDict from copy import deepcopy from itertools import product from pathlib import Path +from typing import Mapping, Any import ConfigSpace as CS import numpy as np @@ -165,7 +165,8 @@ def pipeline_space_from_yaml( elif param_type in ("const", "constant"): # Constant parameter pipeline_space[name] = ConstantParameter( - value=details["value"], is_fidelity=details.get("is_fidelity", False) + value=details["value"], + is_fidelity=details.get("is_fidelity", False), ) else: # Handle unknown parameter type @@ -182,7 +183,7 @@ def pipeline_space_from_yaml( return pipeline_space -class SearchSpace(collections.abc.Mapping): +class SearchSpace(Mapping[str, Any]): def __init__(self, **hyperparameters): self.hyperparameters = OrderedDict() @@ -478,7 +479,7 @@ def get_search_space_grid(self, grid_step_size: int = 10): def serialize(self): return {key: hp.serialize() for key, hp in self.hyperparameters.items()} - def load_from(self, config: dict): + def load_from(self, config: Mapping[str, Any]): for name in config.keys(): self.hyperparameters[name].load_from(config[name]) diff --git a/neps/status/status.py b/neps/status/status.py index e7bc99d3..8fbc3b13 100644 --- a/neps/status/status.py +++ b/neps/status/status.py @@ -1,20 +1,20 @@ from __future__ import annotations import logging -import time from pathlib import Path from typing import Any import pandas as pd -from ..metahyper import ConfigResult, read -from ..metahyper._locker import Locker from ..search_spaces.search_space import SearchSpace from ..utils.result_utils import get_loss +from neps.utils._locker import Locker +from neps.runtime import ConfigResult, SharedState, Trial def get_summary_dict( - root_directory: str | Path, add_details: bool = False + root_directory: str | Path, + add_details: bool = False, ) -> dict[str, Any]: """Create a dict that summarizes a run. @@ -27,35 +27,39 @@ def get_summary_dict( summary_dict: Information summarizing a run """ root_directory = Path(root_directory) - previous_results, pending_configs, pending_configs_free = read( - root_directory, None, logging.getLogger("neps.status") - ) + shared_state = SharedState(root_directory) + + # NOTE: We don't lock the shared state since we are just reading and don't need to make + # decisions based on the state + trial_refs = shared_state.trial_refs() + evaluated = [r.to_result() for r in trial_refs[Trial.State.COMPLETE]] + pending = [r.load() for r in trial_refs[Trial.State.PENDING]] + in_progress = [r.load() for r in trial_refs[Trial.State.IN_PROGRESS]] + summary = dict() if add_details: - summary["previous_results"] = previous_results - summary["pending_configs"] = pending_configs - summary["pending_configs_free"] = pending_configs_free - - summary["num_evaluated_configs"] = len(previous_results) - summary["num_pending_configs"] = len(pending_configs) - summary["num_pending_configs_with_worker"] = len(pending_configs) - len( - pending_configs_free - ) + summary["previous_results"] = {c.id: c for c in evaluated} + summary["pending_configs"] = {c.id: c for c in in_progress + pending} + summary["pending_configs_free"] = {c: id for c in pending} + + summary["num_evaluated_configs"] = len(evaluated) + summary["num_pending_configs"] = len(in_progress) + len(pending) + summary["num_pending_configs_with_worker"] = len(in_progress) summary["best_loss"] = float("inf") summary["best_config_id"] = None summary["best_config_metadata"] = None summary["best_config"] = None summary["num_error"] = 0 - for config_id, evaluation in previous_results.items(): + for evaluation in evaluated: if evaluation.result == "error": summary["num_error"] += 1 loss = get_loss(evaluation.result, ignore_errors=True) if isinstance(loss, float) and loss < summary["best_loss"]: summary["best_loss"] = get_loss(evaluation.result) summary["best_config"] = evaluation.config - summary["best_config_id"] = config_id + summary["best_config_id"] = evaluation.id summary["best_config_metadata"] = evaluation.metadata return summary @@ -151,9 +155,7 @@ def _initiate_summary_csv( csv_config_data = summary_csv_directory / "config_data.csv" csv_run_data = summary_csv_directory / "run_status.csv" - csv_lock_file = summary_csv_directory / ".csv_lock" - csv_lock_file.touch(exist_ok=True) - csv_locker = Locker(csv_lock_file, logger.getChild("_locker")) + csv_locker = Locker(summary_csv_directory / ".csv_lock") return ( csv_config_data, @@ -227,7 +229,9 @@ def _get_dataframes_from_summary( ) # Concatenate the two DataFrames - df_config_data = pd.concat([df_previous, df_pending], join="outer", ignore_index=True) + df_config_data = pd.concat( + [df_previous, df_pending], join="outer", ignore_index=True + ) # Create a dataframe with the specified additional summary data additional_data = { @@ -267,55 +271,46 @@ def _save_data_to_csv( This function saves data to CSV files while acquiring a lock to prevent concurrent writes. If the lock is acquired, it writes the data to the CSV files and releases the lock. """ - should_break = False - while not should_break: - if locker.acquire_lock(): - try: - pending_configs = run_data_df.loc["num_pending_configs", "value"] - pending_configs_with_worker = run_data_df.loc[ - "num_pending_configs_with_worker", "value" + with locker(poll=2): + try: + pending_configs = run_data_df.loc["num_pending_configs", "value"] + pending_configs_with_worker = run_data_df.loc[ + "num_pending_configs_with_worker", "value" + ] + # Represents the last worker + if int(pending_configs) == 0 and int(pending_configs_with_worker) == 0: + config_data_df = config_data_df.sort_values( + by="result.loss", ascending=True + ) + config_data_df.to_csv(config_data_file_path, index=False, mode="w") + run_data_df.to_csv(run_data_file_path, index=True, mode="w") + + if run_data_file_path.exists(): + prev_run_data_df = pd.read_csv(run_data_file_path) + prev_run_data_df.set_index("description", inplace=True) + + num_evaluated_configs_csv = prev_run_data_df.loc[ + "num_evaluated_configs", "value" ] - # Represents the last worker - if int(pending_configs) == 0 and int(pending_configs_with_worker) == 0: - config_data_df = config_data_df.sort_values( - by="result.loss", ascending=True - ) - config_data_df.to_csv(config_data_file_path, index=False, mode="w") - run_data_df.to_csv(run_data_file_path, index=True, mode="w") - - if run_data_file_path.exists(): - prev_run_data_df = pd.read_csv(run_data_file_path) - prev_run_data_df.set_index("description", inplace=True) - - num_evaluated_configs_csv = prev_run_data_df.loc[ - "num_evaluated_configs", "value" - ] - num_evaluated_configs_run = run_data_df.loc[ - run_data_df.index == "num_evaluated_configs", "value" - ] - # checks if the current worker has more evaluated configs than the previous - if int(num_evaluated_configs_csv) < num_evaluated_configs_run.iloc[0]: - config_data_df = config_data_df.sort_values( - by="result.loss", ascending=True - ) - config_data_df.to_csv( - config_data_file_path, index=False, mode="w" - ) - run_data_df.to_csv(run_data_file_path, index=True, mode="w") - # Represents the first worker to be evaluated - else: + num_evaluated_configs_run = run_data_df.loc[ + run_data_df.index == "num_evaluated_configs", "value" + ] + # checks if the current worker has more evaluated configs than the previous + if int(num_evaluated_configs_csv) < num_evaluated_configs_run.iloc[0]: config_data_df = config_data_df.sort_values( by="result.loss", ascending=True ) config_data_df.to_csv(config_data_file_path, index=False, mode="w") run_data_df.to_csv(run_data_file_path, index=True, mode="w") - except Exception as e: - raise RuntimeError(f"Error during data saving: {e}") from e - finally: - locker.release_lock() - should_break = True - else: - time.sleep(2) + # Represents the first worker to be evaluated + else: + config_data_df = config_data_df.sort_values( + by="result.loss", ascending=True + ) + config_data_df.to_csv(config_data_file_path, index=False, mode="w") + run_data_df.to_csv(run_data_file_path, index=True, mode="w") + except Exception as e: + raise RuntimeError(f"Error during data saving: {e}") from e def post_run_csv(root_directory: str | Path, logger=None) -> None: diff --git a/neps/types.py b/neps/types.py new file mode 100644 index 00000000..2aef4bb8 --- /dev/null +++ b/neps/types.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Mapping, Any, Literal, Callable +from typing_extensions import TypeAlias +from dataclasses import dataclass +import logging + +# NOTE: `SearchSpace` is also one +ConfigLike: TypeAlias = Mapping[str, Any] + +# TODO(eddiebergman): We can turn this to an enum at some +# point to prevent having to isinstance and str match +ERROR: TypeAlias = Literal["error"] + +POST_EVAL_HOOK_SIGNATURE: TypeAlias = Callable[ + [ + ConfigLike, + str, + Path, + dict[str, Any] | ERROR, + logging.Logger, + ], + None, +] + + +# TODO(eddiebergman): Ideally, use `Trial` objects which can carry a lot more +# useful information to optimizers than the below dataclass. Would be a follow up +# refactor. +@dataclass +class ConfigResult: + id: str + config: ConfigLike + result: dict + metadata: dict + + +class AttrDict(dict): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.__dict__ = self diff --git a/neps/utils/__init__.py b/neps/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/neps/utils/_locker.py b/neps/utils/_locker.py new file mode 100644 index 00000000..dd2cab35 --- /dev/null +++ b/neps/utils/_locker.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from contextlib import contextmanager +from typing import Iterator, IO +from pathlib import Path + +import portalocker as pl + +EXCLUSIVE_NONE_BLOCKING = pl.LOCK_EX | pl.LOCK_NB + + +class Locker: + FailedToAcquireLock = pl.exceptions.LockException + + def __init__(self, lock_path: Path): + self.lock_path = lock_path + self.lock_path.touch(exist_ok=True) + + @contextmanager + def try_lock(self) -> Iterator[bool]: + try: + with self.acquire(fail_when_locked=True): + yield True + except self.FailedToAcquireLock: + yield False + + def is_locked(self) -> bool: + with self.try_lock() as acquired_lock: + return not acquired_lock + + @contextmanager + def __call__( + self, + poll: float = 1, + *, + timeout: float | None = None, + fail_when_locked: bool = False, + ) -> Iterator[IO]: + with pl.Lock( + self.lock_path, + check_interval=poll, + timeout=timeout, + flags=EXCLUSIVE_NONE_BLOCKING, + fail_when_locked=fail_when_locked, + ) as fh: + yield fh # We almost never use it but nothing better to yield + + @contextmanager + def acquire( + self, + check_interval: float = 1.0, + *, + timeout: float | None = None, + fail_when_locked: bool = False, + ) -> Iterator[IO]: + with self( + check_interval, + timeout=timeout, + fail_when_locked=fail_when_locked, + ) as fh: + yield fh diff --git a/neps/utils/common.py b/neps/utils/common.py index bcc3066e..95562925 100644 --- a/neps/utils/common.py +++ b/neps/utils/common.py @@ -10,8 +10,10 @@ import torch import yaml -from ..metahyper.api import ConfigInRun -from ..optimizers.info import SearcherConfigs +import inspect +from functools import partial + +from neps.runtime import get_in_progress_trial def load_checkpoint( @@ -39,11 +41,16 @@ def load_checkpoint( # Check if the user did not provide a specific pipeline directory # or if the provided pipeline directory does not exist. if directory is None: - # If not provided, use the pipeline directory from ConfigInRun - directory = ConfigInRun.previous_pipeline_directory + # If not provided, use the pipeline directory of the current trial. + trial = get_in_progress_trial() + + # If the pipeline directory remains None, return None. + if trial is None: + return None - # If the pipeline directory remains None even in ConfigInRun, return None. - # Otherwise, create a Path object using the provided or ConfigInRun value. + directory = trial.disk.previous_pipeline_dir + + # Otherwise, create a Path object using the provided or current trial. if directory: directory = Path(directory) @@ -88,7 +95,15 @@ def save_checkpoint( is "checkpoint.pth". """ if directory is None: - directory = ConfigInRun.pipeline_directory + in_progress_trial = get_in_progress_trial() + + if in_progress_trial is None: + raise ValueError( + "No current trial was found to save the checkpoint! This should not happen." + " Please report this issue and in the meantime you may provide a directory" + " manually." + ) + directory = in_progress_trial.pipeline_dir directory = Path(directory) checkpoint_path = f"{directory}/{checkpoint_name}.pth" @@ -129,7 +144,9 @@ def load_lightning_checkpoint( are found in the directory. """ if previous_pipeline_directory is None: - previous_pipeline_directory = ConfigInRun.previous_pipeline_directory + trial = get_in_progress_trial() + if trial is not None: + previous_pipeline_directory = trial.disk.previous_pipeline_dir if previous_pipeline_directory: # Search for possible checkpoints to continue training @@ -166,11 +183,19 @@ def get_initial_directory(pipeline_directory: Path | str | None = None) -> Path: Returns: Path: The initial directory. """ - if pipeline_directory is None: - pipeline_directory = ConfigInRun.pipeline_directory - - pipeline_directory = Path(pipeline_directory) + if pipeline_directory is not None: + pipeline_directory = Path(pipeline_directory) + else: + trial = get_in_progress_trial() + if trial is None: + raise ValueError( + "No current trial was found to get the initial directory! This should not happen." + " Please report this issue and in the meantime you may provide a directory" + " manually." + ) + pipeline_directory = trial.pipeline_dir + # Recursively find the initial directory while True: # Get the id of the previous directory previous_pipeline_directory_id = pipeline_directory / "previous_config.id" @@ -218,6 +243,8 @@ def get_searcher_data(searcher: str, searcher_path: Path | str | None = None) -> parent_directory = os.path.join(script_directory, os.pardir) resource_path = os.path.join(parent_directory, folder_path, f"{searcher}.yaml") + from neps.optimizers.info import SearcherConfigs + searchers = SearcherConfigs.get_searchers() if not os.path.exists(resource_path): @@ -281,7 +308,96 @@ def set_rnd_state(state: dict): ) -class AttrDict(dict): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.__dict__ = self +class MissingDependencyError(Exception): + def __init__(self, dep: str, cause: Exception, *args: Any): + super().__init__(dep, cause, *args) + self.dep = dep + self.__cause__ = cause # This is what `raise a from b` does + + def __str__(self) -> str: + return ( + f"Some required dependency-({self.dep}) to use this optional feature is " + f"missing. Please, include neps[experimental] dependency group in your " + f"installation of neps to be able to use all the optional features." + f" Otherwise, just install ({self.dep})" + ) + + +def is_partial_class(obj): + """Check if the object is a (partial) class, or an instance""" + if isinstance(obj, partial): + obj = obj.func + return inspect.isclass(obj) + + +def instance_from_map( + mapping: dict[str, Any], + request: str | list | tuple | Any, + name: str = "mapping", + allow_any: bool = True, + as_class: bool = False, + kwargs: dict | None = None, +): + """Get an instance of an class from a mapping. + + Arguments: + mapping: Mapping from string keys to classes or instances + request: A key from the mapping. If allow_any is True, could also be an + object or a class, to use a custom object. + name: Name of the mapping used in error messages + allow_any: If set to True, allows using custom classes/objects. + as_class: If the class should be returned without beeing instanciated + kwargs: Arguments used for the new instance, if created. Its purpose is + to serve at default arguments if the user doesn't built the object. + + Raises: + ValueError: if the request is invalid (not a string if allow_any is False), + or invalid key. + """ + + # Split arguments of the form (request, kwargs) + args_dict = kwargs or {} + if isinstance(request, tuple) or isinstance(request, list): + if len(request) != 2: + raise ValueError( + "When building an instance and specifying arguments, " + "you should give a pair (class, arguments)" + ) + request, req_args_dict = request + if not isinstance(req_args_dict, dict): + raise ValueError("The arguments should be given as a dictionary") + args_dict = {**args_dict, **req_args_dict} + + # Then, get the class/instance from the request + if isinstance(request, str): + if request in mapping: + instance = mapping[request] + else: + raise ValueError(f"{request} doesn't exists for {name}") + elif allow_any: + instance = request + else: + raise ValueError(f"Object {request} invalid key for {name}") + + if isinstance(instance, MissingDependencyError): + raise instance + + # Check if the request is a class if it is mandatory + if (args_dict or as_class) and not is_partial_class(instance): + raise ValueError( + f"{instance} is not a class and can't be used with additional arguments" + ) + + # Give the arguments to the class + if args_dict: + instance = partial(instance, **args_dict) + + # Return the class / instance + if as_class: + return instance + if is_partial_class(instance): + try: + instance = instance() + except TypeError as e: + raise TypeError(f"{e} when calling {instance} with {args_dict}") from e + return instance diff --git a/neps/utils/data_loading.py b/neps/utils/data_loading.py index 996d1583..f562b88d 100644 --- a/neps/utils/data_loading.py +++ b/neps/utils/data_loading.py @@ -5,11 +5,12 @@ import re from itertools import chain from typing import Any +from pathlib import Path import numpy as np import yaml -from ..metahyper import read +from neps.runtime import SharedState, Trial from .result_utils import get_loss @@ -38,8 +39,12 @@ def read_tasks_and_dev_stages_from_disk( if not is_valid_dev_path(dev_dir_path): continue dev_id = get_id_from_path(dev_dir_path) - # TODO: Perhaps use 2nd and 3rd argument as well - result, _, _ = read(dev_dir_path) + + state = SharedState(Path(dev_dir_path)) + with state.lock(poll=1, timeout=None): + refs = state.trial_refs() + + result = {ref.id: ref.to_result() for ref in refs[Trial.State.COMPLETE]} results[task_id][dev_id] = result return results @@ -64,7 +69,11 @@ def read_user_prior_results_from_disk(path: str): continue # get name of the directory name = os.path.basename(prior_dir_path) - results[name], _, _ = read(prior_dir_path) + state = SharedState(Path(prior_dir_path)) + with state.lock(poll=1, timeout=None): + refs = state.trial_refs() + + results[name] = {ref.id: ref.to_result() for ref in refs[Trial.State.COMPLETE]} return results @@ -162,7 +171,13 @@ def summarize_results( # TOOD: only use IDs if provided final_results = results[final_task_id][final_dev_id] else: - final_results, _, _ = read(seed_dir_path) + state = SharedState(Path(seed_dir_path)) + with state.lock(poll=1, timeout=None): + refs = state.trial_refs() + + final_results = { + ref.id: ref.to_result() for ref in refs[Trial.State.COMPLETE] + } # This part is copied from neps.status() best_loss = float("inf") diff --git a/neps/utils/files.py b/neps/utils/files.py new file mode 100644 index 00000000..cab7992f --- /dev/null +++ b/neps/utils/files.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +import glob +from pathlib import Path +from typing import Any + +import yaml + +from typing import Iterable, Mapping + + +def get_data_representation(data: Any): + """Common data representations. Other specific types should be handled + by the user in his Parameter class.""" + if hasattr(data, "serialize"): + return get_data_representation(data.serialize()) + + if isinstance(data, Mapping): + return {key: get_data_representation(val) for key, val in data.items()} + + if not isinstance(data, str) and isinstance(data, Iterable): + return [get_data_representation(val) for val in data] + + if type(data).__module__ in ["numpy", "torch"]: + data = data.tolist() # type: ignore + if type(data).__module__ == "numpy": + data = data.item() + return get_data_representation(data) + + return data + + +def serialize(data: Any, path: Path | str, sort_keys: bool = True) -> None: + data = get_data_representation(data) + path = Path(path) + with path.open("w") as file_stream: + try: + return yaml.safe_dump(data, file_stream, sort_keys=sort_keys) + except yaml.representer.RepresenterError as e: + raise TypeError( + "Could not serialize to yaml! The object " + f"{e.args[1]} of type {type(e.args[1])} is not." + ) from e + + +def deserialize(path: Path | str) -> dict[str, Any]: + with Path(path).open("r") as file_stream: + return yaml.full_load(file_stream) + + +def empty_file(file_path: Path) -> bool: + return not file_path.exists() or file_path.stat().st_size <= 0 + + +def find_files( + directory: Path, + files: Iterable[str], + any_suffix: bool = False, + check_nonempty: bool = False, +) -> list[Path]: + found_paths = [] + for file_name in files: + pattern = f"{directory.absolute()}/**/{file_name}" + if any_suffix: + pattern += "*" + for f_path in glob.glob(pattern, recursive=True): + path_found = Path(f_path) + if path_found.is_file(): + if check_nonempty and empty_file(path_found): + continue + found_paths.append(path_found) + return found_paths diff --git a/pyproject.toml b/pyproject.toml index 66be36bb..474c853e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,25 +3,30 @@ name = "neural-pipeline-search" version = "v0.11.1" description = "Neural Pipeline Search helps deep learning experts find the best neural pipeline." authors = [ - "Danny Stoll ", - "Neeratyoy Mallik ", - "Simon Schrodi", - "Maciej Janowski", - "Samir Garibov", - "Tarek Abou Chakra", - "Carl Hvarfner", - "Eddie Bergman", - "Binxin Ru", - "Nils Kober", - "Théophane Vallaeys", - "Frank Hutter", + "Danny Stoll ", + "Neeratyoy Mallik ", + "Simon Schrodi", + "Maciej Janowski", + "Samir Garibov", + "Tarek Abou Chakra", + "Carl Hvarfner", + "Eddie Bergman", + "Binxin Ru", + "Nils Kober", + "Théophane Vallaeys", + "Frank Hutter", ] readme = "README.md" license = "Apache-2.0" homepage = "https://github.com/automl/neps" repository = "https://github.com/automl/neps" documentation = "https://automl.github.io/neps/" -keywords = ["Neural Pipeline Search", "Neural Architecture Search", "Hyperparameter Optimization", "AutoML"] +keywords = [ + "Neural Pipeline Search", + "Neural Architecture Search", + "Hyperparameter Optimization", + "AutoML", +] classifiers = [ "Development Status :: 4 - Beta", "Intended Audience :: Developers", @@ -38,11 +43,7 @@ classifiers = [ "Topic :: Scientific/Engineering :: Artificial Intelligence", "Topic :: System :: Distributed Computing", ] -packages = [ - { include = "neps" }, - { include = "metahyper", from = "neps"}, - { include = "neps_examples" }, -] +packages = [{ include = "neps" }, { include = "neps_examples" }] [tool.poetry.dependencies] @@ -56,7 +57,7 @@ nltk = "^3.6.4" #path = "^16.2.0" #termcolor = "^1.1.0" scipy = "^1" -torch = ">=1.7.0,<=2.1, !=2.0.1, !=2.1.0" # fix from: https://stackoverflow.com/a/76647180 +torch = ">=1.7.0,<=2.1, !=2.0.1, !=2.1.0" # fix from: https://stackoverflow.com/a/76647180 # torch = [ # {version = ">=1.7.0,<=2.1", markers = "sys_platform == 'darwin'"}, # Segfaults for macOS on github actions # {version = ">=1.7.0,<=2.1", markers = "sys_platform != 'darwin'"}, @@ -80,7 +81,7 @@ typing-extensions = "^4" # jahs-bench = {git = "https://github.com/automl/jahs_bench_201.git", rev = "v1.0.2"} mkdocs-material = "^8.1.3" mike = "^1.1.2" -torchvision = "<0.16.0" # Used in examples +torchvision = "<0.16.0" # Used in examples [tool.poetry.group.experimental] @@ -103,7 +104,14 @@ line_length = 90 [tool.pytest.ini_options] addopts = "--basetemp ./tests_tmpdir -m 'neps_api or core_examples'" -markers = ["all_examples", "core_examples", "regression_all", "metahyper", "neps_api", "summary_csv"] +markers = [ + "all_examples", + "core_examples", + "regression_all", + "runtime", + "neps_api", + "summary_csv", +] filterwarnings = "ignore::DeprecationWarning:torch.utils.tensorboard.*:" [tool.mypy] @@ -128,51 +136,203 @@ ignore_errors = true [tool.pylint.messages_control] disable = 'all' enable = [ - 'invalid-characters-in-docstring','wrong-spelling-in-comment','wrong-spelling-in-docstring','not-in-loop', - 'function-redefined','continue-in-finally','abstract-class-instantiated','star-needs-assignment-target', - 'duplicate-argument-name','return-in-init','too-many-star-expressions','nonlocal-and-global', - 'return-outside-function','return-arg-in-generator','invalid-star-assignment-target','bad-reversed-sequence', - 'nonexistent-operator','yield-outside-function','init-is-generator','nonlocal-without-binding','lost-exception', - 'assert-on-tuple','dangerous-default-value','duplicate-key','useless-else-on-loop','expression-not-assigned', - 'confusing-with-statement','unnecessary-lambda','pointless-statement','unnecessary-pass','unreachable','eval-used', - 'exec-used','using-constant-test','deprecated-lambda','blacklisted-name','misplaced-comparison-constant', - 'singleton-comparison','unneeded-not','ßconsider-iterating-dictionary','consider-using-enumerate','empty-docstring', - 'unidiomatic-typecheck','condition-evals-to-constant','consider-using-generator','nan-comparison', - 'consider-using-min-max-builtin','consider-using-with','invalid-all-format','consider-using-dict-items', - 'deprecated-decorator','forgotten-debug-statement','useless-with-lock', - 'use-implicit-booleaness-not-comparison','not-async-context-manager','yield-inside-async-function', - 'await-outside-async','invalid-unary-operand-type','unsupported-binary-operation','not-callable', - 'redundant-keyword-arg','assignment-from-no-return','assignment-from-none','not-context-manager', - 'repeated-keyword','missing-kwoa','no-value-for-parameter','invalid-sequence-index','invalid-slice-index', - 'unexpected-keyword-arg','unsupported-membership-test','unsubscriptable-object','bad-except-order', - 'catching-non-exception','bad-exception-context','notimplemented-raised','raising-bad-type','raising-non-exception', - 'misplaced-bare-raise','duplicate-except','nonstandard-exception','binary-op-exception','bare-except', - 'raise-missing-from','consider-using-namedtuple-or-dataclass','consider-using-tuple','bad-open-mode', - 'redundant-unittest-assert','boolean-datetime','deprecated-methodimport-error','import-self','reimported', - 'relative-import','deprecated-module','wildcard-import','misplaced-future','cyclic-import','wrong-import-position', - 'ungrouped-imports','multiple-imports','simplifiable-condition','len-as-condition', - 'unpacking-non-sequence','invalid-all-object','unbalanced-tuple-unpacking','undefined-variable', - 'undefined-all-variable','used-before-assignment','cell-var-from-loop','global-variable-undefined', - 'redefined-builtin','redefine-in-handler','unused-import','unused-argument','unused-wildcard-import', - 'unused-variable','global-variable-not-assigned','undefined-loop-variable','global-statement', - 'global-at-module-level','format-needs-mapping','truncated-format-string','missing-format-string-key', - 'mixed-format-string','too-few-format-args','bad-str-strip-call','too-many-format-args','bad-format-character', - 'format-combined-specification','bad-format-string-key','bad-format-string','missing-format-attribute', - 'missing-format-argument-key','unused-format-string-argument','unused-format-string-key','invalid-format-index', - 'f-string-without-interpolation','use-maxsplit-arg','anomalous-unicode-escape-in-string', - 'anomalous-backslash-in-string','redundant-u-string-prefix','format-string-without-interpolation', - 'simplifiable-if-statement','logging-format-truncated','logging-too-few-args','logging-too-many-args', - 'logging-unsupported-format','not-an-iterable','not-a-mapping','use-sequence-for-iteration','bad-indentation', - 'unnecessary-semicolon','missing-final-newline','mixed-line-endings','multiple-statements','trailing-newlines', - 'trailing-whitespace','unexpected-line-ending-format','superfluous-parens','access-member-before-definition', - 'method-hidden','assigning-non-slot','duplicate-bases','inconsistent-mro','inherit-non-class','invalid-slots', - 'invalid-slots-object','no-method-argument','no-self-argument','unexpected-special-method-signature', - 'non-iterator-returned','invalid-length-returned','protected-access','attribute-defined-outside-init', - 'abstract-method','bad-staticmethod-argument','non-parent-init-called','super-init-not-called', - 'no-classmethod-decorator','no-staticmethod-decorator','no-self-use','bad-classmethod-argument', - 'bad-mcs-classmethod-argument','bad-mcs-method-argument','method-check-failed','invalid-bool-returned', - 'invalid-index-returned','invalid-repr-returned','invalid-str-returned','invalid-bytes-returned', - 'invalid-hash-returned','invalid-length-hint-returned','invalid-format-returned','invalid-getnewargs-returned', - 'invalid-getnewargs-ex-returned','super-with-arguments','deprecated-class','invalid-class-object', - 'unused-private-member', + 'invalid-characters-in-docstring', + 'wrong-spelling-in-comment', + 'wrong-spelling-in-docstring', + 'not-in-loop', + 'function-redefined', + 'continue-in-finally', + 'abstract-class-instantiated', + 'star-needs-assignment-target', + 'duplicate-argument-name', + 'return-in-init', + 'too-many-star-expressions', + 'nonlocal-and-global', + 'return-outside-function', + 'return-arg-in-generator', + 'invalid-star-assignment-target', + 'bad-reversed-sequence', + 'nonexistent-operator', + 'yield-outside-function', + 'init-is-generator', + 'nonlocal-without-binding', + 'lost-exception', + 'assert-on-tuple', + 'dangerous-default-value', + 'duplicate-key', + 'useless-else-on-loop', + 'expression-not-assigned', + 'confusing-with-statement', + 'unnecessary-lambda', + 'pointless-statement', + 'unnecessary-pass', + 'unreachable', + 'eval-used', + 'exec-used', + 'using-constant-test', + 'deprecated-lambda', + 'blacklisted-name', + 'misplaced-comparison-constant', + 'singleton-comparison', + 'unneeded-not', + 'ßconsider-iterating-dictionary', + 'consider-using-enumerate', + 'empty-docstring', + 'unidiomatic-typecheck', + 'condition-evals-to-constant', + 'consider-using-generator', + 'nan-comparison', + 'consider-using-min-max-builtin', + 'consider-using-with', + 'invalid-all-format', + 'consider-using-dict-items', + 'deprecated-decorator', + 'forgotten-debug-statement', + 'useless-with-lock', + 'use-implicit-booleaness-not-comparison', + 'not-async-context-manager', + 'yield-inside-async-function', + 'await-outside-async', + 'invalid-unary-operand-type', + 'unsupported-binary-operation', + 'not-callable', + 'redundant-keyword-arg', + 'assignment-from-no-return', + 'assignment-from-none', + 'not-context-manager', + 'repeated-keyword', + 'missing-kwoa', + 'no-value-for-parameter', + 'invalid-sequence-index', + 'invalid-slice-index', + 'unexpected-keyword-arg', + 'unsupported-membership-test', + 'unsubscriptable-object', + 'bad-except-order', + 'catching-non-exception', + 'bad-exception-context', + 'notimplemented-raised', + 'raising-bad-type', + 'raising-non-exception', + 'misplaced-bare-raise', + 'duplicate-except', + 'nonstandard-exception', + 'binary-op-exception', + 'bare-except', + 'raise-missing-from', + 'consider-using-namedtuple-or-dataclass', + 'consider-using-tuple', + 'bad-open-mode', + 'redundant-unittest-assert', + 'boolean-datetime', + 'deprecated-methodimport-error', + 'import-self', + 'reimported', + 'relative-import', + 'deprecated-module', + 'wildcard-import', + 'misplaced-future', + 'cyclic-import', + 'wrong-import-position', + 'ungrouped-imports', + 'multiple-imports', + 'simplifiable-condition', + 'len-as-condition', + 'unpacking-non-sequence', + 'invalid-all-object', + 'unbalanced-tuple-unpacking', + 'undefined-variable', + 'undefined-all-variable', + 'used-before-assignment', + 'cell-var-from-loop', + 'global-variable-undefined', + 'redefined-builtin', + 'redefine-in-handler', + 'unused-import', + 'unused-argument', + 'unused-wildcard-import', + 'unused-variable', + 'global-variable-not-assigned', + 'undefined-loop-variable', + 'global-statement', + 'global-at-module-level', + 'format-needs-mapping', + 'truncated-format-string', + 'missing-format-string-key', + 'mixed-format-string', + 'too-few-format-args', + 'bad-str-strip-call', + 'too-many-format-args', + 'bad-format-character', + 'format-combined-specification', + 'bad-format-string-key', + 'bad-format-string', + 'missing-format-attribute', + 'missing-format-argument-key', + 'unused-format-string-argument', + 'unused-format-string-key', + 'invalid-format-index', + 'f-string-without-interpolation', + 'use-maxsplit-arg', + 'anomalous-unicode-escape-in-string', + 'anomalous-backslash-in-string', + 'redundant-u-string-prefix', + 'format-string-without-interpolation', + 'simplifiable-if-statement', + 'logging-format-truncated', + 'logging-too-few-args', + 'logging-too-many-args', + 'logging-unsupported-format', + 'not-an-iterable', + 'not-a-mapping', + 'use-sequence-for-iteration', + 'bad-indentation', + 'unnecessary-semicolon', + 'missing-final-newline', + 'mixed-line-endings', + 'multiple-statements', + 'trailing-newlines', + 'trailing-whitespace', + 'unexpected-line-ending-format', + 'superfluous-parens', + 'access-member-before-definition', + 'method-hidden', + 'assigning-non-slot', + 'duplicate-bases', + 'inconsistent-mro', + 'inherit-non-class', + 'invalid-slots', + 'invalid-slots-object', + 'no-method-argument', + 'no-self-argument', + 'unexpected-special-method-signature', + 'non-iterator-returned', + 'invalid-length-returned', + 'protected-access', + 'attribute-defined-outside-init', + 'abstract-method', + 'bad-staticmethod-argument', + 'non-parent-init-called', + 'super-init-not-called', + 'no-classmethod-decorator', + 'no-staticmethod-decorator', + 'no-self-use', + 'bad-classmethod-argument', + 'bad-mcs-classmethod-argument', + 'bad-mcs-method-argument', + 'method-check-failed', + 'invalid-bool-returned', + 'invalid-index-returned', + 'invalid-repr-returned', + 'invalid-str-returned', + 'invalid-bytes-returned', + 'invalid-hash-returned', + 'invalid-length-hint-returned', + 'invalid-format-returned', + 'invalid-getnewargs-returned', + 'invalid-getnewargs-ex-returned', + 'super-with-arguments', + 'deprecated-class', + 'invalid-class-object', + 'unused-private-member', ] diff --git a/tests/test_metahyper/test_locking.py b/tests/test_metahyper/test_locking.py index dfdfcb05..252c17fa 100644 --- a/tests/test_metahyper/test_locking.py +++ b/tests/test_metahyper/test_locking.py @@ -22,7 +22,7 @@ def launch_example_processes(n_workers: int = 3) -> list: return processes -@pytest.mark.metahyper +@pytest.mark.runtime def test_filelock() -> None: """Test that the filelocking method of parallelization works as intended.""" # Note: Not using tmpdir