PandasUDF with AutoTS #76
Replies: 3 comments 6 replies
-
Instead of using a Spark cluster, just use a bigger VM. You can fit a lot of time series into 120 gb (or more) of RAM! You can also manually split your time series into different chunks and then run AutoTS on each chunk on a different node. You could also use the loki backend of joblib to spread n_jobs across a dask cluster for many of the models. I'm not a big fan of Spark, so unlikely I'll be adding support myself anytime soon. |
Beta Was this translation helpful? Give feedback.
-
I have the need to run some code in Databricks. Here is a script that worked for me: # {load your df from source}
# renaming the source data columns into something more generic
df = df.withColumnRenamed("id", "series_id").withColumnRenamed("MonthDateKey", "datetime").withColumnRenamed("UnitSales", "value")
df = df.withColumn("datetime", to_date(df.datetime, 'yyyy-MM-dd'))
df = df.withColumn('value', df.value.cast("float"))
# haven't got a product category column here but the first couple of characters of the product id make a group of sorts (by manufacturer)
# here we are running AutoTS on chunks, this allows multivariate/global models but only on that chunk, so chose a reasonable chunk of similar series
# this grouping is to allow use of a big cluster and distribute. If you are only using one node, set to a constant: lit("single_group")
df = df.withColumn("prod_group", substring("ProductID", 0, 3))
df.describe().show()
display(df)
df.printSchema()
schema = StructType([
StructField('series_id', StringType(), True),
StructField('datetime', DateType(), True),
StructField('value', FloatType(), True),
StructField('prediction_interval', StringType(), True),
])
# had issues trying to make datetime come out as a date type
schema_text = "datetime string, series_id string, value float, prediction_interval string"
def forecast_func(df_long: pd.DataFrame) -> pd.DataFrame:
col_order = ['datetime', 'series_id', 'value', 'prediction_interval']
forecast_length = 6
try:
model = AutoTS(
forecast_length=forecast_length,
frequency='infer',
model_list= "superfast", # fast_parallel_no_arima
transformer_list='fast',
ensemble=['horizontal-max'],
validation_method="backwards",
max_generations=20,
num_validations=1, # current sample data is pretty short history, FYI
no_negatives=True,
verbose=0,
)
model = model.fit(df_long, date_col='datetime', value_col='value', id_col='series_id')
prediction = model.predict(forecast_length=forecast_length)
forecasts_df = prediction.long_form_results(id_name="series_id", value_name='value', interval_name="prediction_interval")
forecasts_df.index.name = "datetime"
forecasts_df = forecasts_df.reset_index(drop=False)
forecasts_df['datetime'] = forecasts_df['datetime'].dt.strftime("%Y-%m-%d")
forecasts_df = forecasts_df[col_order]
return(forecasts_df)
except Exception as e:
print(repr(e))
forecasts_df = pd.DataFrame(columns=col_order)
return(forecasts_df)
# you could also group by series_id if you only want univariate models/forecasts and for this to be highly distributed
out_df = df.groupby('prod_group').applyInPandas(forecast_func, schema=schema_text)
display(forecasts_df) |
Beta Was this translation helpful? Give feedback.
-
Any one has example of using autots with snowpark? |
Beta Was this translation helpful? Give feedback.
-
Hello,
I would like to use AutoTS with larger datasets. Does anyone know whether AutoTS works with pandas UDF and has already finished example code?
I am currently struggling to do it by myself as I get a lot of error messages along the way.
Thank you very much!
Beta Was this translation helpful? Give feedback.
All reactions