diff --git a/README.md b/README.md index 092fbc6..381bb0e 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ Get started now! - Jan 2025: [TimesFM](https://github.com/google-research/timesfm) is available for univariate and covariate forecasting. Try the notebooks: [univariate](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/foundation_daily.py) and [covariate](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/foundation_external_regressors_daily.py). - Jan 2025: [Chronos Bolt](https://github.com/amazon-science/chronos-forecasting) models are available for univariate forecasting. Try the [notebook](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/foundation_daily.py). - Jan 2025: [Moirai MoE](https://github.com/SalesforceAIResearch/uni2ts) models are available for univariate forecasting. Try the [notebook](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/foundation_daily.py). +- Jan 2025: Added support weekly (`freq="W"`) time series . See the notebooks for [local](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/local_univariate_weekly.py), [global](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/global_weekly.py) and [foundation](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/foundation_weekly.py) models. ## Getting started @@ -95,7 +96,7 @@ run_forecast( - ```group_id``` is a column storing the unique id that groups your dataset to each time series. - ```date_col``` is your time column name. - ```target``` is your target column name. -- ```freq``` is your prediction frequency. Currently, "D" for daily and "M" for monthly are supported. Note that ```freq``` supported is as per the model basis, hence check the model documentation carefully. Monthly forecasting expects the timestamp column in ```train_data``` and ```scoring_output``` to be the last day of the month. +- ```freq``` is your prediction frequency. Currently, "D" for daily, "W" for weekly and "M" for monthly are supported. Note that ```freq``` supported is as per the model basis, hence check the model documentation carefully. Monthly forecasting expects the timestamp column in ```train_data``` and ```scoring_output``` to be the last day of the month. - ```prediction_length``` is your forecasting horizon in the number of steps. - ```backtest_months``` specifies how many previous months you use for backtesting. - ```stride``` is the number of steps in which you update your backtesting trial start date when going from one trial to the next. diff --git a/examples/foundation_daily.py b/examples/foundation_daily.py index e6d12c3..cf154aa 100644 --- a/examples/foundation_daily.py +++ b/examples/foundation_daily.py @@ -108,7 +108,7 @@ def transform_group(df): # COMMAND ---------- # MAGIC %md ### Models -# MAGIC Let's configure a list of models we are going to apply to our time series for evaluation and forecasting. A comprehensive list of all supported models is available in [mmf_sa/models/models_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/models_conf.yaml). Look for the models where `model_type: foundation`; these are the foundation models we import from [chronos](https://github.com/amazon-science/chronos-forecasting/tree/main), [uni2ts](https://github.com/SalesforceAIResearch/uni2ts) and [moment](https://github.com/moment-timeseries-foundation-model/moment). Check their documentation for the detailed description of each model. +# MAGIC Let's configure a list of models we are going to apply to our time series for evaluation and forecasting. A comprehensive list of all supported models is available in [mmf_sa/models/models_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/models_conf.yaml). Look for the models where `model_type: foundation`; these are the foundation models we install from [chronos](https://pypi.org/project/chronos-forecasting/), [uni2ts](https://pypi.org/project/uni2ts/) and [timesfm](https://pypi.org/project/timesfm/). Check their documentation for the detailed description of each model. # MAGIC # MAGIC Foundation time series models are pretrained on millions or billions of time series. These models can produce analysis (i.e. forecasting, anomaly detection, classfication) on an unforeseen time series without training or tuning. You can modify the hyperparameters in [mmf_sa/models/models_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/models_conf.yaml) or overwrite the default values in [mmf_sa/forecasting_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/forecasting_conf.yaml). You can also introduce new hyperparameters that are supported by the base models. To do this, first add those hyperparameters under the model specification in [mmf_sa/models/models_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/models_conf.yaml). Then, include these hyperparameters inside the model instantiation which happens in the model pipeline script: e.g. `ChronosT5Tiny` class in [mmf_sa/models/chronosforecast/ChronosPipeline.py](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/chronosforecast/ChronosPipeline.py). diff --git a/examples/foundation_monthly.py b/examples/foundation_monthly.py index dd40016..6bbf154 100644 --- a/examples/foundation_monthly.py +++ b/examples/foundation_monthly.py @@ -121,7 +121,7 @@ def transform_group(df): # COMMAND ---------- # MAGIC %md ### Models -# MAGIC Let's configure a list of models we are going to apply to our time series for evaluation and forecasting. A comprehensive list of all supported models is available in [mmf_sa/models/models_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/models_conf.yaml). Look for the models where `model_type: foundation`; these are the foundation models we import from [chronos](https://github.com/amazon-science/chronos-forecasting/tree/main), [uni2ts](https://github.com/SalesforceAIResearch/uni2ts) and [moment](https://github.com/moment-timeseries-foundation-model/moment). Check their documentation for the detailed description of each model. +# MAGIC Let's configure a list of models we are going to apply to our time series for evaluation and forecasting. A comprehensive list of all supported models is available in [mmf_sa/models/models_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/models_conf.yaml). Look for the models where `model_type: foundation`; these are the foundation models we install from [chronos](https://pypi.org/project/chronos-forecasting/), [uni2ts](https://pypi.org/project/uni2ts/) and [timesfm](https://pypi.org/project/timesfm/). Check their documentation for the detailed description of each model. # COMMAND ---------- diff --git a/examples/foundation_weekly.py b/examples/foundation_weekly.py new file mode 100644 index 0000000..3dfd3b4 --- /dev/null +++ b/examples/foundation_weekly.py @@ -0,0 +1,192 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Many Models Forecasting Demo +# MAGIC This notebook showcases how to run MMF with foundation models on multiple time series of weekly resolution. We will use [M4 competition](https://www.sciencedirect.com/science/article/pii/S0169207019301128#sec5) data. The descriptions here are mostly the same as the case with the [daily resolution](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/foundation_daily.py), so we will skip the redundant parts and focus only on the essentials. + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Cluster setup +# MAGIC +# MAGIC We recommend using a cluster with [Databricks Runtime 14.3 LTS for ML](https://docs.databricks.com/en/release-notes/runtime/14.3lts-ml.html) or above. The cluster should be single-node with one or more GPU instances: e.g. [g4dn.12xlarge [T4]](https://aws.amazon.com/ec2/instance-types/g4/) on AWS or [Standard_NC64as_T4_v3](https://learn.microsoft.com/en-us/azure/virtual-machines/nct4-v3-series) on Azure. MMF leverages [Pandas UDF](https://docs.databricks.com/en/udf/pandas.html) for distributing the inference tasks and utilizing all the available resource. + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Install and import packages +# MAGIC Check out [requirements.txt](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/requirements.txt) if you're interested in the libraries we use. + +# COMMAND ---------- + +# MAGIC %pip install datasetsforecast==0.0.8 --quiet +# MAGIC dbutils.library.restartPython() + +# COMMAND ---------- + +import logging +logger = spark._jvm.org.apache.log4j +logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR) +logging.getLogger("py4j.clientserver").setLevel(logging.ERROR) + +# COMMAND ---------- + +import uuid +import pathlib +import pandas as pd +from datasetsforecast.m4 import M4 + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Prepare data +# MAGIC We are using [`datasetsforecast`](https://github.com/Nixtla/datasetsforecast/tree/main/) package to download M4 data. + +# COMMAND ---------- + +# Number of time series +n = 100 + + +def create_m4_weekly(): + y_df, _, _ = M4.load(directory=str(pathlib.Path.home()), group="Weekly") + _ids = [f"W{i}" for i in range(1, n)] + y_df = ( + y_df.groupby("unique_id") + .filter(lambda x: x.unique_id.iloc[0] in _ids) + .groupby("unique_id") + .apply(transform_group) + .reset_index(drop=True) + ) + return y_df + + +def transform_group(df): + unique_id = df.unique_id.iloc[0] + if len(df) > 260: + df = df.iloc[-260:] + _start = pd.Timestamp("2020-01-01") + _end = _start + pd.DateOffset(days=int(7*len(df))) + date_idx = pd.date_range(start=_start, end=_end, freq="W", name="ds") + res_df = pd.DataFrame(data=[], index=date_idx).reset_index() + res_df["unique_id"] = unique_id + res_df["y"] = df.y.values + return res_df + +# COMMAND ---------- + +# MAGIC %md +# MAGIC We are going to save this data in a delta lake table. Provide catalog and database names where you want to store the data.__ + +# COMMAND ---------- + +catalog = "mmf" # Name of the catalog we use to manage our assets +db = "m4" # Name of the schema we use to manage our assets (e.g. datasets) +user = spark.sql('select current_user() as user').collect()[0]['user'] # User email address + +# COMMAND ---------- + +# Making sure that the catalog and the schema exist +_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}") +_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}") + +( + spark.createDataFrame(create_m4_weekly()) + .write.format("delta").mode("overwrite") + .saveAsTable(f"{catalog}.{db}.m4_weekly_train") +) + +# COMMAND ---------- + +# MAGIC %md Let's take a peak at the dataset: + +# COMMAND ---------- + +display(spark.sql(f"select unique_id, count(ds) as count from {catalog}.{db}.m4_weekly_train group by unique_id order by unique_id")) + +# COMMAND ---------- + +display( + spark.sql(f"select * from {catalog}.{db}.m4_weekly_train where unique_id in ('W1', 'W2', 'W3', 'W4', 'W5') order by unique_id, ds") + ) + +# COMMAND ---------- + +# MAGIC %md ### Models +# MAGIC Let's configure a list of models we are going to apply to our time series for evaluation and forecasting. A comprehensive list of all supported models is available in [mmf_sa/models/models_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/models_conf.yaml). Look for the models where `model_type: foundation`; these are the foundation models we install from [chronos](https://pypi.org/project/chronos-forecasting/), [uni2ts](https://pypi.org/project/uni2ts/) and [timesfm](https://pypi.org/project/timesfm/). Check their documentation for the detailed description of each model. + +# COMMAND ---------- + +active_models = [ + "ChronosT5Tiny", + "ChronosT5Mini", + "ChronosT5Small", + "ChronosT5Base", + "ChronosT5Large", + "ChronosBoltTiny", + "ChronosBoltMini", + "ChronosBoltSmall", + "ChronosBoltBase", + "MoiraiSmall", + "MoiraiBase", + "MoiraiLarge", + "MoiraiMoESmall", + "MoiraiMoEBase", + "TimesFM_1_0_200m", + "TimesFM_2_0_500m", +] + +# COMMAND ---------- + +# MAGIC %md ### Run MMF +# MAGIC +# MAGIC Now, we can run the evaluation and forecasting using `run_forecast` function defined in [mmf_sa/models/__init__.py](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/__init__.py). Refer to [README.md](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/README.md#parameters-description) for a comprehensive description of each parameter. + +# COMMAND ---------- + +# The same run_id will be assigned to all the models. This makes it easier to run the post evaluation analysis later. +run_id = str(uuid.uuid4()) + +for model in active_models: + dbutils.notebook.run( + "run_weekly", + timeout_seconds=0, + arguments={"catalog": catalog, "db": db, "model": model, "run_id": run_id, "user": user}) + +# COMMAND ---------- + +# MAGIC %md ### Evaluate +# MAGIC In `evaluation_output` table, the we store all evaluation results for all backtesting trials from all models. This information can be used to understand which models performed well on which time series on which periods of backtesting. This is very important for selecting the final model for forecasting or models for ensembling. Maybe, it's faster to take a look at the table: + +# COMMAND ---------- + +display(spark.sql(f""" + select * from {catalog}.{db}.weekly_evaluation_output + where unique_id = 'W1' + order by unique_id, model, backtest_window_start_date + """)) + +# COMMAND ---------- + +# MAGIC %md ### Forecast +# MAGIC In `scoring_output` table, forecasts for each time series from each model are stored. + +# COMMAND ---------- + +display(spark.sql(f""" + select * from {catalog}.{db}.weekly_scoring_output + where unique_id = 'W1' + order by unique_id, model, ds + """)) + +# COMMAND ---------- + +# MAGIC %md ### Delete Tables +# MAGIC Let's clean up the tables. + +# COMMAND ---------- + +#display(spark.sql(f"delete from {catalog}.{db}.weekly_evaluation_output")) + +# COMMAND ---------- + +#display(spark.sql(f"delete from {catalog}.{db}.weekly_scoring_output")) \ No newline at end of file diff --git a/examples/global_weekly.py b/examples/global_weekly.py new file mode 100644 index 0000000..4302fd8 --- /dev/null +++ b/examples/global_weekly.py @@ -0,0 +1,186 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Many Models Forecasting Demo +# MAGIC This notebook showcases how to run MMF with global models on multiple time series of weekly resolution. We will use [M4 competition](https://www.sciencedirect.com/science/article/pii/S0169207019301128#sec5) data. The descriptions here are mostly the same as the case with the [daily resolution](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/global_daily.py), so we will skip the redundant parts and focus only on the essentials. + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Cluster setup +# MAGIC +# MAGIC We recommend using a cluster with [Databricks Runtime 14.3 LTS for ML](https://docs.databricks.com/en/release-notes/runtime/14.3lts-ml.html) or above. The cluster should be single-node with one or more GPU instances: e.g. [g4dn.12xlarge [T4]](https://aws.amazon.com/ec2/instance-types/g4/) on AWS or [Standard_NC64as_T4_v3](https://learn.microsoft.com/en-us/azure/virtual-machines/nct4-v3-series) on Azure. + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Install and import packages +# MAGIC Check out [requirements.txt](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/requirements.txt) if you're interested in the libraries we use. + +# COMMAND ---------- + +# MAGIC %pip install datasetsforecast==0.0.8 --quiet +# MAGIC dbutils.library.restartPython() + +# COMMAND ---------- + +import logging +logger = spark._jvm.org.apache.log4j +logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR) +logging.getLogger("py4j.clientserver").setLevel(logging.ERROR) + +# COMMAND ---------- + +import uuid +import pathlib +import pandas as pd +from datasetsforecast.m4 import M4 + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Prepare data +# MAGIC We are using [`datasetsforecast`](https://github.com/Nixtla/datasetsforecast/tree/main/) package to download M4 data. + +# COMMAND ---------- + +# Number of time series +n = 100 + + +def create_m4_weekly(): + y_df, _, _ = M4.load(directory=str(pathlib.Path.home()), group="Weekly") + _ids = [f"W{i}" for i in range(1, n)] + y_df = ( + y_df.groupby("unique_id") + .filter(lambda x: x.unique_id.iloc[0] in _ids) + .groupby("unique_id") + .apply(transform_group) + .reset_index(drop=True) + ) + return y_df + + +def transform_group(df): + unique_id = df.unique_id.iloc[0] + if len(df) > 260: + df = df.iloc[-260:] + _start = pd.Timestamp("2020-01-01") + _end = _start + pd.DateOffset(days=int(7*len(df))) + date_idx = pd.date_range(start=_start, end=_end, freq="W", name="ds") + res_df = pd.DataFrame(data=[], index=date_idx).reset_index() + res_df["unique_id"] = unique_id + res_df["y"] = df.y.values + return res_df + +# COMMAND ---------- + +# MAGIC %md +# MAGIC We are going to save this data in a delta lake table. Provide catalog and database names where you want to store the data.__ + +# COMMAND ---------- + +catalog = "mmf" # Name of the catalog we use to manage our assets +db = "m4" # Name of the schema we use to manage our assets (e.g. datasets) +user = spark.sql('select current_user() as user').collect()[0]['user'] # User email address + +# COMMAND ---------- + +# Making sure that the catalog and the schema exist +_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}") +_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}") + +( + spark.createDataFrame(create_m4_weekly()) + .write.format("delta").mode("overwrite") + .saveAsTable(f"{catalog}.{db}.m4_weekly_train") +) + +# COMMAND ---------- + +# MAGIC %md Let's take a peak at the dataset: + +# COMMAND ---------- + +display(spark.sql(f"select unique_id, count(ds) as count from {catalog}.{db}.m4_weekly_train group by unique_id order by unique_id")) + +# COMMAND ---------- + +display( + spark.sql(f"select * from {catalog}.{db}.m4_weekly_train where unique_id in ('W1', 'W2', 'W3', 'W4', 'W5') order by unique_id, ds") + ) + +# COMMAND ---------- + +# MAGIC %md ### Models +# MAGIC Let's configure a list of models we are going to apply to our time series for evaluation and forecasting. A comprehensive list of all supported models is available in [mmf_sa/models/models_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/models_conf.yaml). Look for the models where `model_type: global`; these are the global models we import from [neuralforecast](https://github.com/Nixtla/neuralforecast). Check their documentation for the detailed description of each model. + +# COMMAND ---------- + +active_models = [ + "NeuralForecastRNN", + "NeuralForecastLSTM", + "NeuralForecastNBEATSx", + "NeuralForecastNHITS", + "NeuralForecastAutoRNN", + "NeuralForecastAutoLSTM", + "NeuralForecastAutoNBEATSx", + "NeuralForecastAutoNHITS", + "NeuralForecastAutoTiDE", + "NeuralForecastAutoPatchTST", +] + +# COMMAND ---------- + +# MAGIC %md ### Run MMF +# MAGIC +# MAGIC Now, we can run the evaluation and forecasting using `run_forecast` function defined in [mmf_sa/models/__init__.py](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/__init__.py). Refer to [README.md](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/README.md#parameters-description) for a comprehensive description of each parameter. Make sure to set `freq="W"` in `run_forecast` function called in [examples/run_weekly.py](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/run_weekly.py). + +# COMMAND ---------- + +# The same run_id will be assigned to all the models. This makes it easier to run the post evaluation analysis later. +run_id = str(uuid.uuid4()) + +for model in active_models: + dbutils.notebook.run( + "run_weekly", + timeout_seconds=0, + arguments={"catalog": catalog, "db": db, "model": model, "run_id": run_id, "user": user}) + +# COMMAND ---------- + +# MAGIC %md ### Evaluate +# MAGIC In `evaluation_output` table, the we store all evaluation results for all backtesting trials from all models. + +# COMMAND ---------- + +display(spark.sql(f""" + select * from {catalog}.{db}.weekly_evaluation_output + where unique_id = 'W1' + order by unique_id, model, backtest_window_start_date + """)) + +# COMMAND ---------- + +# MAGIC %md ### Forecast +# MAGIC In `scoring_output` table, forecasts for each time series from each model are stored. + +# COMMAND ---------- + +display(spark.sql(f""" + select * from {catalog}.{db}.weekly_scoring_output + where unique_id = 'W1' + order by unique_id, model, ds + """)) + +# COMMAND ---------- + +# MAGIC %md ### Delete Tables +# MAGIC Let's clean up the tables. + +# COMMAND ---------- + +#display(spark.sql(f"delete from {catalog}.{db}.weekly_evaluation_output")) + +# COMMAND ---------- + +#display(spark.sql(f"delete from {catalog}.{db}.weekly_scoring_output")) \ No newline at end of file diff --git a/examples/local_univariate_weekly.py b/examples/local_univariate_weekly.py new file mode 100644 index 0000000..fca1002 --- /dev/null +++ b/examples/local_univariate_weekly.py @@ -0,0 +1,224 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Many Models Forecasting (MMF) +# MAGIC This notebook showcases how to run MMF with local models on multiple univariate time series of weekly resolution. We will use [M4 competition](https://www.sciencedirect.com/science/article/pii/S0169207019301128#sec5) data. + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Cluster setup +# MAGIC +# MAGIC We recommend using a cluster with [Databricks Runtime 14.3 LTS for ML](https://docs.databricks.com/en/release-notes/runtime/14.3lts-ml.html) or above. The cluster can be either a single-node or multi-node CPU cluster. MMF leverages [Pandas UDF](https://docs.databricks.com/en/udf/pandas.html) under the hood and utilizes all the available resource. Make sure to set the following Spark configurations before you start your cluster: [`spark.sql.execution.arrow.enabled true`](https://spark.apache.org/docs/3.0.1/sql-pyspark-pandas-with-arrow.html#enabling-for-conversion-tofrom-pandas) and [`spark.sql.adaptive.enabled false`](https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution). You can do this by specifying [Spark configuration](https://docs.databricks.com/en/compute/configure.html#spark-configuration) in the advanced options on the cluster creation page. + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Install and import packages +# MAGIC Check out [requirements.txt](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/requirements.txt) if you're interested in the libraries we use. + +# COMMAND ---------- + +# DBTITLE 1,Install the necessary libraries +# MAGIC %pip install -r ../requirements.txt --quiet +# MAGIC dbutils.library.restartPython() + +# COMMAND ---------- + +import logging +logger = spark._jvm.org.apache.log4j +logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR) +logging.getLogger("py4j.clientserver").setLevel(logging.ERROR) + +# COMMAND ---------- + +import pathlib +import pandas as pd +from datasetsforecast.m4 import M4 +from mmf_sa import run_forecast + +# COMMAND ---------- + +# MAGIC %md +# MAGIC #### Install R packages +# MAGIC If you want to use the R fable models, you need to [install the R dependecies](https://docs.databricks.com/en/libraries/index.html#r-library-support). See [RUNME.py](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/RUNME.py) for the full list of required R libraries and their versions. + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Prepare data +# MAGIC We are using [`datasetsforecast`](https://github.com/Nixtla/datasetsforecast/tree/main/) package to download M4 data. M4 dataset contains a set of time series which we use for testing MMF. Below we have written a number of custome functions to convert M4 time series to an expected format. + +# COMMAND ---------- + +# Number of time series +n = 100 + + +def create_m4_weekly(): + y_df, _, _ = M4.load(directory=str(pathlib.Path.home()), group="Weekly") + _ids = [f"W{i}" for i in range(1, n)] + y_df = ( + y_df.groupby("unique_id") + .filter(lambda x: x.unique_id.iloc[0] in _ids) + .groupby("unique_id") + .apply(transform_group) + .reset_index(drop=True) + ) + return y_df + + +def transform_group(df): + unique_id = df.unique_id.iloc[0] + if len(df) > 260: + df = df.iloc[-260:] + _start = pd.Timestamp("2020-01-01") + _end = _start + pd.DateOffset(days=int(7*len(df))) + date_idx = pd.date_range(start=_start, end=_end, freq="W", name="ds") + res_df = pd.DataFrame(data=[], index=date_idx).reset_index() + res_df["unique_id"] = unique_id + res_df["y"] = df.y.values + return res_df + +# COMMAND ---------- + +# MAGIC %md +# MAGIC We are going to save this data in a delta lake table. Provide catalog and database names where you want to store the data. + +# COMMAND ---------- + +catalog = "mmf" # Name of the catalog we use to manage our assets +db = "m4" # Name of the schema we use to manage our assets (e.g. datasets) +user = spark.sql('select current_user() as user').collect()[0]['user'] # User email address + +# COMMAND ---------- + +# Making sure that the catalog and the schema exist +_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}") +_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}") + +( + spark.createDataFrame(create_m4_weekly()) + .write.format("delta").mode("overwrite") + .saveAsTable(f"{catalog}.{db}.m4_weekly_train") +) + +# COMMAND ---------- + +# MAGIC %md Let's take a peak at the dataset: + +# COMMAND ---------- + +display( + spark.sql(f"select * from {catalog}.{db}.m4_weekly_train where unique_id in ('W1', 'W2', 'W3', 'W4', 'W5') order by unique_id, ds") + ) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC If the number of time series is larger than the number of total cores, we set `spark.sql.shuffle.partitions` to the number of cores (can also be a multiple) so that we don't under-utilize the resource. + +# COMMAND ---------- + +if n > sc.defaultParallelism: + sqlContext.setConf("spark.sql.shuffle.partitions", sc.defaultParallelism) + +# COMMAND ---------- + +# MAGIC %md ### Models +# MAGIC Let's configure a list of models we are going to apply to our time series for evaluation and forecasting. A comprehensive list of all supported models is available in [mmf_sa/models/models_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/models_conf.yaml). Look for the models where `model_type: local`; these are the local models we import from [statsforecast](https://github.com/Nixtla/statsforecast), [r fable](https://cran.r-project.org/web/packages/fable/vignettes/fable.html) and [sktime](https://github.com/sktime/sktime). Check their documentations for the description of each model. + +# COMMAND ---------- + +active_models = [ + "StatsForecastBaselineWindowAverage", + "StatsForecastBaselineSeasonalWindowAverage", + "StatsForecastBaselineNaive", + "StatsForecastBaselineSeasonalNaive", + "StatsForecastAutoArima", + "StatsForecastAutoETS", + "StatsForecastAutoCES", + "StatsForecastAutoTheta", + "StatsForecastTSB", + "StatsForecastADIDA", + "StatsForecastIMAPA", + "StatsForecastCrostonClassic", + "StatsForecastCrostonOptimized", + "StatsForecastCrostonSBA", + "RFableArima", + "RFableETS", + "RFableNNETAR", + "RFableEnsemble", + "RDynamicHarmonicRegression", + "SKTimeTBats", + "SKTimeLgbmDsDt", + ] + +# COMMAND ---------- + +# MAGIC %md ### Run MMF +# MAGIC +# MAGIC Now, we can run the evaluation and forecasting using `run_forecast` function defined in [mmf_sa/models/__init__.py](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/__init__.py). Make sure to set `freq="W"` in `run_forecast` function. + +# COMMAND ---------- + +run_forecast( + spark=spark, + train_data=f"{catalog}.{db}.m4_weekly_train", + scoring_data=f"{catalog}.{db}.m4_weekly_train", + scoring_output=f"{catalog}.{db}.weekly_scoring_output", + evaluation_output=f"{catalog}.{db}.weekly_evaluation_output", + group_id="unique_id", + date_col="ds", + target="y", + freq="W", + prediction_length=4, + backtest_months=3, + stride=1, + metric="smape", + train_predict_ratio=1, + data_quality_check=True, + resample=False, + active_models=active_models, + experiment_path=f"/Users/{user}/mmf/m4_weekly", + use_case_name="m4_weekly", +) + +# COMMAND ---------- + +# MAGIC %md ### Evaluate +# MAGIC In `evaluation_output` table, the we store all evaluation results for all backtesting trials from all models. + +# COMMAND ---------- + +display( + spark.sql(f""" + select * from {catalog}.{db}.weekly_evaluation_output + where unique_id = 'W1' + order by unique_id, model, backtest_window_start_date + """)) + +# COMMAND ---------- + +# MAGIC %md ### Forecast +# MAGIC In `scoring_output` table, forecasts for each time series from each model are stored. + +# COMMAND ---------- + +display(spark.sql(f""" + select * from {catalog}.{db}.weekly_scoring_output + where unique_id = 'W1' + order by unique_id, model, ds + """)) + +# COMMAND ---------- + +# MAGIC %md ### Delete Tables +# MAGIC Let's clean up the tables. + +# COMMAND ---------- + +#display(spark.sql(f"delete from {catalog}.{db}.weekly_evaluation_output")) + +# COMMAND ---------- + +#display(spark.sql(f"delete from {catalog}.{db}.weekly_scoring_output")) \ No newline at end of file diff --git a/examples/run_weekly.py b/examples/run_weekly.py new file mode 100644 index 0000000..47f02cf --- /dev/null +++ b/examples/run_weekly.py @@ -0,0 +1,51 @@ +# Databricks notebook source +# MAGIC %pip install -r ../requirements-global.txt --quiet +# MAGIC dbutils.library.restartPython() + +# COMMAND ---------- + +dbutils.widgets.text("catalog", "") +dbutils.widgets.text("db", "") +dbutils.widgets.text("model", "") +dbutils.widgets.text("run_id", "") +dbutils.widgets.text("user", "") + +catalog = dbutils.widgets.get("catalog") +db = dbutils.widgets.get("db") +model = dbutils.widgets.get("model") +run_id = dbutils.widgets.get("run_id") +user = dbutils.widgets.get("user") + +# COMMAND ---------- + +from mmf_sa import run_forecast +import logging +logger = spark._jvm.org.apache.log4j +logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR) +logging.getLogger("py4j.clientserver").setLevel(logging.ERROR) + + +run_forecast( + spark=spark, + train_data=f"{catalog}.{db}.m4_weekly_train", + scoring_data=f"{catalog}.{db}.m4_weekly_train", + scoring_output=f"{catalog}.{db}.weekly_scoring_output", + evaluation_output=f"{catalog}.{db}.weekly_evaluation_output", + model_output=f"{catalog}.{db}", + group_id="unique_id", + date_col="ds", + target="y", + freq="W", + prediction_length=4, + backtest_months=3, + stride=1, + metric="smape", + train_predict_ratio=1, + data_quality_check=True, + resample=False, + active_models=[model], + experiment_path=f"/Users/{user}/mmf/m4_weekly", + use_case_name="m4_weekly", + run_id=run_id, + accelerator="gpu", +) \ No newline at end of file diff --git a/mmf_sa/Forecaster.py b/mmf_sa/Forecaster.py index eb37abd..5d97801 100644 --- a/mmf_sa/Forecaster.py +++ b/mmf_sa/Forecaster.py @@ -640,4 +640,3 @@ def get_latest_model_info(client, registered_name): if version_int > latest_version: latest_version = version_int return client.get_model_version(registered_name, str(latest_version)) - diff --git a/mmf_sa/__init__.py b/mmf_sa/__init__.py index ab72ac5..c59614a 100644 --- a/mmf_sa/__init__.py +++ b/mmf_sa/__init__.py @@ -54,7 +54,7 @@ def run_forecast( group_id (str): A string specifying the column name that groups the training data into individual time series. date_col (str): A string specifying the column name that stores the date variable. target (str): A string specifying the column name of the target variable. - freq (str): A string specifying the frequency. Currently, "D" for daily and "M" for monthly are supported. + freq (str): A string specifying the frequency. Currently, "D" for daily, "W" for weekly and "M" for monthly are supported. prediction_length (int): An integer specifying the prediction length: i.e. forecasting horizon. backtest_months (int): An integer specifying the number of backtest months. stride (int): An integer specifying the stride length. diff --git a/mmf_sa/data_quality_checks.py b/mmf_sa/data_quality_checks.py index 31592ec..8d6b869 100644 --- a/mmf_sa/data_quality_checks.py +++ b/mmf_sa/data_quality_checks.py @@ -28,9 +28,9 @@ def _backtest_length_check(self): """ backtest_days = self.conf["backtest_months"] * 30 prediction_length_days = ( - self.conf["prediction_length"] - if self.conf["freq"] == "D" - else self.conf["prediction_length"] * 30 + self.conf["prediction_length"] if self.conf["freq"] == "D" else + self.conf["prediction_length"] * 7 if self.conf["freq"] == "W" else + self.conf["prediction_length"] * 30 ) if backtest_days < prediction_length_days: raise Exception(f"Backtesting interval is shorter than prediction length!") diff --git a/mmf_sa/models/abstract_model.py b/mmf_sa/models/abstract_model.py index fe7350d..c1bc1fe 100644 --- a/mmf_sa/models/abstract_model.py +++ b/mmf_sa/models/abstract_model.py @@ -18,12 +18,16 @@ def __init__(self, params): self.params = params self.freq = params["freq"].upper()[0] self.one_ts_offset = ( - pd.offsets.MonthEnd(1) if self.freq == "M" else pd.DateOffset(days=1) + pd.offsets.MonthEnd(1) if self.freq == "M" else + pd.DateOffset(weeks=1) if self.freq == "W" else + pd.DateOffset(days=1) if self.freq == "D" else + None ) self.prediction_length_offset = ( - pd.offsets.MonthEnd(params["prediction_length"]) - if self.freq == "M" - else pd.DateOffset(days=params["prediction_length"]) + pd.offsets.MonthEnd(params["prediction_length"]) if self.freq == "M" else + pd.DateOffset(weeks=params["prediction_length"]) if self.freq == "W" else + pd.DateOffset(days=params["prediction_length"]) if self.freq == "D" else + None ) @abstractmethod @@ -62,9 +66,10 @@ def backtest( """ stride = int(self.params["stride"]) # Read in stride stride_offset = ( - pd.offsets.MonthEnd(stride) - if self.freq == "M" - else pd.DateOffset(days=stride) + pd.offsets.MonthEnd(stride) if self.freq == "M" else + pd.DateOffset(weeks=stride) if self.freq == "W" else + pd.DateOffset(days=stride) if self.freq == "D" else + None ) df = df.copy().sort_values(by=[self.params["date_col"]]) end_date = df[self.params["date_col"]].max() # Last date from the training data @@ -164,4 +169,3 @@ def calculate_metrics( "forecast": pred_df[self.params["target"]].to_numpy("float"), "actual": val_df[self.params["target"]].to_numpy(), "model_pickle": cloudpickle.dumps(model_fitted)} - diff --git a/mmf_sa/models/timesfmforecast/TimesFMPipeline.py b/mmf_sa/models/timesfmforecast/TimesFMPipeline.py index 09aa03c..f238f6b 100644 --- a/mmf_sa/models/timesfmforecast/TimesFMPipeline.py +++ b/mmf_sa/models/timesfmforecast/TimesFMPipeline.py @@ -94,7 +94,7 @@ def predict(self, dynamic_covariates = self.prepare_data(val_df, future=True) df_union = pd.concat([df, dynamic_covariates], axis=0, join='outer', ignore_index=True) forecast_input = [group['y'].values for _, group in df.groupby('unique_id')] - freq_index = 0 if self.params.freq in ("H", "D", "B", "U") else 1 + freq_index = 0 if self.params.freq in ("D") else 1 dynamic_numerical_covariates = {} if 'dynamic_future_numerical' in self.params.keys(): for var in self.params.dynamic_future_numerical: @@ -280,4 +280,4 @@ def predict(self, context, input_df, params=None): value_name=self.params.target, # Column name in the DataFrame containing the values to forecast. num_jobs=-1, # Number of parallel jobs to run, set to -1 to use all available processors. ) - return forecast_df # Return the forecast DataFrame \ No newline at end of file + return forecast_df # Return the forecast DataFrame