From eb46e3524aae80e42686590385a9e7688612c6e7 Mon Sep 17 00:00:00 2001 From: karibbov Date: Mon, 8 Apr 2024 15:05:32 +0200 Subject: [PATCH] SH, HB using both MFObservedData and observed_configs --- neps/optimizers/multi_fidelity/dyhpo.py | 45 ++- neps/optimizers/multi_fidelity/hyperband.py | 7 +- .../multi_fidelity/successive_halving.py | 151 ++++++--- neps/optimizers/multi_fidelity/utils.py | 291 ++++++++++++------ 4 files changed, 324 insertions(+), 170 deletions(-) diff --git a/neps/optimizers/multi_fidelity/dyhpo.py b/neps/optimizers/multi_fidelity/dyhpo.py index 5e540bc3..b23f82ba 100755 --- a/neps/optimizers/multi_fidelity/dyhpo.py +++ b/neps/optimizers/multi_fidelity/dyhpo.py @@ -6,12 +6,10 @@ import numpy as np import pandas as pd -import time - -from ...utils.common import EvaluationData, SimpleCSVWriter from ...metahyper import ConfigResult, instance_from_map from ...search_spaces.search_space import FloatParameter, IntegerParameter, SearchSpace +from ...utils.common import EvaluationData, SimpleCSVWriter from ..base_optimizer import BaseOptimizer from ..bayesian_optimization.acquisition_functions import AcquisitionMapping from ..bayesian_optimization.acquisition_functions.base_acquisition import BaseAcquisition @@ -28,11 +26,15 @@ class AcqWriter(SimpleCSVWriter): def set_data(self, sample_configs: pd.Series, acq_vals: pd.Series): - config_vals = pd.DataFrame([config.hp_values() for config in sample_configs], index=sample_configs.index) + config_vals = pd.DataFrame( + [config.hp_values() for config in sample_configs], index=sample_configs.index + ) if isinstance(acq_vals, pd.Series): acq_vals.name = "Acq Value" + # pylint: disable=attribute-defined-outside-init self.df = config_vals.join(acq_vals) self.df = self.df.sort_values(by="Acq Value") + # pylint: enable=attribute-defined-outside-init class MFEIBO(BaseOptimizer): @@ -118,8 +120,11 @@ def __init__( self.total_fevals: int = 0 self.observed_configs = MFObservedData( - columns=["config", "perf", "learning_curves"], - index_names=["config_id", "budget_id"], + config_id="config_id", + budget_id="budget_id", + config_col="config", + perf_col="perf", + learning_curve_col="learning_curves", ) # Preparing model @@ -304,10 +309,12 @@ def load_results( previous_results (dict[str, ConfigResult]): [description] pending_evaluations (dict[str, ConfigResult]): [description] """ - start = time.time() self.observed_configs = MFObservedData( - columns=["config", "perf", "learning_curves"], - index_names=["config_id", "budget_id"], + config_id="config_id", + budget_id="budget_id", + config_col="config", + perf_col="perf", + learning_curve_col="learning_curves", ) # previous optimization run exists and needs to be loaded self._load_previous_observations(previous_results) @@ -328,9 +335,6 @@ def load_results( init_phase = self.is_init_phase() if not init_phase: self._fit_models() - # print("-" * 50) - # print(f"| Total time for `load_results()`: {time.time()-start:.2f}s") - # print("-" * 50) @classmethod def _get_config_id_split(cls, config_id: str) -> tuple[str, str]: @@ -444,29 +448,21 @@ def get_config_and_ids( # pylint: disable=no-self-use else: if self.count == 0: self.logger.info("\nPartial learning curves as initial design:\n") - self.logger.info(f"{self.observed_configs.get_learning_curves()}\n") + self.logger.info(f"{self.observed_configs.get_trajectories()}\n") self.count += 1 # main acquisition call here after initial design is turned off self.logger.info("acquiring...") # generates candidate samples for acquisition calculation - start = time.time() samples = self.acquisition_sampler.sample( set_new_sample_fidelity=self.pipeline_space.fidelity.lower ) # fidelity values here should be the observations or min. fidelity - # print("-" * 50) - # print(f"| Total time for acq. sampling: {time.time()-start:.2f}s") - # print("-" * 50) - start = time.time() # calculating acquisition function values for the candidate samples acq, _samples = self.acquisition.eval( # type: ignore[attr-defined] x=samples, asscalar=True ) acq = pd.Series(acq, index=_samples.index) - # print("-" * 50) - # print(f"| Total time for acq. eval: {time.time()-start:.2f}s") - # print("-" * 50) # maximizing acquisition function best_idx = acq.sort_values().index[-1] # extracting the config ID for the selected maximizer @@ -519,8 +515,11 @@ def get_config_and_ids( # pylint: disable=no-self-use if best_idx > max(self.observed_configs.seen_config_ids) else ( self.get_budget_value( - self.observed_configs.get_max_observed_fidelity_level_per_config().loc[best_idx] - ) + self.step_size # ONE-STEP FIDELITY QUERY + self.observed_configs.get_max_observed_fidelity_level_per_config().loc[ + best_idx + ] + ) + + self.step_size # ONE-STEP FIDELITY QUERY ) ) # generating correct IDs diff --git a/neps/optimizers/multi_fidelity/hyperband.py b/neps/optimizers/multi_fidelity/hyperband.py index 86ff2f5f..1278cbee 100644 --- a/neps/optimizers/multi_fidelity/hyperband.py +++ b/neps/optimizers/multi_fidelity/hyperband.py @@ -4,10 +4,9 @@ import typing from copy import deepcopy -from typing import Any +from typing import Any, Literal import numpy as np -from typing_extensions import Literal from ...metahyper import ConfigResult from ...search_spaces.search_space import SearchSpace @@ -100,6 +99,8 @@ def _update_sh_bracket_state(self) -> None: # TODO: can we avoid copying full observation history bracket = self.sh_brackets[self.current_sh_bracket] # type: ignore bracket.observed_configs = self.observed_configs.copy() + # TODO: Do we NEED to copy here instead? + bracket.MFobserved_configs = self.MFobserved_configs # pylint: disable=no-self-use def clear_old_brackets(self): @@ -302,7 +303,7 @@ def __init__( prior_confidence=prior_confidence, random_interleave_prob=random_interleave_prob, sample_default_first=sample_default_first, - sample_default_at_target=sample_default_at_target + sample_default_at_target=sample_default_at_target, ) self.sampling_args = { "inc": None, diff --git a/neps/optimizers/multi_fidelity/successive_halving.py b/neps/optimizers/multi_fidelity/successive_halving.py index a3145dc2..94bdaeb5 100644 --- a/neps/optimizers/multi_fidelity/successive_halving.py +++ b/neps/optimizers/multi_fidelity/successive_halving.py @@ -5,10 +5,10 @@ import random import typing from copy import deepcopy +from typing import Literal import numpy as np import pandas as pd -from typing_extensions import Literal from ...metahyper import ConfigResult from ...search_spaces.hyperparameters.categorical import ( @@ -22,6 +22,7 @@ from ..base_optimizer import BaseOptimizer from .promotion_policy import AsyncPromotionPolicy, SyncPromotionPolicy from .sampling_policy import FixedPriorPolicy, RandomUniformPolicy +from .utils import MFObservedData CUSTOM_FLOAT_CONFIDENCE_SCORES = FLOAT_CONFIDENCE_SCORES.copy() CUSTOM_FLOAT_CONFIDENCE_SCORES.update({"ultra": 0.05}) @@ -102,8 +103,7 @@ def __init__( # the parameter is exposed to allow HB to call SH with different stopping rates self.early_stopping_rate = early_stopping_rate self.sampling_policy = sampling_policy( - pipeline_space=self.pipeline_space, - logger=self.logger + pipeline_space=self.pipeline_space, logger=self.logger ) self.promotion_policy = promotion_policy(self.eta) @@ -132,6 +132,15 @@ def __init__( self.sampling_args: dict = {} self.fidelities = list(self.rung_map.values()) + + self.MFobserved_configs = MFObservedData( + config_id="config_id", + budget_id="budget_id", + config_col="config", + perf_col="perf", + auxiliary_cols=["rung"], + ) + # TODO: replace with MFobserved_configs # stores the observations made and the corresponding fidelity explored # crucial data structure used for determining promotion candidates self.observed_configs = pd.DataFrame([], columns=("config", "rung", "perf")) @@ -164,6 +173,10 @@ def _get_rung_trace(cls, rung_map: dict, config_map: dict) -> list[int]: return rung_trace def get_incumbent_score(self): + # budget_perf = self.MFobserved_configs.get_best_performance_for_each_budget() + # y_star = budget_perf[budget_perf.index.max] + + # TODO: replace this with existing method y_star = np.inf # minimizing optimizer if len(self.observed_configs): y_star = self.observed_configs.perf.values.min() @@ -219,52 +232,88 @@ def _get_config_id_split(cls, config_id: str) -> tuple[str, str]: def _load_previous_observations( self, previous_results: dict[str, ConfigResult] ) -> None: - for config_id, config_val in previous_results.items(): + def index_data_split(config_id: str, config_val): _config, _rung = self._get_config_id_split(config_id) perf = self.get_loss(config_val.result) - if int(_config) in self.observed_configs.index: - # config already recorded in dataframe - rung_recorded = self.observed_configs.at[int(_config), "rung"] - if rung_recorded < int(_rung): - # config recorded for a lower rung but higher rung eval available - self.observed_configs.at[int(_config), "config"] = config_val.config - self.observed_configs.at[int(_config), "rung"] = int(_rung) - self.observed_configs.at[int(_config), "perf"] = perf - else: - _df = pd.DataFrame( - [[config_val.config, int(_rung), perf]], - columns=self.observed_configs.columns, - index=pd.Series(int(_config)), # key for config_id - ) - self.observed_configs = pd.concat( - (self.observed_configs, _df) - ).sort_index() - # for efficiency, redefining the function to have the - # `rung_histories` assignment inside the for loop - # rung histories are collected only for `previous` and not `pending` configs - self.rung_histories[int(_rung)]["config"].append(int(_config)) - self.rung_histories[int(_rung)]["perf"].append(perf) + index = int(_config), int(_rung) + _data = [config_val.config, perf, int(_rung)] + return index, _data + + if len(previous_results) > 0: + index_row = [ + tuple(index_data_split(config_id, config_val)) + for config_id, config_val in previous_results.items() + ] + indices, rows = zip(*index_row) + self.MFobserved_configs.add_data(data=list(rows), index=list(indices)) + # TODO: replace this with new optimized method + # for config_id, config_val in previous_results.items(): + # _config, _rung = self._get_config_id_split(config_id) + # perf = self.get_loss(config_val.result) + # if int(_config) in self.observed_configs.index: + # # config already recorded in dataframe + # rung_recorded = self.observed_configs.at[int(_config), "rung"] + # if rung_recorded < int(_rung): + # # config recorded for a lower rung but higher rung eval available + # self.observed_configs.at[int(_config), "config"] = config_val.config + # self.observed_configs.at[int(_config), "rung"] = int(_rung) + # self.observed_configs.at[int(_config), "perf"] = perf + # else: + # _df = pd.DataFrame( + # [[config_val.config, int(_rung), perf]], + # columns=self.observed_configs.columns, + # index=pd.Series(int(_config)), # key for config_id + # ) + # self.observed_configs = pd.concat( + # (self.observed_configs, _df) + # ).sort_index() + # # for efficiency, redefining the function to have the + # # `rung_histories` assignment inside the for loop + # # rung histories are collected only for `previous` and not `pending` configs + # self.rung_histories[int(_rung)]["config"].append(int(_config)) + # self.rung_histories[int(_rung)]["perf"].append(perf) return def _handle_pending_evaluations( self, pending_evaluations: dict[str, ConfigResult] ) -> None: + def index_data_split(config_id: str, config_val): + _config, _rung = self._get_config_id_split(config_id) + # perf = self.get_loss(config_val.result) + index = int(_config), int(_rung) + _data = [ + # use `config_val` instead of `config_val.config` + # unlike `previous_results` case + config_val, + np.nan, + int(_rung), + ] + return index, _data + + if len(pending_evaluations) > 0: + index_row = [ + tuple(index_data_split(config_id, config_val)) + for config_id, config_val in pending_evaluations.items() + ] + indices, rows = zip(*index_row) + self.MFobserved_configs.add_data(data=list(rows), index=list(indices)) + # TODO: replace this # iterates over all pending evaluations and updates the list of observed # configs with the rung and performance as None - for config_id, config in pending_evaluations.items(): - _config, _rung = self._get_config_id_split(config_id) - if int(_config) not in self.observed_configs.index: - _df = pd.DataFrame( - [[config, int(_rung), np.nan]], - columns=self.observed_configs.columns, - index=pd.Series(int(_config)), # key for config_id - ) - self.observed_configs = pd.concat( - (self.observed_configs, _df) - ).sort_index() - else: - self.observed_configs.at[int(_config), "rung"] = int(_rung) - self.observed_configs.at[int(_config), "perf"] = np.nan + # for config_id, config in pending_evaluations.items(): + # _config, _rung = self._get_config_id_split(config_id) + # if int(_config) not in self.observed_configs.index: + # _df = pd.DataFrame( + # [[config, int(_rung), np.nan]], + # columns=self.observed_configs.columns, + # index=pd.Series(int(_config)), # key for config_id + # ) + # self.observed_configs = pd.concat( + # (self.observed_configs, _df) + # ).sort_index() + # else: + # self.observed_configs.at[int(_config), "rung"] = int(_rung) + # self.observed_configs.at[int(_config), "perf"] = np.nan return def clean_rung_information(self): @@ -290,6 +339,7 @@ def _get_rungs_state(self, observed_configs=None): # iterates over the list of explored configs and buckets them to respective # rungs depending on the highest fidelity it was evaluated at self.clean_rung_information() + # TODO: create a new method for this for _rung in observed_configs.rung.unique(): idxs = observed_configs.rung == _rung self.rung_members[_rung] = observed_configs.index[idxs].values @@ -331,7 +381,15 @@ def load_results( for rung in range(self.min_rung, self.max_rung + 1) } - self.observed_configs = pd.DataFrame([], columns=("config", "rung", "perf")) + self.MFobserved_configs = MFObservedData( + config_id="config_id", + budget_id="budget_id", + config_col="config", + perf_col="perf", + auxiliary_cols=["rung"], + ) + + # self.observed_configs = pd.DataFrame([], columns=("config", "rung", "perf")) # previous optimization run exists and needs to be loaded self._load_previous_observations(previous_results) @@ -340,6 +398,12 @@ def load_results( # account for pending evaluations self._handle_pending_evaluations(pending_evaluations) + # TODO: change this after testing + # Copy data into old format + self.observed_configs = self.MFobserved_configs.copy_df( + df=self.MFobserved_configs.reduce_to_max_seen_budgets() + ) + # process optimization state and bucket observations per rung self._get_rungs_state() @@ -374,7 +438,9 @@ def sample_new_config( return config def _generate_new_config_id(self): - return self.observed_configs.index.max() + 1 if len(self.observed_configs) else 0 + return self.MFobserved_configs.next_config_id() + # TODO: replace this with existing + # return self.observed_configs.index.max() + 1 if len(self.observed_configs) else 0 def get_default_configuration(self): pass @@ -403,6 +469,7 @@ def get_config_and_ids( # pylint: disable=no-self-use rung_to_promote = self.is_promotable() if rung_to_promote is not None: # promotes the first recorded promotable config in the argsort-ed rung + # TODO: What to do with this? row = self.observed_configs.iloc[self.rung_promotions[rung_to_promote][0]] config = deepcopy(row["config"]) rung = rung_to_promote + 1 diff --git a/neps/optimizers/multi_fidelity/utils.py b/neps/optimizers/multi_fidelity/utils.py index 80a6a230..cbc456db 100644 --- a/neps/optimizers/multi_fidelity/utils.py +++ b/neps/optimizers/multi_fidelity/utils.py @@ -1,18 +1,19 @@ # type: ignore from __future__ import annotations +from copy import deepcopy from typing import Any, Sequence import numpy as np import pandas as pd -import time import torch -from copy import deepcopy - from ...optimizers.utils import map_real_hyperparameters_from_tabular_ids from ...search_spaces.search_space import SearchSpace +# from neps.optimizers.utils import map_real_hyperparameters_from_tabular_ids +# from neps.search_spaces.search_space import SearchSpace + def continuous_to_tabular( config: SearchSpace, categorical_space: SearchSpace @@ -62,32 +63,32 @@ class MFObservedData: def __init__( self, - columns: list[str] | None = None, - index_names: list[str] | None = None, + config_id: str | None = None, + budget_id: str | None = None, + config_col: str | None = None, + perf_col: str | None = None, + learning_curve_col: str | None = None, + auxiliary_cols: list[str] | None = None, ): - if columns is None: - columns = [self.default_config_col, self.default_perf_col] - if index_names is None: - index_names = [self.default_config_idx, self.default_budget_idx] + self.config_col = self.default_config_col if config_col is None else config_col + self.perf_col = self.default_perf_col if perf_col is None else perf_col - self.config_col = columns[0] - self.perf_col = columns[1] + self.config_idx = self.default_config_idx if config_id is None else config_id + self.budget_idx = self.default_budget_idx if budget_id is None else budget_id - if len(columns) > 2: - self.lc_col_name = columns[2] - else: - self.lc_col_name = self.default_lc_col + self.lc_col_name = learning_curve_col - if len(index_names) == 1: - index_names += ["budget_id"] + auxiliary_cols = [] if auxiliary_cols is None else auxiliary_cols - self.config_idx = index_names[0] - self.budget_idx = index_names[1] - self.index_names = index_names + self.index_names = [self.config_idx, self.budget_idx] + col_names = [self.config_col, self.perf_col, self.lc_col_name] + auxiliary_cols + self.columns = [col_name for col_name in col_names if col_name is not None] - index = pd.MultiIndex.from_tuples([], names=index_names) + index = pd.MultiIndex.from_tuples([], names=self.index_names) - self.df = pd.DataFrame([], columns=columns, index=index) + self.df = pd.DataFrame([], columns=self.columns, index=index) + + self.mutable_columns = [self.config_col, self.lc_col_name] @property def pending_condition(self): @@ -116,6 +117,13 @@ def next_config_id(self) -> int: else: return 0 + @staticmethod + def __validate_index(index_list): + """Extends single indices to multi-index case""" + if all([isinstance(idx, int) for idx in index_list]): + index_list = list(zip(index_list, [0] * len(index_list))) + return index_list + def add_data( self, data: list[Any] | list[list[Any]], @@ -125,7 +133,6 @@ def add_data( """ Add data only if none of the indices are already existing in the DataFrame """ - # TODO: If index is only config_id extend it if not isinstance(index, list): index_list = [index] data_list = [data] @@ -133,6 +140,8 @@ def add_data( index_list = index data_list = data + index_list = self.__validate_index(index_list) + if not self.df.index.isin(index_list).any(): index = pd.MultiIndex.from_tuples(index_list, names=self.index_names) _df = pd.DataFrame(data_list, columns=self.df.columns, index=index) @@ -157,6 +166,9 @@ def update_data( index_list = [index] else: index_list = index + + index_list = self.__validate_index(index_list) + if self.df.index.isin(index_list).sum() == len(index_list): column_names, data = zip(*data_dict.items()) data = list(zip(*data)) @@ -169,7 +181,7 @@ def update_data( f"Given indices: {index_list}" ) - def get_learning_curves(self): + def get_trajectories(self): return self.df.pivot_table( index=self.df.index.names[0], columns=self.df.index.names[1], @@ -186,11 +198,11 @@ def get_incumbents_for_budgets(self, maximize: bool = False): Note: this will always map the best lowest ID if two configurations have the same performance at the same fidelity """ - learning_curves = self.get_learning_curves() + trajectories = self.get_trajectories() if maximize: - config_ids = learning_curves.idxmax(axis=0) + config_ids = trajectories.idxmax(axis=0) else: - config_ids = learning_curves.idxmin(axis=0) + config_ids = trajectories.idxmin(axis=0) indices = list(zip(config_ids.values.tolist(), config_ids.index.to_list())) partial_configs = self.df.loc[indices, self.config_col].to_list() @@ -203,17 +215,16 @@ def get_best_performance_for_each_budget(self, maximize: bool = False): Note: this will always map the best lowest ID if two configurations has the same performance at the same fidelity """ - learning_curves = self.get_learning_curves() + trajectories = self.get_trajectories() if maximize: - performance = learning_curves.max(axis=0) + performance = trajectories.max(axis=0) else: - performance = learning_curves.min(axis=0) + performance = trajectories.min(axis=0) return performance def get_budget_level_for_best_performance(self, maximize: bool = False) -> int: - """Returns the lowest budget level at which the highest performance was recorded. - """ + """Returns the lowest budget level at which the highest performance was recorded.""" perf_per_z = self.get_best_performance_for_each_budget(maximize=maximize) y_star = self.get_best_seen_performance(maximize=maximize) # uses the minimum of the budget that see the maximum obseved score @@ -228,29 +239,62 @@ def get_best_learning_curve_id(self, maximize: bool = False): Note: this will always return the single best lowest ID if two configurations has the same performance """ - learning_curves = self.get_learning_curves() + trajectories = self.get_trajectories() if maximize: - return learning_curves.max(axis=1).idxmax() + return trajectories.max(axis=1).idxmax() else: - return learning_curves.min(axis=1).idxmin() + return trajectories.min(axis=1).idxmin() def get_best_seen_performance(self, maximize: bool = False): - learning_curves = self.get_learning_curves() + trajectories = self.get_trajectories() if maximize: - return learning_curves.max(axis=1).max() + return trajectories.max(axis=1).max() else: - return learning_curves.min(axis=1).min() + return trajectories.min(axis=1).min() def add_budget_column(self): - combined_df = self.df.reset_index(level=1) - combined_df.set_index( - keys=[self.budget_idx], drop=False, append=True, inplace=True - ) - return combined_df + pass + # budget_column = self.df.index.get_level_values(1) + # self.df[self.budget_idx] = budget_column + # combined_df = self.df.reset_index(level=1) + # combined_df.set_index( + # keys=[self.budget_idx], drop=False, append=True, inplace=True + # ) + # return combined_df + + def copy_df(self, df: pd.DataFrame | None = None): + """ + Use this function to copy df if you are going to + perform some operations on its elements. + + DataFrames are not meant for mutable data-types, + nevertheless we do put mutable SearchSpace objects into the config_col of the DF + In order not to change the values stored objects in the DF we deepcopy all + mutable columns here. + + self.mutable_columns must keep track of + the mutable columns at all times. + """ + if df is None: + df = self.df + new_df = pd.DataFrame() + new_df.index = df.index.copy(deep=True) + + for column in df.columns: + if column in self.mutable_columns: + new_column = [deepcopy(value) for value in df[column].values] + new_df[column] = new_column + else: + new_df[column] = df[column].copy(deep=True) + + return new_df def reduce_to_max_seen_budgets(self): self.df.sort_index(inplace=True) - combined_df = self.add_budget_column() + budget_column = self.df.index.get_level_values(1) + combined_df = self.df.copy(deep=True) + combined_df[self.budget_idx] = budget_column + # combined_df = self.copy_df(df=combined_df) return combined_df.groupby(level=0).last() def get_partial_configs_at_max_seen(self): @@ -262,7 +306,9 @@ def extract_learning_curve( if budget_id is None: # budget_id only None when predicting # extract full observed learning curve for prediction pipeline - budget_id = max(self.df.loc[config_id].index.get_level_values("budget_id").values) + 1 + budget_id = ( + max(self.df.loc[config_id].index.get_level_values("budget_id").values) + 1 + ) # For the first epoch we have no learning curve available if budget_id == 0: @@ -273,14 +319,13 @@ def extract_learning_curve( if self.lc_col_name in self.df.columns: lc = self.df.loc[(config_id, budget_id), self.lc_col_name] else: - lcs = self.get_learning_curves() - lc = lcs.loc[config_id, :budget_id].values.flatten().tolist() + trajectories = self.get_trajectories() + lc = trajectories.loc[config_id, :budget_id].values.flatten().tolist() return deepcopy(lc) def get_training_data_4DyHPO( self, df: pd.DataFrame, pipeline_space: SearchSpace | None = None ): - start = time.time() configs = [] learning_curves = [] performance = [] @@ -295,30 +340,27 @@ def get_training_data_4DyHPO( configs.append(row[self.config_col]) performance.append(row[self.perf_col]) learning_curves.append(self.extract_learning_curve(config_id, budget_id)) - # print("-" * 50) - # print(f"| Time for `get_training_data_4DyHPO()`: {time.time()-start:.2f}s") - # print("-" * 50) return configs, learning_curves, performance def get_best_performance_per_config(self, maximize: bool = False) -> pd.Series: - """Returns the best score recorded per config across fidelities seen. - """ + """Returns the best score recorded per config across fidelities seen.""" op = np.max if maximize else np.min perf = ( - self.df - .sort_values("budget_id", ascending=False) # sorts with largest budget first + self.df.sort_values( + "budget_id", ascending=False + ) # sorts with largest budget first .groupby("config_id") # retains only config_id .first() # retrieves the largest budget seen for each config_id - .learning_curves # extracts all values seen till largest budget for a config - .apply(op) # finds the minimum over per-config learning curve + .learning_curves.apply( # extracts all values seen till largest budget for a config + op + ) # finds the minimum over per-config learning curve ) return perf def get_max_observed_fidelity_level_per_config(self) -> pd.Series: - """Returns the highest fidelity level recorded per config seen. - """ + """Returns the highest fidelity level recorded per config seen.""" max_z_observed = { - _id: self.df.loc[_id,:].index.sort_values()[-1] + _id: self.df.loc[_id, :].index.sort_values()[-1] for _id in self.df.index.get_level_values("config_id").sort_values() } return pd.Series(max_z_observed) @@ -353,48 +395,93 @@ def token_ids(self) -> np.ndarray: if __name__ == "__main__": # TODO: Either delete these or convert them to tests (karibbov) + + def multi_index_parallel(): + data = MFObservedData( + config_id="config_id", + budget_id="budget_id", + config_col="config", + perf_col="perf", + ) + + # When adding multiple indices data should be list of rows(lists) and the index should be list of tuples + data.add_data( + [["conf1", 0.5], ["conf2", 0.7], ["conf1", 0.6], ["conf2", 0.4]], + index=[(0, 0), (1, 1), (0, 3), (1, 0)], + ) + # print(data.df) + + data.add_data( + [["conf1", 0.5], ["conf2", 0.10], ["conf1", 0.11]], + index=[(0, 2), (1, 2), (0, 1)], + ) + + print(data.df) + # print(data.get_trajectories()) + # print( + # "Mapping of budget IDs into best performing configurations at each fidelity:\n", + # data.get_incumbents_for_budgets(), + # ) + # print( + # "Best Performance at each budget level:\n", + # data.get_best_performance_for_each_budget(), + # ) + # print( + # "Configuration ID of the best observed performance so far: ", + # data.get_best_learning_curve_id(), + # ) + # print(data.extract_learning_curve(0, 2)) + # # data.df.sort_index(inplace=True) + # print(data.get_partial_configs_at_max_seen()) + # + # # When updating multiple indices at a time both the values in the data dictionary and the indices should be lists + data.update_data({"perf": [1.8, 1.5]}, index=[(1, 1), (0, 0)]) + print(data.df) + + def multi_index_single(): + data = MFObservedData( + config_id="config_id", + budget_id="budget_id", + config_col="config", + perf_col="perf", + ) + + # when adding a single row second level list is not necessary + data.add_data(["conf1", 0.5], index=(0, 0)) + data.add_data(["conf1", 0.8], index=(1, 0)) + print(data.df) + + data.update_data({"perf": [1.8], "budget_col": [5]}, index=(0, 0)) + print(data.df) + + def single_index_parallel(): + data = MFObservedData( + config_id="config_id", + budget_id="budget_id", + config_col="config", + perf_col="perf", + ) + + # When adding multiple indices data should be list of rows(lists) and the index should be list of tuples + data.add_data( + [["conf1", 0.5], ["conf2", 0.7]], + index=[(0), (1)], + ) + print(data.df) + + data.add_data( + [["conf1", 0.5], ["conf2", 0.10]], + index=[(2), (3)], + ) + + print(data.df) + + data.update_data({"perf": [1.8, 1.5]}, index=[(1), (0)]) + print(data.df) + """ Here are a few examples of how to manage data with this class: """ - data = MFObservedData(["config", "perf"], index_names=["config_id", "budget_id"]) - - # When adding multiple indices data should be list of rows(lists) and the index should be list of tuples - data.add_data( - [["conf1", 0.5], ["conf2", 0.7], ["conf1", 0.6], ["conf2", 0.4]], - index=[(0, 0), (1, 1), (0, 3), (1, 0)], - ) - data.add_data( - [["conf1", 0.5], ["conf2", 0.10], ["conf1", 0.11]], - index=[(0, 2), (1, 2), (0, 1)], - ) - - print(data.df) - print(data.get_learning_curves()) - print( - "Mapping of budget IDs into best performing configurations at each fidelity:\n", - data.get_incumbents_for_budgets(), - ) - print( - "Best Performance at each budget level:\n", - data.get_best_performance_for_each_budget(), - ) - print( - "Configuration ID of the best observed performance so far: ", - data.get_best_learning_curve_id(), - ) - print(data.extract_learning_curve(0, 2)) - # data.df.sort_index(inplace=True) - print(data.get_partial_configs_at_max_seen()) - - # When updating multiple indices at a time both the values in the data dictionary and the indices should be lists - data.update_data({"perf": [1.8, 1.5]}, index=[(1, 1), (0, 0)]) - print(data.df) - - data = MFObservedData(["config", "perf"], index_names=["config_id", "budget_id"]) - - # when adding a single row second level list is not necessary - data.add_data(["conf1", 0.5], index=(0, 0)) - print(data.df) - - data.update_data({"perf": [1.8], "budget_col": [5]}, index=(0, 0)) - print(data.df) + single_index_parallel() + multi_index_single() + single_index_parallel()