Skip to content

Commit

Permalink
Merge pull request #55 from databricks-industry-solutions/m5
Browse files Browse the repository at this point in the history
Added M5 example notebooks
  • Loading branch information
ryuta-yoshimatsu authored Jun 9, 2024
2 parents 1b8d668 + 9f6eed3 commit d703d26
Show file tree
Hide file tree
Showing 26 changed files with 509 additions and 41 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,4 @@ dmypy.json

# Lightning Logs
examples/lightning_logs
examples/m5-examples/lightning_logs
2 changes: 1 addition & 1 deletion examples/foundation-model-examples/chronos-example.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def get_horizon_timestamps(batch_iterator: Iterator[pd.Series]) -> Iterator[pd.S
horizon_timestamps = []
for i in range(prediction_length):
timestamp = timestamp + one_ts_offset
horizon_timestamps.append(timestamp)
horizon_timestamps.append(timestamp.to_numpy())
barch_horizon_timestamps.append(np.array(horizon_timestamps))
yield pd.Series(barch_horizon_timestamps)

Expand Down
2 changes: 2 additions & 0 deletions examples/foundation-model-examples/data_preparation.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def create_m4_daily():

def transform_group_daily(df):
unique_id = df.unique_id.iloc[0]
if len(df) > 1020:
df = df.iloc[-1020:]
_start = pd.Timestamp("2020-01-01")
_end = _start + pd.DateOffset(days=int(df.count()[0]) - 1)
date_idx = pd.date_range(start=_start, end=_end, freq="D", name="ds")
Expand Down
2 changes: 1 addition & 1 deletion examples/foundation-model-examples/moirai-example.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def get_horizon_timestamps(batch_iterator: Iterator[pd.Series]) -> Iterator[pd.S
horizon_timestamps = []
for i in range(prediction_length):
timestamp = timestamp + one_ts_offset
horizon_timestamps.append(timestamp)
horizon_timestamps.append(timestamp.to_numpy())
barch_horizon_timestamps.append(np.array(horizon_timestamps))
yield pd.Series(barch_horizon_timestamps)

Expand Down
2 changes: 1 addition & 1 deletion examples/foundation-model-examples/moment-example.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def get_horizon_timestamps(batch_iterator: Iterator[pd.Series]) -> Iterator[pd.S
horizon_timestamps = []
for i in range(prediction_length):
timestamp = timestamp + one_ts_offset
horizon_timestamps.append(timestamp)
horizon_timestamps.append(timestamp.to_numpy())
barch_horizon_timestamps.append(np.array(horizon_timestamps))
yield pd.Series(barch_horizon_timestamps)

Expand Down
17 changes: 13 additions & 4 deletions examples/foundation_daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
# COMMAND ----------

# Number of time series
n = 100
n = 1000


def create_m4_daily():
Expand All @@ -63,6 +63,8 @@ def create_m4_daily():

def transform_group(df):
unique_id = df.unique_id.iloc[0]
if len(df) > 1020:
df = df.iloc[-1020:]
_start = pd.Timestamp("2020-01-01")
_end = _start + pd.DateOffset(days=int(df.count()[0]) - 1)
date_idx = pd.date_range(start=_start, end=_end, freq="D", name="ds")
Expand All @@ -71,7 +73,6 @@ def transform_group(df):
res_df["y"] = df.y.values
return res_df


# COMMAND ----------

# MAGIC %md
Expand All @@ -82,6 +83,8 @@ def transform_group(df):
catalog = "solacc_uc" # Name of the catalog we use to manage our assets
db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets)

# COMMAND ----------

# Making sure that the catalog and the schema exist
_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}")
Expand Down Expand Up @@ -133,7 +136,7 @@ def transform_group(df):
# MAGIC
# MAGIC If you are interested in how MMF achieves distributed inference on these foundation models using Pandas UDF, have a look at the model pipeline scripts: e.g. [mmf_sa/models/chronosforecast/ChronosPipeline.py](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/chronosforecast/ChronosPipeline.py).
# MAGIC
# MAGIC One small difference here in running `run_forecast` from the local model case is that we have to iterate through the `active_models` and call the function written in a [separate notebook](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/run_daily.py). This is to avoid the CUDA out of memory issue by freeing up the GPU memory after each model. Make sure to provide `accelerator="gpu"` as an input parameter to `run_forecast` function.
# MAGIC One small difference here in running `run_forecast` from the local model case is that we have to iterate through the `active_models` and call the function written in a [separate notebook](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/run_daily.py). This is to avoid the CUDA out of memory issue by freeing up the GPU memory after each model. Make sure to provide `accelerator="gpu"` as an input parameter to `run_forecast` function. Also, set the parameter `data_quality_check=True` and `resample=False`,or provide a complete dataset without missing entries to avoid issues with skipped dates.

# COMMAND ----------

Expand All @@ -153,7 +156,9 @@ def transform_group(df):

# COMMAND ----------

display(spark.sql(f"select * from {catalog}.{db}.daily_evaluation_output order by unique_id, model, backtest_window_start_date"))
display(
spark.sql(f"select * from {catalog}.{db}.daily_evaluation_output order by unique_id, model, backtest_window_start_date")
)

# COMMAND ----------

Expand Down Expand Up @@ -189,3 +194,7 @@ def transform_group(df):
# COMMAND ----------

display(spark.sql(f"delete from {catalog}.{db}.daily_scoring_output"))

# COMMAND ----------


2 changes: 1 addition & 1 deletion examples/foundation_monthly.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
# COMMAND ----------

# Number of time series
n = 100
n = 1000


def create_m4_monthly():
Expand Down
8 changes: 6 additions & 2 deletions examples/global_daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
# COMMAND ----------

# Number of time series
n = 100
n = 1000


def create_m4_daily():
Expand All @@ -64,6 +64,8 @@ def create_m4_daily():

def transform_group(df):
unique_id = df.unique_id.iloc[0]
if len(df) > 1020:
df = df.iloc[-1020:]
_start = pd.Timestamp("2020-01-01")
_end = _start + pd.DateOffset(days=int(df.count()[0]) - 1)
date_idx = pd.date_range(start=_start, end=_end, freq="D", name="ds")
Expand All @@ -83,6 +85,8 @@ def transform_group(df):
catalog = "solacc_uc" # Name of the catalog we use to manage our assets
db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets)

# COMMAND ----------

# Making sure that the catalog and the schema exist
_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}")
Expand Down Expand Up @@ -137,7 +141,7 @@ def transform_group(df):
# MAGIC
# MAGIC If you are interested in how MMF achieves distributed training and inference, have a look at the two methods `evaluate_global_model` and `evaluate_global_model` defined in the source code [`Forecaster.py`](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/Forecaster.py).
# MAGIC
# MAGIC One small difference here in running `run_forecast` from the local model case is that we have to iterate through the `active_models` and call the function written in a [separate notebook](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/run_daily.py). This is to avoid the CUDA out of memory issue by freeing up the GPU memory after each model. Make sure to provide `accelerator="gpu"` as an input parameter to `run_forecast` function.
# MAGIC A small difference here in running `run_forecast` from the local model case is that we have to iterate through the `active_models` and call the function in a [separate notebook](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/examples/run_daily.py). This is to avoid the CUDA out of memory issue by freeing up the GPU memory after each model. Make sure to provide `accelerator="gpu"` as an input parameter to `run_forecast` function. Also, set the parameter `data_quality_check=True` or provide a complete dataset without missing entries to avoid issues with skipped dates.

# COMMAND ----------

Expand Down
2 changes: 2 additions & 0 deletions examples/global_external_regressors_daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets)
volume = "rossmann" # Name of the volume where you have your rossmann dataset csv sotred

# COMMAND ----------

# Make sure that the catalog and the schema exist
_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}")
Expand Down
8 changes: 7 additions & 1 deletion examples/global_monthly.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
# COMMAND ----------

# Number of time series
n = 100
n = 1000


def create_m4_monthly():
Expand Down Expand Up @@ -87,6 +87,8 @@ def transform_group(df):
catalog = "solacc_uc" # Name of the catalog we use to manage our assets
db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets)

# COMMAND ----------

# Making sure that the catalog and the schema exist
_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}")
Expand Down Expand Up @@ -178,3 +180,7 @@ def transform_group(df):
# COMMAND ----------

display(spark.sql(f"delete from {catalog}.{db}.monthly_scoring_output"))

# COMMAND ----------


27 changes: 21 additions & 6 deletions examples/local_univariate_daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
# COMMAND ----------

# Number of time series
n = 100
n = 1000


def create_m4_daily():
Expand All @@ -69,6 +69,8 @@ def create_m4_daily():

def transform_group(df):
unique_id = df.unique_id.iloc[0]
if len(df) > 1020:
df = df.iloc[-1020:]
_start = pd.Timestamp("2020-01-01")
_end = _start + pd.DateOffset(days=int(df.count()[0]) - 1)
date_idx = pd.date_range(start=_start, end=_end, freq="D", name="ds")
Expand All @@ -77,7 +79,6 @@ def transform_group(df):
res_df["y"] = df.y.values
return res_df


# COMMAND ----------

# MAGIC %md
Expand All @@ -88,6 +89,8 @@ def transform_group(df):
catalog = "solacc_uc" # Name of the catalog we use to manage our assets
db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets)

# COMMAND ----------

# Making sure that the catalog and the schema exist
_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}")
Expand All @@ -110,6 +113,16 @@ def transform_group(df):

# COMMAND ----------

# MAGIC %md
# MAGIC If the number of time series is larger than the number of total cores, we set `spark.sql.shuffle.partitions` to the number of cores (can also be a multiple) so that we don't under-utilize the resource.

# COMMAND ----------

if n > sc.defaultParallelism:
sqlContext.setConf("spark.sql.shuffle.partitions", sc.defaultParallelism)

# COMMAND ----------

# MAGIC %md ### Models
# MAGIC Let's configure a list of models we are going to apply to our time series for evaluation and forecasting. A comprehensive list of all supported models is available in [mmf_sa/models/models_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/models_conf.yaml). Look for the models where `model_type: local`; these are the local models we import from [statsforecast](https://github.com/Nixtla/statsforecast), [r fable](https://cran.r-project.org/web/packages/fable/vignettes/fable.html) and [sktime](https://github.com/sktime/sktime). Check their documentations for the detailed description of each model.
# MAGIC
Expand Down Expand Up @@ -139,7 +152,7 @@ def transform_group(df):
"RDynamicHarmonicRegression",
"SKTimeTBats",
"SKTimeLgbmDsDt",
]
]

# COMMAND ----------

Expand Down Expand Up @@ -168,8 +181,8 @@ def transform_group(df):
prediction_length=10,
backtest_months=1,
stride=10,
train_predict_ratio=2,
data_quality_check=True,
train_predict_ratio=1,
data_quality_check=False,
resample=False,
active_models=active_models,
experiment_path=f"/Shared/mmf_experiment",
Expand All @@ -183,7 +196,9 @@ def transform_group(df):

# COMMAND ----------

display(spark.sql(f"select * from {catalog}.{db}.daily_evaluation_output order by unique_id, model, backtest_window_start_date"))
display(
spark.sql(f"select * from {catalog}.{db}.daily_evaluation_output order by unique_id, model, backtest_window_start_date")
)

# COMMAND ----------

Expand Down
17 changes: 13 additions & 4 deletions examples/local_univariate_external_regressors_daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets)
volume = "rossmann" # Name of the volume where you have your rossmann dataset csv sotred

# COMMAND ----------

# Make sure that the catalog and the schema exist
_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}")
Expand All @@ -55,7 +57,7 @@

# Number of time series to sample
sample = True
size = 100
size = 1000
stores = sorted(random.sample(range(0, 1000), size))

train = spark.read.csv(f"/Volumes/{catalog}/{db}/{volume}/train.csv", header=True, inferSchema=True)
Expand Down Expand Up @@ -91,6 +93,11 @@

# COMMAND ----------

if sample and size > sc.defaultParallelism:
sqlContext.setConf("spark.sql.shuffle.partitions", sc.defaultParallelism)

# COMMAND ----------

# MAGIC %md ### Models
# MAGIC Let's configure a list of models we are going to apply to our time series for evaluation and forecasting. A comprehensive list of all supported models is available in [mmf_sa/models/models_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/models_conf.yaml). Look for the models where `model_type: local`; these are the local models we import from [statsforecast](https://github.com/Nixtla/statsforecast), [r fable](https://cran.r-project.org/web/packages/fable/vignettes/fable.html) and [sktime](https://github.com/sktime/sktime). Check their documentations for the description of each model.
# MAGIC
Expand Down Expand Up @@ -144,9 +151,9 @@
prediction_length=10,
backtest_months=1,
stride=10,
train_predict_ratio=2,
train_predict_ratio=1,
active_models=active_models,
data_quality_check=True,
data_quality_check=False,
resample=False,
experiment_path=f"/Shared/mmf_rossmann",
use_case_name="rossmann_daily",
Expand All @@ -159,7 +166,9 @@

# COMMAND ----------

display(spark.sql(f"select * from {catalog}.{db}.rossmann_daily_evaluation_output order by Store, model, backtest_window_start_date"))
display(
spark.sql(f"select * from {catalog}.{db}.rossmann_daily_evaluation_output order by Store, model, backtest_window_start_date")
)

# COMMAND ----------

Expand Down
21 changes: 16 additions & 5 deletions examples/local_univariate_monthly.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
# COMMAND ----------

# Number of time series
n = 100
n = 1000


def create_m4_monthly():
Expand Down Expand Up @@ -94,6 +94,8 @@ def transform_group(df):
catalog = "solacc_uc" # Name of the catalog we use to manage our assets
db = "mmf" # Name of the schema we use to manage our assets (e.g. datasets)

# COMMAND ----------

# Making sure that the catalog and the schema exist
_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}")
Expand All @@ -110,7 +112,9 @@ def transform_group(df):

# COMMAND ----------

display(spark.sql(f"select unique_id, count(date) as count from {catalog}.{db}.m4_monthly_train group by unique_id order by unique_id"))
display(
spark.sql(f"select unique_id, count(date) as count from {catalog}.{db}.m4_monthly_train group by unique_id order by unique_id")
)

# COMMAND ----------

Expand All @@ -120,6 +124,11 @@ def transform_group(df):

# COMMAND ----------

if n > sc.defaultParallelism:
sqlContext.setConf("spark.sql.shuffle.partitions", sc.defaultParallelism)

# COMMAND ----------

# MAGIC %md ### Models
# MAGIC Let's configure a list of models we are going to apply to our time series for evaluation and forecasting. A comprehensive list of all supported models is available in [mmf_sa/models/models_conf.yaml](https://github.com/databricks-industry-solutions/many-model-forecasting/blob/main/mmf_sa/models/models_conf.yaml). Look for the models where `model_type: local`; these are the local models we import from [statsforecast](https://github.com/Nixtla/statsforecast), [r fable](https://cran.r-project.org/web/packages/fable/vignettes/fable.html) and [sktime](https://github.com/sktime/sktime). Check their documentations for the description of each model.

Expand Down Expand Up @@ -170,8 +179,8 @@ def transform_group(df):
prediction_length=3,
backtest_months=12,
stride=1,
train_predict_ratio=2,
data_quality_check=True,
train_predict_ratio=1,
data_quality_check=False,
resample=False,
active_models=active_models,
experiment_path=f"/Shared/mmf_experiment_monthly",
Expand All @@ -185,7 +194,9 @@ def transform_group(df):

# COMMAND ----------

display(spark.sql(f"select * from {catalog}.{db}.monthly_evaluation_output order by unique_id, model, backtest_window_start_date"))
display(
spark.sql(f"select * from {catalog}.{db}.monthly_evaluation_output order by unique_id, model, backtest_window_start_date")
)

# COMMAND ----------

Expand Down
Loading

0 comments on commit d703d26

Please sign in to comment.