diff --git a/mmf_sa/Forecaster.py b/mmf_sa/Forecaster.py index 85cc4e0..fac4a96 100644 --- a/mmf_sa/Forecaster.py +++ b/mmf_sa/Forecaster.py @@ -378,6 +378,9 @@ def backtest_global_model( StructField("model_pickle", BinaryType()), ] ) + # Covert to Python-native types before converting to pyspark dataframe + res_pdf['forecast'] = res_pdf['forecast'].apply(lambda x: [float(i) for i in x]) + res_pdf['actual'] = res_pdf['actual'].apply(lambda x: [float(i) for i in x]) res_sdf = self.spark.createDataFrame(res_pdf, schema) # Write evaluation results to a delta table if write: diff --git a/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py b/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py index 12697f4..d9c1d2e 100644 --- a/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py +++ b/mmf_sa/models/neuralforecast/NeuralForecastPipeline.py @@ -217,9 +217,9 @@ def calculate_metrics( else: raise Exception(f"Metric {self.params['metric']} not supported!") for key in keys: - actual = val_df[val_df[self.params["group_id"]] == key][self.params["target"]] + actual = val_df[val_df[self.params["group_id"]] == key][self.params["target"]].reset_index(drop=True) forecast = pred_df[pred_df[self.params["group_id"]] == key][self.params["target"]].\ - iloc[-self.params["prediction_length"]:] + iloc[-self.params["prediction_length"]:].reset_index(drop=True) try: if metric_name == "smape": smape = MeanAbsolutePercentageError(symmetric=True)