Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 0.6.9
- separate ml model types so not all are required in installation and import
- make stats in mpc dashboard more robust


## 0.6.8
- #55 improved numerics of casadi model simulation by adding initial guess for outputs

Expand Down
2 changes: 1 addition & 1 deletion agentlib_mpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
from .modules import MODULE_TYPES
from .models import MODEL_TYPES

__version__ = "0.6.8"
__version__ = "0.6.9"
15 changes: 0 additions & 15 deletions agentlib_mpc/data_structures/ml_model_datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from pathlib import Path
from typing import Literal

import keras.callbacks
import pandas as pd
import pydantic
from enum import Enum
Expand Down Expand Up @@ -136,17 +135,3 @@ def name_with_lag(name: str, lag: int) -> str:
if lag == 0:
return name
return f"{name}_{lag}"


class EarlyStoppingCallback(pydantic.BaseModel):
patience: int = (1000,)
verbose: Literal[0, 1] = 0
restore_best_weights: bool = True
activate: bool = False

def callback(self):
return keras.callbacks.EarlyStopping(
patience=self.patience,
verbose=self.verbose,
restore_best_weights=self.restore_best_weights,
)
3 changes: 1 addition & 2 deletions agentlib_mpc/models/casadi_ml_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@
from agentlib_mpc.data_structures import ml_model_datatypes
from agentlib_mpc.data_structures.ml_model_datatypes import OutputType, name_with_lag

from agentlib_mpc.models.casadi_predictor import CasadiPredictor
from agentlib_mpc.models.casadi_predictor.casadi_predictor import CasadiPredictor
from agentlib_mpc.models.casadi_model import (
CasadiModel,
CasadiModelConfig,
CasadiState,
CasadiOutput,
CasadiTypes,
)
from agentlib_mpc.models.serialized_ml_model import (
SerializedMLModel,
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -1,197 +1,10 @@
import abc

import casadi as ca
import numpy as np

from enum import Enum
from keras import layers

from typing import Union, TYPE_CHECKING

from agentlib_mpc.models.serialized_ml_model import (
SerializedMLModel,
SerializedLinReg,
SerializedGPR,
SerializedANN,
MLModels,
)

if TYPE_CHECKING:
from keras import Sequential
from agentlib_mpc.models.serialized_ml_model import CustomGPR
from sklearn.linear_model import LinearRegression


class CasadiPredictor(abc.ABC):
"""
Protocol for generic Casadi implementation of various ML-Model-based predictors.

Attributes:
serialized_model: Serialized model which will be translated to a casadi model.
predictor_model: Predictor model from other libraries, which are translated to
casadi syntax.
sym_input: Symbolical input of predictor. Has the necessary shape of the input.
prediction_function: Symbolical casadi prediction function of the given model.
"""

class Config:
arbitrary_types_allowed = True

def __init__(self, serialized_model: SerializedMLModel) -> None:
"""Initialize Predictor class."""
self.serialized_model: SerializedMLModel = serialized_model
self.predictor_model: Union[Sequential, CustomGPR, LinearRegression] = (
serialized_model.deserialize()
)
self.sym_input: ca.MX = self._get_sym_input()
self.prediction_function: ca.Function = self._build_prediction_function()

@classmethod
def from_serialized_model(cls, serialized_model: SerializedMLModel):
"""Initialize sub predictor class."""
model_type = serialized_model.model_type
# todo return type[cls]
return casadi_predictors[model_type](serialized_model)

@property
@abc.abstractmethod
def input_shape(self) -> tuple[int, int]:
"""Input shape of Predictor."""
pass

@property
def output_shape(self) -> tuple[int, int]:
"""Output shape of Predictor."""
return 1, len(self.serialized_model.output)

def _get_sym_input(self):
"""Returns symbolical input object in the required shape."""
return ca.MX.sym("input", 1, self.input_shape[1])

@abc.abstractmethod
def _build_prediction_function(self) -> ca.Function:
"""Build the prediction function with casadi and a symbolic input."""
pass

def predict(self, x: Union[np.ndarray, ca.MX]) -> Union[ca.DM, ca.MX]:
"""
Evaluate prediction function with input data.
Args:
x: input data.
Returns:
results of evaluation of prediction function with input data.
"""
return self.prediction_function(x)


class CasadiLinReg(CasadiPredictor):
"""
Generic Casadi implementation of scikit-learn LinerRegression.
"""

def __init__(self, serialized_model: SerializedLinReg) -> None:
"""
Initializes CasadiLinReg predictor.
Args:
serialized_model: SerializedLinReg object.
"""
super().__init__(serialized_model)

@property
def input_shape(self) -> tuple[int, int]:
"""Input shape of Predictor."""
return 1, self.predictor_model.coef_.shape[1]

def _build_prediction_function(self) -> ca.Function:
"""Build the prediction function with casadi and a symbolic input."""
intercept = self.predictor_model.intercept_
coef = self.predictor_model.coef_
function = intercept + ca.mtimes(self.sym_input, coef.T)
return ca.Function("forward", [self.sym_input], [function])


class CasadiGPR(CasadiPredictor):
"""
Generic implementation of scikit-learn Gaussian Process Regressor.
"""

def __init__(self, serialized_model: SerializedGPR) -> None:
super().__init__(serialized_model)

@property
def input_shape(self) -> tuple[int, int]:
"""Input shape of Predictor."""
return 1, self.predictor_model.X_train_.shape[1]

def _build_prediction_function(self) -> ca.Function:
"""Build the prediction function with casadi and a symbolic input."""
normalize = self.predictor_model.data_handling.normalize
scale = self.predictor_model.data_handling.scale
alpha = self.predictor_model.alpha_
if normalize:
normalized_inp = self._normalize(self.sym_input)
k_star = self._kernel(normalized_inp)
else:
k_star = self._kernel(self.sym_input)
f_mean = ca.mtimes(k_star.T, alpha) * scale
return ca.Function("forward", [self.sym_input], [f_mean])

def _kernel(
self,
x_test: ca.MX,
) -> ca.MX:
"""
Calculates the kernel with regard to mpc and testing data.
If x_train is None the internal mpc data is used.

shape(x_test) = (n_samples, n_features)
shape(x_train) = (n_samples, n_features)
"""

square_distance = self._square_distance(x_test)
length_scale = self.predictor_model.kernel_.k1.k2.length_scale
constant_value = self.predictor_model.kernel_.k1.k1.constant_value
return np.exp((-square_distance / (2 * length_scale**2))) * constant_value

def _square_distance(self, inp: ca.MX):
"""
Calculates the square distance from x_train to x_test.

shape(x_test) = (n_test_samples, n_features)
shape(x_train) = (n_train_samples, n_features)
"""

x_train = self.predictor_model.X_train_

self._check_shapes(inp, x_train)

a = ca.sum2(inp**2)

b = ca.np.sum(x_train**2, axis=1, dtype=float).reshape(-1, 1)

c = -2 * ca.mtimes(x_train, inp.T)

return a + b + c

def _normalize(self, x: ca.MX):
mean = self.predictor_model.data_handling.mean
std = self.predictor_model.data_handling.std

if mean is None and std is not None:
raise ValueError("Mean and std are not valid.")

return (x - ca.DM(mean).T) / ca.DM(std).T

def _check_shapes(self, x_test: Union[ca.MX, np.ndarray], x_train: np.ndarray):
if x_test.shape[1] != x_train.shape[1]:
raise ValueError(
f"The shape of x_test {x_test.shape}[1] and x_train {x_train.shape}[1] must match."
)

import casadi as ca
from keras.api import layers

###################################
### ANN ###
###################################
from agentlib_mpc.models.casadi_predictor.casadi_predictor import CasadiPredictor
from agentlib_mpc.models.serialized_ml_model.serialized_ann import SerializedANN


class ANNLayerTypes(str, Enum):
Expand Down Expand Up @@ -482,9 +295,3 @@ def _build_prediction_function(self) -> ca.Function:
ANNLayerTypes.BATCHNORMALIZATION: BatchNormalization,
ANNLayerTypes.LSTM: LSTM,
}

casadi_predictors = {
MLModels.ANN: CasadiANN,
MLModels.GPR: CasadiGPR,
MLModels.LINREG: CasadiLinReg,
}
86 changes: 86 additions & 0 deletions agentlib_mpc/models/casadi_predictor/casadi_gpr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from typing import Union

import casadi as ca
import numpy as np

from agentlib_mpc.models.casadi_predictor.casadi_predictor import CasadiPredictor
from agentlib_mpc.models.serialized_ml_model.serialized_gpr import SerializedGPR


class CasadiGPR(CasadiPredictor):
"""
Generic implementation of scikit-learn Gaussian Process Regressor.
"""

def __init__(self, serialized_model: SerializedGPR) -> None:
super().__init__(serialized_model)

@property
def input_shape(self) -> tuple[int, int]:
"""Input shape of Predictor."""
return 1, self.predictor_model.X_train_.shape[1]

def _build_prediction_function(self) -> ca.Function:
"""Build the prediction function with casadi and a symbolic input."""
normalize = self.predictor_model.data_handling.normalize
scale = self.predictor_model.data_handling.scale
alpha = self.predictor_model.alpha_
if normalize:
normalized_inp = self._normalize(self.sym_input)
k_star = self._kernel(normalized_inp)
else:
k_star = self._kernel(self.sym_input)
f_mean = ca.mtimes(k_star.T, alpha) * scale
return ca.Function("forward", [self.sym_input], [f_mean])

def _kernel(
self,
x_test: ca.MX,
) -> ca.MX:
"""
Calculates the kernel with regard to mpc and testing data.
If x_train is None the internal mpc data is used.

shape(x_test) = (n_samples, n_features)
shape(x_train) = (n_samples, n_features)
"""

square_distance = self._square_distance(x_test)
length_scale = self.predictor_model.kernel_.k1.k2.length_scale
constant_value = self.predictor_model.kernel_.k1.k1.constant_value
return np.exp((-square_distance / (2 * length_scale**2))) * constant_value

def _square_distance(self, inp: ca.MX):
"""
Calculates the square distance from x_train to x_test.

shape(x_test) = (n_test_samples, n_features)
shape(x_train) = (n_train_samples, n_features)
"""

x_train = self.predictor_model.X_train_

self._check_shapes(inp, x_train)

a = ca.sum2(inp**2)

b = ca.np.sum(x_train**2, axis=1, dtype=float).reshape(-1, 1)

c = -2 * ca.mtimes(x_train, inp.T)

return a + b + c

def _normalize(self, x: ca.MX):
mean = self.predictor_model.data_handling.mean
std = self.predictor_model.data_handling.std

if mean is None and std is not None:
raise ValueError("Mean and std are not valid.")

return (x - ca.DM(mean).T) / ca.DM(std).T

def _check_shapes(self, x_test: Union[ca.MX, np.ndarray], x_train: np.ndarray):
if x_test.shape[1] != x_train.shape[1]:
raise ValueError(
f"The shape of x_test {x_test.shape}[1] and x_train {x_train.shape}[1] must match."
)
30 changes: 30 additions & 0 deletions agentlib_mpc/models/casadi_predictor/casadi_linreg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import casadi as ca

from agentlib_mpc.models.casadi_predictor.casadi_predictor import CasadiPredictor
from agentlib_mpc.models.serialized_ml_model.serialized_linreg import SerializedLinReg


class CasadiLinReg(CasadiPredictor):
"""
Generic Casadi implementation of scikit-learn LinerRegression.
"""

def __init__(self, serialized_model: SerializedLinReg) -> None:
"""
Initializes CasadiLinReg predictor.
Args:
serialized_model: SerializedLinReg object.
"""
super().__init__(serialized_model)

@property
def input_shape(self) -> tuple[int, int]:
"""Input shape of Predictor."""
return 1, self.predictor_model.coef_.shape[1]

def _build_prediction_function(self) -> ca.Function:
"""Build the prediction function with casadi and a symbolic input."""
intercept = self.predictor_model.intercept_
coef = self.predictor_model.coef_
function = intercept + ca.mtimes(self.sym_input, coef.T)
return ca.Function("forward", [self.sym_input], [function])
Loading
Loading