diff --git a/mmf_sa/Forecaster.py b/mmf_sa/Forecaster.py index 079372d..b268f70 100644 --- a/mmf_sa/Forecaster.py +++ b/mmf_sa/Forecaster.py @@ -72,6 +72,11 @@ def __init__( self.run_date = datetime.now() def set_mlflow_experiment(self): + """ + Sets the MLflow experiment using the provided experiment path and returns the experiment id. + Parameters: self (Forecaster): A Forecaster object. + Returns: experiment_id (str): A string specifying the experiment id. + """ mlflow.set_experiment(self.conf["experiment_path"]) experiment_id = ( MlflowClient() @@ -81,6 +86,11 @@ def set_mlflow_experiment(self): return experiment_id def resolve_source(self, key: str) -> DataFrame: + """ + Resolve a data source using the provided key and return Spark DataFrame. + Parameters: self (Forecaster): A Forecaster object. key (str): A string specifying the key. + Returns: DataFrame: A Spark DataFrame object. + """ if self.data_conf: df_val = self.data_conf.get(key) if df_val is not None and isinstance(df_val, pd.DataFrame): @@ -91,6 +101,16 @@ def resolve_source(self, key: str) -> DataFrame: return self.spark.read.table(self.conf[key]) def prepare_data_for_global_model(self, mode: str = None): + """ + Prepares data for a global model by resolving the training data source, performing data quality checks, + optionally merging scoring data, and converting the DataFrame to a pandas DataFrame. + Parameters: + self (Forecaster): A Forecaster object. + mode (str, optional): A string specifying the mode. Default is None. + Returns: + src_df (pd.DataFrame): A pandas DataFrame. + removed (List[str]): A list of strings specifying the removed groups. + """ src_df = self.resolve_source("train_data") src_df, removed = DataQualityChecks(src_df, self.conf, self.spark).run() if (mode == "scoring") \ @@ -103,8 +123,16 @@ def prepare_data_for_global_model(self, mode: str = None): return src_df, removed def split_df_train_val(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: - """Splits df into train and data, based on backtest months and prediction length. - Data before backtest will be train, and data from backtest (at most prediction length days) will be val data.""" + """ + Splits df into train and validation data, based on backtest months and prediction length. + Data before backtest will be "train", and data after the backtest (at most prediction length days) will be "val". + Parameters: + self (Forecaster): A Forecaster object. + df (pd.DataFrame): A pandas DataFrame. + Returns: + train_df (pd.DataFrame): A pandas DataFrame. + val_df (pd.DataFrame): A pandas DataFrame. + """ # Train with data before the backtest months in conf train_df = df[ df[self.conf["date_col"]] @@ -121,6 +149,14 @@ def split_df_train_val(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFra def evaluate_score( self, evaluate: bool = True, score: bool = True) -> str: + """ + Evaluates and scores the models using the provided configuration. + Parameters: + self (Forecaster): A Forecaster object. + evaluate (bool, optional): A boolean specifying whether to evaluate the models. Default is True. + score (bool, optional): A boolean specifying whether to score the models. Default is True. + Returns: run_id (str): A string specifying the run id. + """ print("Starting evaluate_score") if evaluate: self.evaluate_models() @@ -132,6 +168,7 @@ def evaluate_score( def evaluate_models(self): """ Trains and evaluates all models from the active models list. + Parameters: self (Forecaster): A Forecaster object. """ print("Starting evaluate_models") for model_name in self.model_registry.get_active_model_keys(): @@ -153,8 +190,17 @@ def evaluate_models(self): print("Finished evaluate_models") def evaluate_local_model(self, model_conf): + """ + Evaluates a local model using the provided model configuration. It applies the Pandas UDF to the training data. + It then logs the aggregated metrics and a few tags to MLflow. + Parameters: + self (Forecaster): A Forecaster object. + model_conf (dict): A dictionary specifying the model configuration. + """ src_df = self.resolve_source("train_data") src_df, removed = DataQualityChecks(src_df, self.conf, self.spark).run() + + # Specifying the output schema for Pandas UDF output_schema = StructType( [ StructField( @@ -170,7 +216,7 @@ def evaluate_local_model(self, model_conf): ) model = self.model_registry.get_model(model_conf["name"]) - # Use Pandas UDF to forecast + # Use Pandas UDF to forecast individual group evaluate_one_local_model_fn = functools.partial( Forecaster.evaluate_one_local_model, model=model ) @@ -178,6 +224,8 @@ def evaluate_local_model(self, model_conf): src_df.groupby(self.conf["group_id"]) .applyInPandas(evaluate_one_local_model_fn, schema=output_schema) ) + + # Write evaluation result to a delta table if self.conf.get("evaluation_output", None) is not None: ( res_sdf.withColumn(self.conf["group_id"], col(self.conf["group_id"]).cast(StringType())) @@ -189,6 +237,8 @@ def evaluate_local_model(self, model_conf): .write.mode("append") .saveAsTable(self.conf.get("evaluation_output")) ) + + # Compute aggregated metrics res_df = ( res_sdf.groupby(["metric_name"]) .mean("metric_value") @@ -210,6 +260,14 @@ def evaluate_local_model(self, model_conf): def evaluate_one_local_model( pdf: pd.DataFrame, model: ForecastingRegressor ) -> pd.DataFrame: + """ + A static method that evaluates a single local model using the provided pandas DataFrame and model. + If the evaluation for a single group fails, it returns an empty DataFrame without failing the entire process. + Parameters: + pdf (pd.DataFrame): A pandas DataFrame. + model (ForecastingRegressor): A ForecastingRegressor object. + Returns: metrics_df (pd.DataFrame): A pandas DataFrame. + """ pdf[model.params["date_col"]] = pd.to_datetime(pdf[model.params["date_col"]]) pdf.sort_values(by=model.params["date_col"], inplace=True) split_date = pdf[model.params["date_col"]].max() - pd.DateOffset( @@ -228,7 +286,6 @@ def evaluate_one_local_model( exc_info=err, stack_info=True, ) - # raise Exception(f"Error evaluating group {group_id}: {err}") return pd.DataFrame( columns=[ model.params["group_id"], @@ -242,17 +299,25 @@ def evaluate_one_local_model( ) def evaluate_global_model(self, model_conf): + """ + Evaluate a global model using the provided model configuration. Trains and registers the model. + Parameters: + self (Forecaster): A Forecaster object. + model_conf (dict): A dictionary specifying the model configuration. + """ with mlflow.start_run(experiment_id=self.experiment_id) as run: model_name = model_conf["name"] hist_df, removed = self.prepare_data_for_global_model("evaluating") train_df, val_df = self.split_df_train_val(hist_df) - # First, we train the model on the entire history (train_df, val_df) - # and then register this model as our final model in Unity Catalog + # First, we train the model on the entire history (train_df, val_df). + # Then we register this model as our final model in Unity Catalog. final_model = self.model_registry.get_model(model_name) final_model.fit(pd.concat([train_df, val_df])) input_example = train_df[train_df[self.conf['group_id']] == train_df[self.conf['group_id']] \ .unique()[0]].sort_values(by=[self.conf['date_col']]) + + # Prepare model signature for model registry input_schema = infer_signature(model_input=input_example).inputs output_schema = Schema( [ @@ -263,6 +328,8 @@ def evaluate_global_model(self, model_conf): ] ) signature = ModelSignature(inputs=input_schema, outputs=output_schema) + + # Register the model model_info = mlflow.sklearn.log_model( final_model, "model", @@ -286,7 +353,6 @@ def evaluate_global_model(self, model_conf): model_uri=model_info.model_uri, # This model_uri is from the final model write=True, ) - print(f"Finished training {model_conf.get('name')}") def backtest_global_model( self, @@ -296,12 +362,24 @@ def backtest_global_model( model_uri: str, write: bool = True, ): + """ + Performs detailed backtesting of a global model using the provided model, training DataFrame, + validation DataFrame, model URI, and write parameter. + Parameters: + self (Forecaster): A Forecaster object. + model (ForecastingRegressor): A ForecastingRegressor object. + train_df (pd.DataFrame): A pandas DataFrame. + val_df (pd.DataFrame): A pandas DataFrame. + model_uri (str): A string specifying the model URI. + write (bool, optional): A boolean specifying whether to write the results to a table. Default is True. + Returns: metric_value (float): A float specifying the mean metric value. + """ res_pdf = ( model.backtest( pd.concat([train_df, val_df]), start=train_df[self.conf["date_col"]].max(), spark=self.spark, - #backtest_retrain=self.conf["backtest_retrain"], + # backtest_retrain=self.conf["backtest_retrain"], )) group_id_dtype = IntegerType() \ @@ -320,6 +398,7 @@ def backtest_global_model( ) res_sdf = self.spark.createDataFrame(res_pdf, schema) + # Write evaluation results to a delta table if write: if self.conf.get("evaluation_output", None): ( @@ -333,6 +412,7 @@ def backtest_global_model( .saveAsTable(self.conf.get("evaluation_output")) ) + # Compute aggregated metrics res_df = ( res_sdf.groupby(["metric_name"]) .mean("metric_value") @@ -343,6 +423,7 @@ def backtest_global_model( metric_name = None metric_value = None + # Log metrics to MLFlow for rec in res_df.values: metric_name, metric_value = rec if write: @@ -351,6 +432,12 @@ def backtest_global_model( return metric_value def evaluate_foundation_model(self, model_conf): + """ + Evaluates a foundation model using the provided model configuration. Registers the model. + Parameters: + self (Forecaster): A Forecaster object. + model_conf (dict): A dictionary specifying the model configuration. + """ with mlflow.start_run(experiment_id=self.experiment_id) as run: model_name = model_conf["name"] model = self.model_registry.get_model(model_name) @@ -373,9 +460,12 @@ def evaluate_foundation_model(self, model_conf): mlflow.set_tag("model_name", model.params["name"]) mlflow.set_tag("run_id", self.run_id) mlflow.log_params(model.get_params()) - print(f"Finished evaluating {model_conf.get('name')}") def score_models(self): + """ + Scores the models using the provided model configuration. + Parameters: self (Forecaster): A Forecaster object. + """ print("Starting run_scoring") for model_name in self.model_registry.get_active_model_keys(): model_conf = self.model_registry.get_model_conf(model_name) @@ -390,16 +480,24 @@ def score_models(self): print("Finished run_scoring") def score_local_model(self, model_conf): + """ + Scores a local model using the provided model configuration and writes the results to a delta table. + Parameters: + self (Forecaster): A Forecaster object. + model_conf (dict): A dictionary specifying the model configuration. + """ src_df = self.resolve_source("train_data") src_df, removed = DataQualityChecks(src_df, self.conf, self.spark).run() - # Check if external regressors are provided and framework is statsforecast - # External regressors are supported only with statsforecast and neuralforecast models + # Check if external regressors are provided and if the framework is statsforecast. + # External regressors are supported only with statsforecast models. if (self.conf["scoring_data"])\ and (self.conf["train_data"] != self.conf["scoring_data"])\ and (model_conf["framework"] == "StatsForecast"): score_df = self.resolve_source("scoring_data") score_df = score_df.where(~col(self.conf["group_id"]).isin(removed)) src_df = src_df.unionByName(score_df, allowMissingColumns=True) + + # Specify output schema for Pandas UDF output_schema = StructType( [ StructField( @@ -411,11 +509,15 @@ def score_local_model(self, model_conf): ] ) model = self.model_registry.get_model(model_conf["name"]) + + # Use Pandas UDF to distribute scoring score_one_local_model_fn = functools.partial(Forecaster.score_one_local_model, model=model) res_sdf = ( src_df.groupby(self.conf["group_id"]) .applyInPandas(score_one_local_model_fn, schema=output_schema) ) + + # Write the results to a delta table ( res_sdf.withColumn(self.conf["group_id"], col(self.conf["group_id"]).cast(StringType())) .withColumn("run_id", lit(self.run_id)) @@ -431,6 +533,15 @@ def score_local_model(self, model_conf): def score_one_local_model( pdf: pd.DataFrame, model: ForecastingRegressor ) -> pd.DataFrame: + """ + A static method that scores a single local model using the provided pandas DataFrame and model. + If the scoring for one time series fails, it returns an empty DataFrame instead of failing the + entire process. + Parameters: + pdf (pd.DataFrame): A pandas DataFrame. + model (ForecastingRegressor): A ForecastingRegressor object. + Returns: res_df (pd.DataFrame): A pandas DataFrame. + """ pdf[model.params["date_col"]] = pd.to_datetime(pdf[model.params["date_col"]]) pdf.sort_values(by=model.params["date_col"], inplace=True) group_id = pdf[model.params["group_id"]].iloc[0] @@ -482,6 +593,12 @@ def score_global_model(self, model_conf): ) def score_foundation_model(self, model_conf): + """ + Scores a global model using the provided model configuration. Writes the results to a delta table. + Parameters: + self (Forecaster): A Forecaster object. + model_conf (dict): A dictionary specifying the model configuration. + """ print(f"Running scoring for {model_conf['name']}...") model_name = model_conf["name"] _, model_uri = self.get_model_for_scoring(model_conf) @@ -502,6 +619,13 @@ def score_foundation_model(self, model_conf): ) def get_model_for_scoring(self, model_conf): + """ + Gets a model for scoring using the provided model configuration. + Parameters: + self (Forecaster): A Forecaster object. + model_conf (dict): A dictionary specifying the model configuration. + Returns: model (Model): A model object. model_uri (str): A string specifying the model URI. + """ client = MlflowClient() registered_name = f"{self.conf['model_output']}.{model_conf['name']}_{self.conf['use_case_name']}" model_info = self.get_latest_model_info(client, registered_name) @@ -517,10 +641,17 @@ def get_model_for_scoring(self, model_conf): @staticmethod def get_latest_model_info(client, registered_name): + """ + Gets the latest model info using the provided MLflow client and registered name. + Parameters: + client (MlflowClient): An MLflowClient object. + registered_name (str): A string specifying the registered name. + Returns: model_info (ModelVersion): A ModelVersion object. + """ latest_version = 1 for mv in client.search_model_versions(f"name='{registered_name}'"): version_int = int(mv.version) if version_int > latest_version: latest_version = version_int - return client.get_model_version(registered_name, latest_version) + return client.get_model_version(registered_name, str(latest_version)) diff --git a/mmf_sa/__init__.py b/mmf_sa/__init__.py index e7c1a2d..fec9b94 100644 --- a/mmf_sa/__init__.py +++ b/mmf_sa/__init__.py @@ -32,14 +32,53 @@ def run_forecast( dynamic_future: List[str] = None, dynamic_historical: List[str] = None, active_models: List[str] = None, - accelerator: str = None, + accelerator: str = "cpu", backtest_retrain: bool = None, train_predict_ratio: int = None, data_quality_check: bool = None, experiment_path: str = None, - conf: Union[str, Dict[str, Any], OmegaConf] = None, run_id: str = None, + conf: Union[str, Dict[str, Any], OmegaConf] = None, ) -> str: + + """ + The function creates a Forecaster object with the provided configuration. + And then calls the evaluate_score method to perform the evaluation and forecasting. + The function returns the run id. + + Parameters: + spark (SparkSession): A SparkSession object. + train_data (Union[str, pd.DataFrame, DataFrame]): Training data as a string of delta table name, pandas DataFrame, or Spark DataFrame. + group_id (str): A string specifying the column name that groups the training data into individual time series. + date_col (str): A string specifying the column name that stores the date variable. + target (str): A string specifying the column name of the target variable. + freq (str): A string specifying the frequency. Currently, "D" for daily and "M" for monthly are supported. + prediction_length (int): An integer specifying the prediction length: i.e. forecasting horizon. + backtest_months (int): An integer specifying the number of backtest months. + stride (int): An integer specifying the stride length. + metric (str): A string specifying the metric to use for evaluation. Default is smape. + resample (bool): A boolean specifying whether to back-fill skipped entries with 0. Default is False. + scoring_data (Union[str, pd.DataFrame, DataFrame]): Scoring data as a string of delta table name, pandas DataFrame, or Spark DataFrame. + scoring_output (str): A string specifying the output table name for scoring. + evaluation_output (str): A string specifying the output table name for evaluation. + model_output (str): A string specifying the output path for the model. + use_case_name (str): A string specifying the use case name. + static_features (List[str]): A list of strings specifying the static features. + dynamic_future (List[str]): A list of strings specifying the dynamic future features. + dynamic_historical (List[str]): A list of strings specifying the dynamic historical features. + active_models (List[str]): A list of strings specifying the active models. + accelerator (str): A string specifying the accelerator to use: cpu or gpu. Default is cpu. + backtest_retrain (bool): A boolean specifying whether to retrain the model during backtesting. Currently, not supported. + train_predict_ratio (int): An integer specifying the train predict ratio. + data_quality_check (bool): A boolean specifying whether to check the data quality. + experiment_path (str): A string specifying the experiment path. + run_id (str): A string specifying the run id. If not provided a random string is generated and assigned to each run. + conf (Union[str, Dict[str, Any], OmegaConf]): A configuration object. + + Returns: + Dict[str, Union[int, str]]: A dictionary with an integer and a string as values. + """ + if isinstance(conf, dict): _conf = OmegaConf.create(conf) elif isinstance(conf, str): @@ -69,7 +108,6 @@ def run_forecast( _conf["stride"] = stride _conf["metric"] = metric _conf["resample"] = resample - run_evaluation = True run_scoring = False if scoring_data is not None and scoring_output is not None: @@ -112,7 +150,9 @@ def run_forecast( spark=spark, run_id=run_id, ) + run_id = f.evaluate_score(evaluate=run_evaluation, score=run_scoring) + return run_id diff --git a/mmf_sa/data_quality_checks.py b/mmf_sa/data_quality_checks.py index af2f77f..1c73b56 100644 --- a/mmf_sa/data_quality_checks.py +++ b/mmf_sa/data_quality_checks.py @@ -1,15 +1,9 @@ import functools -from abc import ABC, abstractmethod -from argparse import ArgumentParser -from logging import Logger from typing import Dict, Any, Union -import yaml -import pathlib -from omegaconf import OmegaConf, DictConfig +from omegaconf import DictConfig import pandas as pd import pyspark from pyspark.sql import SparkSession -import sys class DataQualityChecks: @@ -28,8 +22,9 @@ def __init__( def _backtest_length_check(self): """ - Checks if the backtest interval contains at least one prediction length. - Mandatory check irrespective of data_quality_check being True or False. + Checks if backtest_months contains at least one prediction_length. + Mandatory check regardless of data_quality_check set to True or False. + Parameters: self (Forecaster): A Forecaster object. """ backtest_days = self.conf["backtest_months"] * 30 prediction_length_days = ( @@ -42,13 +37,14 @@ def _backtest_length_check(self): def _external_regressors_check(self): """ - Checks if the resampling is turned off when an external regressor is given. - Mandatory check irrespective of data_quality_check being True or False. + Checks if the resampling is turned off when an exogenous regressor is given. + Mandatory check irrespective of data_quality_check set to True or False. + Parameters: self (Forecaster): A Forecaster object. """ if ( - self.conf.get("static_categoricals", None) - or self.conf.get("dynamic_categoricals", None) - or self.conf.get("dynamic_reals", None) + self.conf.get("static_features", None) + or self.conf.get("dynamic_future", None) + or self.conf.get("dynamic_historical", None) ): if self.conf.get("resample"): raise Exception( @@ -60,16 +56,21 @@ def _multiple_checks( _df: pd.DataFrame, conf: Dict[str, Any], max_date: pd.Timestamp ) -> pd.DataFrame: """ - Run 3 checks on the subset dataset grouped by group_id. - Optional checks only when data_quality_check is True. - 1. Check if any of external regressor provided contains null. If it does, this time series is removed. - 2. Check if the training period is longer than the requirement (train_predict_ratio). - 3. Check for missing entries. If the time series has a missing entry and the resampling is disabled, - this time series is removed. If the time series has too many missing entries (more than 0.2 of the + Runs 4 checks on the subset dataset grouped by group_id. + These optional checks run only when data_quality_check is True. + 1. Checks if any of external regressor provided contains null. If it does, this time series is removed. + 2. Checks if the training period is longer than the requirement (train_predict_ratio). + 3. Checks for missing entries. If the time series has a missing entry and the resampling is disabled, + it is removed. If the time series has too many missing entries (more than 0.2 of the entire duration), it is removed even when resampling is enabled. 4. If the time series has too many negative entries (more than 0.2 of the entire duration), it is removed. - :return: - pandas DataFrame with time series not meeting the requirement removed. + + Parameters: + _df (pd.DataFrame): A pandas DataFrame. + conf (Dict[str, Any]): A dictionary specifying the configuration. + max_date (pd.Timestamp, optional): A pandas Timestamp object. + + Returns: _df (pd.DataFrame): A pandas DataFrame after time series not meeting the requirement removed. """ group_id = _df[conf["group_id"]].iloc[0] @@ -135,8 +136,11 @@ def _multiple_checks( def run(self) -> tuple[Union[pd.DataFrame, pyspark.sql.DataFrame], list]: """ - Main method of the job. - :return: + Runs the main method of the job. + Parameters: self (Forecaster): A Forecaster object. + Returns: + clean_df (Union[pd.DataFrame, pyspark.sql.DataFrame]): A pandas DataFrame or a PySpark DataFrame. + removed (list): A list of group ids that are removed. """ print(f"Running data quality checks...") self.df[self.conf["date_col"]] = pd.to_datetime(self.df[self.conf["date_col"]]) diff --git a/mmf_sa/jobs/forecasting_job.py b/mmf_sa/jobs/forecasting_job.py index 0c108eb..9afd1a8 100644 --- a/mmf_sa/jobs/forecasting_job.py +++ b/mmf_sa/jobs/forecasting_job.py @@ -1,6 +1,5 @@ import mlflow from mlflow.tracking import MlflowClient - from mmf_sa.Forecaster import Forecaster from mmf_sa.common import Job @@ -17,7 +16,7 @@ def launch(self): ) forecaster = Forecaster(self.conf, self.spark, experiment_id) - forecaster.train_eval_score(export_metrics=False, scoring=False) + forecaster.evaluate_score() self.logger.info("Forecasting Job finished!") diff --git a/mmf_sa/jobs/retraining_evaluation_job.py b/mmf_sa/jobs/retraining_evaluation_job.py deleted file mode 100644 index 18bab51..0000000 --- a/mmf_sa/jobs/retraining_evaluation_job.py +++ /dev/null @@ -1,28 +0,0 @@ -import mlflow -from mlflow.tracking import MlflowClient - -from mmf_sa.Forecaster import Forecaster -from mmf_sa.common import Job - - -class RetrainingEvaluationJob(Job): - def launch(self): - self.logger.info("Launching Retraining/Evaluation Job") - - mlflow.set_experiment(self.conf["experiment_path"]) - experiment_id = ( - MlflowClient() - .get_experiment_by_name(self.conf["experiment_path"]) - .experiment_id - ) - - forecaster = Forecaster(self.conf, self.spark, experiment_id) - forecaster.train_evaluate_models() # Train and evaluate models - # promote best model to production - - self.logger.info("Retraining/Evaluation Job finished!") - - -if __name__ == "__main__": - job = RetrainingEvaluationJob() - job.launch() diff --git a/mmf_sa/models/abstract_model.py b/mmf_sa/models/abstract_model.py index af03175..6b14153 100644 --- a/mmf_sa/models/abstract_model.py +++ b/mmf_sa/models/abstract_model.py @@ -32,12 +32,10 @@ def fit(self, x, y=None): @abstractmethod def predict(self, x, y=None): - # TODO Shouldn't X be optional if we have a trainable model and provide a prediction length pass @abstractmethod def forecast(self, x, spark=None): - # TODO Shouldn't X be optional if we have a trainable model and provide a prediction length pass def backtest( @@ -46,9 +44,20 @@ def backtest( start: pd.Timestamp, group_id: Union[str, int] = None, stride: int = None, - backtest_retrain: bool = False, + # backtest_retrain: bool = False, spark=None, ) -> pd.DataFrame: + """ + Performs backtesting using the provided pandas DataFrame, start timestamp, group id, stride and SparkSession. + Parameters: + self (Forecaster): A Forecaster object. + df (pd.DataFrame): A pandas DataFrame. + start (pd.Timestamp): A pandas Timestamp object. + group_id (Union[str, int], optional): A string or an integer specifying the group id. Default is None. + stride (int, optional): An integer specifying the stride. Default is None. + spark (SparkSession, optional): A SparkSession object. Default is None. + Returns: res_df (pd.DataFrame): A pandas DataFrame. + """ if stride is None: stride = int(self.params.get("stride", 7)) stride_offset = ( @@ -59,12 +68,12 @@ def backtest( df = df.copy().sort_values(by=[self.params["date_col"]]) end_date = df[self.params["date_col"]].max() curr_date = start + self.one_ts_offset - #print("end_date = ", end_date) + # print("end_date = ", end_date) results = [] while curr_date + self.prediction_length_offset <= end_date + self.one_ts_offset: - #print("start_date = ", curr_date) + # print("start_date = ", curr_date) _df = df[df[self.params["date_col"]] < np.datetime64(curr_date)] actuals_df = df[ (df[self.params["date_col"]] >= np.datetime64(curr_date)) @@ -74,8 +83,8 @@ def backtest( )] # backtest_retrain for global models is currently not supported - if backtest_retrain and self.params["model_type"] == "global": - self.fit(_df) + # if backtest_retrain and self.params["model_type"] == "global": + # self.fit(_df) metrics = self.calculate_metrics(_df, actuals_df, curr_date, spark) @@ -113,6 +122,16 @@ def backtest( def calculate_metrics( self, hist_df: pd.DataFrame, val_df: pd.DataFrame, curr_date, spark=None ) -> Dict[str, Union[str, float, bytes]]: + """ + Calculates the metrics using the provided historical DataFrame, validation DataFrame, current date, and SparkSession. + Parameters: + self (Forecaster): A Forecaster object. + hist_df (pd.DataFrame): A pandas DataFrame. + val_df (pd.DataFrame): A pandas DataFrame. + curr_date: A pandas Timestamp object. + spark (SparkSession, optional): A SparkSession object. Default is None. + Returns: metrics (Dict[str, Union[str, float, bytes]]): A dictionary specifying the metrics. + """ pred_df, model_fitted = self.predict(hist_df, val_df) smape = mean_absolute_percentage_error( val_df[self.params["target"]], diff --git a/mmf_sa/models/chronosforecast/ChronosPipeline.py b/mmf_sa/models/chronosforecast/ChronosPipeline.py index 4c55f72..2777db1 100644 --- a/mmf_sa/models/chronosforecast/ChronosPipeline.py +++ b/mmf_sa/models/chronosforecast/ChronosPipeline.py @@ -100,7 +100,7 @@ def predict(self, ) # Todo - #forecast_df[self.params.target] = forecast_df[self.params.target].clip(0.01) + # forecast_df[self.params.target] = forecast_df[self.params.target].clip(0.01) return forecast_df, self.model def forecast(self, df: pd.DataFrame, spark=None): diff --git a/mmf_sa/models/models_conf.yaml b/mmf_sa/models/models_conf.yaml index 3b4be45..33f48a1 100644 --- a/mmf_sa/models/models_conf.yaml +++ b/mmf_sa/models/models_conf.yaml @@ -52,9 +52,8 @@ models: framework: StatsForecast model_type: local model_spec: - season_length: - approximation: - level: + season_length: 7 + approximation: false StatsForecastAutoETS: module: mmf_sa.models.statsforecast.StatsFcForecastingPipeline @@ -62,8 +61,8 @@ models: framework: StatsForecast model_type: local model_spec: - season_length: - model_type: "ZZZ" + season_length: 7 + model: "ZZZ" StatsForecastAutoCES: module: mmf_sa.models.statsforecast.StatsFcForecastingPipeline @@ -71,8 +70,8 @@ models: framework: StatsForecast model_type: local model_spec: - season_length: 1 - model_type: "Z" + season_length: 7 + model: "Z" StatsForecastAutoTheta: module: mmf_sa.models.statsforecast.StatsFcForecastingPipeline @@ -80,7 +79,7 @@ models: framework: StatsForecast model_type: local model_spec: - season_length: 1 + season_length: 7 decomposition_type: "multiplicative" StatsForecastTSB: @@ -133,7 +132,7 @@ models: framework: RFable model_type: local model_spec: - season_length: + season_length: 7 RFableETS: module: mmf_sa.models.r_fable.RFableForecastingPipeline @@ -141,7 +140,7 @@ models: framework: RFable model_type: local model_spec: - season_length: + season_length: 7 RFableNNETAR: module: mmf_sa.models.r_fable.RFableForecastingPipeline @@ -149,7 +148,7 @@ models: framework: RFable model_type: local model_spec: - season_length: + season_length: 7 RFableEnsemble: module: mmf_sa.models.r_fable.RFableForecastingPipeline diff --git a/mmf_sa/models/moiraiforecast/MoiraiPipeline.py b/mmf_sa/models/moiraiforecast/MoiraiPipeline.py index b770427..54b20b4 100644 --- a/mmf_sa/models/moiraiforecast/MoiraiPipeline.py +++ b/mmf_sa/models/moiraiforecast/MoiraiPipeline.py @@ -101,7 +101,7 @@ def predict(self, ) # Todo - #forecast_df[self.params.target] = forecast_df[self.params.target].clip(0.01) + # forecast_df[self.params.target] = forecast_df[self.params.target].clip(0.01) return forecast_df, self.model def forecast(self, df: pd.DataFrame, spark=None): diff --git a/mmf_sa/models/momentforecast/MomentPipeline.py b/mmf_sa/models/momentforecast/MomentPipeline.py index b421089..1768a4b 100644 --- a/mmf_sa/models/momentforecast/MomentPipeline.py +++ b/mmf_sa/models/momentforecast/MomentPipeline.py @@ -99,7 +99,7 @@ def predict(self, ) # Todo - #forecast_df[self.params.target] = forecast_df[self.params.target].clip(0.01) + # forecast_df[self.params.target] = forecast_df[self.params.target].clip(0.01) return forecast_df, self.model def forecast(self, df: pd.DataFrame, spark=None): diff --git a/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py b/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py index 19c1e9b..38c061a 100644 --- a/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py +++ b/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py @@ -126,7 +126,7 @@ def predict(self, hist_df: pd.DataFrame, val_df: pd.DataFrame = None): return forecast_df, self.model - def forecast(self, df: pd.DataFrame): + def forecast(self, df: pd.DataFrame, spark=None): _df = df[df[self.params.target].notnull()] _df = self.prepare_data(_df) _last_date = _df["ds"].max() diff --git a/mmf_sa/models/r_fable/RFableForecastingPipeline.py b/mmf_sa/models/r_fable/RFableForecastingPipeline.py index 13dc7a9..23194f1 100644 --- a/mmf_sa/models/r_fable/RFableForecastingPipeline.py +++ b/mmf_sa/models/r_fable/RFableForecastingPipeline.py @@ -11,8 +11,6 @@ from mmf_sa.models.abstract_model import ForecastingRegressor -# make sure R and the fable package are installed on your system -# TODO: Above comment should be moved to README at some point base = importr("base") tsibble = importr("tsibble") fabletools = importr("fabletools") @@ -55,7 +53,6 @@ def prepare_training_data(self, df: pd.DataFrame) -> pd.DataFrame: return rts def prepare_forecast_data(self, df: pd.DataFrame) -> pd.DataFrame: - # TODO Combine the prepare_data method to one single method df_rfable = df[ [self.params.group_id, self.params.date_col] + [xreg for xreg in self.params.model_spec.xreg] @@ -99,7 +96,7 @@ def predict(self, hist_df: pd.DataFrame, val_df: pd.DataFrame = None): ) return forecast_df, self.model - def forecast(self, df: pd.DataFrame): + def forecast(self, df: pd.DataFrame, spark=None): return self.predict(df) def _get_model_definition(self): diff --git a/mmf_sa/models/sktime/SKTimeForecastingPipeline.py b/mmf_sa/models/sktime/SKTimeForecastingPipeline.py index e2d1e8c..11b2bc0 100644 --- a/mmf_sa/models/sktime/SKTimeForecastingPipeline.py +++ b/mmf_sa/models/sktime/SKTimeForecastingPipeline.py @@ -78,7 +78,7 @@ def predict(self, hist_df: pd.DataFrame, val_df: pd.DataFrame = None): forecast_df[self.params.target] = forecast_df[self.params.target].clip(0.01) return forecast_df, self.model - def forecast(self, x): + def forecast(self, x, spark=None): return self.predict(x) @@ -92,14 +92,14 @@ def create_model(self) -> BaseForecaster: ( "deseasonalise", ConditionalDeseasonalizer( - model=self.model_spec.get("deseasonalise_model", "additive"), - sp=int(self.model_spec.get("season_length", 1)) + model=self.model_spec.get("deseasonalise_model"), + sp=int(self.model_spec.get("season_length")) ) ), ( "detrend", Detrender( - forecaster=PolynomialTrendForecaster(degree=int(self.model_spec.get("detrend_poly_degree", 1))) + forecaster=PolynomialTrendForecaster(degree=int(self.model_spec.get("detrend_poly_degree"))) ) ), ( @@ -134,9 +134,9 @@ def __init__(self, params): def create_model(self) -> BaseForecaster: model = TBATS( - sp=int(self.model_spec.get("season_length", 7)), - use_trend=self.model_spec.get("use_trend", True), - use_box_cox=self.model_spec.get("box_cox", True), + sp=int(self.model_spec.get("season_length")), + use_trend=self.model_spec.get("use_trend"), + use_box_cox=self.model_spec.get("box_cox"), n_jobs=-1, ) return model diff --git a/mmf_sa/models/statsforecast/StatsFcForecastingPipeline.py b/mmf_sa/models/statsforecast/StatsFcForecastingPipeline.py index d9060f4..665ade0 100644 --- a/mmf_sa/models/statsforecast/StatsFcForecastingPipeline.py +++ b/mmf_sa/models/statsforecast/StatsFcForecastingPipeline.py @@ -100,7 +100,7 @@ def predict(self, hist_df: pd.DataFrame, val_df: pd.DataFrame = None): forecast_df[self.params.target] = forecast_df[self.params.target].clip(0.01) return forecast_df, self.model - def forecast(self, df: pd.DataFrame): + def forecast(self, df: pd.DataFrame, spark=None): _df = df[df[self.params.target].notnull()] _df = self.prepare_data(_df) self.fit(_df) @@ -138,131 +138,108 @@ def forecast(self, df: pd.DataFrame): return forecast_df, self.model -class StatsFcAutoArima(StatsFcForecaster): +class StatsFcBaselineWindowAverage(StatsFcForecaster): def __init__(self, params): super().__init__(params) - self.model_spec = AutoARIMA( - season_length=self.params.model_spec.season_length - if self.params.model_spec.season_length - else 1, - approximation=self.params.model_spec.approximation - if self.params.model_spec.approximation - else False, + self.model_spec = WindowAverage( + window_size=self.params.model_spec.window_size, ) -class StatsFcAutoETS(StatsFcForecaster): +class StatsFcBaselineSeasonalWindowAverage(StatsFcForecaster): def __init__(self, params): super().__init__(params) - self.model_spec = AutoETS( - season_length=self.params.model_spec.season_length - if self.params.model_spec.season_length - else 1, - model=self.params.model_spec.model - if self.params.model_spec.model - else "ZNZ", + self.model_spec = SeasonalWindowAverage( + season_length=self.params.model_spec.season_length, + window_size=self.params.model_spec.window_size, ) -class StatsFcAutoCES(StatsFcForecaster): +class StatsFcBaselineNaive(StatsFcForecaster): def __init__(self, params): super().__init__(params) - self.model_spec = AutoCES( - season_length=self.params.model_spec.season_length - if self.params.model_spec.season_length - else 1, - model=self.params.model_spec.model - if self.params.model_spec.model - else "Z", - ) + self.model_spec = Naive() -class StatsFcAutoTheta(StatsFcForecaster): +class StatsFcBaselineSeasonalNaive(StatsFcForecaster): def __init__(self, params): super().__init__(params) - self.model_spec = AutoTheta( - season_length=self.params.model_spec.season_length - if self.params.model_spec.season_length - else 1, - decomposition_type=self.params.model_spec.decomposition_type - if self.params.model_spec.decomposition_type - else "multiplicative", + self.model_spec = SeasonalNaive( + season_length=self.params.model_spec.season_length, ) -class StatsFcADIDA(StatsFcForecaster): +class StatsFcAutoArima(StatsFcForecaster): def __init__(self, params): super().__init__(params) - self.model_spec = ADIDA() + self.model_spec = AutoARIMA( + season_length=self.params.model_spec.season_length, + approximation=self.params.model_spec.approximation, + ) -class StatsFcIMAPA(StatsFcForecaster): +class StatsFcAutoETS(StatsFcForecaster): def __init__(self, params): super().__init__(params) - self.model_spec = IMAPA() + self.model_spec = AutoETS( + season_length=self.params.model_spec.season_length, + model=self.params.model_spec.model, + ) -class StatsFcTSB(StatsFcForecaster): +class StatsFcAutoCES(StatsFcForecaster): def __init__(self, params): super().__init__(params) - self.model_spec = TSB( - alpha_d=float(params.get("alpha_d", 0.2)), - alpha_p=float(params.get("alpha_p", 0.2)), + self.model_spec = AutoCES( + season_length=self.params.model_spec.season_length, + model=self.params.model_spec.model, ) -class StatsFcCrostonClassic(StatsFcForecaster): +class StatsFcAutoTheta(StatsFcForecaster): def __init__(self, params): super().__init__(params) - self.model_spec = CrostonClassic() + self.model_spec = AutoTheta( + season_length=self.params.model_spec.season_length, + decomposition_type=self.params.model_spec.decomposition_type, + ) -class StatsFcCrostonOptimized(StatsFcForecaster): +class StatsFcTSB(StatsFcForecaster): def __init__(self, params): super().__init__(params) - self.model_spec = CrostonOptimized() + self.model_spec = TSB( + alpha_d=self.params.model_spec.alpha_d, + alpha_p=self.params.model_spec.alpha_p, + ) -class StatsFcCrostonSBA(StatsFcForecaster): +class StatsFcADIDA(StatsFcForecaster): def __init__(self, params): super().__init__(params) - self.model_spec = CrostonSBA() + self.model_spec = ADIDA() -class StatsFcBaselineWindowAverage(StatsFcForecaster): +class StatsFcIMAPA(StatsFcForecaster): def __init__(self, params): super().__init__(params) - self.model_spec = WindowAverage( - window_size=self.params.model_spec.window_size - if self.params.model_spec.window_size - else 7, - ) + self.model_spec = IMAPA() -class StatsFcBaselineSeasonalWindowAverage(StatsFcForecaster): +class StatsFcCrostonClassic(StatsFcForecaster): def __init__(self, params): super().__init__(params) - self.model_spec = SeasonalWindowAverage( - season_length=self.params.model_spec.season_length - if self.params.model_spec.season_length - else 7, - window_size=self.params.model_spec.window_size - if self.params.model_spec.window_size - else 7, - ) + self.model_spec = CrostonClassic() -class StatsFcBaselineNaive(StatsFcForecaster): +class StatsFcCrostonOptimized(StatsFcForecaster): def __init__(self, params): super().__init__(params) - self.model_spec = Naive() + self.model_spec = CrostonOptimized() -class StatsFcBaselineSeasonalNaive(StatsFcForecaster): +class StatsFcCrostonSBA(StatsFcForecaster): def __init__(self, params): super().__init__(params) - self.model_spec = SeasonalNaive( - season_length=self.params.model_spec.season_length - if self.params.model_spec.season_length - else 7, - ) + self.model_spec = CrostonSBA() + diff --git a/mmf_sa/models/timesfmforecast/TimesFMPipeline.py b/mmf_sa/models/timesfmforecast/TimesFMPipeline.py index 4b79277..6f07b1c 100644 --- a/mmf_sa/models/timesfmforecast/TimesFMPipeline.py +++ b/mmf_sa/models/timesfmforecast/TimesFMPipeline.py @@ -54,7 +54,7 @@ def predict(self, } ) # Todo - #forecast_df[self.params.target] = forecast_df[self.params.target].clip(0.01) + # forecast_df[self.params.target] = forecast_df[self.params.target].clip(0.01) return forecast_df, self.model def forecast(self, df: pd.DataFrame, spark=None):