Skip to content

Commit

Permalink
Simplify the __init__ of the MyScaler class Fixes #3
Browse files Browse the repository at this point in the history
  • Loading branch information
ppalud committed Jan 7, 2025
1 parent 557787a commit 4a34ea5
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 117 deletions.
114 changes: 39 additions & 75 deletions beetroots/space_transform/transform.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
r"""Contains a class that defines the transition between the sampling scale and user friendly / interpretable scale
"""
# from sklearn.preprocessing import StandardScaler
from typing import List, Optional
from typing import List

import numba
import numpy as np
Expand All @@ -14,23 +13,14 @@ def _from_scaled_to_lin(
Theta_scaled: np.ndarray,
mean_: np.ndarray,
std_: np.ndarray,
list_is_log: List[bool],
D: int,
D_no_kappa: int,
LOG_10: float,
list_is_log: List[bool],
) -> np.ndarray:
# theta : rescale log and go back to linear scale
Theta_linscale = np.zeros_like(Theta_scaled)

# kappa : go back to linear scale
if list_is_log[0]:
Theta_linscale[:, 0] = np.exp(Theta_scaled[:, 0])
else:
Theta_linscale[:, 0] = Theta_scaled[:, 0] * 1

# other params
for d in range(1, D):
rescaled = std_[d - 1] * Theta_scaled[:, d] + mean_[d - 1]
for d in range(D):
rescaled = std_[d] * Theta_scaled[:, d] + mean_[d]
if list_is_log[d]:
Theta_linscale[:, d] = np.exp(LOG_10 * rescaled)
else:
Expand All @@ -44,88 +34,63 @@ def _from_lin_to_scaled(
Theta_linscale: np.ndarray,
mean_: np.ndarray,
std_: np.ndarray,
D: int,
D_no_kappa: int,
list_is_log: List[bool],
D: int,
) -> np.ndarray:
# theta
Theta_scaled = np.zeros_like(Theta_linscale)

# kappa
if list_is_log[0]:
Theta_scaled[:, 0] = np.log(Theta_linscale[:, 0])
else:
Theta_scaled[:, 0] = Theta_linscale[:, 0] * 1

# other params
for d in range(1, D):
for d in range(D):
if list_is_log[d]:
scaled = np.log10(Theta_linscale[:, d])
else:
scaled = Theta_linscale[:, d] * 1

Theta_scaled[:, d] = (scaled - mean_[d - 1]) / std_[d - 1]
Theta_scaled[:, d] = (scaled - mean_[d]) / std_[d]

return Theta_scaled


class MyScaler(Scaler):
r"""Defines the scale used during sampling and the transforms to navigate from one scale to the other"""
r"""Defines the scale used during sampling and the transforms to navigate from one scale to the other. The transformation is a normalization (defined with a mean `mean_` and standard deviation `std_`) for each physical parameter, defined on the log10 scale or on the linear scale depending on `list_is_log`.
.. note::
If one of the physical parameters is the scaling factor :math:`\kappa`, its mean is set to 0 and its std to 1 / np.log(10), so that it is not normalized regardless to its sampling scale (log10 or linear).
__slots__ = ("D", "D_no_kappa", "mean_", "std_")
The std = 1 / np.log(10) for kappa yields a scaled validity interval that is about [-2.7, 2.7] (for a [0.1, 10] true validity interval), ie comparable to that of other normalized parameters.
"""

__slots__ = ("D", "mean_", "std_", "list_is_log")
LOG_10 = np.log(10.0)

def __init__(
self,
Theta_grid_lin: Optional[np.ndarray] = None,
D_no_kappa: Optional[int] = None,
mean_=None,
std_=None,
list_is_log: Optional[List[bool]] = None,
mean_: np.ndarray,
std_: np.ndarray,
list_is_log: List[bool],
):
r"""
Parameters
----------
Theta_grid_lin : np.ndarray of shape (-1, D)
grid of simulations
D_no_kappa : int
number of physical parameters that require a standard scaler
"""
if mean_ is not None and std_ is not None and list_is_log is not None:
self.D_no_kappa = mean_.size * 1
r"""int: number of physical parameters that require a standard scaler"""
self.D = self.D_no_kappa + 1
r"""int: total number of physical parameters that require a standard scaler, including the scaling factor :math:`\kappa`"""
self.mean_ = mean_
r"""np.ndarray of shape (D,): mean of the D components :math:`\theta_d`, used in the data normalization"""
self.std_ = std_
r"""np.ndarray of shape (D,): standard deviation of the D components :math:`\theta_d`, used in the data normalization"""

assert len(list_is_log) == self.D, f"{self.D}, {len(list_is_log)}"
self.list_is_log = list_is_log
assert mean_.shape == std_.shape
assert mean_.size in [len(list_is_log), len(list_is_log) - 1]

else:
self.D = Theta_grid_lin.shape[1]
self.D_no_kappa = D_no_kappa if D_no_kappa is not None else self.D
assert self.D_no_kappa <= self.D
# if there is a kappa in the set of parameters
# (if there is no kappa, then each parameter should have an associated
# mean and std in the scaler)
if mean_.size == len(list_is_log) - 1:
# kappa: mean = 0 and std = 1 / np.log(10)
mean_ = np.array([0.0] + list(mean_))
std_ = np.array([1.0 / self.LOG_10] + list(std_))

assert isinstance(list_is_log, list) and len(list_is_log) == self.D
self.list_is_log = list_is_log
self.D = mean_.size
r"""int: total number of physical parameters that require a standard scaler, including the scaling factor :math:`\kappa`"""

raise NotImplementedError()
self.mean_ = mean_
r"""np.ndarray of shape (D,): mean of the D components :math:`\theta_d`, used in the data normalization"""

# TODO : correct this condition
log10_grid_theta = np.log10(Theta_grid_lin[:, (self.D - self.D_no_kappa) :])
self.mean_ = log10_grid_theta.mean(axis=0) # (D,)
self.std_ = log10_grid_theta.std(axis=0) # (D,)
self.std_ = std_
r"""np.ndarray of shape (D,): standard deviation of the D components :math:`\theta_d`, used in the data normalization"""

assert self.mean_.shape == (
self.D_no_kappa,
), f"{self.D_no_kappa}, {self.mean_.shape}"
assert self.std_.shape == (
self.D_no_kappa,
), f"{self.D_no_kappa}, {self.std_.shape}"
self.list_is_log = list_is_log
r"""list of bool of length D: whether the normalization should be applied on the log10 scale or in the linear scale"""

def from_scaled_to_lin(self, Theta_scaled: np.ndarray) -> np.ndarray:
assert len(Theta_scaled.shape) == 2, Theta_scaled.shape
Expand All @@ -135,22 +100,21 @@ def from_scaled_to_lin(self, Theta_scaled: np.ndarray) -> np.ndarray:
Theta_scaled,
self.mean_,
self.std_,
self.list_is_log,
self.D,
self.D_no_kappa,
self.LOG_10,
self.list_is_log,
)
return Theta_linscale

def from_lin_to_scaled(self, Theta_linscale: np.ndarray) -> np.ndarray:
assert len(Theta_linscale.shape) == 2, Theta_linscale.shape
assert Theta_linscale.shape[1] == self.D, Theta_linscale.shape

Theta_scaled = _from_lin_to_scaled(
Theta_linscale,
self.mean_,
self.std_,
self.D,
self.D_no_kappa,
self.list_is_log,
self.D,
)
return Theta_scaled
70 changes: 28 additions & 42 deletions tests/space_transform/test_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,61 +4,47 @@

N_grid = 500_000
D = 3
D_no_kappa = 2
Theta_grid_lin = np.zeros((N_grid, D))
Theta_grid_lin[:, D - D_no_kappa :] = np.exp(
np.log(10) * np.random.normal(0, 1, size=(N_grid, D_no_kappa))
)
Theta_grid_lin[:, : D - D_no_kappa] = 1

# with normal distributed in scaled, the standard scaler should have no effect

# grid without kappa
Theta_grid_lin = np.zeros((N_grid, D - 1))
Theta_grid_lin = np.exp(np.log(10) * np.random.normal(0, 1, size=(N_grid, D - 1)))

list_is_log = [True for _ in range(D)]

mean_ = np.mean(np.log10(Theta_grid_lin), axis=0) # (D-1,)
std_ = np.std(np.log10(Theta_grid_lin), axis=0) # (D-1,)
assert mean_.size == D - 1

N_test = 10
normal_samples = np.random.normal(0, 1, size=(N_test, D_no_kappa))
normal_samples = np.random.normal(0, 3.0, size=(N_test, D))

Theta_test_lin = np.zeros((N_test, D))
Theta_test_lin[:, D - D_no_kappa :] = 10**normal_samples
Theta_test_lin[:, : D - D_no_kappa] = 1
Theta_test_lin = np.exp(np.log(10) * normal_samples)

Theta_test_scaled = np.zeros((N_test, D))
Theta_test_scaled[:, D - D_no_kappa :] = normal_samples
Theta_test_scaled[:, : D - D_no_kappa] = 0
Theta_test_scaled = normal_samples

mean_ = np.mean(Theta_test_scaled[:, D - D_no_kappa :], axis=0)
std_ = np.std(Theta_test_scaled[:, D - D_no_kappa :], axis=0)
list_is_log = [True for _ in range(D)]
Theta_test_scaled[:, 0] *= np.log(10) # divide by std = 1/log(10)

Theta_test_scaled[:, 1:] = (Theta_test_scaled[:, 1:] - mean_[None, :]) / std_[None, :]

scaler = MyScaler(mean_, std_, list_is_log)

Theta_test_scaled[:, D - D_no_kappa :] = (
Theta_test_scaled[:, D - D_no_kappa :] - mean_[None, :]
) / std_[None, :]

scaler = MyScaler(
Theta_grid_lin, D_no_kappa, mean_=mean_, std_=std_, list_is_log=list_is_log
)
def test_init():
assert np.isclose(scaler.mean_[0], 0.0)
assert np.isclose(scaler.std_[0], 1 / np.log(10))
assert scaler.mean_.size == D
assert scaler.std_.size == D


def test_from_scaled_to_lin():
Theta_lin = scaler.from_scaled_to_lin(Theta_test_scaled)
assert Theta_lin.shape == Theta_test_scaled.shape
assert np.allclose(
Theta_lin[:, :-D_no_kappa], Theta_test_lin[:, :-D_no_kappa]
) # kappa
assert np.allclose(
Theta_lin[:, -D_no_kappa:],
Theta_test_lin[:, -D_no_kappa:],
rtol=2e-2,
atol=1e-1,
) # theta
assert np.allclose(Theta_lin[:, 0], Theta_test_lin[:, 0]) # kappa
assert np.allclose(Theta_lin[:, 1:], Theta_test_lin[:, 1:]) # theta


def test_from_lin_to_scaled():
Theta_scaled = scaler.from_lin_to_scaled(Theta_test_lin)
assert Theta_scaled.shape == Theta_test_scaled.shape
assert np.allclose(
Theta_scaled[:, :-D_no_kappa], Theta_test_scaled[:, :-D_no_kappa]
) # kappa
assert np.allclose(
Theta_scaled[:, -D_no_kappa:],
Theta_test_scaled[:, -D_no_kappa:],
rtol=2e-2,
atol=1e-1,
) # theta
assert np.allclose(Theta_scaled[:, 0], Theta_test_scaled[:, 0]) # kappa
assert np.allclose(Theta_scaled[:, 1:], Theta_test_scaled[:, 1:]) # theta

0 comments on commit 4a34ea5

Please sign in to comment.