Skip to content

Commit

Permalink
removed ensemble
Browse files Browse the repository at this point in the history
  • Loading branch information
ryuta-yoshimatsu committed Jun 4, 2024
1 parent 294692e commit fa57f11
Show file tree
Hide file tree
Showing 22 changed files with 77 additions and 609 deletions.
21 changes: 3 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,8 @@ run_forecast(
train_predict_ratio=2,
data_quality_check=True,
resample=False,
ensemble=True,
ensemble_metric="smape",
ensemble_metric_avg=0.3,
ensemble_metric_max=0.5,
ensemble_scoring_output=f"{catalog}.{db}.daily_ensemble_output",
active_models=active_models,
experiment_path=f"/Shared/mmf_experiment",
experiment_path="/Shared/mmf_experiment",
use_case_name="m4_daily",
)
```
Expand All @@ -99,12 +94,7 @@ run_forecast(
- ```stride``` is the number of steps in which you update your backtesting trial start date when going from one trial to the next.
- ```train_predict_ratio``` specifies the minimum length required for your training dataset with respect to ```prediction_length```. If ```train_predict_ratio```=2, you need to have training dataset that is at least twice as long as ```prediciton_length```.
- ```data_quality_check``` checks the quality of the input data if set to True. See [data_quality_checks.py](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/data_quality_checks.py) for the full details of the checks.
- ```resample``` backfills empty entries with 0 if set to True.
- ```ensemble```: if you have forecasts from multiple models, you can take a simple mean, min and max of these values and generate an ensemble forecast.
- ```ensemble_metric``` is smape (symmetric mean absolute percentage error) by default. You can or add your own metrics at the main core of the forecasting_sa package or you simply can use ```evaluation_output``` to calculate any metric of your choice.
- ```ensemble_metric_avg``` sets the maximum for the avg smape from each model, above which we exclude from ensembling.
- ```ensemble_metric_max``` sets the maximum for the smape of each model, above which we exclude from ensembling.
- ```ensemble_scoring_output``` is a delta table where you write the ensembled forecasts.
- ```resample``` backfills skipped entries with 0 if set to True. Default is False.
- ```active_models``` is a list of models you want to use.
- ```experiment_path``` to keep metrics under the MLFlow.
- ```use_case_name``` a new column will be created under the delta Table, in case you save multiple trials under 1 table.
Expand Down Expand Up @@ -170,13 +160,8 @@ run_forecast(
train_predict_ratio=2,
data_quality_check=True,
resample=False,
ensemble=True,
ensemble_metric="smape",
ensemble_metric_avg=0.3,
ensemble_metric_max=0.5,
ensemble_scoring_output=f"{catalog}.{db}.daily_ensemble_output",
active_models=[model],
experiment_path=f"/Shared/mmf_experiment",
experiment_path="/Shared/mmf_experiment",
use_case_name="m4_daily",
accelerator="gpu",
)
Expand Down
92 changes: 8 additions & 84 deletions mmf_sa/Forecaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(
data_conf: Dict[str, Any],
spark: SparkSession,
experiment_id: str = None,
run_id: str = None,
):
if isinstance(conf, BaseContainer):
self.conf = conf
Expand All @@ -53,8 +54,10 @@ def __init__(
self.conf = OmegaConf.create(_yaml_conf)
else:
raise Exception("No configuration provided!")

self.run_id = str(uuid.uuid4())
if run_id:
self.run_id = run_id
else:
self.run_id = str(uuid.uuid4())
self.data_conf = data_conf
self.model_registry = ModelRegistry(self.conf)
self.spark = spark
Expand All @@ -66,7 +69,6 @@ def __init__(
raise Exception(
"Please set 'experiment_path' parameter in the configuration file!"
)
self.selection_metric = self.conf["selection_metric"]
self.run_date = datetime.now()

def set_mlflow_experiment(self):
Expand Down Expand Up @@ -118,15 +120,12 @@ def split_df_train_val(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFra
return train_df, val_df

def evaluate_score(
self, evaluating: bool = True, scoring: bool = True, ensemble: bool = True
) -> str:
self, evaluate: bool = True, score: bool = True) -> str:
print("Starting evaluate_score")
if evaluating:
if evaluate:
self.evaluate_models()
if scoring:
if score:
self.score_models()
if ensemble:
self.ensemble()
print("Finished evaluate_score")
return self.run_id

Expand Down Expand Up @@ -525,78 +524,3 @@ def get_latest_model_info(client, registered_name):
latest_version = version_int
return client.get_model_version(registered_name, latest_version)

def ensemble(self):
if self.conf.get("ensemble") and self.conf["ensemble_scoring_output"]:
metrics_df = (
self.spark.table(self.conf["evaluation_output"])
.where(col("run_id").eqNullSafe(lit(self.run_id)))
.where(
col("metric_name").eqNullSafe(
lit(self.conf.get("ensemble_metric", "smape"))
)
)
)
models_df = (
metrics_df.groupby(self.conf["group_id"], "model")
.agg(
avg("metric_value").alias("metric_avg"),
min("metric_value").alias("metric_min"),
max("metric_value").alias("metric_max"),
)
.where(
col("metric_avg") < lit(self.conf.get("ensemble_metric_avg", 0.2))
)
.where(
col("metric_max") < lit(self.conf.get("ensemble_metric_max", 0.5))
)
.where(col("metric_min") > lit(0.01))
)
df = (
self.spark.table(self.conf["scoring_output"])
.where(col("run_id").eqNullSafe(lit(self.run_id)))
.join(
models_df.select(self.conf["group_id"], "model"),
on=[self.conf["group_id"], "model"],
)
)

left = df.select(
self.conf["group_id"], "run_id", "run_date", "use_case", "model",
posexplode(self.conf["date_col"])
).withColumnRenamed('col', self.conf["date_col"])

right = df.select(
self.conf["group_id"], "run_id", "run_date", "use_case", "model",
posexplode(self.conf["target"])
).withColumnRenamed('col', self.conf["target"])

merged = left.join(right, [
self.conf["group_id"], 'run_id', 'run_date', 'use_case', 'model',
'pos'], 'inner').drop("pos")

aggregated_df = merged.groupby(
self.conf["group_id"], self.conf["date_col"]
).agg(
avg(self.conf["target"]).alias(self.conf["target"] + "_avg"),
min(self.conf["target"]).alias(self.conf["target"] + "_min"),
max(self.conf["target"]).alias(self.conf["target"] + "_max"),
)

aggregated_df = aggregated_df.orderBy(self.conf["group_id"], self.conf["date_col"])\
.groupBy(self.conf["group_id"]).agg(
collect_list(self.conf["date_col"]).alias(self.conf["date_col"]),
collect_list(self.conf["target"] + "_avg").alias(self.conf["target"] + "_avg"),
collect_list(self.conf["target"] + "_min").alias(self.conf["target"] + "_min"),
collect_list(self.conf["target"] + "_max").alias(self.conf["target"] + "_max")
)

(
aggregated_df.withColumn(self.conf["group_id"], col(self.conf["group_id"]).cast(StringType()))
.withColumn("run_id", lit(self.run_id))
.withColumn("run_date", lit(self.run_date))
.withColumn("use_case", lit(self.conf["use_case_name"]))
.withColumn("model", lit("ensemble"))
.write.format("delta")
.mode("append")
.saveAsTable(self.conf["ensemble_scoring_output"])
)
54 changes: 17 additions & 37 deletions mmf_sa/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,31 @@
def run_forecast(
spark: SparkSession,
train_data: Union[str, pd.DataFrame, DataFrame],
group_id: str = "unique_id",
date_col: str = "date",
target: str = "y",
freq: str = "D",
prediction_length: int = 10,
backtest_months: int = 1,
stride: int = 10,
group_id: str,
date_col: str,
target: str,
freq: str,
prediction_length: int,
backtest_months: int,
stride: int,
metric: str = "smape",
resample: bool = False,
scoring_data: Union[str, pd.DataFrame, DataFrame] = None,
scoring_output: str = None,
evaluation_output: str = None,
model_output: str = None,
ensemble: bool = None,
ensemble_metric: str = None,
ensemble_metric_avg: float = None,
ensemble_metric_max: float = None,
ensemble_scoring_output: str = None,
use_case_name: str = None,
static_features: List[str] = None,
dynamic_future: List[str] = None,
dynamic_historical: List[str] = None,
active_models: List[str] = None,
accelerator: str = None,
scoring_model_stage: str = None,
selection_metric: str = None,
backtest_retrain: bool = None,
train_predict_ratio: int = None,
data_quality_check: bool = None,
experiment_path: str = None,
conf: Union[str, Dict[str, Any], OmegaConf] = None,
run_id: str = None,
) -> str:
if isinstance(conf, dict):
_conf = OmegaConf.create(conf)
Expand All @@ -57,7 +51,7 @@ def run_forecast(
_conf = OmegaConf.create()

base_conf = OmegaConf.create(
pkg_resources.read_text(sys.modules[__name__], "base_forecasting_conf.yaml")
pkg_resources.read_text(sys.modules[__name__], "forecasting_conf.yaml")
)
_conf = OmegaConf.merge(base_conf, _conf)

Expand All @@ -66,7 +60,6 @@ def run_forecast(
_data_conf["train_data"] = train_data
else:
_conf["train_data"] = train_data

_conf["group_id"] = group_id
_conf["date_col"] = date_col
_conf["target"] = target
Expand All @@ -86,18 +79,12 @@ def run_forecast(
_data_conf["scoring_data"] = scoring_data
else:
_conf["scoring_data"] = scoring_data
run_ensemble = True

if use_case_name is not None:
_conf["use_case_name"] = use_case_name
if active_models is not None:
_conf["active_models"] = active_models
if accelerator is not None:
_conf["accelerator"] = accelerator
if scoring_model_stage is not None:
_conf["scoring_model_stage"] = scoring_model_stage
if selection_metric is not None:
_conf["selection_metric"] = selection_metric
if backtest_retrain is not None:
_conf["backtest_retrain"] = backtest_retrain
if train_predict_ratio is not None:
Expand All @@ -108,16 +95,6 @@ def run_forecast(
_conf["evaluation_output"] = evaluation_output
if model_output is not None:
_conf["model_output"] = model_output
if ensemble is not None:
_conf["ensemble"] = ensemble
if ensemble_metric is not None:
_conf["ensemble_metric"] = ensemble_metric
if ensemble_metric_avg is not None:
_conf["ensemble_metric_avg"] = ensemble_metric_avg
if ensemble_metric_max is not None:
_conf["ensemble_metric_max"] = ensemble_metric_max
if ensemble_scoring_output is not None:
_conf["ensemble_scoring_output"] = ensemble_scoring_output
if data_quality_check is not None:
_conf["data_quality_check"] = data_quality_check
if static_features is not None:
Expand All @@ -126,13 +103,16 @@ def run_forecast(
_conf["dynamic_future"] = dynamic_future
if dynamic_historical is not None:
_conf["dynamic_historical"] = dynamic_historical
if run_id is not None:
_conf["run_id"] = run_id

f = Forecaster(conf=_conf, data_conf=_data_conf, spark=spark)
run_id = f.evaluate_score(
evaluating=run_evaluation,
scoring=run_scoring,
ensemble=run_ensemble,
f = Forecaster(
conf=_conf,
data_conf=_data_conf,
spark=spark,
run_id=run_id,
)
run_id = f.evaluate_score(evaluate=run_evaluation, score=run_scoring)
return run_id


Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
output_format: "delta"
accelerator: cpu
temp_path: /Volumes/solacc_uc/mmf/partitions

metric: smape
selection_metric: smape_eval
backtest_retrain: false

accelerator: cpu
resample: false
train_predict_ratio: 4
freq: D
Expand Down
Loading

0 comments on commit fa57f11

Please sign in to comment.