Skip to content

Commit

Permalink
Merge pull request #85 from databricks-industry-solutions/timesfm-cov…
Browse files Browse the repository at this point in the history
…ariate-support

TimesFM covariate support
  • Loading branch information
ryuta-yoshimatsu authored Jan 27, 2025
2 parents ab0e195 + 22509d0 commit cc87b97
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 50 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Get started now!

## What's New

- Jan 2025: [TimesFM](https://github.com/google-research/timesfm) is available for univariate forecasting. Try the [notebook](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/foundation_daily.py).
- Jan 2025: [TimesFM](https://github.com/google-research/timesfm) is available for univariate and covariate forecasting. Try the notebooks: [univariate](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/foundation_daily.py) and [covariate](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/foundation_external_regressors_daily.py).
- Jan 2025: [Chronos Bolt](https://github.com/amazon-science/chronos-forecasting) models are available for univariate forecasting. Try the [notebook](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/foundation_daily.py).
- Jan 2025: [Moirai MoE](https://github.com/SalesforceAIResearch/uni2ts) models are available for univariate forecasting. Try the [notebook](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/foundation_daily.py).

Expand Down
164 changes: 164 additions & 0 deletions examples/foundation_external_regressors_daily.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
# Databricks notebook source
# MAGIC %md
# MAGIC # Many Models Forecasting Demo
# MAGIC
# MAGIC This notebook showcases how to run MMF with foundation models on multiple time series of daily resolution using exogenous regressors. We will use [Rossmann Store](https://www.kaggle.com/competitions/rossmann-store-sales/data) data. To be able to run this notebook, you need to register on [Kaggle](https://www.kaggle.com/) and download the dataset. The descriptions here are mostly the same as the case [without exogenous regressors](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/foundation_daily.py), so we will skip the redundant parts and focus only on the essentials.

# COMMAND ----------

# MAGIC %md
# MAGIC ### Cluster setup
# MAGIC
# MAGIC We recommend using a cluster with [Databricks Runtime 14.3 LTS for ML](https://docs.databricks.com/en/release-notes/runtime/14.3lts-ml.html) or above. The cluster should be single-node with one or more GPU instances: e.g. [g4dn.12xlarge [T4]](https://aws.amazon.com/ec2/instance-types/g4/) on AWS or [Standard_NC64as_T4_v3](https://learn.microsoft.com/en-us/azure/virtual-machines/nct4-v3-series) on Azure.

# COMMAND ----------

# MAGIC %md
# MAGIC ### Install and import packages
# MAGIC Check out [requirements.txt](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/requirements.txt) if you're interested in the libraries we use.

# COMMAND ----------

# MAGIC %pip install datasetsforecast==0.0.8 --quiet

# COMMAND ----------

import logging
from tqdm.autonotebook import tqdm
logger = spark._jvm.org.apache.log4j
logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR)
logging.getLogger("py4j.clientserver").setLevel(logging.ERROR)
import uuid

# COMMAND ----------

# MAGIC %md
# MAGIC ### Prepare data
# MAGIC Before running this notebook, download the dataset from [Kaggle](https://www.kaggle.com/competitions/rossmann-store-sales/data) and store them in Unity Catalog as a [volume](https://docs.databricks.com/en/connect/unity-catalog/volumes.html).

# COMMAND ----------

catalog = "mmf" # Name of the catalog we use to manage our assets
db = "rossmann" # Name of the schema we use to manage our assets (e.g. datasets)
volume = "csv" # Name of the volume where you have your rossmann dataset csv sotred
user = spark.sql('select current_user() as user').collect()[0]['user'] # User email address

# COMMAND ----------

# Make sure that the catalog and the schema exist
_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}")
_ = spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog}.{db}.{volume}")

# COMMAND ----------

# Randomly select 100 stores to forecast
import random
random.seed(7)

# Number of time series to sample
sample = True
size = 1000
stores = sorted(random.sample(range(0, 1000), size))

train = spark.read.csv(f"/Volumes/{catalog}/{db}/{volume}/train.csv", header=True, inferSchema=True)
test = spark.read.csv(f"/Volumes/{catalog}/{db}/{volume}/test.csv", header=True, inferSchema=True)

if sample:
train = train.filter(train.Store.isin(stores))
test = test.filter(test.Store.isin(stores))

# COMMAND ----------

# MAGIC %md
# MAGIC We are going to save this data in a delta lake table. Provide catalog and database names where you want to store the data.

# COMMAND ----------

train.write.mode("overwrite").option("mergeSchema", "true").saveAsTable(f"{catalog}.{db}.rossmann_daily_train")
test.write.mode("overwrite").option("mergeSchema", "true").saveAsTable(f"{catalog}.{db}.rossmann_daily_test")

# COMMAND ----------

# MAGIC %md Let's take a peak at the dataset:

# COMMAND ----------

display(spark.sql(f"select * from {catalog}.{db}.rossmann_daily_train where Store=49 order by Date"))
display(spark.sql(f"select * from {catalog}.{db}.rossmann_daily_test where Store=49 order by Date"))

# COMMAND ----------

# MAGIC %md
# MAGIC Note that in `rossmann_daily_train` we have our target variable `Sales` but not in `rossmann_daily_test`. This is because `rossmann_daily_test` is going to be used as our `scoring_data` that stores `dynamic_future_categorical` variables of the future dates. When you adapt this notebook to your use case, make sure to comply with these datasets formats. See TimesFm's [documentation](https://github.com/google-research/timesfm/blob/master/notebooks/covariates.ipynb) for more detail on exogenous regressors.

# COMMAND ----------

# MAGIC %md ### Models
# MAGIC Let's configure a list of models we are going to apply to our time series for evaluation and forecasting. Exogenous regressors are currently only supported for [TimesFM](https://github.com/google-research/timesfm/blob/master/notebooks/covariates.ipynb) models at the moment (i.e., `TimesFM_1_0_200m` and `TimesFM_2_0_500m`). But including non-supported models (e.g., `ChronosT5Tiny`) in the active model list doesn't harm: models that can't use exogenous regressors will simply ignore them.

# COMMAND ----------

active_models = [
"ChronosT5Tiny",
"MoiraiSmall",
"TimesFM_1_0_200m",
"TimesFM_2_0_500m",
]

# COMMAND ----------

# MAGIC %md ### Run MMF
# MAGIC
# MAGIC Now, we run the evaluation and forecasting using `run_forecast` function. We are providing the training table and the scoring table names. If `scoring_data` is not provided or if the same name as `train_data` is provided, the models will ignore the `dynamic_future_numrical` and `dynamic_future_categorical` regressors. Note that we are providing a covariate field (i.e. `dynamic_future_categorical`) this time in `run_forecast` function called in [examples/run_external_regressors_daily.py](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/run_external_regressors_daily.py). There are also other convariate fields, namely `static_features`, `dynamic_historical_numerical` and `dynamic_historical_categorical`, which you can provide. Read more about these covariates in [TimesFM's documentation](https://github.com/google-research/timesfm/blob/master/notebooks/covariates.ipynb).

# COMMAND ----------

# The same run_id will be assigned to all the models. This makes it easier to run the post evaluation analysis later.
run_id = str(uuid.uuid4())

for model in active_models:
dbutils.notebook.run(
"run_external_regressors_daily",
timeout_seconds=0,
arguments={"catalog": catalog, "db": db, "model": model, "run_id": run_id, "user": user})

# COMMAND ----------

# MAGIC %md ### Evaluate
# MAGIC In `evaluation_output` table, the we store all evaluation results for all backtesting trials from all models.

# COMMAND ----------

display(
spark.sql(f"""
select * from {catalog}.{db}.rossmann_daily_evaluation_output
where Store=49
order by Store, model, backtest_window_start_date
"""))

# COMMAND ----------

# MAGIC %md ### Forecast
# MAGIC In `scoring_output` table, forecasts for each time series from each model are stored.

# COMMAND ----------

display(spark.sql(f"""
select * from {catalog}.{db}.rossmann_daily_scoring_output
where Store=49
order by Store, model
"""))

# COMMAND ----------

# MAGIC %md ### Delete Tables
# MAGIC Let's clean up the tables.

# COMMAND ----------

#display(spark.sql(f"delete from {catalog}.{db}.rossmann_daily_evaluation_output"))

# COMMAND ----------

#display(spark.sql(f"delete from {catalog}.{db}.rossmann_daily_scoring_output"))
1 change: 0 additions & 1 deletion examples/global_external_regressors_daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@
"NeuralForecastAutoNBEATSx",
"NeuralForecastAutoNHITS",
"NeuralForecastAutoTiDE",
"NeuralForecastAutoPatchTST",
]

# COMMAND ----------
Expand Down
4 changes: 2 additions & 2 deletions mmf_sa/Forecaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,8 +589,8 @@ def score_foundation_model(self, model_conf):
model_name = model_conf["name"]
_, model_uri = self.get_model_for_scoring(model_conf)
model = self.model_registry.get_model(model_name)
hist_df, removed = self.prepare_data_for_global_model()
prediction_df, model_pretrained = model.forecast(hist_df, spark=self.spark)
score_df, removed = self.prepare_data_for_global_model("scoring")
prediction_df, model_pretrained = model.forecast(score_df, spark=self.spark)
sdf = self.spark.createDataFrame(prediction_df).drop('index')
(
sdf.withColumn(self.conf["group_id"], col(self.conf["group_id"]).cast(StringType()))
Expand Down
49 changes: 26 additions & 23 deletions mmf_sa/models/neuralforecast/NeuralForecastPipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,19 @@ def fit(self, x, y=None):
static_pdf = self.prepare_static_features(x)
self.model.fit(df=pdf, static_df=static_pdf)

def predict(self, hist_df: pd.DataFrame, val_df: pd.DataFrame = None):
_df = self.prepare_data(hist_df)
_dynamic_future = self.prepare_data(val_df, future=True)
if _dynamic_future.empty:
_dynamic_future = None
_static_df = self.prepare_static_features(hist_df)
def predict(self,
hist_df: pd.DataFrame,
val_df: pd.DataFrame = None):

df = self.prepare_data(hist_df)
dynamic_covariates = self.prepare_data(val_df, future=True)
if dynamic_covariates.empty:
dynamic_covariates = None
static_df = self.prepare_static_features(hist_df)
forecast_df = self.model.predict(
df=_df,
static_df=_static_df,
futr_df=_dynamic_future
df=df,
static_df=static_df,
futr_df=dynamic_covariates
)
target = [col for col in forecast_df.columns.to_list()
if col not in ["unique_id", "ds"]][0]
Expand All @@ -172,25 +175,25 @@ def predict(self, hist_df: pd.DataFrame, val_df: pd.DataFrame = None):
return forecast_df, self.model

def forecast(self, df: pd.DataFrame, spark=None):
_df = df[df[self.params.target].notnull()]
_df = self.prepare_data(_df)
_last_date = _df["ds"].max()
_future_df = df[
(df[self.params["date_col"]] > np.datetime64(_last_date))
hist_df = df[df[self.params.target].notnull()]
hist_df = self.prepare_data(hist_df)
last_date = hist_df["ds"].max()
future_df = df[
(df[self.params["date_col"]] > np.datetime64(last_date))
& (df[self.params["date_col"]]
<= np.datetime64(_last_date + self.prediction_length_offset))
<= np.datetime64(last_date + self.prediction_length_offset))
]
_dynamic_future = self.prepare_data(_future_df, future=True)
_dynamic_future = None if _dynamic_future.empty else _dynamic_future
_static_df = self.prepare_static_features(_future_df)
dynamic_future = self.prepare_data(future_df, future=True)
dynamic_future = None if dynamic_future.empty else dynamic_future
static_df = self.prepare_static_features(future_df)

# Check if dynamic futures for all unique_id are provided.
# If not, drop unique_id without dynamic futures from scoring.
if (_dynamic_future is not None) and \
(not set(_df["unique_id"].unique().flatten())
.issubset(set(_dynamic_future["unique_id"].unique().flatten()))):
_df = _df[_df["unique_id"].isin(list(_dynamic_future["unique_id"].unique()))]
forecast_df = self.model.predict(df=_df, static_df=_static_df, futr_df=_dynamic_future)
if (dynamic_future is not None) and \
(not set(hist_df["unique_id"].unique().flatten())
.issubset(set(dynamic_future["unique_id"].unique().flatten()))):
hist_df = hist_df[hist_df["unique_id"].isin(list(dynamic_future["unique_id"].unique()))]
forecast_df = self.model.predict(df=hist_df, static_df=static_df, futr_df=dynamic_future)
target = [col for col in forecast_df.columns.to_list()
if col not in ["unique_id", "ds"]][0]
forecast_df = forecast_df.reset_index(drop=False).rename(
Expand Down
Loading

0 comments on commit cc87b97

Please sign in to comment.