diff --git a/docs/modules/datasets.rst b/docs/modules/datasets.rst index b795df85b..4f8977fff 100644 --- a/docs/modules/datasets.rst +++ b/docs/modules/datasets.rst @@ -47,10 +47,10 @@ The following functions are used to make synthetic functional datasets: .. autosummary:: :toctree: autosummary - skfda.datasets.euler_maruyama skfda.datasets.make_gaussian skfda.datasets.make_gaussian_process skfda.datasets.make_sinusoidal_process skfda.datasets.make_multimodal_samples skfda.datasets.make_multimodal_landmarks skfda.datasets.make_random_warping + skfda.datasets.make_sde_trajectories \ No newline at end of file diff --git a/examples/plot_langevin_dynamics.py b/examples/plot_langevin_dynamics.py new file mode 100644 index 000000000..9e66407ef --- /dev/null +++ b/examples/plot_langevin_dynamics.py @@ -0,0 +1,287 @@ +""" +SDE simulation: Langevin dynamics +======================================================================= + +This example shows how to use numeric SDE solvers to simulate solutions of +Stochastic Differential Equations (SDEs). +""" + +# Author: Pablo Soto Martín +# License: MIT +# sphinx_gallery_thumbnail_number = 1 + +import matplotlib.pyplot as plt +import numpy as np +from matplotlib import rc +from matplotlib.animation import FuncAnimation +from scipy.stats import multivariate_normal + +from skfda.datasets import make_sde_trajectories + +# %% +# Langevin dynamics is a mathematical model used to describe the behaviour of +# particles in a fluid, particularly in the context of statistical mechanic +# and molecular dynamics. It was initally formulated by French physicist Paul +# Langevin. In Langevin dynamics, the motion of particles is influenced by both +# determinist and stochastic forces. The ideas presented by Paul Langevin can +# be applied in various disciplines to simulate the behaviour of particles in +# complex environments. In our case, we will use them to produce samples from +# a probability distribution: a 2-d Gaussian mixture. +# +# Langevin dynamics enable us to sample from probability distributions from +# which a non-normalised pdf is known. This is possible thanks to the use +# of the score function. Given a probability density function +# :math:`p(\mathbf{x}),` the score function is defined as the gradient of its +# logarithm +# +# .. math:: +# +# \nabla_\mathbf{x} \log p(\mathbf{x}). +# +# For example, if :math:`p(\mathbf{x}) = \frac{q(\mathbf{x})}{Z}`, where +# :math:`q(\mathbf{x}) \geq 0` is known but :math:`Z` is a not known +# normalising constant, then the score of :math:`p` is +# +# .. math:: +# +# \nabla_\mathbf{x} \log p(\mathbf{x}) = \nabla_\mathbf{x} \log q(\mathbf{x}) +# - \nabla_\mathbf{x} \log Z = \nabla_\mathbf{x} \log q(\mathbf{x}), +# +# which is known. +# +# Once we know the score function, we can sample from the +# probability distribution :math:`p(\mathbf{x})` using a dynamic driven by +# SDEs. The idea is to define an SDE whose stationary distribution is +# :math:`p(\mathbf{x})`. If we evolve the SDE +# +# .. math:: +# +# d\mathbf{X}(t) = \nabla_\mathbf{x} \log p(\mathbf{X}(t))dt + +# \sqrt{2}d\mathbf{W}(t) +# +# from an arbitrary, sufficiently smooth initial distribution +# :math:`\mathbf{X}(0) \sim \pi_0(\mathbf{x})`, we get that for +# :math:`t \rightarrow \infty`, the marginal probability distribution of the +# process converges to the distribution :math:`p(\mathbf{x})`. The initial +# distribution :math:`\pi_0(\mathbf{x})` could be any sufficiently smooth +# distribution. +# +# We will use scikit-fda to simulate this process. We will exemplify this +# use case by sampling from a 2-d Gaussian mixture, but the same steps can be +# applied to other distributions. +# +# We will start by defining some notation. The Gaussian mixture is composed +# of :math:`N` Gaussians of mean :math:`\mu_n` and covariance matrix +# :math:`\Sigma_n`. For the sake of simplicity, we will suppose the covariance +# matrices are diagonal. Let :math:`\sigma_n` then be the corresponding vector +# of standard deviations. Each Gaussian will be weighted by :math:`\omega_n`, +# such that :math:`\sum_{n=1}^N \omega_n = 1`. So, if :math:`p_n(x)` is the pdf +# for the n-th Gaussian, then the pdf of the mixture is +# :math:`p(x) = \sum_{n=1}^{N}\omega_n p_n(x)`. +# +# To compute the score, we can use the chain rule: +# +# .. math:: +# +# \nabla_x \log p(x) = \frac{\nabla_x p(x)}{p(x)} = +# \frac{\sum_{n=1}^{N}\omega_n\nabla_x p_n(x)}{\sum_{n=1}^{N}\omega_n p_n(x)} +# = \frac{\sum_{n=1}^{N}\omega_n p_n(x) \frac{x - \mu_n}{\sigma_n}} +# {\sum_{n=1}^{N}\omega_n p_n(x)}. +# +# We start by defining functions that compute the pdf, log_pdf and score of the +# distribution. + +means = np.array([[-1, -1], [3, 2], [0, 2]]) +cov_matrices = np.array( + [ + [[0.4, 0], [0, 0.4]], + [[0.5, 0], [0, 0.5]], + [[0.2, 0], [0, 0.2]], + ], +) + +probabilities = np.array([0.3, 0.6, 0.1]) + + +def pdf_gaussian_mixture( + x: np.ndarray, + weight: np.ndarray, + mean: np.ndarray, + cov: np.ndarray, +) -> np.ndarray: + """Pdf of a 2-d Gaussian distribution of N Gaussians.""" + n_gaussians, dim = np.shape(means) + return np.sum( + [weight[n] * multivariate_normal.pdf(x, mean[n], cov[n]) + for n in range(n_gaussians) + ], + axis=0, + ) + + +def log_pdf_gaussian_mixture( + x: np.ndarray, + weight: np.ndarray, + mean: np.ndarray, + cov: np.ndarray, +) -> np.ndarray: + """Log-pdf of a 2-d Gaussian distribution of N Gaussians.""" + return np.log(pdf_gaussian_mixture(x, weight, mean, cov)) + + +def score_gaussian_mixture( + x: np.ndarray, + weight: np.ndarray, + mean: np.ndarray, + cov: np.ndarray, +) -> np.ndarray: + """Score of a 2-d Gaussian distribution of N Gaussians.""" + n_gaussians, dim = np.shape(means) + score = np.zeros_like(x) + pdf = pdf_gaussian_mixture(x, weight, mean, cov) + + for n in range(n_gaussians): + score_weight = weight[n] * (x - mean[n]) / np.sqrt(np.diag(cov[n])) + score += ( + score_weight + * multivariate_normal.pdf(x, mean[n], cov[n])[:, np.newaxis] + ) + + return -score / pdf[:, np.newaxis] + +# %% +# Once we have defined the pdf and the score of the distribution, we can +# visualize them with a contour plot of the logprobability and the vector +# field given by the score. + + +x_range = np.linspace(-4, 6, 100) +y_range = np.linspace(-4, 6, 100) +x_score_range = np.linspace(-4, 6, 15) +y_score_range = np.linspace(-4, 6, 15) + +X, Y = np.meshgrid(x_range, y_range) +coords = np.c_[X.ravel(), Y.ravel()] + +Z = log_pdf_gaussian_mixture(coords, probabilities, means, cov_matrices) +Z = Z.reshape(X.shape) + +X_score, Y_score = np.meshgrid(x_score_range, y_score_range) +coords_score = np.c_[X_score.ravel(), Y_score.ravel()] + +score = score_gaussian_mixture( + coords_score, + probabilities, + means, + cov_matrices, +) +score = score.reshape(X_score.shape + (2,)) +score_x_coord = score[:, :, 0] +score_y_coord = score[:, :, 1] + +plt.contour(X, Y, Z, levels=25, cmap='autumn') +plt.quiver(X_score, Y_score, score_x_coord, score_y_coord, scale=200) +plt.xticks([]) +plt.yticks([]) +plt.title("Score of a Gaussian mixture", y=1.02) +plt.show() + +# %% +# As we can see in the image, the score function is a vector which points +# in the direction in which :math:`\log p(\mathbf{x})` grows faster. Also, if +# :math:`\mathbf{X}(t)` is in an low probability region, +# :math:`\nabla_\mathbf{x} \log p(\mathbf{X}(t))` will have a big norm, which +# means that points which are far away from the "common" areas will tend +# faster towards more probable ones. In regions with high probability, +# :math:`\nabla_\mathbf{x} \log p(\mathbf{X}(t))` will have a small norm, +# which means that the majority of the samples will remain in that area. +# +# We can now proceed to define the parameters for the SDE simulation. For +# this example we have chosen than the starting data follow a uniform +# distribution, but any other distribution is equally valid. + + +def langevin_drift( + t: float, + x: np.ndarray, +) -> np.ndarray: + """Drift term of the Langevin dynamics.""" + return score_gaussian_mixture(x, probabilities, means, cov_matrices) + + +def langevin_diffusion( + t: float, + x: np.ndarray, +) -> np.ndarray: + """Diffusion term of the Langevin dynamics.""" + return np.sqrt(2) * np.eye(x.shape[-1]) + + +rnd_state = np.random.RandomState(1) + + +def initial_distribution( + size: int, + random_state: np.random.RandomState, +) -> np.ndarray: + """Uniform initial distribution""" + return random_state.uniform(-4, 6, (size, 2)) + + +n_samples = 400 +n_grid_points = 200 +grid_points_per_frame = 10 +frames = 20 +# %% +# We use :func:`skfda.datasets.make_sde_trajectories` method of the datasets +# module to simulate solutions of the SDE. More information on how to use it +# can be found in the example +# :ref:`sphx_glr_auto_examples_plot_sde_simulation.py`. +t_0 = 0 +t_n = 3.0 + +fd = make_sde_trajectories( + initial_condition=initial_distribution, + n_grid_points=n_grid_points, + n_samples=n_samples, + start=t_0, + stop=t_n, + drift=langevin_drift, + diffusion=langevin_diffusion, + random_state=rnd_state, +) + +# %% +# We can visualize how the samples start from a random distribution and +# gradually move to regions of higher mass probability pushed by the score +# drift. The final result is an approximate sample from the target +# distribution. + +points = fd.data_matrix +fig, ax = plt.subplots() + +plt.contour(X, Y, Z, levels=25, cmap='autumn') +plt.quiver(X_score, Y_score, score_x_coord, score_y_coord, scale=200) +rc('animation', html='jshtml') +scatter = None + + +def update(frame: int) -> None: + """Creation of each frame of the animation.""" + global scatter + + if scatter: + scatter.remove() + + ax.set_xlim(-4, 6) + ax.set_ylim(-4, 6) + ax.set_xticks([]) + ax.set_yticks([]) + x = points[:, grid_points_per_frame * frame, 0] + y = points[:, grid_points_per_frame * frame, 1] + scatter = ax.scatter(x, y, s=5, c='dodgerblue') + + +animation = FuncAnimation(fig, update, frames=frames, interval=500) +plt.close() +animation diff --git a/examples/plot_sde_simulation.py b/examples/plot_sde_simulation.py new file mode 100644 index 000000000..205d02f5b --- /dev/null +++ b/examples/plot_sde_simulation.py @@ -0,0 +1,523 @@ +""" +SDE simulation: creating synthetic datasets using SDEs +======================================================================= + +This example shows how to use numeric SDE solvers to simulate solutions of +Stochastic Differential Equations (SDEs). +""" + +# Author: Pablo Soto Martín +# License: MIT +# sphinx_gallery_thumbnail_number = 2 +import matplotlib.pyplot as plt +import numpy as np +from matplotlib import rc +from matplotlib.animation import FuncAnimation +from scipy.stats import multivariate_normal, norm +from sklearn.neighbors import KernelDensity + +from skfda.datasets import make_sde_trajectories + +# %% +# SDEs represent a fundamental mathematical framework for modelling systems +# subject to both deterministic and random influences. They are a +# generalisation of ordinary differential equations, in which a diffusion +# or stochastic term is added. They are a great way of modelling phenomena +# with uncertain factors and are widely used in many scientific areas such +# us financial mathematics, quantum mechanics, engeneering, etc. +# +# Mathematically, we can represent an SDE with the following formula: +# +# .. math:: +# +# d\mathbf{X}(t) = \mathbf{F}(t, \mathbf{X}(t))dt + \mathbf{G}(t, +# \mathbf{X}(t))d\mathbf{W}(t), +# +# where :math:`\mathbf{X}` is the vector-valued random variable we want to +# compute. The term :math:`\mathbf{F}` is called the **drift** of the process, +# and the term :math:`\mathbf{G}` the **diffusion**. The diffusion is a matrix +# which is multiplied by the vector :math:`\mathbf{W}(t)`, which represents a +# `Wiener process `_. +# +# To simulate SDEs practically, there exist various numerical integrators that +# approximate the solution with different degrees of precision. scikit-fda +# implements two of them: Euler-Maruyama and Milstein In this example we will +# use the Euler-Maruyama scheme due to its simplicity. However, the results can +# be reproduced also using Milstein scheme. +# +# +# The example is divided into two parts: +# +# - In the first part a simulation of trajectories is made. +# - In the second part, the marginal probability flow of a stochastic process +# is visualized. + +# %% +# Simulation of trajectories of an Ornstein-Uhlenbeck process +# ------------------------------------------------------------ +# +# Using numeric SDE solvers we can simulate the evolution of a stochastic +# process. One common SDE found in the literature is the Ornstein Uhlenbeck +# process (OU). The OU process is particularly useful for modelling phenomena +# where a system tends to return to a central value or equilibrium point over +# time, exhibiting a form of stochastic stability. In this case, the process +# can be modelled with the equation: +# +# .. math:: +# +# d\mathbf{X}(t) = -\mathbf{A}(\mathbf{X}(t) - \mathbf{\mu})dt + \mathbf{B} +# d\mathbf{W}(t) +# +# +# where :math:`\mathbf{W}` is the Brownian motion. The parameter :math:`\mu` +# represents the equilibrium point of the process, :math:`\mathbf{A}` +# represents the rate at which the process reverts to the mean and +# :math:`\mathbf{B}` represents the volatility of the processs. +# +# To illustrate the example, we will define some concrete values for the +# parameters: + +A = 1 +mu = 3 +B = 0.5 + + +def ou_drift( + t: float, + x: np.ndarray, +) -> np.ndarray: + """Drift of the Ornstein-Uhlenbeck process.""" + return -A * (x - mu) + +# %% +# For the first example, we will consider that all samples start from the same +# initial state :math:`X_0`. We can simulate the trajectories using the +# Euler - Maruyama method. The trajectories of the SDE calculated by +# :func:`skfda.datasets.make_sde_trajectories` are stored in an FDataGrid. +# Then, we can plot them using scikit-fda integrated functional data plots. + + +X_0 = 1.0 + +fd_ou = make_sde_trajectories( + initial_condition=X_0, + n_samples=30, + drift=ou_drift, + diffusion=B, + start=0.0, + stop=5.0, + random_state=np.random.RandomState(1), +) + +grid_points = fd_ou.grid_points[0] +fd_ou.plot(alpha=0.8) +mean_ou = np.mean(fd_ou.data_matrix, axis=0)[:, 0] +std_ou = np.std(fd_ou.data_matrix, axis=0)[:, 0] +plt.fill_between( + grid_points, + mean_ou + 2 * std_ou, + mean_ou - 2 * std_ou, + alpha=0.25, + color='gray', +) +plt.plot( + grid_points, + mean_ou, + linewidth=2, + color='k', + label="empirical mean", +) +plt.plot( + grid_points, + mu * np.ones_like(grid_points), + linewidth=2, + label="stationary mean", + color='b', + linestyle='dashed', +) +plt.xlabel("t") +plt.ylabel("X(t)") +plt.title("Ornstein-Uhlenbeck simulation from an initial value") +plt.legend() +plt.show() + +# %% +# In the previous image we can observe 30 simulated trajectories. We can see +# how the empirical mean approaches the theoretical stationary mean of the +# process. +# +# The initial point :math:`X_0` from which we compute the simulation does not +# need to be always the same value. In the following code, we compute +# trajectories for the Ornstein-Uhlenbeck process for a range of initial +# points. + +X_0 = np.linspace(1, 11, 30) + +fd_ou = make_sde_trajectories( + initial_condition=X_0, + drift=ou_drift, + diffusion=B, + start=0.0, + stop=5.0, + random_state=np.random.RandomState(1), +) + +grid_points = fd_ou.grid_points[0] +fd_ou.plot(alpha=0.8) +mean_ou = np.mean(fd_ou.data_matrix, axis=0)[:, 0] +std_ou = np.std(fd_ou.data_matrix, axis=0)[:, 0] +plt.fill_between( + grid_points, + mean_ou + 2 * std_ou, + mean_ou - 2 * std_ou, + alpha=0.25, + color='gray', +) +plt.plot( + grid_points, + mean_ou, + linewidth=2, + color='k', + label="empirical mean", +) +plt.plot( + grid_points, + mu * np.ones_like(grid_points), + linewidth=2, + label="stationary mean", + color='b', + linestyle='dashed', +) +plt.xlabel("t") +plt.ylabel("X(t)") +plt.title("Ornstein-Uhlenbeck simulation from a range of initial values") +plt.legend() +plt.show() + +# %% +# In the Ornstein-Uhlenbeck process, regardless the initial value, the +# trajectories tend towards the stationary distribution. + +# %% +# Probability flow +# --------------------------------------------------- +# +# In this section we exemplify how to visualize the evolution of the marginal +# probability density, i.e. probability flow, of a stochastic differential +# equation. At a given time t, the solution of an SDE is not a real-valued +# vector; it is a random variable. Numeric integrators allow us to generate +# trajectories which represent samples of these random variables at a given +# time t. Probability flow is a very interesting characteristic to study when +# simulating SDE, as it gives us the possibility to visualiaze the probability +# distribution of the solution of the SDE at a given time. We will present a +# 1-dimensional and a 2-dimensional example. +# +# In the first example, we will analyse the evolution of the Ornstein Uhlenbeck +# process defined above where all the trajectories start on the same value +# :math:`X_0` (the initial distribution is a Dirac delta on :math:`X_0`). The +# solution of the OU process is theoretically known, so we can compare it with +# the empirical data we get from numeric SDE integrators. +# +# The `theoretical marginal probability density `_ of the process +# is given in the following function: + + +def theoretical_pdf( + t: float, + x: np.ndarray, + x_0: float = 0, +) -> np.ndarray: + """Theoretical marginal pdf of an Ornstein-Uhlenbeck process.""" + mean = x_0 * np.exp(-A * t) + mu * (1 - np.exp(-A * t)) + std = B * np.sqrt((1 - np.exp(-2 * A * t)) / (2 * A)) + return norm.pdf(x, mean, std) + +# %% +# In this case we will simulate a big number of trajectories, so that we get +# better precision. + + +X_0 = 0.0 +t_0 = 0.0 +t_n = 3.0 + +fd = make_sde_trajectories( + initial_condition=X_0, + n_samples=5000, + drift=ou_drift, + diffusion=B, + start=t_0, + stop=t_n, + random_state=np.random.RandomState(1), +) + +# %% +# We create an animation that compares the theoretical marginal density with +# a normalized histogram with the empirical data. + +x_min, x_max = min(X_0 - 1, mu - 2 * B), max(X_0 + 1, mu + 2 * B) +x_range = np.linspace(x_min, x_max, 200) +epsilon_t = 1.0e-10 # Avoids evaluating singular pdf at t_0 +times = np.linspace(t_0 + epsilon_t, t_n, 100) +fig, ax = plt.subplots(2, 1, figsize=(7, 10)) +rc('animation', html='jshtml') + +# Creation of the plot. +ax[0].set_xlim(t_0, t_n) +ax[0].set_ylim(x_min, x_max) +lines_0 = ax[0].plot(times[:1], fd.data_matrix[:30, :1, 0].T) +ax[0].set_title( + f"Ornstein Uhlenbeck evolution at time {round(times[0], 2)}", +) +ax[0].set_xlabel("t") +ax[0].set_ylabel("X(t)") + + +def update(frame: int) -> None: + """Creation of each frame of the animation.""" + for i, line in enumerate(lines_0): + line.set_data(times[:frame], fd.data_matrix[i, :frame, 0].T) + ax[0].set_title( + f"Ornstein Uhlenbeck evolution at time {round(times[frame], 2)}", + ) + + ax[1].clear() + ax[1].set_xlim(x_min, x_max) + ax[1].set_ylim(0, 4) + ax[1].set_xlabel("x") + ax[1].hist( + fd.data_matrix[:, frame], + bins=30, + density=True, + label="Empirical pdf", + ) + ax[1].plot( + x_range, + theoretical_pdf(times[frame], x_range), + linewidth=4, + label="Theoretical pdf", + ) + ax[1].legend() + ax[1].set_title("Distribution of X(t) = x | X(0) = 0") + + +anim = FuncAnimation(fig, update, frames=range(100)) +plt.close() +anim + + +# %% +# In the second example, we visualize the probability flow of a 2-dimensional +# Ornstein Uhlenbeck process. This process has the same parameters than the +# previous 1-dimensional one in each coordinate. In this case, instead of +# starting all trajectories from the same value or range of values, the initial +# condition is a random variable. Note that at each time :math:`t`, the +# solution of the SDE :math:`\mathbf{X}(t)` is a random variable. So, in the +# general case the initial condition of an SDE is a random variable. In order +# to simulate trajectories from an initial condition given my a random +# variable, first data will be sampled from the random variable and then used +# to calculate trajectories of the process. For this example, the distribution +# of the initial random variable is a 2d Gaussian mixture with three modes. +# +# In this code we define the parameters of the Gaussian mixture, which will +# have three modes. We also define a function that enables us to sample from +# the distribution. + +means = np.array([[-1, -1], [3, 2], [0, 2]]) +cov_matrices = np.array( + [[[0.4, 0], [0, 0.4]], + [[0.5, 0], [0, 0.5]], + [[0.2, 0], [0, 0.2]], + ], +) +probabilities = np.array([0.3, 0.6, 0.1]) + + +def rvs_gaussian_mixture( + size: int, + random_state: np.random.RandomState, +) -> np.ndarray: + """Generate samples of a gaussian mixture.""" + n_gaussians, dim = np.shape(means) + selected_gaussians = np.random.multinomial(size, probabilities) + samples = [multivariate_normal.rvs( + means[index], + cov_matrices[index], + random_state=random_state, + size=selected_gaussians[index], + ) for index in range(n_gaussians) + ] + + samples = np.concatenate(samples) + np.random.shuffle(samples) + return np.array(samples) + +# %% +# Once we have defined an initial distribution and a way to sample from it, +# we generate trajectories using the Euler-Maruyama function. + + +random_state = np.random.RandomState(1) +A = np.array([1, 1]) +mu = np.array([3, 3]) +B = 0.5 * np.eye(2) + +fd = make_sde_trajectories( + initial_condition=rvs_gaussian_mixture, + n_samples=500, + drift=ou_drift, + diffusion=B, + stop=3, + random_state=random_state, +) + +# %% +# We now create a 3d-plot in which we show the evolution of the marginal +# pdf of the samples. In order to calculate the empirical pdf, we use kernel +# density estimators. + +x_range = np.linspace(-4, 6, 100) +y_range = np.linspace(-4, 6, 100) +X, Y = np.meshgrid(x_range, y_range) + +coords = np.column_stack((X.ravel(), Y.ravel())) + +fig3d = plt.figure(figsize=(7, 8)) +ax = fig3d.add_subplot(2, 1, 1, projection='3d') +ax2 = fig3d.add_subplot(2, 1, 2) +fig3d.suptitle('Kernel Density Estimation') + + +def update3d(frame: int) -> None: + """Creation of each frame of the 3d animation.""" + data = fd.data_matrix[:, frame, :] + + kde = KernelDensity(bandwidth=0.5, kernel='gaussian') + kde.fit(data) + grid_points_3d = np.c_[X.ravel(), Y.ravel()] + Z = np.exp(kde.score_samples(grid_points_3d)) + Z = Z.reshape(X.shape) + + ax.clear() + ax.set_xlim(-4, 6) + ax.set_ylim(-4, 6) + ax.set_zlim(0, 0.35) + ax.plot_surface(X, Y, Z, cmap='cividis', lw=0.2, rstride=5, cstride=5) + ax.set_xlabel('X') + ax.set_ylabel('Y') + + ax2.clear() + ax2.set_xlim(-4, 6) + ax2.set_ylim(-4, 6) + ax2.contourf(X, Y, Z, levels=25, cmap='cividis') + + +ani3d = FuncAnimation(fig3d, update3d, frames=range(100)) +plt.close() +ani3d + +# %% +# Using Milstein's method to compute SDE solutions +# --------------------------------------------------- +# +# Apart from func:`~skfda.datasets.make_sde_trajectories`, scikit-fda also +# implements the Milstein scheme, a numerical SDE integrator of a higher order +# of convergence than the Euler-Maruyama scheme. When computing solution +# trajectories of an SDE, the Milstein method adds a term which depends on the +# spacial derivative of the diffusion term of the SDE (derivative with respect +# to :math:`\mathbf{X}`). In the Ornstein-Uhlenbeck process, as the diffusion +# does not depend on the value of :math:`\mathbf{X}`, then both methods +# Euler Maruyama and Milstein are equivalent. In this section we show how +# to use the former function for SDEs where the diffusion term does depend +# :math:`X`. +# +# We will simulate a Geometric Brownian Motion (GBM). One of its notable +# applications is in modelling stock prices in financial markets, as it forms +# the basis for models like the Black-Scholes option pricing model. The SDE +# of a 1-dimensional GBM is given by the formula +# +# .. math:: +# +# dX(t) = \mu X(t) dt + \sigma X(t) dW(t), +# +# where :math:`\mu` and :math:`\sigma` have constant values. To illustrate the +# example, we will define some concrete values for the parameters: + +mu = 2 +sigma = 1 + + +def gbm_drift(t: float, x: np.ndarray) -> np.ndarray: + """Drift term of a Geometric Brownian Motion.""" + return mu * x + + +def gbm_diffusion(t: float, x: np.ndarray) -> np.ndarray: + """Diffusion term of a Geometric Brownian Motion.""" + return sigma * x + + +def gbm_diffusion_derivative(t: float, x: np.ndarray) -> np.ndarray: + """Spacial derviative of the diffusion term of a GBM.""" + return sigma * np.ones_like(x)[:, :, np.newaxis] + + +# %% +# When defining the derivative of the diffusion function, it is important to +# return an array that has one additional dimension compared to the original +# diffusion function. This is because the derivative of a function provides +# information about the rate of change in multiple directions, which requires +# an extra dimension to capture the changes along each axis. +# +# We will simulate all trajectories from the same initial value +# :math:`X_0 = 1`. + +X0 = 1 +n_simulations = 500 +n_steps = 100 +n_l0_discretization_points = 5 +random_state = np.random.RandomState(1) + +fd = make_sde_trajectories( + initial_condition=X0, + n_samples=n_simulations, + n_grid_points=n_steps, + drift=gbm_drift, + diffusion=gbm_diffusion, + diffusion_derivative=gbm_diffusion_derivative, + method="milstein", + random_state=random_state, +) + +grid_points = fd.grid_points[0] +fd.plot(alpha=0.85) +std_gbm = np.std(fd.data_matrix, axis=0)[:, 0] +mean_gbm = np.mean(fd.data_matrix, axis=0)[:, 0] +plt.fill_between( + grid_points, + mean_gbm + 2 * std_gbm, + mean_gbm - 2 * std_gbm, + alpha=0.25, + color='gray', +) +plt.plot( + grid_points, + mean_gbm, + linewidth=2, + color='k', + label="empirical mean", +) +plt.plot( + grid_points, + np.exp(grid_points * mu), + linewidth=2, + label="theoretical mean", + color='b', + linestyle='dashed', +) +plt.xlabel("t") +plt.ylabel("X(t)") +plt.title("Geometric Brownian motion simulation") +plt.legend() +plt.show() diff --git a/skfda/datasets/__init__.py b/skfda/datasets/__init__.py index 01f53197e..a3194253b 100644 --- a/skfda/datasets/__init__.py +++ b/skfda/datasets/__init__.py @@ -23,13 +23,13 @@ "fetch_bone_density", ], "_samples_generators": [ - "euler_maruyama", "make_gaussian", "make_gaussian_process", "make_multimodal_landmarks", "make_multimodal_samples", "make_random_warping", "make_sinusoidal_process", + "make_sde_trajectories", ], }, ) @@ -52,11 +52,11 @@ fetch_weather as fetch_weather, ) from ._samples_generators import ( - euler_maruyama as euler_maruyama, make_gaussian as make_gaussian, make_gaussian_process as make_gaussian_process, make_multimodal_landmarks as make_multimodal_landmarks, make_multimodal_samples as make_multimodal_samples, make_random_warping as make_random_warping, make_sinusoidal_process as make_sinusoidal_process, + make_sde_trajectories as make_sde_trajectories, ) diff --git a/skfda/datasets/_samples_generators.py b/skfda/datasets/_samples_generators.py index 0ae0e6490..e7838a90f 100644 --- a/skfda/datasets/_samples_generators.py +++ b/skfda/datasets/_samples_generators.py @@ -1,7 +1,7 @@ from __future__ import annotations import itertools -from typing import Any, Callable, Sequence, Union +from typing import Any, Callable, Literal, Sequence, Union import numpy as np import scipy.integrate @@ -21,6 +21,7 @@ MeanLike = Union[float, NDArrayFloat, MeanCallable] SDETerm = Callable[[float, NDArrayFloat], NDArrayFloat] +EMTermCalculation = Callable[[float, NDArrayFloat, NDArrayFloat], NDArrayFloat] class InitialValueGenerator(Protocol): @@ -38,15 +39,147 @@ def __call__( """Interface of initial value generator.""" -def euler_maruyama( # noqa: WPS210 +def _sde_initial_condition_preprocessing( + initial_condition: ArrayLike | InitialValueGenerator, + random_state: np.random.RandomState, + n_samples: int | None = None, +) -> tuple[NDArrayFloat, int, int]: + + if n_samples is None: + if callable(initial_condition): + raise ValueError( + "Invalid initial conditions. If a function is given, the " + "n_samples argument must be included.", + ) + + initial_values = np.atleast_1d(initial_condition) + n_samples = len(initial_values) + else: + if callable(initial_condition): + initial_values = initial_condition( + size=n_samples, + random_state=random_state, + ) + else: + initial_condition = np.atleast_1d(initial_condition) + dim_codomain = len(initial_condition) + initial_values = ( + initial_condition + * np.ones((n_samples, dim_codomain)) + ) + + if initial_values.ndim == 1: + initial_values = initial_values[:, np.newaxis] + elif initial_values.ndim > 2: + raise ValueError( + "Invalid initial conditions. Each of the starting points " + "must be a flat array.", + ) + (n_samples, dim_codomain) = initial_values.shape + + return ( + initial_values, + n_samples, + dim_codomain, + ) + + +def _sde_drift_diffusion_preprocessing( + drift: SDETerm | ArrayLike | None, + diffusion: SDETerm | ArrayLike | None, + dim_codomain: int, + initial_values: NDArrayFloat, + start: float, +) -> tuple[int, SDETerm, SDETerm, EMTermCalculation, bool]: + + if drift is None: + drift = 0 + + if callable(drift): + formatted_drift = drift + else: + def constant_drift( # noqa: WPS430 -- We need internal functions + t: float, + x: NDArrayFloat, + ) -> NDArrayFloat: + return np.atleast_1d(drift) + + formatted_drift = constant_drift + + if diffusion is None: + diffusion = 1.0 + + if callable(diffusion): + formatted_diffusion = diffusion + else: + def constant_diffusion( # noqa: WPS430 -- We need internal functions + t: float, + x: NDArrayFloat, + ) -> NDArrayFloat: + return np.atleast_1d(diffusion) + + formatted_diffusion = constant_diffusion + + def vector_diffusion_times_noise( # noqa: WPS430 + t_n: float, + x_n: NDArrayFloat, + noise: NDArrayFloat, + ) -> NDArrayFloat: + return formatted_diffusion(t_n, x_n) * noise + + def matrix_diffusion_times_noise( # noqa: WPS430 + t_n: float, + x_n: NDArrayFloat, + noise: NDArrayFloat, + ) -> Any: + return np.einsum( + '...dj, ...j -> ...d', + formatted_diffusion(t_n, x_n), + noise, + ) + + first_value = initial_values[0] + diffusion_test_values = np.tile(first_value, (dim_codomain + 1, 1)) + diffusion_shape = np.atleast_1d( + formatted_diffusion(start, diffusion_test_values), + ).shape + + if len(diffusion_shape) == 3: + diffusion_matricial_term = True + elif len(diffusion_shape) == 2: + diffusion_matricial_term = diffusion_shape[0] != dim_codomain + 1 + else: + diffusion_matricial_term = False + + dim_noise = dim_codomain + + if diffusion_matricial_term: + diffusion_times_noise = matrix_diffusion_times_noise + dim_noise = diffusion_shape[-1] + else: + diffusion_times_noise = vector_diffusion_times_noise + + return ( + dim_noise, + formatted_drift, + formatted_diffusion, + diffusion_times_noise, + diffusion_matricial_term, + ) + + +def make_sde_trajectories( # noqa: WPS211 + *, initial_condition: ArrayLike | InitialValueGenerator, - n_grid_points: int = 100, drift: SDETerm | ArrayLike | None = None, diffusion: SDETerm | ArrayLike | None = None, + diffusion_derivative: SDETerm | None = None, + n_L0_discretization_points: int | None = None, + method: Literal["euler-maruyama", "milstein"] = "euler-maruyama", + n_grid_points: int = 100, n_samples: int | None = None, - start: float = 0.0, # noqa: WPS358 -- Distinguish float from integer + start: float = 0, stop: float = 1.0, - diffusion_matricial_term: bool = True, random_state: RandomStateLike = None, ) -> FDataGrid: r"""Numerical integration of an Itô SDE using the Euler-Maruyana scheme. @@ -70,6 +203,8 @@ def euler_maruyama( # noqa: WPS210 :math:`q` refers to the dimension of the variable :math:`\mathbf{X}` (dimension of the codomain) and :math:`m` to the dimension of the noise. + Two methods are implemented: Euler_Maruyama's method and Milstein's method. + Euler-Maruyama's method computes the approximated solution using the Markov chain @@ -82,36 +217,81 @@ def euler_maruyama( # noqa: WPS210 and the :math:`\mathbf{Z}_m` are independent, identically distributed :math:`m`-dimensional standard normal random variables. + Milstein's method computes the approximated solution using the + Markov chain + + .. math:: + + X^{(i)}_{n + 1} = X_n^{(i)} + F^{(i)}(X_i, t_i)\Delta t_i + + \sum_{j=1}^m G^{i,j}(t_n, X_n)\sqrt{\Delta t_n}Z_n^j + + \sum_{j_1, j_2 = 1}^m L^{j_1} G^{i, j_2} (t_n, X_n) + I_{(j_1, j_2)} [t_n, t_{n+1}], + + where :math:`X_n^{(i)}` is the approximated value of :math:`X^{(i)}(t_n)` + and the :math:`\mathbf{Z}_m` are independent, identically distributed + :math:`m`-dimensional standard normal random variables. :math:`L^j` stands + for the operator + + .. math:: + + L^j = \sum_{k=1}^q G^{k, j}(t_n, X_n)\frac{\partial}{\partial x^k} + + + and :math:`I_{(j_1, j_2)} [t_n, t_{n+1}]` is the double stochastic integral + + .. math:: + + I_{(j_1, j_2)} [t_n, t_{n+1}] = \int_{t_n}^{t_{n+1}} \int_{t}^{t_{n+1}} + dW_s^{j_1} dW_t^{j_2}. + + In order to compute :math:`I_{(j_1, j_2)} [t_n, t_{n+1}]`, we use + Milstein L=0 method, described by Banerjee + :footcite:p:`banerjee++_2020_numerical`. This method approximates the + value of the double Itô integral by simulating solutions of another SDE. + The number of discretization points used to simulate the SDE which + approximates the double Itô integral is given by the parameter + ``n_L0_discretization_points``. + + For unidimensional processes the value of the double Itô integral is closed + so it is not necessary to include the ``n_L0_discretization_points`` + parameter. However, if the process is multidimensional it is mandatory + include it. + Args: initial_condition: Initial condition of the SDE. It can have one of three formats: An starting initial value from which to - calculate *n_samples* trajectories. An array of initial values. + calculate ``n_samples`` trajectories. An array of initial values. For each starting point a trajectory will be calculated. A function that generates random numbers or vectors. It should - have two parameters called size and random_state and it should - return an array. + have two parameters called ``size`` and ``random_state`` and it + should return an array. + drift: Drift term (:math:`F(t,\mathbf{X})` in the equation). + diffusion: Diffusion term (:math:`G(t,\mathbf{X})` in the + equation). The diffusion term should depend on the variable + :math:`\mathbf{X}`. + diffusion_derivative: Derivate of the diffusion term + (:math:`G_\mathbf{X}(t, \mathbf{X})`). Only needed for Milstein's + method. The return of this function should have one extra dimension + than the diffusion term, to account for the derivatives for each of + the coordinates. + n_L0_discretization_points: number of discretization points to + approximate the double Itô integral :math:`I_{(j_1,j_2)}`. Only + needed in Milstein's method. Only use when the process is + multidimensional. + method: Integration SDE method used. It can be either euler-maruyama or + milstein. n_grid_points: The total number of points of evaluation. - drift: Drift coefficient (:math:`F(t,\mathbf{X})` in the equation). - diffusion: Diffusion coefficient (:math:`G(t,\mathbf{X})` in the - equation). n_samples: Number of trajectories integrated. start: Starting time of the trajectories. stop: Ending time of the trajectories. - diffusion_matricial_term: True if the diffusion coefficient is a - matrix. random_state: Random state. - Returns: :class:`FDataGrid` object comprising all the trajectories. - See also: - :func:`make_gaussian_process`: Simpler function for generating - Gaussian processes. - Examples: - Example of the use of euler_maruyama for an Ornstein-Uhlenbeck process - that has the equation: + Example of the use of Euler-Maruyama's method for an Ornstein-Uhlenbeck + process that has the equation: .. math: @@ -125,114 +305,362 @@ def euler_maruyama( # noqa: WPS210 ... return -A * (x - mu) >>> initial_condition = norm().rvs >>> - >>> trajectories = euler_maruyama( + >>> trajectories = make_sde_trajectories( ... initial_condition=initial_condition, ... n_samples=10, ... drift=ou_drift, ... diffusion=B, ... ) + Example of the use of Milstein's method for a 1-d Geometric Brownian + motion that has the equation: + + .. math: + + dX(t) = \mu X(t) dt + \sigma X(t) dW(t) + + >>> sigma = 1 + >>> mu = 2 + >>> def gbm_drift(t: float, x: np.ndarray) -> np.ndarray: + ... return mu * x + >>> def gbm_diffusion(t: float, x: np.ndarray) -> np.ndarray: + ... return sigma * x + >>> def gbm_diff_derivative(t: float, x: np.ndarray) -> np.ndarray: + ... return sigma * np.ones_like(x)[: :, np.newaxis] + >>> X_0 = 1 + >>> n_L0_discretization_points = 5, + >>> + >>> trajectories = make_sde_trajectories( + ... initial_condition=X_0, + ... drift=gbm_drift, + ... diffusion=gbm_diffusion, + ... diffusion_derivative=gbm_diff_derivative, + ... n_L0_discretization_points=n_L0_discretization_points, + ... method="milstein", + ... n_samples=10, + ... ) + + References: + .. footbibliography:: """ random_state = validate_random_state(random_state) - if n_samples is None: - if callable(initial_condition): + ( + initial_values, + n_samples, + dim_codomain, + ) = _sde_initial_condition_preprocessing( + initial_condition, + random_state, + n_samples, + ) + + ( # noqa: WPS236 + dim_noise, + formatted_drift, + formatted_diffusion, + diffusion_times_noise, + diffusion_matricial_term, + ) = _sde_drift_diffusion_preprocessing( + drift, + diffusion, + dim_codomain, + initial_values, + start, + ) + + times = np.linspace(start, stop, n_grid_points) + + if method == "euler-maruyama": + return _euler_maruyama( + initial_values, + n_samples, + n_grid_points, + dim_codomain, + dim_noise, + formatted_drift, + diffusion_times_noise, + times, + random_state, + ) + elif method == "milstein": + if diffusion_derivative is None: raise ValueError( - "Invalid initial conditions. If a function is given, the " - "n_samples argument must be included.", + "The diffusion derivative must be included for the Milstein" + "method.", ) - initial_values = np.atleast_1d(initial_condition) - n_samples = len(initial_values) - else: - if callable(initial_condition): - initial_values = initial_condition( - size=n_samples, - random_state=random_state, - ) - else: - initial_condition = np.atleast_1d(initial_condition) - dim_codomain = len(initial_condition) - initial_values = ( - initial_condition - * np.ones((n_samples, dim_codomain)) - ) + return _milstein( + initial_values, + n_samples, + n_grid_points, + dim_codomain, + dim_noise, + formatted_drift, + formatted_diffusion, + diffusion_derivative, + n_L0_discretization_points, + diffusion_times_noise, + diffusion_matricial_term, + times, + random_state, + ) - if initial_values.ndim == 1: - initial_values = initial_values[:, np.newaxis] - elif initial_values.ndim > 2: - raise ValueError( - "Invalid initial conditions. Each of the starting points " - "must be a flat array.", + raise ValueError("Method for computing SDEs not implemented") + + +def _euler_maruyama( + initial_values: NDArrayFloat, + n_samples: int, + n_grid_points: int, + dim_codomain: int, + dim_noise: int, + drift_function: SDETerm, + diffusion_times_noise: EMTermCalculation, + times: NDArrayFloat, + random_state: np.random.RandomState, +) -> FDataGrid: + r"""Numerical integration of an Itô SDE using the Euler-Maruyana scheme. + + An SDE can be expressed with the following formula + + .. math:: + + d\mathbf{X}(t) = \mathbf{F}(t, \mathbf{X}(t))dt + \mathbf{G}(t, + \mathbf{X}(t))d\mathbf{W}(t). + + In this equation, :math:`\mathbf{X} = (X^{(1)}, X^{(2)}, ... , X^{(n)}) + \in \mathbb{R}^q` is a vector that represents the state of the stochastic + process. The function :math:`\mathbf{F}(t, \mathbf{X}) = (F^{(1)}(t, + \mathbf{X}), ..., F^{(q)}(t, \mathbf{X}))` is called drift and refers + to the deterministic component of the equation. The function + :math:`\mathbf{G} (t, \mathbf{X}) = (G^{i, j}(t, \mathbf{X}))_{i=1, j=1} + ^{q, m}` is denoted as the diffusion term and refers to the stochastic + component of the evolution. :math:`\mathbf{W}(t)` refers to a Wiener + process (Standard Brownian motion) of dimension :math:`m`. Finally, + :math:`q` refers to the dimension of the variable :math:`\mathbf{X}` + (dimension of the codomain) and :math:`m` to the dimension of the noise. + + Euler-Maruyama's method computes the approximated solution using the + Markov chain + + .. math:: + + X_{n + 1}^{(i)} = X_n^{(i)} + F^{(i)}(t_n, \mathbf{X}_n)\Delta t_n + + \sum_{j=1}^m G^{i,j}(t_n, \mathbf{X}_n)\sqrt{\Delta t_n}\Delta Z_n^j, + + where :math:`X_n^{(i)}` is the approximated value of :math:`X^{(i)}(t_n)` + and the :math:`\mathbf{Z}_m` are independent, identically distributed + :math:`m`-dimensional standard normal random variables. + + Args: + initial_values: Array of initial values. For each starting point + a trajectory will be calculated. + n_samples: Number of trajectories generated. + n_grid_points: The total number of points of evaluation. + drift_function: Drift coefficient. + dim_codomain: dimension of the image of the stochatic process. + dim_noise: Added noise dimension. + diffusion_times_noise: Diffusion coefficient times the added noise. + times: distretization array for the numerical method. + random_state: Random state. + + Returns: + :class:`FDataGrid` object comprising all the trajectories. + + """ + data_matrix = np.zeros((n_samples, n_grid_points, dim_codomain)) + delta_t = times[1:] - times[:-1] + noise = random_state.standard_normal( + size=(n_samples, n_grid_points - 1, dim_noise), + ) + data_matrix[:, 0] = initial_values + + for n in range(n_grid_points - 1): + t_n = times[n] + x_n = data_matrix[:, n] + + data_matrix[:, n + 1] = ( + x_n + + delta_t[n] * drift_function(t_n, x_n) + + diffusion_times_noise(t_n, x_n, noise[:, n]) + * np.sqrt(delta_t[n]) ) - (n_samples, dim_codomain) = initial_values.shape - if dim_codomain == 1: - diffusion_matricial_term = False + return FDataGrid( + grid_points=times, + data_matrix=data_matrix, + ) - if drift is None: - drift = 0.0 # noqa: WPS358 -- Distinguish float from integer - if callable(drift): - drift_function = drift - else: - def constant_drift( # noqa: WPS430 -- We need internal functions - t: float, - x: NDArrayFloat, - ) -> NDArrayFloat: - return np.atleast_1d(drift) +def _milstein( # noqa: WPS211 + initial_values: NDArrayFloat, + n_samples: int, + n_grid_points: int, + dim_codomain: int, + dim_noise: int, + formatted_drift: SDETerm, + formatted_diffusion: SDETerm, + diffusion_derivative: SDETerm, + n_L0_discretization_points: int | None, + diffusion_times_noise: EMTermCalculation, + diffusion_matricial_term: bool, + times: NDArrayFloat, + random_state: np.random.RandomState, +) -> FDataGrid: + r"""Numerical integration of an Itô SDE using the Euler-Maruyana scheme. - drift_function = constant_drift + An SDE can be expressed with the following formula - if diffusion is None: - if diffusion_matricial_term: - diffusion = np.eye(dim_codomain) - else: - diffusion = 1.0 + .. math:: + + d\mathbf{X}(t) = \mathbf{F}(t, \mathbf{X}(t))dt + \mathbf{G}(t, + \mathbf{X}(t))d\mathbf{W}(t). + + In this equation, :math:`\mathbf{X} = (X^{(1)}, X^{(2)}, ... , X^{(n)}) + \in \mathbb{R}^q` is a vector that represents the state of the stochastic + process. The function :math:`\mathbf{F}(t, \mathbf{X}) = (F^{(1)}(t, + \mathbf{X}), ..., F^{(q)}(t, \mathbf{X}))` is called drift and refers + to the deterministic component of the equation. The function + :math:`\mathbf{G} (t, \mathbf{X}) = (G^{i, j}(t, \mathbf{X}))_{i=1, j=1} + ^{q, m}` is denoted as the diffusion term and refers to the stochastic + component of the evolution. :math:`\mathbf{W}(t)` refers to a Wiener + process (Standard Brownian motion) of dimension :math:`m`. Finally, + :math:`q` refers to the dimension of the variable :math:`\mathbf{X}` + (dimension of the codomain) and :math:`m` to the dimension of the noise. + + Milstein's method computes the approximated solution using the + Markov chain + + .. math:: + + X^{(i)}_{n + 1} = X_n^{(i)} + F^{(i)}(X_i, t_i)\Delta t_i + + \sum_{j=1}^m G^{i,j}(t_n, X_n)\sqrt{\Delta t_n}Z_n^j + + \sum_{j_1, j_2 = 1}^m L^{j_1} G^{i, j_2} (t_n, X_n) + I_{(j_1, j_2)} [t_n, t_{n+1}], + + where :math:`X_n^{(i)}` is the approximated value of :math:`X^{(i)}(t_n)` + and the :math:`\mathbf{Z}_m` are independent, identically distributed + :math:`m`-dimensional standard normal random variables. :math:`L^j` stands + for the operator + + .. math:: + + L^j = \sum_{k=1}^q G^{k, j}(t_n, X_n)\frac{\partial}{\partial x^k} - if callable(diffusion): - diffusion_function = diffusion - else: - def constant_diffusion( # noqa: WPS430 -- We need internal functions - t: float, - x: NDArrayFloat, - ) -> NDArrayFloat: - return np.atleast_1d(diffusion) - diffusion_function = constant_diffusion + and :math:`I_{(j_1, j_2)} [t_n, t_{n+1}]` is the double stochastic integral - def vector_diffusion_times_noise( # noqa: WPS430 We need internal functons + .. math:: + + I_{(j_1, j_2)} [t_n, t_{n+1}] = \int_{t_n}^{t_{n+1}} \int_{t}^{t_{n+1}} + dW_s^{j_1} dW_t^{j_2}. + + In order to compute :math:`I_{(j_1, j_2)} [t_n, t_{n+1}]`, we use + Milstein L=0 method, described by Banerjee + :footcite:p:`banerjee++_2020_numerical`. This method approximates the + value of the double Itô integral by simulating solutions of another SDE. + The number of discretization points used to simulate the SDE which + approximates the double Itô integral is given by the parameter + ``n_L0_discretization_points``. + + For unidimensional processes the value of the double Itô integral is closed + so it is not necessary to include the ``n_L0_discretization_points`` + parameter. However, if the process is multidimensional it is mandatory + include it. + + Args: + initial_values: Array of initial values. For each starting point + a trajectory will be calculated. + n_samples: Number of trajectories generated. + n_grid_points: The total number of points of evaluation. + dim_codomain: dimension of the image of the stochatic process. + dim_noise: Added noise dimension. + formatted_drift: Drift coefficient. + formatted_diffusion: Diffusion coefficient. + diffusion_derivative: Derivative of formatted_diffusion. + n_L0_discretization_points: Discretization points for the L0 method. + diffusion_times_noise: Diffusion coefficient times the added noise. + diffusion_matricial_term: True if diffusion is a matrix (changes + the way of multypling). + times: distretization array for the numerical method. + random_state: Random state. + + Returns: + :class:`FDataGrid` object comprising all the trajectories. + + References: + .. footbibliography:: + """ + + def matrix_milstein_term( # noqa: WPS430 t_n: float, x_n: NDArrayFloat, - noise: NDArrayFloat, + n: int, ) -> NDArrayFloat: - return diffusion_function(t_n, x_n) * noise + L_j = np.einsum( + '...kj, ...ilk -> ...ijl', + formatted_diffusion(t_n, x_n), + diffusion_derivative(t_n, x_n), + ) - def matrix_diffusion_times_noise( # noqa: WPS430 We need internal functons + return np.einsum( + '...ijl, ...jl -> ...i', + L_j, + double_ito_integral[:, n, :, :], + ) + + def vector_milstein_term( # noqa: WPS430 t_n: float, x_n: NDArrayFloat, - noise: NDArrayFloat, - ) -> Any: + n: int, + ) -> NDArrayFloat: return np.einsum( - '...dj, ...j -> ...d', - diffusion_function(t_n, x_n), - noise, + '...j, ...ij, ...ji -> ...i', + formatted_diffusion(t_n, x_n), + diffusion_derivative(t_n, x_n), + double_ito_integral[:, n, :, :], ) - dim_noise = dim_codomain - if diffusion_matricial_term: - diffusion_times_noise = matrix_diffusion_times_noise - dim_noise = diffusion_function(start, initial_values).shape[-1] + milstein_term = matrix_milstein_term else: - diffusion_times_noise = vector_diffusion_times_noise + milstein_term = vector_milstein_term + + if initial_values.shape[1] == 1: + n_L0_discretization_points = 1 + elif n_L0_discretization_points is None: + raise ValueError( + "Invalid n_L0_discretization_points. When the process is " + "multidimensional it must have an integer value.", + ) data_matrix = np.zeros((n_samples, n_grid_points, dim_codomain)) - times = np.linspace(start, stop, n_grid_points) delta_t = times[1:] - times[:-1] noise = random_state.standard_normal( - size=(n_samples, n_grid_points - 1, dim_noise), + size=( + n_samples, + n_grid_points - 1, + dim_noise, + n_L0_discretization_points, + ), + ) * np.sqrt(delta_t[0] / n_L0_discretization_points) + wiener_process_differences = np.cumsum(noise, axis=3) + + # Computation of double ito integral using Milstein L=0 method + # The method simulates SDEs for each step to estimate this + # quantity. These simulations are equivalent to computing the + # dot product of two vectors. + double_ito_integral = np.einsum( + '...ik, ...jk -> ...ij', + wiener_process_differences - noise / 2, + noise, ) + double_ito_integral = ( + double_ito_integral + - delta_t[0] * np.eye(dim_noise) / 2 + ) + data_matrix[:, 0] = initial_values for n in range(n_grid_points - 1): @@ -241,9 +669,13 @@ def matrix_diffusion_times_noise( # noqa: WPS430 We need internal functons data_matrix[:, n + 1] = ( x_n - + delta_t[n] * drift_function(t_n, x_n) - + diffusion_times_noise(t_n, x_n, noise[:, n]) - * np.sqrt(delta_t[n]) + + delta_t[n] * formatted_drift(t_n, x_n) + + diffusion_times_noise( + t_n, + x_n, + wiener_process_differences[:, n, :, -1], + ) + + milstein_term(t_n, x_n, n) ) return FDataGrid( diff --git a/skfda/tests/test_euler_maruyama.py b/skfda/tests/test_make_sde_trajectories.py similarity index 59% rename from skfda/tests/test_euler_maruyama.py rename to skfda/tests/test_make_sde_trajectories.py index e6684add2..be867eacb 100644 --- a/skfda/tests/test_euler_maruyama.py +++ b/skfda/tests/test_make_sde_trajectories.py @@ -3,7 +3,7 @@ import numpy as np from scipy.stats import multivariate_normal, norm -from skfda.datasets import euler_maruyama +from skfda.datasets import make_sde_trajectories from skfda.typing._numpy import NDArrayFloat @@ -33,14 +33,14 @@ def test_one_initial_point() -> None: ], ]) - fd = euler_maruyama( - initial_float, + fd = make_sde_trajectories( + initial_condition=initial_float, n_samples=n_samples, n_grid_points=n_grid_points, random_state=random_state, ) - np.testing.assert_almost_equal( + np.testing.assert_almost_equal( # noqa: WPS204 -- clarity fd.data_matrix, expected_result, ) @@ -72,8 +72,8 @@ def test_one_initial_point_monodimensional() -> None: ], ]) - fd = euler_maruyama( - initial_float_in_list, + fd = make_sde_trajectories( + initial_condition=initial_float_in_list, n_samples=n_samples, n_grid_points=n_grid_points, random_state=random_state, @@ -110,8 +110,8 @@ def test_one_initial_point_multidimensional() -> None: ], ]) - fd = euler_maruyama( - initial_array, + fd = make_sde_trajectories( + initial_condition=initial_array, n_samples=n_samples, n_grid_points=n_grid_points, random_state=random_state, @@ -164,8 +164,8 @@ def standard_normal_generator( # noqa: WPS430 ], ]) - fd = euler_maruyama( - standard_normal_generator, + fd = make_sde_trajectories( + initial_condition=standard_normal_generator, n_samples=n_samples, n_grid_points=n_grid_points, random_state=random_state, @@ -202,8 +202,8 @@ def standard_normal_generator_2d( # noqa: WPS430 ], ]) - fd = euler_maruyama( - standard_normal_generator_2d, + fd = make_sde_trajectories( + initial_condition=standard_normal_generator_2d, n_samples=n_samples, n_grid_points=n_grid_points, random_state=random_state, @@ -248,8 +248,8 @@ def test_initial_distribution() -> None: ], ]) - fd = euler_maruyama( - normal_distribution, + fd = make_sde_trajectories( + initial_condition=normal_distribution, n_samples=n_samples, n_grid_points=n_grid_points, random_state=random_state, @@ -286,8 +286,8 @@ def test_initial_distribution() -> None: ], ]) - fd = euler_maruyama( - multivariate_normal_distribution, + fd = make_sde_trajectories( + initial_condition=multivariate_normal_distribution, n_samples=n_samples, n_grid_points=n_grid_points, random_state=random_state, @@ -322,8 +322,8 @@ def test_vector_initial_points() -> None: ], ]) - fd = euler_maruyama( - initial_float, + fd = make_sde_trajectories( + initial_condition=initial_float, n_grid_points=n_grid_points, random_state=random_state, ) @@ -352,8 +352,8 @@ def test_vector_initial_points() -> None: ], ]) - fd = euler_maruyama( - initial_array, + fd = make_sde_trajectories( + initial_condition=initial_array, n_grid_points=n_grid_points, random_state=random_state, ) @@ -382,8 +382,8 @@ def test_vector_initial_points() -> None: ], ]) - fd = euler_maruyama( - initial_array, + fd = make_sde_trajectories( + initial_condition=initial_array, n_grid_points=n_grid_points, random_state=random_state, ) @@ -422,8 +422,8 @@ def base_drift( # noqa: WPS430 ], ]) - fd = euler_maruyama( - initial_condition, + fd = make_sde_trajectories( + initial_condition=initial_condition, n_grid_points=n_grid_points, n_samples=n_samples, drift=base_drift, @@ -461,8 +461,8 @@ def vector_drift( # noqa: WPS430 ], ]) - fd = euler_maruyama( - initial_condition, + fd = make_sde_trajectories( + initial_condition=initial_condition, n_grid_points=n_grid_points, n_samples=n_samples, drift=vector_drift, @@ -502,13 +502,12 @@ def test_diffusion_cases() -> None: ]) expected_result[:, :, 1] *= 2 - fd = euler_maruyama( - initial_condition, + fd = make_sde_trajectories( + initial_condition=initial_condition, n_grid_points=n_grid_points, n_samples=n_samples, drift=0, diffusion=vector_diffusion, - diffusion_matricial_term=False, random_state=random_state, ) @@ -548,8 +547,8 @@ def test_diffusion_cases() -> None: axis=2, ) - fd = euler_maruyama( - initial_condition, + fd = make_sde_trajectories( + initial_condition=initial_condition, n_grid_points=n_grid_points, n_samples=n_samples, drift=0, @@ -574,8 +573,8 @@ def test_grid_points() -> None: np.linspace(start, stop, n_grid_points), ) - fd = euler_maruyama( - initial_condition, + fd = make_sde_trajectories( + initial_condition=initial_condition, n_grid_points=n_grid_points, n_samples=10, start=start, @@ -598,8 +597,8 @@ def test_initial_condition_negative_cases() -> None: n_samples = 3 with np.testing.assert_raises(ValueError): - euler_maruyama( - initial_condition, + make_sde_trajectories( + initial_condition=initial_condition, n_samples=n_samples, random_state=random_state, ) @@ -608,8 +607,8 @@ def test_initial_condition_negative_cases() -> None: initial_condition = np.zeros((1, 1, 1)) with np.testing.assert_raises(ValueError): - euler_maruyama( - initial_condition, + make_sde_trajectories( + initial_condition=initial_condition, random_state=random_state, ) @@ -617,16 +616,16 @@ def test_initial_condition_negative_cases() -> None: initial_generator = norm().rvs with np.testing.assert_raises(ValueError): - euler_maruyama( - initial_generator, + make_sde_trajectories( + initial_condition=initial_generator, random_state=random_state, ) # Case 4: n_samples not greater than 0 with np.testing.assert_raises(ValueError): - euler_maruyama( - 0, + make_sde_trajectories( + initial_condition=0, n_samples=-1, random_state=random_state, ) @@ -637,14 +636,13 @@ def test_diffusion_negative_cases() -> None: initial_condition = np.array([0, 0]) n_samples = 2 random_state = np.random.RandomState(1) - vector_diffusion = np.array([1, 2]) initial_condition = np.array([0, 0, 0]) with np.testing.assert_raises(ValueError): - euler_maruyama( - initial_condition, + make_sde_trajectories( + initial_condition=initial_condition, n_samples=n_samples, diffusion=vector_diffusion, random_state=random_state, @@ -674,8 +672,327 @@ def test_random_generator() -> None: ], ]) - fd = euler_maruyama( - normal_distribution, + fd = make_sde_trajectories( + initial_condition=normal_distribution, + n_samples=n_samples, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result, + ) + + +# Tests for Milstein's method + + +def gbm_drift( + t: float, + x: NDArrayFloat, + mu: float = 1, +) -> NDArrayFloat: + """Drift term of a Geometric Brownian motion.""" + return mu * x + + +def gbm_diffusion( + t: float, + x: NDArrayFloat, + sigma: float = 1, +) -> NDArrayFloat: + """Diffusion term of a Geometric Brownian motion.""" + return sigma * x + + +def gbm_diffusion_derivative( + t: float, + x: NDArrayFloat, + sigma: float = 1, +) -> NDArrayFloat: + """Spacial derivative of the diffusion term of a GBM.""" + dim_codomain = x.shape[1] + gbm_diffusion_derivative = np.zeros( + x.shape + (dim_codomain,), + ) + for i in np.arange(dim_codomain): + gbm_diffusion_derivative[:, i, i] = sigma + + return gbm_diffusion_derivative + + +def test_milstein_one_initial_point() -> None: + """Case 1 -> One initial point + n_samples > 0. + + 1.1: initial_condition = 1, n_samples = 2 + mu = 1, sigma = 1 + + """ + n_samples = 2 + n_grid_points = 5 + random_state = np.random.RandomState(1) + initial_float = 1 + n_L0_discretization_points = 1 + + expected_result = np.array([[ + [1], + [2.26698491], + [1.96298798], + [1.75841479], + [1.28790413], + ], + + [[1], + [1.65132011], + [1.05084347], + [2.49885523], + [2.04113003], + ], + ]) + + fd = make_sde_trajectories( + initial_condition=initial_float, + drift=gbm_drift, + diffusion=gbm_diffusion, + diffusion_derivative=gbm_diffusion_derivative, + n_L0_discretization_points=n_L0_discretization_points, + method="milstein", + n_samples=n_samples, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result, + ) + + +def test_milstein_one_point_monodimensional() -> None: + """Case 1.2 Starting point = float in an array format + n_samples. + + initial_condition = [0], n_samples = 2. + The result should be the same as 1.1 + """ + n_samples = 2 + n_grid_points = 5 + random_state = np.random.RandomState(1) + initial_float_in_list = [1] + n_L0_discretization_points = 1 + + expected_result = np.array([[ + [1], + [2.26698491], + [1.96298798], + [1.75841479], + [1.28790413], + ], + + [[1], + [1.65132011], + [1.05084347], + [2.49885523], + [2.04113003], + ], + ]) + + fd = make_sde_trajectories( + initial_condition=initial_float_in_list, + drift=gbm_drift, + diffusion=gbm_diffusion, + diffusion_derivative=gbm_diffusion_derivative, + n_L0_discretization_points=n_L0_discretization_points, + method="milstein", + n_samples=n_samples, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result, + ) + + +def test_milstein_one_point_multidimensional() -> None: + """Case 1.3 Starting point = array + n_samples. + + initial_condition = np.array([1, 2]), n_samples = 2. + """ + n_samples = 2 + n_grid_points = 5 + random_state = np.random.RandomState(1) + initial_array = np.array([1, 2]) + n_L0_discretization_points = 2 + + expected_result = np.array([ + [[1, 2], + [1.54708778, 1.4382791], + [1.15436809, 2.20520438], + [1.32744823, 2.06388647], + [1.20322359, 2.34674103], + ], + + [[1, 2], + [0.82261148, 2.74079487], + [0.93836515, 4.78168655], + [1.13046062, 3.92459221], + [1.38153796, 3.19551206], + ], + ]) + + fd = make_sde_trajectories( + initial_condition=initial_array, + drift=gbm_drift, + diffusion=gbm_diffusion, + diffusion_derivative=gbm_diffusion_derivative, + n_L0_discretization_points=n_L0_discretization_points, + method="milstein", + n_samples=n_samples, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result, + ) + + +def test_milstein_matrix_diffusion() -> None: + """Case 2 Starting point = array + n_samples. + + initial_condition = np.array([1, 2]), n_samples = 2. + """ + n_samples = 2 + n_grid_points = 5 + dim_codomain = 2 + random_state = np.random.RandomState(1) + initial_array = np.array([1, 2]) + n_L0_discretization_points = 2 + + def gbm_matrix_diffusion( # noqa: WPS430 + t: float, + x: NDArrayFloat, + sigma: float = 1, + ) -> NDArrayFloat: + """Diffusion term of a Geometric Brownian motion.""" + diffusion = np.zeros((x.shape[0], dim_codomain, dim_codomain)) + for i in np.arange(dim_codomain): + diffusion[:, i, i] = sigma * x[:, i] + return diffusion + + def gbm_matrix_diffusion_derivative( # noqa: WPS430 + t: float, + x: NDArrayFloat, + sigma: float = 1, + ) -> NDArrayFloat: + """Diffusion term of a Geometric Brownian motion.""" + gbm_diffusion_derivative = np.zeros( + (n_samples, dim_codomain, dim_codomain, dim_codomain), + ) + for i in np.arange(dim_codomain): + gbm_diffusion_derivative[:, i, i, i] = sigma + + return gbm_diffusion_derivative + + expected_result = np.array([ + [[1, 2], + [1.54708778, 1.4382791], + [1.15436809, 2.20520438], + [1.32744823, 2.06388647], + [1.20322359, 2.34674103], + ], + + [[1, 2], + [0.82261148, 2.74079487], + [0.93836515, 4.78168655], + [1.13046062, 3.92459221], + [1.38153796, 3.19551206], + ], + ]) + + fd = make_sde_trajectories( + initial_condition=initial_array, + drift=gbm_drift, + diffusion=gbm_matrix_diffusion, + diffusion_derivative=gbm_matrix_diffusion_derivative, + n_L0_discretization_points=n_L0_discretization_points, + method="milstein", + n_samples=n_samples, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result, + ) + + +def test_milstein_grid_points() -> None: + """Test for checking that the grid_points are correct.""" + random_state = np.random.RandomState(1) + initial_condition = np.array([0, 1]) + start = 0 + stop = 10 + n_grid_points = 105 + n_L0_discretization_points = 1 + expected_grid_points = np.atleast_2d( + np.linspace(start, stop, n_grid_points), + ) + + fd = make_sde_trajectories( + initial_condition=initial_condition, + drift=gbm_drift, + diffusion=gbm_diffusion, + diffusion_derivative=gbm_diffusion_derivative, + n_L0_discretization_points=n_L0_discretization_points, + method="milstein", + n_grid_points=n_grid_points, + start=start, + stop=stop, + random_state=random_state, + ) + + np.testing.assert_array_equal( + fd.grid_points, + expected_grid_points, + ) + + +def test_milstein_random_generator() -> None: + """Test using Generator instead of RandomState.""" + n_samples = 2 + n_grid_points = 5 + n_L0_discretization_points = 1 + random_state = np.random.default_rng(seed=1) + normal_distribution = norm().rvs + + expected_result = np.array([ + [[0.34558419], + [0.45059587], + [0.30897301], + [0.51911686], + [0.71279603], + ], + + [[0.82161814], + [0.73334614], + [1.06905099], + [1.41531695], + [1.81568250], + ], + ]) + + fd = make_sde_trajectories( + initial_condition=normal_distribution, + drift=gbm_drift, + diffusion=gbm_diffusion, + diffusion_derivative=gbm_diffusion_derivative, + n_L0_discretization_points=n_L0_discretization_points, + method="milstein", n_samples=n_samples, n_grid_points=n_grid_points, random_state=random_state, @@ -685,3 +1002,23 @@ def test_random_generator() -> None: fd.data_matrix, expected_result, ) + + +def test_milstein_n_l0_discretization() -> None: + """Test for checking related errors with n_L0_discretization_points.""" + initial_condition = np.array([1, 2]) + n_samples = 2 + random_state = np.random.RandomState(1) + + initial_condition = np.array([0, 0, 0]) + + with np.testing.assert_raises(ValueError): + make_sde_trajectories( + initial_condition=initial_condition, + drift=gbm_drift, + diffusion=gbm_diffusion, + diffusion_derivative=gbm_diffusion_derivative, + method="milstein", + n_samples=n_samples, + random_state=random_state, + )