diff --git a/README.md b/README.md index 2b347c6..fc80a6b 100644 --- a/README.md +++ b/README.md @@ -73,13 +73,8 @@ run_forecast( train_predict_ratio=2, data_quality_check=True, resample=False, - ensemble=True, - ensemble_metric="smape", - ensemble_metric_avg=0.3, - ensemble_metric_max=0.5, - ensemble_scoring_output=f"{catalog}.{db}.daily_ensemble_output", active_models=active_models, - experiment_path=f"/Shared/mmf_experiment", + experiment_path="/Shared/mmf_experiment", use_case_name="m4_daily", ) ``` @@ -99,12 +94,7 @@ run_forecast( - ```stride``` is the number of steps in which you update your backtesting trial start date when going from one trial to the next. - ```train_predict_ratio``` specifies the minimum length required for your training dataset with respect to ```prediction_length```. If ```train_predict_ratio```=2, you need to have training dataset that is at least twice as long as ```prediciton_length```. - ```data_quality_check``` checks the quality of the input data if set to True. See [data_quality_checks.py](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/data_quality_checks.py) for the full details of the checks. -- ```resample``` backfills empty entries with 0 if set to True. -- ```ensemble```: if you have forecasts from multiple models, you can take a simple mean, min and max of these values and generate an ensemble forecast. -- ```ensemble_metric``` is smape (symmetric mean absolute percentage error) by default. You can or add your own metrics at the main core of the forecasting_sa package or you simply can use ```evaluation_output``` to calculate any metric of your choice. -- ```ensemble_metric_avg``` sets the maximum for the avg smape from each model, above which we exclude from ensembling. -- ```ensemble_metric_max``` sets the maximum for the smape of each model, above which we exclude from ensembling. -- ```ensemble_scoring_output``` is a delta table where you write the ensembled forecasts. +- ```resample``` backfills skipped entries with 0 if set to True. Default is False. - ```active_models``` is a list of models you want to use. - ```experiment_path``` to keep metrics under the MLFlow. - ```use_case_name``` a new column will be created under the delta Table, in case you save multiple trials under 1 table. @@ -170,13 +160,8 @@ run_forecast( train_predict_ratio=2, data_quality_check=True, resample=False, - ensemble=True, - ensemble_metric="smape", - ensemble_metric_avg=0.3, - ensemble_metric_max=0.5, - ensemble_scoring_output=f"{catalog}.{db}.daily_ensemble_output", active_models=[model], - experiment_path=f"/Shared/mmf_experiment", + experiment_path="/Shared/mmf_experiment", use_case_name="m4_daily", accelerator="gpu", ) diff --git a/mmf_sa/Forecaster.py b/mmf_sa/Forecaster.py index f2ff733..079372d 100644 --- a/mmf_sa/Forecaster.py +++ b/mmf_sa/Forecaster.py @@ -43,6 +43,7 @@ def __init__( data_conf: Dict[str, Any], spark: SparkSession, experiment_id: str = None, + run_id: str = None, ): if isinstance(conf, BaseContainer): self.conf = conf @@ -53,8 +54,10 @@ def __init__( self.conf = OmegaConf.create(_yaml_conf) else: raise Exception("No configuration provided!") - - self.run_id = str(uuid.uuid4()) + if run_id: + self.run_id = run_id + else: + self.run_id = str(uuid.uuid4()) self.data_conf = data_conf self.model_registry = ModelRegistry(self.conf) self.spark = spark @@ -66,7 +69,6 @@ def __init__( raise Exception( "Please set 'experiment_path' parameter in the configuration file!" ) - self.selection_metric = self.conf["selection_metric"] self.run_date = datetime.now() def set_mlflow_experiment(self): @@ -118,15 +120,12 @@ def split_df_train_val(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFra return train_df, val_df def evaluate_score( - self, evaluating: bool = True, scoring: bool = True, ensemble: bool = True - ) -> str: + self, evaluate: bool = True, score: bool = True) -> str: print("Starting evaluate_score") - if evaluating: + if evaluate: self.evaluate_models() - if scoring: + if score: self.score_models() - if ensemble: - self.ensemble() print("Finished evaluate_score") return self.run_id @@ -525,78 +524,3 @@ def get_latest_model_info(client, registered_name): latest_version = version_int return client.get_model_version(registered_name, latest_version) - def ensemble(self): - if self.conf.get("ensemble") and self.conf["ensemble_scoring_output"]: - metrics_df = ( - self.spark.table(self.conf["evaluation_output"]) - .where(col("run_id").eqNullSafe(lit(self.run_id))) - .where( - col("metric_name").eqNullSafe( - lit(self.conf.get("ensemble_metric", "smape")) - ) - ) - ) - models_df = ( - metrics_df.groupby(self.conf["group_id"], "model") - .agg( - avg("metric_value").alias("metric_avg"), - min("metric_value").alias("metric_min"), - max("metric_value").alias("metric_max"), - ) - .where( - col("metric_avg") < lit(self.conf.get("ensemble_metric_avg", 0.2)) - ) - .where( - col("metric_max") < lit(self.conf.get("ensemble_metric_max", 0.5)) - ) - .where(col("metric_min") > lit(0.01)) - ) - df = ( - self.spark.table(self.conf["scoring_output"]) - .where(col("run_id").eqNullSafe(lit(self.run_id))) - .join( - models_df.select(self.conf["group_id"], "model"), - on=[self.conf["group_id"], "model"], - ) - ) - - left = df.select( - self.conf["group_id"], "run_id", "run_date", "use_case", "model", - posexplode(self.conf["date_col"]) - ).withColumnRenamed('col', self.conf["date_col"]) - - right = df.select( - self.conf["group_id"], "run_id", "run_date", "use_case", "model", - posexplode(self.conf["target"]) - ).withColumnRenamed('col', self.conf["target"]) - - merged = left.join(right, [ - self.conf["group_id"], 'run_id', 'run_date', 'use_case', 'model', - 'pos'], 'inner').drop("pos") - - aggregated_df = merged.groupby( - self.conf["group_id"], self.conf["date_col"] - ).agg( - avg(self.conf["target"]).alias(self.conf["target"] + "_avg"), - min(self.conf["target"]).alias(self.conf["target"] + "_min"), - max(self.conf["target"]).alias(self.conf["target"] + "_max"), - ) - - aggregated_df = aggregated_df.orderBy(self.conf["group_id"], self.conf["date_col"])\ - .groupBy(self.conf["group_id"]).agg( - collect_list(self.conf["date_col"]).alias(self.conf["date_col"]), - collect_list(self.conf["target"] + "_avg").alias(self.conf["target"] + "_avg"), - collect_list(self.conf["target"] + "_min").alias(self.conf["target"] + "_min"), - collect_list(self.conf["target"] + "_max").alias(self.conf["target"] + "_max") - ) - - ( - aggregated_df.withColumn(self.conf["group_id"], col(self.conf["group_id"]).cast(StringType())) - .withColumn("run_id", lit(self.run_id)) - .withColumn("run_date", lit(self.run_date)) - .withColumn("use_case", lit(self.conf["use_case_name"])) - .withColumn("model", lit("ensemble")) - .write.format("delta") - .mode("append") - .saveAsTable(self.conf["ensemble_scoring_output"]) - ) diff --git a/mmf_sa/__init__.py b/mmf_sa/__init__.py index 50d1f57..e7c1a2d 100644 --- a/mmf_sa/__init__.py +++ b/mmf_sa/__init__.py @@ -14,37 +14,31 @@ def run_forecast( spark: SparkSession, train_data: Union[str, pd.DataFrame, DataFrame], - group_id: str = "unique_id", - date_col: str = "date", - target: str = "y", - freq: str = "D", - prediction_length: int = 10, - backtest_months: int = 1, - stride: int = 10, + group_id: str, + date_col: str, + target: str, + freq: str, + prediction_length: int, + backtest_months: int, + stride: int, metric: str = "smape", resample: bool = False, scoring_data: Union[str, pd.DataFrame, DataFrame] = None, scoring_output: str = None, evaluation_output: str = None, model_output: str = None, - ensemble: bool = None, - ensemble_metric: str = None, - ensemble_metric_avg: float = None, - ensemble_metric_max: float = None, - ensemble_scoring_output: str = None, use_case_name: str = None, static_features: List[str] = None, dynamic_future: List[str] = None, dynamic_historical: List[str] = None, active_models: List[str] = None, accelerator: str = None, - scoring_model_stage: str = None, - selection_metric: str = None, backtest_retrain: bool = None, train_predict_ratio: int = None, data_quality_check: bool = None, experiment_path: str = None, conf: Union[str, Dict[str, Any], OmegaConf] = None, + run_id: str = None, ) -> str: if isinstance(conf, dict): _conf = OmegaConf.create(conf) @@ -57,7 +51,7 @@ def run_forecast( _conf = OmegaConf.create() base_conf = OmegaConf.create( - pkg_resources.read_text(sys.modules[__name__], "base_forecasting_conf.yaml") + pkg_resources.read_text(sys.modules[__name__], "forecasting_conf.yaml") ) _conf = OmegaConf.merge(base_conf, _conf) @@ -66,7 +60,6 @@ def run_forecast( _data_conf["train_data"] = train_data else: _conf["train_data"] = train_data - _conf["group_id"] = group_id _conf["date_col"] = date_col _conf["target"] = target @@ -86,18 +79,12 @@ def run_forecast( _data_conf["scoring_data"] = scoring_data else: _conf["scoring_data"] = scoring_data - run_ensemble = True - if use_case_name is not None: _conf["use_case_name"] = use_case_name if active_models is not None: _conf["active_models"] = active_models if accelerator is not None: _conf["accelerator"] = accelerator - if scoring_model_stage is not None: - _conf["scoring_model_stage"] = scoring_model_stage - if selection_metric is not None: - _conf["selection_metric"] = selection_metric if backtest_retrain is not None: _conf["backtest_retrain"] = backtest_retrain if train_predict_ratio is not None: @@ -108,16 +95,6 @@ def run_forecast( _conf["evaluation_output"] = evaluation_output if model_output is not None: _conf["model_output"] = model_output - if ensemble is not None: - _conf["ensemble"] = ensemble - if ensemble_metric is not None: - _conf["ensemble_metric"] = ensemble_metric - if ensemble_metric_avg is not None: - _conf["ensemble_metric_avg"] = ensemble_metric_avg - if ensemble_metric_max is not None: - _conf["ensemble_metric_max"] = ensemble_metric_max - if ensemble_scoring_output is not None: - _conf["ensemble_scoring_output"] = ensemble_scoring_output if data_quality_check is not None: _conf["data_quality_check"] = data_quality_check if static_features is not None: @@ -126,13 +103,16 @@ def run_forecast( _conf["dynamic_future"] = dynamic_future if dynamic_historical is not None: _conf["dynamic_historical"] = dynamic_historical + if run_id is not None: + _conf["run_id"] = run_id - f = Forecaster(conf=_conf, data_conf=_data_conf, spark=spark) - run_id = f.evaluate_score( - evaluating=run_evaluation, - scoring=run_scoring, - ensemble=run_ensemble, + f = Forecaster( + conf=_conf, + data_conf=_data_conf, + spark=spark, + run_id=run_id, ) + run_id = f.evaluate_score(evaluate=run_evaluation, score=run_scoring) return run_id diff --git a/mmf_sa/base_forecasting_conf.yaml b/mmf_sa/forecasting_conf.yaml similarity index 98% rename from mmf_sa/base_forecasting_conf.yaml rename to mmf_sa/forecasting_conf.yaml index 554a37d..d594c9c 100644 --- a/mmf_sa/base_forecasting_conf.yaml +++ b/mmf_sa/forecasting_conf.yaml @@ -1,11 +1,7 @@ -output_format: "delta" -accelerator: cpu temp_path: /Volumes/solacc_uc/mmf/partitions - metric: smape -selection_metric: smape_eval backtest_retrain: false - +accelerator: cpu resample: false train_predict_ratio: 4 freq: D diff --git a/mmf_sa/models/pytorch_forecasting/PyTorchForecastingModel.py b/mmf_sa/models/pytorch_forecasting/PyTorchForecastingModel.py deleted file mode 100644 index fb59ff1..0000000 --- a/mmf_sa/models/pytorch_forecasting/PyTorchForecastingModel.py +++ /dev/null @@ -1,191 +0,0 @@ -import pandas as pd -import numpy as np -import pytorch_lightning as pl -from pytorch_lightning.callbacks import EarlyStopping, LearningRateMonitor -from pytorch_lightning.loggers import TensorBoardLogger -import torch - -from pytorch_forecasting import Baseline, TemporalFusionTransformer, TimeSeriesDataSet -from pytorch_forecasting.data import GroupNormalizer -from pytorch_forecasting.metrics import SMAPE, PoissonLoss, QuantileLoss -from sklearn.base import BaseEstimator, TransformerMixin, RegressorMixin -from sktime.performance_metrics.forecasting import mean_absolute_percentage_error - - -class PyTorchForecastingRegressor(BaseEstimator, RegressorMixin): - def __init__(self, params): - self.params = params - - def create_training_dataset(self, X, y=None): - return TimeSeriesDataSet( - X, - time_idx="time_idx", - target=self.params["target"], - group_ids=self.params["static_categoricals"], - min_encoder_length=self.params["min_encoder_length"], - max_encoder_length=self.params["max_encoder_length"], - min_prediction_length=self.params["min_prediction_length"], - max_prediction_length=self.params["max_prediction_length"], - static_categoricals=self.params["static_categoricals"], - static_reals=self.params["static_reals"], - time_varying_known_categoricals=self.params[ - "time_varying_known_categoricals" - ], - time_varying_known_reals=self.params["time_varying_known_reals"], - time_varying_unknown_categoricals=self.params[ - "time_varying_unknown_categoricals" - ], - time_varying_unknown_reals=self.params["time_varying_unknown_reals"], - target_normalizer=GroupNormalizer( - groups=self.params["static_categoricals"], - method=self.params["target_normalizer_method"], - transformation=self.params["target_normalizer_transformation"], - center=self.params["target_normalizer_center"], - ), # use softplus with beta=1.0 and normalize by group - add_relative_time_idx=self.params["add_relative_time_idx"], - add_target_scales=self.params["add_target_scales"], - add_encoder_length=self.params["add_encoder_length"], - allow_missing_timesteps=self.params["allow_missing_timesteps"], - # categorical_encoders={"weekofyear": NaNLabelEncoder(add_nan=True).fit(df.ts_key)}, - ) - - def create_validation_dataset(self, X, training_dataset, y=None): - return TimeSeriesDataSet.from_dataset( - training_dataset, X, predict=True, stop_randomization=True - ) - - def train_validation_split(self, X, y=None): - _train = X[ - X.Date <= X.Date.max() - pd.DateOffset(months=self.params["val_month"]) - ] - print(_train.columns) - training_dataset = self.create_training_dataset(_train) - validation_dataset = self.create_validation_dataset(X, training_dataset) - return training_dataset, validation_dataset - - def create_model(self, training_dataset): - callbacks_list = [] - logger = TensorBoardLogger( - "/tmp/lightning_logs" - ) # logging results to a tensorboard - - if self.params["early_stopping"]: - early_stop_callback = EarlyStopping( - monitor=self.params["early_stop_monitor"], - min_delta=self.params["early_stop_min_delta"], - patience=self.params["early_stop_patience"], - verbose=True, - mode=self.params["early_stop_mode"], - ) - lr_logger = LearningRateMonitor() # log the learning rate - callbacks_list = [lr_logger, early_stop_callback] - - trainer = pl.Trainer( - max_epochs=self.params["max_epochs"], - gpus=self.params["gpus"], - weights_summary=self.params["weights_summary"], - gradient_clip_val=self.params["gradient_clip_val"], - # limit_train_batches=self.params['limit_train_batches'], - fast_dev_run=self.params["fast_dev_run"], - callbacks=callbacks_list, - logger=logger, - ) - - tft = TemporalFusionTransformer.from_dataset( - training_dataset, - learning_rate=self.params["learning_rate"], - hidden_size=self.params["hidden_size"], - attention_head_size=self.params["attention_head_size"], - dropout=self.params["dropout"], # between 0.1 and 0.3 are good values - hidden_continuous_size=self.params[ - "hidden_continuous_size" - ], # set to <= hidden_size - output_size=self.params["output_size"], - loss=QuantileLoss(), - reduce_on_plateau_patience=self.params["reduce_on_plateau_patience"], - ) - return trainer, tft - - def prepare_data(self, df): - df[self.params["date_col"]] = pd.to_datetime(df[self.params["date_col"]]) - df = df.sort_values( - [self.params["group_id"], self.params["date_col"]], ascending=True - ) - df["time_idx"] = ( - df[self.params["date_col"]].dt.year * 12 * 31 - + df[self.params["date_col"]].dt.month * 31 - + df[self.params["date_col"]].dt.day - ) - df["time_idx"] -= df["time_idx"].min() - return df - - def fit(self, X, y=None): - _df = self.prepare_data(X) - training_dataset, validation_dataset = self.train_validation_split(_df) - train_dataloader = training_dataset.to_dataloader( - train=True, - batch_size=self.params["batch_size"], - num_workers=self.params["num_workers"], - ) - val_dataloader = validation_dataset.to_dataloader( - train=False, - batch_size=self.params["batch_size"], - num_workers=self.params["num_workers"], - ) - - _trainer, _tft = self.create_model(training_dataset) - self.trainer = _trainer - self.tft = _tft - - self.trainer.fit( - self.tft, - train_dataloader=train_dataloader, - val_dataloaders=val_dataloader, - ) - actuals = torch.cat([y[0] for x, y in iter(val_dataloader)]) - predictions = self.tft.predict(val_dataloader) - self.smape_train = mean_absolute_percentage_error( - actuals.numpy(), predictions.numpy(), symmetric=True - ) - return self - - def fillin_prediction_values(self, df): - keys = df[self.params["group_id"]].unique() - dfs = [] - for key in keys: - k_df = df[df.Store == key] - train_df = k_df[df.Sales.notnull()] - pred_df = k_df[df.Sales.isnull()].sort_values(by=self.params["date_col"]) - pred_df = pred_df[: self.params["prediction_length"]] - for col in self.params["time_varying_unknown_reals"]: - _max = train_df.loc[train_df.Date.idxmax(), col] - pred_df[col] = _max - dfs.append(pd.concat([train_df, pred_df])) - new_df = pd.concat(dfs) - return new_df - - def predict(self, X, y=None): - df = self.prepare_data(X) - prediction_start_date = df[df[self.params["target"]].isnull()][ - self.params["date_col"] - ].min() - df = self.fillin_prediction_values(df) - self.tft.eval() - prediction, idx_df = self.tft.predict(df, mode="prediction", return_index=True) - dfs = [] - for i in range(len(prediction)): - date_idx = pd.date_range( - prediction_start_date, periods=self.params["prediction_length"] - ) - _df = pd.DataFrame( - data=prediction[i], index=date_idx, columns=[self.params["target"]] - ) - for col in idx_df.columns: - if col not in ["time_idx"]: - _df[col] = idx_df[col].iloc[i] - dfs.append(_df) - res_df = pd.concat(dfs) - res_df.reset_index(inplace=True) - res_df.rename(columns={"index": self.params["date_col"]}, inplace=True) - - return res_df diff --git a/mmf_sa/models/pytorch_forecasting/PyTorchForecastingPipeline.py b/mmf_sa/models/pytorch_forecasting/PyTorchForecastingPipeline.py deleted file mode 100644 index 5173dcd..0000000 --- a/mmf_sa/models/pytorch_forecasting/PyTorchForecastingPipeline.py +++ /dev/null @@ -1,85 +0,0 @@ -import pandas as pd -from sklearn.base import BaseEstimator, TransformerMixin -from sklearn.pipeline import Pipeline - -from mmf_sa.models.abstract_model import ForecastingRegressor -from mmf_sa.models.pytorch_forecasting.PyTorchForecastingModel import ( - PyTorchForecastingRegressor, -) - - -class PyTorchForecastingPipelineRegressor(ForecastingRegressor): - def __init__(self, params): - super().__init__(params) - self.params = params - self.pipeline = self.create_model(params) - - def create_model(self, params): - pipeline = Pipeline( - steps=[ - ("date_transformer", DateTransformer(params["date_col"])), - ("convert_to_cat", ConvertToCategorical(params["categorical_columns"])), - ("imputation", FillNaN(params["columns_to_fillna"])), - ("ts_features_generator", FeatureGenerator(params["date_col"])), - ("tft_forecast", PyTorchForecastingRegressor(params)), - ] - ) - return pipeline - - def fit(self, X, y=None): - return self.pipeline.fit(X, y) - - def predict(self, X): - return self.pipeline.predict(X) - - -class DateTransformer(BaseEstimator, TransformerMixin): - def __init__(self, date_col): - self.date_col = date_col - - def fit(self, X, y=None): - return self - - def transform(self, X, y=None): - X["Date"] = pd.to_datetime(X["Date"]) - X["year"] = X.Date.dt.year.astype(str).astype("category") - X["month"] = X.Date.dt.month.astype(str).astype("category") - return X - - -class ConvertToCategorical(BaseEstimator, TransformerMixin): - def __init__(self, columns): - self.columns = columns - - def fit(self, X, y=None): - return self - - def transform(self, X, y=None): - X[self.columns] = X[self.columns].astype(str).astype("category") - return X - - -class FillNaN(BaseEstimator, TransformerMixin): - def __init__(self, columns): - self.columns = columns - - def fit(self, X, y=None): - return self - - def transform(self, X, y=None): - X[self.columns] = X[self.columns].fillna(value=-1) - return X - - -class FeatureGenerator(BaseEstimator, TransformerMixin): - def __init__(self, data_column): - self.data_column = data_column - - def fit(self, X, y=None): - return self - - def transform(self, X, y=None): - X["Week"] = X[self.data_column].dt.week - X["YearDay"] = X[self.data_column].dt.dayofyear - X["MonthDay"] = X[self.data_column].dt.day - return X diff --git a/mmf_sa/models/pytorch_forecasting/__init__.py b/mmf_sa/models/pytorch_forecasting/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/notebooks/demo_foundation_daily.py b/notebooks/demo_foundation_daily.py index 3ec24d5..c4a7219 100644 --- a/notebooks/demo_foundation_daily.py +++ b/notebooks/demo_foundation_daily.py @@ -27,6 +27,7 @@ logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR) logging.getLogger("py4j.clientserver").setLevel(logging.ERROR) from datasetsforecast.m4 import M4 +import uuid # COMMAND ---------- @@ -110,11 +111,13 @@ def transform_group(df): # COMMAND ---------- +run_id = str(uuid.uuid4()) + for model in active_models: dbutils.notebook.run( "run_daily", timeout_seconds=0, - arguments={"catalog": catalog, "db": db, "model": model}) + arguments={"catalog": catalog, "db": db, "model": model, "run_id": run_id}) # COMMAND ---------- @@ -136,15 +139,6 @@ def transform_group(df): # COMMAND ---------- -# MAGIC %md ### Ensemble Output -# MAGIC In the final ensemble output table, we store the averaged forecast. The models which meet the threshold defined using the ensembling parameters are taken into consideration - -# COMMAND ---------- - -# MAGIC %sql select * from solacc_uc.mmf.daily_ensemble_output order by unique_id, model, ds - -# COMMAND ---------- - # MAGIC %md ### Delete Tables # COMMAND ---------- @@ -154,7 +148,3 @@ def transform_group(df): # COMMAND ---------- # MAGIC %sql delete from solacc_uc.mmf.daily_scoring_output - -# COMMAND ---------- - -# MAGIC %sql delete from solacc_uc.mmf.daily_ensemble_output diff --git a/notebooks/demo_foundation_monthly.py b/notebooks/demo_foundation_monthly.py index 852b5d8..af388aa 100644 --- a/notebooks/demo_foundation_monthly.py +++ b/notebooks/demo_foundation_monthly.py @@ -22,12 +22,12 @@ import pathlib import pandas as pd -from mmf_sa import run_forecast import logging logger = spark._jvm.org.apache.log4j logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR) logging.getLogger("py4j.clientserver").setLevel(logging.ERROR) from datasetsforecast.m4 import M4 +import uuid # COMMAND ---------- @@ -120,11 +120,13 @@ def transform_group(df): # COMMAND ---------- +run_id = str(uuid.uuid4()) + for model in active_models: dbutils.notebook.run( "run_monthly", timeout_seconds=0, - arguments={"catalog": catalog, "db": db, "model": model}) + arguments={"catalog": catalog, "db": db, "model": model, "run_id": run_id}) # COMMAND ---------- @@ -146,15 +148,6 @@ def transform_group(df): # COMMAND ---------- -# MAGIC %md ### Ensemble Output -# MAGIC In the final ensemble output table, we store the averaged forecast. The models which meet the threshold defined using the ensembling parameters are taken into consideration - -# COMMAND ---------- - -# MAGIC %sql select * from solacc_uc.mmf.monthly_ensemble_output order by unique_id, model, date - -# COMMAND ---------- - # MAGIC %md ### Delete Tables # COMMAND ---------- @@ -164,7 +157,3 @@ def transform_group(df): # COMMAND ---------- # MAGIC %sql delete from solacc_uc.mmf.monthly_scoring_output - -# COMMAND ---------- - -# MAGIC %sql delete from solacc_uc.mmf.monthly_ensemble_output diff --git a/notebooks/demo_global_daily.py b/notebooks/demo_global_daily.py index e426203..80fcfe9 100644 --- a/notebooks/demo_global_daily.py +++ b/notebooks/demo_global_daily.py @@ -27,6 +27,7 @@ logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR) logging.getLogger("py4j.clientserver").setLevel(logging.ERROR) from datasetsforecast.m4 import M4 +import uuid # COMMAND ---------- @@ -111,11 +112,13 @@ def transform_group(df): # COMMAND ---------- +run_id = str(uuid.uuid4()) + for model in active_models: dbutils.notebook.run( "run_daily", timeout_seconds=0, - arguments={"catalog": catalog, "db": db, "model": model}) + arguments={"catalog": catalog, "db": db, "model": model, "run_id": run_id}) # COMMAND ---------- @@ -137,15 +140,6 @@ def transform_group(df): # COMMAND ---------- -# MAGIC %md ### Ensemble Output -# MAGIC In the final ensemble output table, we store the averaged forecast. The models which meet the threshold defined using the ensembling parameters are taken into consideration - -# COMMAND ---------- - -# MAGIC %sql select * from solacc_uc.mmf.daily_ensemble_output order by unique_id, model, ds - -# COMMAND ---------- - # MAGIC %md ### Delete Tables # COMMAND ---------- @@ -155,7 +149,3 @@ def transform_group(df): # COMMAND ---------- # MAGIC %sql delete from solacc_uc.mmf.daily_scoring_output - -# COMMAND ---------- - -# MAGIC %sql delete from solacc_uc.mmf.daily_ensemble_output diff --git a/notebooks/demo_global_external_regressors_daily.py b/notebooks/demo_global_external_regressors_daily.py index c5c9c28..ddba67b 100644 --- a/notebooks/demo_global_external_regressors_daily.py +++ b/notebooks/demo_global_external_regressors_daily.py @@ -8,6 +8,7 @@ logger = spark._jvm.org.apache.log4j logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR) logging.getLogger("py4j.clientserver").setLevel(logging.ERROR) +import uuid # COMMAND ---------- @@ -85,11 +86,13 @@ # COMMAND ---------- +run_id = str(uuid.uuid4()) + for model in active_models: dbutils.notebook.run( "run_external_regressors_daily", timeout_seconds=0, - arguments={"catalog": catalog, "db": db, "model": model}) + arguments={"catalog": catalog, "db": db, "model": model, "run_id": run_id}) # COMMAND ---------- @@ -101,10 +104,6 @@ # COMMAND ---------- -# MAGIC %sql select * from solacc_uc.mmf.rossmann_daily_ensemble_output order by Store - -# COMMAND ---------- - # MAGIC %md ### Delete Tables # COMMAND ---------- @@ -114,7 +113,3 @@ # COMMAND ---------- # MAGIC %sql delete from solacc_uc.mmf.rossmann_daily_scoring_output - -# COMMAND ---------- - -# MAGIC %sql delete from solacc_uc.mmf.rossmann_daily_ensemble_output diff --git a/notebooks/demo_global_monthly.py b/notebooks/demo_global_monthly.py index beabe0a..7a9ef69 100644 --- a/notebooks/demo_global_monthly.py +++ b/notebooks/demo_global_monthly.py @@ -22,12 +22,12 @@ import pathlib import pandas as pd -from mmf_sa import run_forecast import logging logger = spark._jvm.org.apache.log4j logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR) logging.getLogger("py4j.clientserver").setLevel(logging.ERROR) from datasetsforecast.m4 import M4 +import uuid # COMMAND ---------- @@ -121,11 +121,13 @@ def transform_group(df): # COMMAND ---------- +run_id = str(uuid.uuid4()) + for model in active_models: dbutils.notebook.run( "run_monthly", timeout_seconds=0, - arguments={"catalog": catalog, "db": db, "model": model}) + arguments={"catalog": catalog, "db": db, "model": model, "run_id": run_id}) # COMMAND ---------- @@ -147,15 +149,6 @@ def transform_group(df): # COMMAND ---------- -# MAGIC %md ### Ensemble Output -# MAGIC In the final ensemble output table, we store the averaged forecast. The models which meet the threshold defined using the ensembling parameters are taken into consideration - -# COMMAND ---------- - -# MAGIC %sql select * from solacc_uc.mmf.monthly_ensemble_output order by unique_id, model, date - -# COMMAND ---------- - # MAGIC %md ### Delete Tables # COMMAND ---------- @@ -165,7 +158,3 @@ def transform_group(df): # COMMAND ---------- # MAGIC %sql delete from solacc_uc.mmf.monthly_scoring_output - -# COMMAND ---------- - -# MAGIC %sql delete from solacc_uc.mmf.monthly_ensemble_output diff --git a/notebooks/demo_local_univariate_daily.py b/notebooks/demo_local_univariate_daily.py index 56b3139..3f9cd74 100644 --- a/notebooks/demo_local_univariate_daily.py +++ b/notebooks/demo_local_univariate_daily.py @@ -137,11 +137,6 @@ def transform_group(df): train_predict_ratio=2, data_quality_check=True, resample=False, - ensemble=True, - ensemble_metric="smape", - ensemble_metric_avg=0.3, - ensemble_metric_max=0.5, - ensemble_scoring_output=f"{catalog}.{db}.daily_ensemble_output", active_models=active_models, experiment_path=f"/Shared/mmf_experiment", use_case_name="m4_daily", @@ -167,15 +162,6 @@ def transform_group(df): # COMMAND ---------- -# MAGIC %md ### Ensemble Output -# MAGIC In the final ensemble output table, we store the averaged forecast. The models which meet the threshold defined using the ensembling parameters are taken into consideration - -# COMMAND ---------- - -# MAGIC %sql select * from solacc_uc.mmf.daily_ensemble_output order by unique_id, model, ds - -# COMMAND ---------- - # MAGIC %md ### Delete Tables # COMMAND ---------- @@ -185,7 +171,3 @@ def transform_group(df): # COMMAND ---------- # MAGIC %sql delete from solacc_uc.mmf.daily_scoring_output - -# COMMAND ---------- - -# MAGIC %sql delete from solacc_uc.mmf.daily_ensemble_output diff --git a/notebooks/demo_local_univariate_external_regressors_daily.py b/notebooks/demo_local_univariate_external_regressors_daily.py index 56f794d..eb68dca 100644 --- a/notebooks/demo_local_univariate_external_regressors_daily.py +++ b/notebooks/demo_local_univariate_external_regressors_daily.py @@ -103,11 +103,6 @@ active_models=active_models, data_quality_check=True, resample=False, - ensemble=True, - ensemble_metric="smape", - ensemble_metric_avg=0.3, - ensemble_metric_max=0.5, - ensemble_scoring_output=f"{catalog}.{db}.rossmann_daily_ensemble_output", experiment_path=f"/Shared/mmf_rossmann", use_case_name="rossmann_daily", ) @@ -123,10 +118,6 @@ # COMMAND ---------- -# MAGIC %sql select * from solacc_uc.mmf.rossmann_daily_ensemble_output order by Store - -# COMMAND ---------- - # MAGIC %md ### Delete Tables # COMMAND ---------- @@ -136,7 +127,3 @@ # COMMAND ---------- # MAGIC %sql delete from solacc_uc.mmf.rossmann_daily_scoring_output - -# COMMAND ---------- - -# MAGIC %sql delete from solacc_uc.mmf.rossmann_daily_ensemble_output diff --git a/notebooks/demo_local_univariate_monthly.py b/notebooks/demo_local_univariate_monthly.py index 9a3932a..1705407 100644 --- a/notebooks/demo_local_univariate_monthly.py +++ b/notebooks/demo_local_univariate_monthly.py @@ -147,11 +147,6 @@ def transform_group(df): train_predict_ratio=2, data_quality_check=True, resample=False, - ensemble=True, - ensemble_metric="smape", - ensemble_metric_avg=0.3, - ensemble_metric_max=0.5, - ensemble_scoring_output=f"{catalog}.{db}.monthly_ensemble_output", active_models=active_models, experiment_path=f"/Shared/mmf_experiment_monthly", use_case_name="m4_monthly", @@ -177,15 +172,6 @@ def transform_group(df): # COMMAND ---------- -# MAGIC %md ### Ensemble Output -# MAGIC In the final ensemble output table, we store the averaged forecast. The models which meet the threshold defined using the ensembling parameters are taken into consideration - -# COMMAND ---------- - -# MAGIC %sql select * from solacc_uc.mmf.monthly_ensemble_output order by unique_id, model, date - -# COMMAND ---------- - # MAGIC %md ### Delete Tables # COMMAND ---------- @@ -195,7 +181,3 @@ def transform_group(df): # COMMAND ---------- # MAGIC %sql delete from solacc_uc.mmf.monthly_scoring_output - -# COMMAND ---------- - -# MAGIC %sql delete from solacc_uc.mmf.monthly_ensemble_output diff --git a/notebooks/run_daily.py b/notebooks/run_daily.py index 98dd9bc..314b59a 100644 --- a/notebooks/run_daily.py +++ b/notebooks/run_daily.py @@ -7,10 +7,12 @@ dbutils.widgets.text("catalog", "") dbutils.widgets.text("db", "") dbutils.widgets.text("model", "") +dbutils.widgets.text("run_id", "") catalog = dbutils.widgets.get("catalog") db = dbutils.widgets.get("db") model = dbutils.widgets.get("model") +run_id = dbutils.widgets.get("run_id") # COMMAND ---------- @@ -38,13 +40,9 @@ train_predict_ratio=2, data_quality_check=True, resample=False, - ensemble=True, - ensemble_metric="smape", - ensemble_metric_avg=0.3, - ensemble_metric_max=0.5, - ensemble_scoring_output=f"{catalog}.{db}.daily_ensemble_output", active_models=[model], experiment_path=f"/Shared/mmf_experiment", use_case_name="m4_daily", + run_id=run_id, accelerator="gpu", ) diff --git a/notebooks/run_external_regressors_daily.py b/notebooks/run_external_regressors_daily.py index d3f52e1..844cbeb 100644 --- a/notebooks/run_external_regressors_daily.py +++ b/notebooks/run_external_regressors_daily.py @@ -7,10 +7,12 @@ dbutils.widgets.text("catalog", "") dbutils.widgets.text("db", "") dbutils.widgets.text("model", "") +dbutils.widgets.text("run_id", "") catalog = dbutils.widgets.get("catalog") db = dbutils.widgets.get("db") model = dbutils.widgets.get("model") +run_id = dbutils.widgets.get("run_id") # COMMAND ---------- @@ -40,12 +42,8 @@ active_models=[model], data_quality_check=True, resample=False, - ensemble=True, - ensemble_metric="smape", - ensemble_metric_avg=0.3, - ensemble_metric_max=0.5, - ensemble_scoring_output=f"{catalog}.{db}.rossmann_daily_ensemble_output", experiment_path=f"/Shared/mmf_rossmann", use_case_name="rossmann_daily", + run_id=run_id, accelerator="gpu", ) diff --git a/notebooks/run_monthly.py b/notebooks/run_monthly.py index d8f23d3..73ae9d3 100644 --- a/notebooks/run_monthly.py +++ b/notebooks/run_monthly.py @@ -7,10 +7,12 @@ dbutils.widgets.text("catalog", "") dbutils.widgets.text("db", "") dbutils.widgets.text("model", "") +dbutils.widgets.text("run_id", "") catalog = dbutils.widgets.get("catalog") db = dbutils.widgets.get("db") model = dbutils.widgets.get("model") +run_id = dbutils.widgets.get("run_id") # COMMAND ---------- @@ -38,13 +40,9 @@ train_predict_ratio=2, data_quality_check=True, resample=False, - ensemble=True, - ensemble_metric="smape", - ensemble_metric_avg=0.3, - ensemble_metric_max=0.5, - ensemble_scoring_output=f"{catalog}.{db}.monthly_ensemble_output", active_models=[model], experiment_path=f"/Shared/mmf_experiment_monthly", use_case_name="m4_monthly", + run_id=run_id, accelerator="gpu", ) diff --git a/tests/integration_test.py b/tests/integration_test.py index 6675a37..57099a2 100644 --- a/tests/integration_test.py +++ b/tests/integration_test.py @@ -62,35 +62,10 @@ def create_m4_df(): m4_df.createOrReplaceTempView("mmf_train") active_models = [ - #"StatsForecastBaselineWindowAverage", - #"StatsForecastBaselineSeasonalWindowAverage", - #"StatsForecastBaselineNaive", - #"StatsForecastBaselineSeasonalNaive", - "StatsForecastAutoArima", - #"StatsForecastAutoETS", - #"StatsForecastAutoCES", - #"StatsForecastAutoTheta", - #"StatsForecastTSB", - #"StatsForecastADIDA", - #"StatsForecastIMAPA", - #"StatsForecastCrostonClassic", - #"StatsForecastCrostonOptimized", - #"StatsForecastCrostonSBA", - #"RFableArima", - #"RFableETS", - #"RFableNNETAR", - #"RFableEnsemble", - #"RDynamicHarmonicRegression", - #"SKTimeLgbmDsDt", - #"SKTimeTBats", - #"NeuralForecastRNN", - #"NeuralForecastLSTM", - #"NeuralForecastNBEATSx", - #"NeuralForecastNHITS", - #"NeuralForecastAutoRNN", - #"NeuralForecastAutoLSTM", - #"NeuralForecastAutoNBEATSx", - #"NeuralForecastAutoNHITS", + "StatsForecastBaselineWindowAverage", + "StatsForecastBaselineSeasonalWindowAverage", + "StatsForecastBaselineNaive", + "StatsForecastBaselineSeasonalNaive", ] # COMMAND ---------- @@ -115,11 +90,6 @@ def create_m4_df(): train_predict_ratio=2, data_quality_check=True, resample=False, - ensemble=True, - ensemble_metric="smape", - ensemble_metric_avg=0.3, - ensemble_metric_max=0.5, - ensemble_scoring_output=f"{catalog}.{db}.daily_ensemble_output", active_models=active_models, experiment_path=f"/Shared/mmf_experiment", use_case_name="mmf", @@ -133,9 +103,3 @@ def create_m4_df(): # MAGIC %sql drop table main.mmf.daily_scoring_output -# COMMAND ---------- - -# MAGIC %sql drop table main.mmf.daily_ensemble_output - -# COMMAND ---------- - diff --git a/tests/unit/test_exogenous_regressors_pipeline.py b/tests/unit/test_exogenous_regressors_pipeline.py index 0493241..060253b 100644 --- a/tests/unit/test_exogenous_regressors_pipeline.py +++ b/tests/unit/test_exogenous_regressors_pipeline.py @@ -31,6 +31,9 @@ def test_exogenous_regressors(base_config, m4_df_exogenous): res_df = model.predict(_hist_df, _val_df) print( model.backtest( - _df, start=_df.ds.max() - pd.DateOffset(days=35), stride=10, retrain=True + _df, + start=_df.ds.max() - pd.DateOffset(days=35), + stride=10, + #retrain=True ) ) diff --git a/tests/unit/test_pipelines.py b/tests/unit/test_pipelines.py index 0b37497..fe9c693 100644 --- a/tests/unit/test_pipelines.py +++ b/tests/unit/test_pipelines.py @@ -21,23 +21,17 @@ def test_api_func(temp_dir, spark_session, m4_df, m4_df_exogenous): train_data="train", scoring_data="train", scoring_output="scoring_out", - metrics_output="metrics", + evaluation_output="evaluation_output", group_id="unique_id", date_col="ds", target="y", freq="D", - dynamic_reals=[], dynamic_future=[], dynamic_historical=[], static_features=[], train_predict_ratio=2, active_models=active_models, data_quality_check=True, - ensemble=True, - ensemble_metric="smape", - ensemble_metric_avg=0.3, - ensemble_metric_max=0.5, - ensemble_scoring_output="ensemble_output", experiment_path=f"{str(temp_dir)}/fsa_experiment", use_case_name="fsa", ) @@ -60,23 +54,17 @@ def test_api_func(temp_dir, spark_session, m4_df, m4_df_exogenous): train_data="train", scoring_data="score", scoring_output="scoring_out", - metrics_output="metrics", + evaluation_output="evaluation_output", group_id="unique_id", date_col="ds", target="y", freq="D", - dynamic_reals=["feature1", "feature2"], - dynamic_future=[], + dynamic_future=["feature1", "feature2"], dynamic_historical=[], static_features=[], train_predict_ratio=2, active_models=active_models, data_quality_check=True, - ensemble=False, - ensemble_metric="smape", - ensemble_metric_avg=0.3, - ensemble_metric_max=0.5, - ensemble_scoring_output="ensemble_output", experiment_path=f"{str(temp_dir)}/fsa_experiment", use_case_name="fsa", ) diff --git a/tests/unit/test_sktime_pipeline.py b/tests/unit/test_sktime_pipeline.py index b82dc7e..4893f8e 100644 --- a/tests/unit/test_sktime_pipeline.py +++ b/tests/unit/test_sktime_pipeline.py @@ -28,7 +28,10 @@ def test_sktime_lgbm_ds_dt(base_config, m4_df): res_df = model.predict(_df) print( model.backtest( - _df, start=_df.ds.max() - pd.DateOffset(days=35), stride=10, retrain=True + _df, + start=_df.ds.max() - pd.DateOffset(days=35), + stride=10, + #retrain=True ) ) @@ -41,6 +44,9 @@ def test_sktime_tbats(base_config, m4_df): res_df = model.predict(_df) print( model.backtest( - _df, start=_df.ds.max() - pd.DateOffset(days=35), stride=10, retrain=False + _df, + start=_df.ds.max() - pd.DateOffset(days=35), + stride=10, + #retrain=False ) )