Skip to content

Commit

Permalink
SH, HB using both MFObservedData and observed_configs
Browse files Browse the repository at this point in the history
  • Loading branch information
karibbov committed Apr 8, 2024
1 parent 98ee210 commit eb46e35
Show file tree
Hide file tree
Showing 4 changed files with 324 additions and 170 deletions.
45 changes: 22 additions & 23 deletions neps/optimizers/multi_fidelity/dyhpo.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@

import numpy as np
import pandas as pd
import time


from ...utils.common import EvaluationData, SimpleCSVWriter
from ...metahyper import ConfigResult, instance_from_map
from ...search_spaces.search_space import FloatParameter, IntegerParameter, SearchSpace
from ...utils.common import EvaluationData, SimpleCSVWriter
from ..base_optimizer import BaseOptimizer
from ..bayesian_optimization.acquisition_functions import AcquisitionMapping
from ..bayesian_optimization.acquisition_functions.base_acquisition import BaseAcquisition
Expand All @@ -28,11 +26,15 @@

class AcqWriter(SimpleCSVWriter):
def set_data(self, sample_configs: pd.Series, acq_vals: pd.Series):
config_vals = pd.DataFrame([config.hp_values() for config in sample_configs], index=sample_configs.index)
config_vals = pd.DataFrame(
[config.hp_values() for config in sample_configs], index=sample_configs.index
)
if isinstance(acq_vals, pd.Series):
acq_vals.name = "Acq Value"
# pylint: disable=attribute-defined-outside-init
self.df = config_vals.join(acq_vals)
self.df = self.df.sort_values(by="Acq Value")
# pylint: enable=attribute-defined-outside-init


class MFEIBO(BaseOptimizer):
Expand Down Expand Up @@ -118,8 +120,11 @@ def __init__(
self.total_fevals: int = 0

self.observed_configs = MFObservedData(
columns=["config", "perf", "learning_curves"],
index_names=["config_id", "budget_id"],
config_id="config_id",
budget_id="budget_id",
config_col="config",
perf_col="perf",
learning_curve_col="learning_curves",
)

# Preparing model
Expand Down Expand Up @@ -304,10 +309,12 @@ def load_results(
previous_results (dict[str, ConfigResult]): [description]
pending_evaluations (dict[str, ConfigResult]): [description]
"""
start = time.time()
self.observed_configs = MFObservedData(
columns=["config", "perf", "learning_curves"],
index_names=["config_id", "budget_id"],
config_id="config_id",
budget_id="budget_id",
config_col="config",
perf_col="perf",
learning_curve_col="learning_curves",
)
# previous optimization run exists and needs to be loaded
self._load_previous_observations(previous_results)
Expand All @@ -328,9 +335,6 @@ def load_results(
init_phase = self.is_init_phase()
if not init_phase:
self._fit_models()
# print("-" * 50)
# print(f"| Total time for `load_results()`: {time.time()-start:.2f}s")
# print("-" * 50)

@classmethod
def _get_config_id_split(cls, config_id: str) -> tuple[str, str]:
Expand Down Expand Up @@ -444,29 +448,21 @@ def get_config_and_ids( # pylint: disable=no-self-use
else:
if self.count == 0:
self.logger.info("\nPartial learning curves as initial design:\n")
self.logger.info(f"{self.observed_configs.get_learning_curves()}\n")
self.logger.info(f"{self.observed_configs.get_trajectories()}\n")
self.count += 1
# main acquisition call here after initial design is turned off
self.logger.info("acquiring...")
# generates candidate samples for acquisition calculation
start = time.time()
samples = self.acquisition_sampler.sample(
set_new_sample_fidelity=self.pipeline_space.fidelity.lower
) # fidelity values here should be the observations or min. fidelity
# print("-" * 50)
# print(f"| Total time for acq. sampling: {time.time()-start:.2f}s")
# print("-" * 50)

start = time.time()
# calculating acquisition function values for the candidate samples
acq, _samples = self.acquisition.eval( # type: ignore[attr-defined]
x=samples, asscalar=True
)
acq = pd.Series(acq, index=_samples.index)

# print("-" * 50)
# print(f"| Total time for acq. eval: {time.time()-start:.2f}s")
# print("-" * 50)
# maximizing acquisition function
best_idx = acq.sort_values().index[-1]
# extracting the config ID for the selected maximizer
Expand Down Expand Up @@ -519,8 +515,11 @@ def get_config_and_ids( # pylint: disable=no-self-use
if best_idx > max(self.observed_configs.seen_config_ids)
else (
self.get_budget_value(
self.observed_configs.get_max_observed_fidelity_level_per_config().loc[best_idx]
) + self.step_size # ONE-STEP FIDELITY QUERY
self.observed_configs.get_max_observed_fidelity_level_per_config().loc[
best_idx
]
)
+ self.step_size # ONE-STEP FIDELITY QUERY
)
)
# generating correct IDs
Expand Down
7 changes: 4 additions & 3 deletions neps/optimizers/multi_fidelity/hyperband.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@

import typing
from copy import deepcopy
from typing import Any
from typing import Any, Literal

import numpy as np
from typing_extensions import Literal

from ...metahyper import ConfigResult
from ...search_spaces.search_space import SearchSpace
Expand Down Expand Up @@ -100,6 +99,8 @@ def _update_sh_bracket_state(self) -> None:
# TODO: can we avoid copying full observation history
bracket = self.sh_brackets[self.current_sh_bracket] # type: ignore
bracket.observed_configs = self.observed_configs.copy()
# TODO: Do we NEED to copy here instead?
bracket.MFobserved_configs = self.MFobserved_configs

# pylint: disable=no-self-use
def clear_old_brackets(self):
Expand Down Expand Up @@ -302,7 +303,7 @@ def __init__(
prior_confidence=prior_confidence,
random_interleave_prob=random_interleave_prob,
sample_default_first=sample_default_first,
sample_default_at_target=sample_default_at_target
sample_default_at_target=sample_default_at_target,
)
self.sampling_args = {
"inc": None,
Expand Down
151 changes: 109 additions & 42 deletions neps/optimizers/multi_fidelity/successive_halving.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import random
import typing
from copy import deepcopy
from typing import Literal

import numpy as np
import pandas as pd
from typing_extensions import Literal

from ...metahyper import ConfigResult
from ...search_spaces.hyperparameters.categorical import (
Expand All @@ -22,6 +22,7 @@
from ..base_optimizer import BaseOptimizer
from .promotion_policy import AsyncPromotionPolicy, SyncPromotionPolicy
from .sampling_policy import FixedPriorPolicy, RandomUniformPolicy
from .utils import MFObservedData

CUSTOM_FLOAT_CONFIDENCE_SCORES = FLOAT_CONFIDENCE_SCORES.copy()
CUSTOM_FLOAT_CONFIDENCE_SCORES.update({"ultra": 0.05})
Expand Down Expand Up @@ -102,8 +103,7 @@ def __init__(
# the parameter is exposed to allow HB to call SH with different stopping rates
self.early_stopping_rate = early_stopping_rate
self.sampling_policy = sampling_policy(
pipeline_space=self.pipeline_space,
logger=self.logger
pipeline_space=self.pipeline_space, logger=self.logger
)
self.promotion_policy = promotion_policy(self.eta)

Expand Down Expand Up @@ -132,6 +132,15 @@ def __init__(
self.sampling_args: dict = {}

self.fidelities = list(self.rung_map.values())

self.MFobserved_configs = MFObservedData(
config_id="config_id",
budget_id="budget_id",
config_col="config",
perf_col="perf",
auxiliary_cols=["rung"],
)
# TODO: replace with MFobserved_configs
# stores the observations made and the corresponding fidelity explored
# crucial data structure used for determining promotion candidates
self.observed_configs = pd.DataFrame([], columns=("config", "rung", "perf"))
Expand Down Expand Up @@ -164,6 +173,10 @@ def _get_rung_trace(cls, rung_map: dict, config_map: dict) -> list[int]:
return rung_trace

def get_incumbent_score(self):
# budget_perf = self.MFobserved_configs.get_best_performance_for_each_budget()
# y_star = budget_perf[budget_perf.index.max]

# TODO: replace this with existing method
y_star = np.inf # minimizing optimizer
if len(self.observed_configs):
y_star = self.observed_configs.perf.values.min()
Expand Down Expand Up @@ -219,52 +232,88 @@ def _get_config_id_split(cls, config_id: str) -> tuple[str, str]:
def _load_previous_observations(
self, previous_results: dict[str, ConfigResult]
) -> None:
for config_id, config_val in previous_results.items():
def index_data_split(config_id: str, config_val):
_config, _rung = self._get_config_id_split(config_id)
perf = self.get_loss(config_val.result)
if int(_config) in self.observed_configs.index:
# config already recorded in dataframe
rung_recorded = self.observed_configs.at[int(_config), "rung"]
if rung_recorded < int(_rung):
# config recorded for a lower rung but higher rung eval available
self.observed_configs.at[int(_config), "config"] = config_val.config
self.observed_configs.at[int(_config), "rung"] = int(_rung)
self.observed_configs.at[int(_config), "perf"] = perf
else:
_df = pd.DataFrame(
[[config_val.config, int(_rung), perf]],
columns=self.observed_configs.columns,
index=pd.Series(int(_config)), # key for config_id
)
self.observed_configs = pd.concat(
(self.observed_configs, _df)
).sort_index()
# for efficiency, redefining the function to have the
# `rung_histories` assignment inside the for loop
# rung histories are collected only for `previous` and not `pending` configs
self.rung_histories[int(_rung)]["config"].append(int(_config))
self.rung_histories[int(_rung)]["perf"].append(perf)
index = int(_config), int(_rung)
_data = [config_val.config, perf, int(_rung)]
return index, _data

if len(previous_results) > 0:
index_row = [
tuple(index_data_split(config_id, config_val))
for config_id, config_val in previous_results.items()
]
indices, rows = zip(*index_row)
self.MFobserved_configs.add_data(data=list(rows), index=list(indices))
# TODO: replace this with new optimized method
# for config_id, config_val in previous_results.items():
# _config, _rung = self._get_config_id_split(config_id)
# perf = self.get_loss(config_val.result)
# if int(_config) in self.observed_configs.index:
# # config already recorded in dataframe
# rung_recorded = self.observed_configs.at[int(_config), "rung"]
# if rung_recorded < int(_rung):
# # config recorded for a lower rung but higher rung eval available
# self.observed_configs.at[int(_config), "config"] = config_val.config
# self.observed_configs.at[int(_config), "rung"] = int(_rung)
# self.observed_configs.at[int(_config), "perf"] = perf
# else:
# _df = pd.DataFrame(
# [[config_val.config, int(_rung), perf]],
# columns=self.observed_configs.columns,
# index=pd.Series(int(_config)), # key for config_id
# )
# self.observed_configs = pd.concat(
# (self.observed_configs, _df)
# ).sort_index()
# # for efficiency, redefining the function to have the
# # `rung_histories` assignment inside the for loop
# # rung histories are collected only for `previous` and not `pending` configs
# self.rung_histories[int(_rung)]["config"].append(int(_config))
# self.rung_histories[int(_rung)]["perf"].append(perf)
return

def _handle_pending_evaluations(
self, pending_evaluations: dict[str, ConfigResult]
) -> None:
def index_data_split(config_id: str, config_val):
_config, _rung = self._get_config_id_split(config_id)
# perf = self.get_loss(config_val.result)
index = int(_config), int(_rung)
_data = [
# use `config_val` instead of `config_val.config`
# unlike `previous_results` case
config_val,
np.nan,
int(_rung),
]
return index, _data

if len(pending_evaluations) > 0:
index_row = [
tuple(index_data_split(config_id, config_val))
for config_id, config_val in pending_evaluations.items()
]
indices, rows = zip(*index_row)
self.MFobserved_configs.add_data(data=list(rows), index=list(indices))
# TODO: replace this
# iterates over all pending evaluations and updates the list of observed
# configs with the rung and performance as None
for config_id, config in pending_evaluations.items():
_config, _rung = self._get_config_id_split(config_id)
if int(_config) not in self.observed_configs.index:
_df = pd.DataFrame(
[[config, int(_rung), np.nan]],
columns=self.observed_configs.columns,
index=pd.Series(int(_config)), # key for config_id
)
self.observed_configs = pd.concat(
(self.observed_configs, _df)
).sort_index()
else:
self.observed_configs.at[int(_config), "rung"] = int(_rung)
self.observed_configs.at[int(_config), "perf"] = np.nan
# for config_id, config in pending_evaluations.items():
# _config, _rung = self._get_config_id_split(config_id)
# if int(_config) not in self.observed_configs.index:
# _df = pd.DataFrame(
# [[config, int(_rung), np.nan]],
# columns=self.observed_configs.columns,
# index=pd.Series(int(_config)), # key for config_id
# )
# self.observed_configs = pd.concat(
# (self.observed_configs, _df)
# ).sort_index()
# else:
# self.observed_configs.at[int(_config), "rung"] = int(_rung)
# self.observed_configs.at[int(_config), "perf"] = np.nan
return

def clean_rung_information(self):
Expand All @@ -290,6 +339,7 @@ def _get_rungs_state(self, observed_configs=None):
# iterates over the list of explored configs and buckets them to respective
# rungs depending on the highest fidelity it was evaluated at
self.clean_rung_information()
# TODO: create a new method for this
for _rung in observed_configs.rung.unique():
idxs = observed_configs.rung == _rung
self.rung_members[_rung] = observed_configs.index[idxs].values
Expand Down Expand Up @@ -331,7 +381,15 @@ def load_results(
for rung in range(self.min_rung, self.max_rung + 1)
}

self.observed_configs = pd.DataFrame([], columns=("config", "rung", "perf"))
self.MFobserved_configs = MFObservedData(
config_id="config_id",
budget_id="budget_id",
config_col="config",
perf_col="perf",
auxiliary_cols=["rung"],
)

# self.observed_configs = pd.DataFrame([], columns=("config", "rung", "perf"))

# previous optimization run exists and needs to be loaded
self._load_previous_observations(previous_results)
Expand All @@ -340,6 +398,12 @@ def load_results(
# account for pending evaluations
self._handle_pending_evaluations(pending_evaluations)

# TODO: change this after testing
# Copy data into old format
self.observed_configs = self.MFobserved_configs.copy_df(
df=self.MFobserved_configs.reduce_to_max_seen_budgets()
)

# process optimization state and bucket observations per rung
self._get_rungs_state()

Expand Down Expand Up @@ -374,7 +438,9 @@ def sample_new_config(
return config

def _generate_new_config_id(self):
return self.observed_configs.index.max() + 1 if len(self.observed_configs) else 0
return self.MFobserved_configs.next_config_id()
# TODO: replace this with existing
# return self.observed_configs.index.max() + 1 if len(self.observed_configs) else 0

def get_default_configuration(self):
pass
Expand Down Expand Up @@ -403,6 +469,7 @@ def get_config_and_ids( # pylint: disable=no-self-use
rung_to_promote = self.is_promotable()
if rung_to_promote is not None:
# promotes the first recorded promotable config in the argsort-ed rung
# TODO: What to do with this?
row = self.observed_configs.iloc[self.rung_promotions[rung_to_promote][0]]
config = deepcopy(row["config"])
rung = rung_to_promote + 1
Expand Down
Loading

0 comments on commit eb46e35

Please sign in to comment.