From 4a34ea5e30f6cc171c48a29b6290d24ca733b803 Mon Sep 17 00:00:00 2001 From: ppalud Date: Tue, 7 Jan 2025 12:24:49 +0100 Subject: [PATCH] Simplify the __init__ of the MyScaler class Fixes #3 --- beetroots/space_transform/transform.py | 114 ++++++++---------------- tests/space_transform/test_transform.py | 70 ++++++--------- 2 files changed, 67 insertions(+), 117 deletions(-) diff --git a/beetroots/space_transform/transform.py b/beetroots/space_transform/transform.py index cf68ed5..2f4fbbc 100644 --- a/beetroots/space_transform/transform.py +++ b/beetroots/space_transform/transform.py @@ -1,7 +1,6 @@ r"""Contains a class that defines the transition between the sampling scale and user friendly / interpretable scale """ -# from sklearn.preprocessing import StandardScaler -from typing import List, Optional +from typing import List import numba import numpy as np @@ -14,23 +13,14 @@ def _from_scaled_to_lin( Theta_scaled: np.ndarray, mean_: np.ndarray, std_: np.ndarray, + list_is_log: List[bool], D: int, - D_no_kappa: int, LOG_10: float, - list_is_log: List[bool], ) -> np.ndarray: - # theta : rescale log and go back to linear scale Theta_linscale = np.zeros_like(Theta_scaled) - # kappa : go back to linear scale - if list_is_log[0]: - Theta_linscale[:, 0] = np.exp(Theta_scaled[:, 0]) - else: - Theta_linscale[:, 0] = Theta_scaled[:, 0] * 1 - - # other params - for d in range(1, D): - rescaled = std_[d - 1] * Theta_scaled[:, d] + mean_[d - 1] + for d in range(D): + rescaled = std_[d] * Theta_scaled[:, d] + mean_[d] if list_is_log[d]: Theta_linscale[:, d] = np.exp(LOG_10 * rescaled) else: @@ -44,88 +34,63 @@ def _from_lin_to_scaled( Theta_linscale: np.ndarray, mean_: np.ndarray, std_: np.ndarray, - D: int, - D_no_kappa: int, list_is_log: List[bool], + D: int, ) -> np.ndarray: - # theta Theta_scaled = np.zeros_like(Theta_linscale) - # kappa - if list_is_log[0]: - Theta_scaled[:, 0] = np.log(Theta_linscale[:, 0]) - else: - Theta_scaled[:, 0] = Theta_linscale[:, 0] * 1 - - # other params - for d in range(1, D): + for d in range(D): if list_is_log[d]: scaled = np.log10(Theta_linscale[:, d]) else: scaled = Theta_linscale[:, d] * 1 - Theta_scaled[:, d] = (scaled - mean_[d - 1]) / std_[d - 1] + Theta_scaled[:, d] = (scaled - mean_[d]) / std_[d] return Theta_scaled class MyScaler(Scaler): - r"""Defines the scale used during sampling and the transforms to navigate from one scale to the other""" + r"""Defines the scale used during sampling and the transforms to navigate from one scale to the other. The transformation is a normalization (defined with a mean `mean_` and standard deviation `std_`) for each physical parameter, defined on the log10 scale or on the linear scale depending on `list_is_log`. + + .. note:: + + If one of the physical parameters is the scaling factor :math:`\kappa`, its mean is set to 0 and its std to 1 / np.log(10), so that it is not normalized regardless to its sampling scale (log10 or linear). - __slots__ = ("D", "D_no_kappa", "mean_", "std_") + The std = 1 / np.log(10) for kappa yields a scaled validity interval that is about [-2.7, 2.7] (for a [0.1, 10] true validity interval), ie comparable to that of other normalized parameters. + """ + + __slots__ = ("D", "mean_", "std_", "list_is_log") LOG_10 = np.log(10.0) def __init__( self, - Theta_grid_lin: Optional[np.ndarray] = None, - D_no_kappa: Optional[int] = None, - mean_=None, - std_=None, - list_is_log: Optional[List[bool]] = None, + mean_: np.ndarray, + std_: np.ndarray, + list_is_log: List[bool], ): - r""" - - Parameters - ---------- - Theta_grid_lin : np.ndarray of shape (-1, D) - grid of simulations - D_no_kappa : int - number of physical parameters that require a standard scaler - """ - if mean_ is not None and std_ is not None and list_is_log is not None: - self.D_no_kappa = mean_.size * 1 - r"""int: number of physical parameters that require a standard scaler""" - self.D = self.D_no_kappa + 1 - r"""int: total number of physical parameters that require a standard scaler, including the scaling factor :math:`\kappa`""" - self.mean_ = mean_ - r"""np.ndarray of shape (D,): mean of the D components :math:`\theta_d`, used in the data normalization""" - self.std_ = std_ - r"""np.ndarray of shape (D,): standard deviation of the D components :math:`\theta_d`, used in the data normalization""" - - assert len(list_is_log) == self.D, f"{self.D}, {len(list_is_log)}" - self.list_is_log = list_is_log + assert mean_.shape == std_.shape + assert mean_.size in [len(list_is_log), len(list_is_log) - 1] - else: - self.D = Theta_grid_lin.shape[1] - self.D_no_kappa = D_no_kappa if D_no_kappa is not None else self.D - assert self.D_no_kappa <= self.D + # if there is a kappa in the set of parameters + # (if there is no kappa, then each parameter should have an associated + # mean and std in the scaler) + if mean_.size == len(list_is_log) - 1: + # kappa: mean = 0 and std = 1 / np.log(10) + mean_ = np.array([0.0] + list(mean_)) + std_ = np.array([1.0 / self.LOG_10] + list(std_)) - assert isinstance(list_is_log, list) and len(list_is_log) == self.D - self.list_is_log = list_is_log + self.D = mean_.size + r"""int: total number of physical parameters that require a standard scaler, including the scaling factor :math:`\kappa`""" - raise NotImplementedError() + self.mean_ = mean_ + r"""np.ndarray of shape (D,): mean of the D components :math:`\theta_d`, used in the data normalization""" - # TODO : correct this condition - log10_grid_theta = np.log10(Theta_grid_lin[:, (self.D - self.D_no_kappa) :]) - self.mean_ = log10_grid_theta.mean(axis=0) # (D,) - self.std_ = log10_grid_theta.std(axis=0) # (D,) + self.std_ = std_ + r"""np.ndarray of shape (D,): standard deviation of the D components :math:`\theta_d`, used in the data normalization""" - assert self.mean_.shape == ( - self.D_no_kappa, - ), f"{self.D_no_kappa}, {self.mean_.shape}" - assert self.std_.shape == ( - self.D_no_kappa, - ), f"{self.D_no_kappa}, {self.std_.shape}" + self.list_is_log = list_is_log + r"""list of bool of length D: whether the normalization should be applied on the log10 scale or in the linear scale""" def from_scaled_to_lin(self, Theta_scaled: np.ndarray) -> np.ndarray: assert len(Theta_scaled.shape) == 2, Theta_scaled.shape @@ -135,22 +100,21 @@ def from_scaled_to_lin(self, Theta_scaled: np.ndarray) -> np.ndarray: Theta_scaled, self.mean_, self.std_, + self.list_is_log, self.D, - self.D_no_kappa, self.LOG_10, - self.list_is_log, ) return Theta_linscale def from_lin_to_scaled(self, Theta_linscale: np.ndarray) -> np.ndarray: assert len(Theta_linscale.shape) == 2, Theta_linscale.shape assert Theta_linscale.shape[1] == self.D, Theta_linscale.shape + Theta_scaled = _from_lin_to_scaled( Theta_linscale, self.mean_, self.std_, - self.D, - self.D_no_kappa, self.list_is_log, + self.D, ) return Theta_scaled diff --git a/tests/space_transform/test_transform.py b/tests/space_transform/test_transform.py index 1cfb338..ad04dc3 100644 --- a/tests/space_transform/test_transform.py +++ b/tests/space_transform/test_transform.py @@ -4,61 +4,47 @@ N_grid = 500_000 D = 3 -D_no_kappa = 2 -Theta_grid_lin = np.zeros((N_grid, D)) -Theta_grid_lin[:, D - D_no_kappa :] = np.exp( - np.log(10) * np.random.normal(0, 1, size=(N_grid, D_no_kappa)) -) -Theta_grid_lin[:, : D - D_no_kappa] = 1 - -# with normal distributed in scaled, the standard scaler should have no effect + +# grid without kappa +Theta_grid_lin = np.zeros((N_grid, D - 1)) +Theta_grid_lin = np.exp(np.log(10) * np.random.normal(0, 1, size=(N_grid, D - 1))) + +list_is_log = [True for _ in range(D)] + +mean_ = np.mean(np.log10(Theta_grid_lin), axis=0) # (D-1,) +std_ = np.std(np.log10(Theta_grid_lin), axis=0) # (D-1,) +assert mean_.size == D - 1 + N_test = 10 -normal_samples = np.random.normal(0, 1, size=(N_test, D_no_kappa)) +normal_samples = np.random.normal(0, 3.0, size=(N_test, D)) -Theta_test_lin = np.zeros((N_test, D)) -Theta_test_lin[:, D - D_no_kappa :] = 10**normal_samples -Theta_test_lin[:, : D - D_no_kappa] = 1 +Theta_test_lin = np.exp(np.log(10) * normal_samples) -Theta_test_scaled = np.zeros((N_test, D)) -Theta_test_scaled[:, D - D_no_kappa :] = normal_samples -Theta_test_scaled[:, : D - D_no_kappa] = 0 +Theta_test_scaled = normal_samples -mean_ = np.mean(Theta_test_scaled[:, D - D_no_kappa :], axis=0) -std_ = np.std(Theta_test_scaled[:, D - D_no_kappa :], axis=0) -list_is_log = [True for _ in range(D)] +Theta_test_scaled[:, 0] *= np.log(10) # divide by std = 1/log(10) + +Theta_test_scaled[:, 1:] = (Theta_test_scaled[:, 1:] - mean_[None, :]) / std_[None, :] + +scaler = MyScaler(mean_, std_, list_is_log) -Theta_test_scaled[:, D - D_no_kappa :] = ( - Theta_test_scaled[:, D - D_no_kappa :] - mean_[None, :] -) / std_[None, :] -scaler = MyScaler( - Theta_grid_lin, D_no_kappa, mean_=mean_, std_=std_, list_is_log=list_is_log -) +def test_init(): + assert np.isclose(scaler.mean_[0], 0.0) + assert np.isclose(scaler.std_[0], 1 / np.log(10)) + assert scaler.mean_.size == D + assert scaler.std_.size == D def test_from_scaled_to_lin(): Theta_lin = scaler.from_scaled_to_lin(Theta_test_scaled) assert Theta_lin.shape == Theta_test_scaled.shape - assert np.allclose( - Theta_lin[:, :-D_no_kappa], Theta_test_lin[:, :-D_no_kappa] - ) # kappa - assert np.allclose( - Theta_lin[:, -D_no_kappa:], - Theta_test_lin[:, -D_no_kappa:], - rtol=2e-2, - atol=1e-1, - ) # theta + assert np.allclose(Theta_lin[:, 0], Theta_test_lin[:, 0]) # kappa + assert np.allclose(Theta_lin[:, 1:], Theta_test_lin[:, 1:]) # theta def test_from_lin_to_scaled(): Theta_scaled = scaler.from_lin_to_scaled(Theta_test_lin) assert Theta_scaled.shape == Theta_test_scaled.shape - assert np.allclose( - Theta_scaled[:, :-D_no_kappa], Theta_test_scaled[:, :-D_no_kappa] - ) # kappa - assert np.allclose( - Theta_scaled[:, -D_no_kappa:], - Theta_test_scaled[:, -D_no_kappa:], - rtol=2e-2, - atol=1e-1, - ) # theta + assert np.allclose(Theta_scaled[:, 0], Theta_test_scaled[:, 0]) # kappa + assert np.allclose(Theta_scaled[:, 1:], Theta_test_scaled[:, 1:]) # theta