diff --git a/src/diffpy/snmf/containers.py b/src/diffpy/snmf/containers.py deleted file mode 100644 index 1af960ad..00000000 --- a/src/diffpy/snmf/containers.py +++ /dev/null @@ -1,77 +0,0 @@ -import numdifftools -import numpy as np - - -class ComponentSignal: - """ - Attributes - ---------- - grid: 1d array of floats - The vector containing the grid points of the component. - iq: 1d array of floats - The intensity/g(r) values of the component. - weights: 1d array of floats - The vector containing the weight of the component signal for each signal. - stretching_factors: 1d array of floats - The vector containing the stretching factor for the component signal for each signal. - id: int - The number identifying the component. - """ - - def __init__(self, grid, number_of_signals, id_number, perturbation=1e-3): - self.grid = np.asarray(grid) - self.iq = np.random.rand(len(grid)) - self.weights = np.random.rand(number_of_signals) - self.stretching_factors = np.ones(number_of_signals) + np.random.randn(number_of_signals) * perturbation - self.id = int(id_number) - - def apply_stretch(self, m): - """Applies a stretching factor to a component - - Parameters - ---------- - m: int - The index specifying which stretching factor to apply - - Returns - ------- - tuple of 1d arrays - The tuple of vectors where one vector is the stretched component, one vector is the 1st derivative of the - stretching operation, and one vector is the second derivative of the stretching operation. - """ - normalized_grid = np.arange(len(self.grid)) - interpolate_intensity = lambda stretching_factor: np.interp( # noqa: E731 - normalized_grid / stretching_factor, normalized_grid, self.iq, left=0, right=0 - ) - derivative_func = numdifftools.Derivative(interpolate_intensity) - second_derivative_func = numdifftools.Derivative(derivative_func) - - stretched_component = interpolate_intensity(self.stretching_factors[m]) - stretched_component_gra = derivative_func(self.stretching_factors[m]) - stretched_component_hess = second_derivative_func(self.stretching_factors[m]) - - return ( - np.asarray(stretched_component), - np.asarray(stretched_component_gra), - np.asarray(stretched_component_hess), - ) - - def apply_weight(self, m, stretched_component=None): - """Applies as weight factor to a component signal. - - Parameters - ---------- - m: int - The index specifying with weight to apply - stretched_component: 1d array - The 1d array containing a stretched component. - - Returns - ------- - 1d array - The vector containing a component signal or stretched component signal with a weight factor applied. - """ - if stretched_component is None: - return self.iq * self.weights[m] - else: - return stretched_component * self.weights[m] diff --git a/src/diffpy/snmf/factorizers.py b/src/diffpy/snmf/factorizers.py deleted file mode 100644 index b4620fd4..00000000 --- a/src/diffpy/snmf/factorizers.py +++ /dev/null @@ -1,31 +0,0 @@ -import numpy as np -import scipy.optimize - - -def lsqnonneg(stretched_component_matrix, target_signal): - """Finds the weights of stretched component signals under one-sided constraint. - - Solves ``argmin_x || Ax - b ||_2`` for ``x>=0`` where A is the stretched_component_matrix and b is the - target_signal vector. Finds the weights of component signals given undecomposed signal data and stretched - components under a one-sided constraint on the weights. - - Parameters - ---------- - stretched_component_matrix: 2d array like - The component matrix where each column contains a stretched component signal. Has dimensions R x C where R is - the length of the signal and C is the number of components. Does not need to be nonnegative. Corresponds with - 'A' from the objective function. - - target_signal: 1d array like - The signal that is used as reference against which weight factors will be determined. Any column from the - matrix of the entire, unfactorized input data could be used. Has length R. Does not need to be nonnegative. - Corresponds with 'b' from the objective function. - - Returns - ------- - 1d array like - The vector containing component signal weights at a moment. Has length C. - """ - stretched_component_matrix = np.asarray(stretched_component_matrix) - target_signal = np.asarray(target_signal) - return scipy.optimize.nnls(stretched_component_matrix, target_signal)[0] diff --git a/src/diffpy/snmf/io.py b/src/diffpy/snmf/io.py deleted file mode 100644 index 12eb1b23..00000000 --- a/src/diffpy/snmf/io.py +++ /dev/null @@ -1,120 +0,0 @@ -from pathlib import Path - -import numpy as np -import scipy.sparse - -from diffpy.utils.parsers.loaddata import loadData - - -def initialize_variables(data_input, number_of_components, data_type, sparsity=1, smoothness=1e18): - """Determines the variables and initial values used in the SNMF algorithm. - - Parameters - ---------- - data_input: 2d array like - The observed or simulated PDF or XRD data provided by the user. Has dimensions R x N where R is the signa - length and N is the number of PDF/XRD signals. - - number_of_components: int - The number of component signals the user would like to decompose 'data_input' into. - - data_type: str - The type of data the user has passed into the program. Can assume the value of 'PDF' or 'XRD.' - - sparsity: float, optional - The regularization parameter that behaves as the coefficient of a "sparseness" regularization term that - enhances the ability to decompose signals in the case of sparse data e.g. X-ray Diffraction data. - A non-zero value indicates sparsity in the data; greater magnitudes indicate greater amounts of sparsity. - - smoothness: float, optional - The regularization parameter that behaves as the coefficient of a "smoothness" term that ensures that - component signal weightings change smoothly with time. Assumes a default value of 1e18. - - Returns - ------- - dictionary - The collection of the names and values of the constants used in the algorithm. Contains the number of - observed PDF/XRD patterns, the length of each pattern, the type of the data, the number of components - the user would like to decompose the data into, an initial guess for the component matrix, and initial - guess for the weight factor matrix, an initial guess for the stretching factor matrix, a parameter - controlling smoothness of the solution, a parameter controlling sparseness of the solution, the matrix - representing the smoothness term, and a matrix used to construct a hessian matrix. - - """ - signal_length = data_input.shape[0] - number_of_signals = data_input.shape[1] - - diagonals = [ - np.ones(number_of_signals - 2), - -2 * np.ones(number_of_signals - 2), - np.ones(number_of_signals - 2), - ] - smoothness_term = 0.25 * scipy.sparse.diags( - diagonals, [0, 1, 2], shape=(number_of_signals - 2, number_of_signals) - ) - - hessian_helper_matrix = scipy.sparse.block_diag([smoothness_term.T @ smoothness_term] * number_of_components) - sequence = ( - np.arange(number_of_signals * number_of_components) - .reshape(number_of_components, number_of_signals) - .T.flatten() - ) - hessian_helper_matrix = hessian_helper_matrix[sequence, :][:, sequence] - - return { - "signal_length": signal_length, - "number_of_signals": number_of_signals, - "number_of_components": number_of_components, - "data_type": data_type, - "smoothness": smoothness, - "sparsity": sparsity, - "smoothness_term": smoothness_term, - "hessian_helper_matrix": hessian_helper_matrix, - } - - -def load_input_signals(file_path=None): - """Processes a directory of a series of PDF/XRD patterns into a usable format. - - Constructs a 2d array out of a directory of PDF/XRD patterns containing each files dependent variable - column in a new column. Constructs a 1d array containing the grid values. - - Parameters - ---------- - file_path: str or Path object, optional - The path to the directory containing the input XRD/PDF data. If no path is specified, defaults to the - current working directory. Accepts a string or a pathlib.Path object. Input data not on the same grid - as the first file read will be ignored. - - Returns - ------- - tuple - The tuple whose first element is an R x M 2d array made of PDF/XRD patterns as each column; R is the - length of the signal and M is the number of patterns. The tuple contains a 1d array containing the values - of the grid points as its second element; Has length R. - - """ - - if file_path is None: - directory_path = Path.cwd() - else: - directory_path = Path(file_path) - - values_list = [] - grid_list = [] - current_grid = [] - for item in directory_path.iterdir(): - if item.is_file(): - data = loadData(item.resolve()) - if current_grid and current_grid != data[:, 0]: - print(f"{item.name} was ignored as it is not on a compatible grid.") - continue - else: - grid_list.append(data[:, 0]) - current_grid = grid_list[-1] - values_list.append(data[:, 1]) - - grid_array = np.column_stack(grid_list) - grid_vector = np.unique(grid_array, axis=1) - values_array = np.column_stack(values_list) - return grid_vector, values_array diff --git a/src/diffpy/snmf/main.py b/src/diffpy/snmf/main.py index e38b5ab9..7fcd0591 100644 --- a/src/diffpy/snmf/main.py +++ b/src/diffpy/snmf/main.py @@ -7,8 +7,8 @@ Y0 = np.loadtxt("input/W0.txt", dtype=float) N, M = MM.shape -my_model = SNMFOptimizer(MM=MM, Y0=Y0, X0=X0, A0=A0) +my_model = SNMFOptimizer(source_matrix=MM, init_weights=Y0, init_comps=X0, init_stretch=A0) print("Done") -np.savetxt("my_norm_X.txt", my_model.X, fmt="%.6g", delimiter=" ") -np.savetxt("my_norm_Y.txt", my_model.Y, fmt="%.6g", delimiter=" ") -np.savetxt("my_norm_A.txt", my_model.A, fmt="%.6g", delimiter=" ") +np.savetxt("my_norm_X.txt", my_model.comps, fmt="%.6g", delimiter=" ") +np.savetxt("my_norm_Y.txt", my_model.weights, fmt="%.6g", delimiter=" ") +np.savetxt("my_norm_A.txt", my_model.stretch, fmt="%.6g", delimiter=" ") diff --git a/src/diffpy/snmf/optimizers.py b/src/diffpy/snmf/optimizers.py deleted file mode 100644 index d1a8e51f..00000000 --- a/src/diffpy/snmf/optimizers.py +++ /dev/null @@ -1,54 +0,0 @@ -import numpy as np -from scipy.optimize import minimize - - -def get_weights(stretched_component_gram_matrix, linear_coefficient, lower_bound, upper_bound): - """Finds the weights of stretched component signals under a two-sided constraint - - Solves min J(y) = (linear_coefficient)' * y + (1/2) * y' * (quadratic coefficient) * y where - lower_bound <= y <= upper_bound and stretched_component_gram_matrix is symmetric positive definite. - Finds the weightings of stretched component signals under a two-sided constraint. - - Parameters - ---------- - stretched_component_gram_matrix: 2d array like - The Gram matrix constructed from the stretched component matrix. It is a square positive definite matrix. - It has dimensions C x C where C is the number of component signals. Must be symmetric positive definite. - - linear_coefficient: 1d array like - The vector containing the product of the stretched component matrix and the transpose of the observed - data matrix. Has length C. - - lower_bound: 1d array like - The lower bound on the values of the output weights. Has the same dimensions of the function output. Each - element in 'lower_bound' determines the minimum value the corresponding element in the function output may - take. - - upper_bound: 1d array like - The upper bound on the values of the output weights. Has the same dimensions of the function output. Each - element in 'upper_bound' determines the maximum value the corresponding element in the function output may - take. - - Returns - ------- - 1d array like - The vector containing the weightings of the components needed to reconstruct a given input signal from the - input set. Has length C - - """ - - stretched_component_gram_matrix = np.asarray(stretched_component_gram_matrix) - linear_coefficient = np.asarray(linear_coefficient) - upper_bound = np.asarray(upper_bound) - lower_bound = np.asarray(lower_bound) - - # Set dynamic bounds based on the size of the linear coefficient - bounds = [(lower_bound, upper_bound) for _ in range(len(linear_coefficient))] - initial_guess = np.zeros_like(linear_coefficient) - - # Find optimal weights of linear coefficients - def obj_func(y): - return linear_coefficient.T @ y + 0.5 * y.T @ stretched_component_gram_matrix @ y - - result = minimize(obj_func, initial_guess, method="L-BFGS-B", bounds=bounds) - return result.x diff --git a/src/diffpy/snmf/polynomials.py b/src/diffpy/snmf/polynomials.py deleted file mode 100644 index c54ff215..00000000 --- a/src/diffpy/snmf/polynomials.py +++ /dev/null @@ -1,34 +0,0 @@ -import numpy as np - - -def compute_root(linear_coefficient, constant_term): - """ - Returns the largest real root of x^3+(linear_coefficient) * x + constant_term. If there are no real roots - return 0. - - Parameters - ---------- - linear_coefficient: ndarray like of floats - The matrix coefficient of the linear term - constant_term: 0d array like, 1d array like of floats or scalar - The constant scalar term of the problem - - Returns - ------- - ndarray of floats - The largest real root of x^3+(linear_coefficient) * x + constant_term if roots are real, else - return 0 array - - """ - linear_coefficient = np.asarray(linear_coefficient) - constant_term = np.asarray(constant_term) - solution = np.empty_like(linear_coefficient, dtype=np.float64) - - for index, value in np.ndenumerate(linear_coefficient): - inputs = [1, 0, value, constant_term] - roots = np.roots(inputs) - if ((constant_term / 2) ** 2 + (value / 3) ** 3) < 0: # Discriminant of depressed cubic equation - solution[index] = max(np.real(roots)) - else: - solution[index] = 0 - return solution diff --git a/src/diffpy/snmf/snmf_class.py b/src/diffpy/snmf/snmf_class.py index 950da7cd..4dfe94f4 100644 --- a/src/diffpy/snmf/snmf_class.py +++ b/src/diffpy/snmf/snmf_class.py @@ -8,7 +8,7 @@ class SNMFOptimizer: Instantiating the SNMFOptimizer class runs all the analysis immediately. The results matrices can then be accessed as instance attributes - of the class (X, Y, and A). + of the class (comps, weights, and stretch). For more information on sNMF, please reference: Gu, R., Rakita, Y., Lan, L. et al. Stretched non-negative matrix factorization. @@ -16,16 +16,16 @@ class SNMFOptimizer: Attributes ---------- - MM : ndarray + source_matrix : ndarray The original, unmodified data to be decomposed and later, compared against. Shape is (length_of_signal, number_of_conditions). - Y : ndarray + stretch : ndarray The best guess (or while running, the current guess) for the stretching factor matrix. - X : ndarray + comps : ndarray The best guess (or while running, the current guess) for the matrix of component intensities. - A : ndarray + weights : ndarray The best guess (or while running, the current guess) for the matrix of component weights. rho : float @@ -37,14 +37,14 @@ class SNMFOptimizer: as XRD, but due to instability, should be used only after first selecting the best value for rho. Suggested adjustment is by powers of 2. max_iter : int - The maximum number of times to update each of A, X, and Y before stopping + The maximum number of times to update each of stretch, comps, and weights before stopping the optimization. tol : float The convergence threshold. This is the minimum fractional improvement in the objective function to allow without terminating the optimization. Note that a minimum of 20 updates are run before this parameter is checked. n_components : int - The number of components to extract from MM. Must be provided when and only when + The number of components to extract from source_matrix. Must be provided when and only when Y0 is not provided. random_state : int The seed for the initial guesses at the matrices (A, X, and Y) created by @@ -53,7 +53,7 @@ class SNMFOptimizer: The total number of times that any of (A, X, and Y) have had their values changed. If not terminated by other means, this value is used to stop when reaching max_iter. objective_function: float - The value corresponding to the minimization of the difference between the MM and the + The value corresponding to the minimization of the difference between the source_matrix and the products of A, X, and Y. For full details see the sNMF paper. Smaller corresponds to better agreement and is desirable. objective_difference : float @@ -63,10 +63,10 @@ class SNMFOptimizer: def __init__( self, - MM, - Y0=None, - X0=None, - A0=None, + source_matrix, + init_weights=None, + init_comps=None, + init_stretch=None, rho=1e12, eta=610, max_iter=500, @@ -78,16 +78,16 @@ def __init__( Parameters ---------- - MM : ndarray + source_matrix : ndarray The data to be decomposed. Shape is (length_of_signal, number_of_conditions). - Y0 : ndarray + init_weights : ndarray The initial guesses for the component weights at each stretching condition. Shape is (number_of_components, number_of_conditions) Must provide exactly one of this or n_components. - X0 : ndarray + init_comps : ndarray The initial guesses for the intensities of each component per row/sample/angle. Shape is (length_of_signal, number_of_components). - A0 : ndarray + init_stretch : ndarray The initial guesses for the stretching factor for each component, at each condition. Shape is (number_of_components, number_of_conditions). rho : float @@ -106,66 +106,72 @@ def __init__( objective function to allow without terminating the optimization. Note that a minimum of 20 updates are run before this parameter is checked. n_components : int - The number of components to extract from MM. Must be provided when and only when + The number of components to extract from source_matrix. Must be provided when and only when Y0 is not provided. random_state : int The seed for the initial guesses at the matrices (A, X, and Y) created by the decomposition. """ - self.MM = MM + self.source_matrix = source_matrix self.rho = rho self.eta = eta # Capture matrix dimensions - self._N, self._M = MM.shape + self._signal_len, self._num_conditions = source_matrix.shape self.num_updates = 0 self._rng = np.random.default_rng(random_state) # Enforce exclusive specification of n_components or Y0 - if (n_components is None) == (Y0 is not None): - raise ValueError("Must provide exactly one of Y0 or n_components, but not both.") - - # Initialize Y0 and determine number of components - if Y0 is None: - self._K = n_components - self.Y = self._rng.beta(a=2.5, b=1.5, size=(self._K, self._M)) + if (n_components is None and init_weights is None) or ( + n_components is not None and init_weights is not None + ): + raise ValueError("Must provide exactly one of init_weights or n_components, but not both.") + + # Initialize weights and determine number of components + if init_weights is None: + self._n_components = n_components + self.weights = self._rng.beta(a=2.5, b=1.5, size=(self._n_components, self._num_conditions)) else: - self._K = Y0.shape[0] - self.Y = Y0 + self._n_components = init_weights.shape[0] + self.weights = init_weights - # Initialize A if not provided - if self.A is None: - self.A = np.ones((self._K, self._M)) + self._rng.normal(0, 1e-3, size=(self._K, self._M)) + # Initialize stretching matrix if not provided + if init_stretch is None: + self.stretch = np.ones((self._n_components, self._num_conditions)) + self._rng.normal( + 0, 1e-3, size=(self._n_components, self._num_conditions) + ) else: - self.A = A0 + self.stretch = init_stretch - # Initialize X0 if not provided - if self.X is None: - self.X = self._rng.random((self._N, self._K)) + # Initialize component matrix if not provided + if init_comps is None: + self.comps = self._rng.random((self._signal_len, self._n_components)) else: - self.X = X0 + self.comps = init_comps - # Enforce non-negativity - self.X = np.maximum(0, self.X) - self.Y = np.maximum(0, self.Y) + # Enforce non-negativity in our initial guesses + self.comps = np.maximum(0, self.comps) + self.weights = np.maximum(0, self.weights) # Second-order spline: Tridiagonal (-2 on diagonal, 1 on sub/superdiagonals) - self.P = 0.25 * diags([1, -2, 1], offsets=[0, 1, 2], shape=(self._M - 2, self._M)) - self.PP = self.P.T @ self.P + self.spline_smooth_operator = 0.25 * diags( + [1, -2, 1], offsets=[0, 1, 2], shape=(self._num_conditions - 2, self._num_conditions) + ) + self.spline_smooth_penalty = self.spline_smooth_operator.T @ self.spline_smooth_operator # Set up residual matrix, objective function, and history - self.R = self.get_residual_matrix() + self.residuals = self.get_residual_matrix() self.objective_function = self.get_objective_function() self.objective_difference = None self._objective_history = [self.objective_function] # Set up tracking variables for updateX() - self._preX = None - self._GraX = np.zeros_like(self.X) # Gradient of X (zeros for now) - self._preGraX = np.zeros_like(self.X) # Previous gradient of X (zeros for now) + self._prev_comps = None + self.grad_comps = np.zeros_like(self.comps) # Gradient of X (zeros for now) + self._prev_grad_comps = np.zeros_like(self.comps) # Previous gradient of X (zeros for now) - regularization_term = 0.5 * rho * np.linalg.norm(self.P @ self.A.T, "fro") ** 2 - sparsity_term = eta * np.sum(np.sqrt(self.X)) # Square root penalty + regularization_term = 0.5 * rho * np.linalg.norm(self.spline_smooth_operator @ self.stretch.T, "fro") ** 2 + sparsity_term = eta * np.sum(np.sqrt(self.comps)) # Square root penalty print( f"Start, Objective function: {self.objective_function:.5e}" f", Obj - reg/sparse: {self.objective_function - regularization_term - sparsity_term:.5e}" @@ -175,8 +181,10 @@ def __init__( for iter in range(max_iter): self.optimize_loop() # Print diagnostics - regularization_term = 0.5 * rho * np.linalg.norm(self.P @ self.A.T, "fro") ** 2 - sparsity_term = eta * np.sum(np.sqrt(self.X)) # Square root penalty + regularization_term = ( + 0.5 * rho * np.linalg.norm(self.spline_smooth_operator @ self.stretch.T, "fro") ** 2 + ) + sparsity_term = eta * np.sum(np.sqrt(self.comps)) # Square root penalty print( f"Num_updates: {self.num_updates}, " f"Obj fun: {self.objective_function:.5e}, " @@ -184,29 +192,30 @@ def __init__( f"Iter: {iter}" ) - # Convergence check: Stop if diffun is small and at least 20 iterations have passed + # Convergence check: decide when to terminate for small/no improvement print(self.objective_difference, " < ", self.objective_function * tol) if self.objective_difference < self.objective_function * tol and iter >= 20: break # Normalize our results - Y_row_max = np.max(self.Y, axis=1, keepdims=True) - self.Y = self.Y / Y_row_max - A_row_max = np.max(self.A, axis=1, keepdims=True) - self.A = self.A / A_row_max - # loop to normalize X - # effectively just re-running class with non-normalized X, normalized Y/A as inputs, then only update X - # reset difference trackers and initialize - self._preX = None - self._GraX = np.zeros_like(self.X) # Gradient of X (zeros for now) - self._preGraX = np.zeros_like(self.X) # Previous gradient of X (zeros for now) - self.R = self.get_residual_matrix() + weights_row_max = np.max(self.weights, axis=1, keepdims=True) + stretch_row_max = np.max(self.stretch, axis=1, keepdims=True) + self.weights = self.weights / weights_row_max + self.stretch = self.stretch / stretch_row_max + + # loop to normalize components + # effectively just re-running class with non-normalized comps, normalized wts/stretch as inputs, + # then only update comps + self._prev_comps = None + self.grad_comps = np.zeros_like(self.comps) + self._prev_grad_comps = np.zeros_like(self.comps) + self.residuals = self.get_residual_matrix() self.objective_function = self.get_objective_function() self.objective_difference = None self._objective_history = [self.objective_function] for norm_iter in range(100): - self.updateX() - self.R = self.get_residual_matrix() + self.update_comps() + self.residuals = self.get_residual_matrix() self.objective_function = self.get_objective_function() print(f"Objective function after normX: {self.objective_function:.5e}") self._objective_history.append(self.objective_function) @@ -218,148 +227,154 @@ def __init__( print("Finished optimization.") def optimize_loop(self): - self._preGraX = self._GraX.copy() - self.updateX() + # Update components first + self._prev_grad_comps = self.grad_comps.copy() + self.update_comps() self.num_updates += 1 - self.R = self.get_residual_matrix() + self.residuals = self.get_residual_matrix() self.objective_function = self.get_objective_function() print(f"Objective function after updateX: {self.objective_function:.5e}") self._objective_history.append(self.objective_function) if self.objective_difference is None: self.objective_difference = self._objective_history[-1] - self.objective_function - # Now we update Y - self.updateY2() + # Now we update weights + self.update_weights() self.num_updates += 1 - self.R = self.get_residual_matrix() + self.residuals = self.get_residual_matrix() self.objective_function = self.get_objective_function() print(f"Objective function after updateY2: {self.objective_function:.5e}") self._objective_history.append(self.objective_function) - self.updateA2() - + # Now we update stretch + self.update_stretch() self.num_updates += 1 - self.R = self.get_residual_matrix() + self.residuals = self.get_residual_matrix() self.objective_function = self.get_objective_function() - print(f"Objective function after updateA2: {self.objective_function:.5e}") + print(f"Objective function after update_stretch: {self.objective_function:.5e}") self._objective_history.append(self.objective_function) self.objective_difference = self._objective_history[-2] - self._objective_history[-1] def apply_interpolation(self, a, x, return_derivatives=False): """ Applies an interpolation-based transformation to `x` based on scaling `a`. - Also can compute first (`Tx`) and second (`Hx`) derivatives. + Also can compute first (`d_intr_x`) and second (`dd_intr_x`) derivatives. """ - N = len(x) + x_len = len(x) # Ensure `a` is an array and reshape for broadcasting a = np.atleast_1d(np.asarray(a)) # Ensures a is at least 1D # Compute fractional indices, broadcasting over `a` - ii = np.arange(N)[:, None] / a # Shape (N, M) + ii = np.arange(x_len)[:, None] / a # Shape (N, M) II = np.floor(ii).astype(int) # Integer part (still (N, M)) - valid_mask = II < (N - 1) # Ensure indices are within bounds + valid_mask = II < (x_len - 1) # Ensure indices are within bounds # Apply valid_mask to keep correct indices - idx_int = np.where(valid_mask, II, N - 2) # Prevent out-of-bounds indexing (previously "I") + idx_int = np.where(valid_mask, II, x_len - 2) # Prevent out-of-bounds indexing (previously "I") idx_frac = np.where(valid_mask, ii, II) # Keep aligned (previously "i") # Ensure x is a 1D array x = np.asarray(x).ravel() - # Compute Ax (linear interpolation) - Ax = x[idx_int] * (1 - idx_frac + idx_int) + x[np.minimum(idx_int + 1, N - 1)] * (idx_frac - idx_int) + # Compute interpolated_x (linear interpolation) + interpolated_x = x[idx_int] * (1 - idx_frac + idx_int) + x[np.minimum(idx_int + 1, x_len - 1)] * ( + idx_frac - idx_int + ) # Fill the tail with the last valid value - Ax_tail = np.full((N - len(idx_int), Ax.shape[1]), Ax[-1, :]) - Ax = np.vstack([Ax, Ax_tail]) + intr_x_tail = np.full((x_len - len(idx_int), interpolated_x.shape[1]), interpolated_x[-1, :]) + interpolated_x = np.vstack([interpolated_x, intr_x_tail]) if return_derivatives: - # Compute first derivative (Tx) + # Compute first derivative (d_intr_x) di = -idx_frac / a - Tx = x[idx_int] * (-di) + x[np.minimum(idx_int + 1, N - 1)] * di - Tx = np.vstack([Tx, np.zeros((N - len(idx_int), Tx.shape[1]))]) + d_intr_x = x[idx_int] * (-di) + x[np.minimum(idx_int + 1, x_len - 1)] * di + d_intr_x = np.vstack([d_intr_x, np.zeros((x_len - len(idx_int), d_intr_x.shape[1]))]) - # Compute second derivative (Hx) + # Compute second derivative (dd_intr_x) ddi = -di / a + idx_frac * a**-2 - Hx = x[idx_int] * (-ddi) + x[np.minimum(idx_int + 1, N - 1)] * ddi - Hx = np.vstack([Hx, np.zeros((N - len(idx_int), Hx.shape[1]))]) + dd_intr_x = x[idx_int] * (-ddi) + x[np.minimum(idx_int + 1, x_len - 1)] * ddi + dd_intr_x = np.vstack([dd_intr_x, np.zeros((x_len - len(idx_int), dd_intr_x.shape[1]))]) else: # Make placeholders - Tx = np.empty(Ax.shape) - Hx = np.empty(Ax.shape) + d_intr_x = np.empty(interpolated_x.shape) + dd_intr_x = np.empty(interpolated_x.shape) - return Ax, Tx, Hx + return interpolated_x, d_intr_x, dd_intr_x - def get_residual_matrix(self, X=None, Y=None, A=None): - # Initialize residual matrix as negative of MM + def get_residual_matrix(self, comps=None, weights=None, stretch=None): + # Initialize residual matrix as negative of source_matrix # In MATLAB this is getR - if X is None: - X = self.X - if Y is None: - Y = self.Y - if A is None: - A = self.A - R = -self.MM.copy() - # Compute transformed X for all (k, m) pairs - for k in range(Y.shape[0]): # K - Ax, _, _ = self.apply_interpolation(A[k, :], X[:, k]) # Only calculate Ax - R += Y[k, :] * Ax # Element-wise scaling and sum - return R - - def get_objective_function(self, R=None, A=None): - if R is None: - R = self.R - if A is None: - A = self.A - residual_term = 0.5 * np.linalg.norm(R, "fro") ** 2 - regularization_term = 0.5 * self.rho * np.linalg.norm(self.P @ A.T, "fro") ** 2 - sparsity_term = self.eta * np.sum(np.sqrt(self.X)) # Square root penalty + if comps is None: + comps = self.comps + if weights is None: + weights = self.weights + if stretch is None: + stretch = self.stretch + residuals = -self.source_matrix.copy() + # Compute transformed components for all (k, m) pairs + for k in range(weights.shape[0]): + stretched_comps, _, _ = self.apply_interpolation(stretch[k, :], comps[:, k]) # Only calculate Ax + residuals += weights[k, :] * stretched_comps # Element-wise scaling and sum + return residuals + + def get_objective_function(self, residuals=None, stretch=None): + if residuals is None: + residuals = self.residuals + if stretch is None: + stretch = self.stretch + residual_term = 0.5 * np.linalg.norm(residuals, "fro") ** 2 + regularization_term = 0.5 * self.rho * np.linalg.norm(self.spline_smooth_operator @ stretch.T, "fro") ** 2 + sparsity_term = self.eta * np.sum(np.sqrt(self.comps)) # Square root penalty # Final objective function value function = residual_term + regularization_term + sparsity_term return function - def apply_interpolation_matrix(self, X=None, Y=None, A=None, return_derivatives=False): + def apply_interpolation_matrix(self, comps=None, weights=None, stretch=None, return_derivatives=False): """ - Applies an interpolation-based transformation to the matrix `X` using `A`, - weighted by `Y`. Optionally computes first (`Tx`) and second (`Hx`) derivatives. + Applies an interpolation-based transformation to the matrix `comps` using `stretch`, + weighted by `weights`. Optionally computes first (`d_str_cmps`) and second (`dd_str_comps`) derivatives. Equivalent to getAfun_matrix in MATLAB. """ - if X is None: - X = self.X - if Y is None: - Y = self.Y - if A is None: - A = self.A - - N, M = self.MM.shape - K = Y.shape[0] + if comps is None: + comps = self.comps + if weights is None: + weights = self.weights + if stretch is None: + stretch = self.stretch # Compute scaled indices (MATLAB: AA = repmat(reshape(A',1,M*K).^-1, N,1)) - A_flat = A.reshape(1, M * K) ** -1 - AA = np.tile(A_flat, (N, 1)) + stretch_flat = stretch.reshape(1, self._num_conditions * self._n_components) ** -1 + stretch_tiled = np.tile(stretch_flat, (self._signal_len, 1)) - # Compute `ii` (MATLAB: ii = repmat((0:N-1)',1,K*M).*AA) - ii = np.tile(np.arange(N)[:, None], (1, M * K)) * AA + # Compute `ii` (MATLAB: ii = repmat((0:N-1)',1,K*M).*tiled_stretch) + ii = ( + np.tile(np.arange(self._signal_len)[:, None], (1, self._num_conditions * self._n_components)) + * stretch_tiled + ) # Weighting matrix (MATLAB: YY = repmat(reshape(Y',1,M*K), N,1)) - Y_flat = Y.reshape(1, M * K) - YY = np.tile(Y_flat, (N, 1)) + weights_flat = weights.reshape(1, self._num_conditions * self._n_components) + weights_tiled = np.tile(weights_flat, (self._signal_len, 1)) # Bias for indexing into reshaped X (MATLAB: bias = kron((0:K-1)*(N+1),ones(N,M))) # TODO break this up or describe what it does better - bias = np.kron(np.arange(K) * (N + 1), np.ones((N, M), dtype=int)).reshape(N, K * M) + bias = np.kron( + np.arange(self._n_components) * (self._signal_len + 1), + np.ones((self._signal_len, self._num_conditions), dtype=int), + ).reshape(self._signal_len, self._n_components * self._num_conditions) # Handle boundary conditions for interpolation (MATLAB: X1=[X;X(end,:)]) - X1 = np.vstack([X, X[-1, :]]) # Duplicate last row (like MATLAB) + X1 = np.vstack([comps, comps[-1, :]]) # Duplicate last row (like MATLAB) # Compute floor indices (MATLAB: II = floor(ii); II1=min(II+1,N+1); II2=min(II1+1,N+1)) II = np.floor(ii).astype(int) - II1 = np.minimum(II + 1, N) - II2 = np.minimum(II1 + 1, N) + II1 = np.minimum(II + 1, self._signal_len) + II2 = np.minimum(II1 + 1, self._signal_len) # Compute fractional part (MATLAB: iI = ii - II) iI = ii - II @@ -371,55 +386,57 @@ def apply_interpolation_matrix(self, X=None, Y=None, A=None, return_derivatives= # Extract values (MATLAB: XI1 = reshape(X1(II1_), N, K*M); XI2 = reshape(X1(II2_), N, K*M)) # Note: this "-1" corrects an off-by-one error that may have originated in an earlier line XI1 = X1.flatten(order="F")[(II1_ - 1).ravel()].reshape( - N, K * M + self._signal_len, self._n_components * self._num_conditions ) # order = F uses FORTRAN, column major order - XI2 = X1.flatten(order="F")[(II2_ - 1).ravel()].reshape(N, K * M) + XI2 = X1.flatten(order="F")[(II2_ - 1).ravel()].reshape( + self._signal_len, self._n_components * self._num_conditions + ) - # Interpolation (MATLAB: Ax2=XI1.*(1-iI)+XI2.*(iI); Ax=Ax2.*YY) + # Interpolation (MATLAB: Ax2=XI1.*(1-iI)+XI2.*(iI); stretched_comps=Ax2.*YY) Ax2 = XI1 * (1 - iI) + XI2 * iI - Ax = Ax2 * YY # Apply weighting + stretched_comps = Ax2 * weights_tiled # Apply weighting if return_derivatives: - # Compute first derivative (MATLAB: Tx2=XI1.*(-di)+XI2.*di; Tx=Tx2.*YY) - di = -ii * AA - Tx2 = XI1 * (-di) + XI2 * di - Tx = Tx2 * YY - - # Compute second derivative (MATLAB: Hx2=XI1.*(-ddi)+XI2.*ddi; Hx=Hx2.*YY) - ddi = -di * AA * 2 - Hx2 = XI1 * (-ddi) + XI2 * ddi - Hx = Hx2 * YY + # Compute first derivative (MATLAB: Tx2=XI1.*(-di)+XI2.*di; d_str_cmps=Tx2.*YY) + di = -ii * stretch_tiled + d_x2 = XI1 * (-di) + XI2 * di + d_str_cmps = d_x2 * weights_tiled + + # Compute second derivative (MATLAB: Hx2=XI1.*(-ddi)+XI2.*ddi; dd_str_comps=Hx2.*YY) + ddi = -di * stretch_tiled * 2 + dd_x2 = XI1 * (-ddi) + XI2 * ddi + dd_str_cmps = dd_x2 * weights_tiled else: - shape = Ax.shape - Tx = np.empty(shape) - Hx = np.empty(shape) + shape = stretched_comps.shape + d_str_cmps = np.empty(shape) + dd_str_cmps = np.empty(shape) - return Ax, Tx, Hx + return stretched_comps, d_str_cmps, dd_str_cmps - def apply_transformation_matrix(self, A=None, Y=None, R=None): + def apply_transformation_matrix(self, stretch=None, weights=None, residuals=None): """ - Computes the transformation matrix `AT` for residual `R`, - using scaling matrix `A` and weight coefficients `Y`. + Computes the transformation matrix `stretch_transformed` for `residuals`, + using scaling matrix `stretch` and coefficients `weights`. """ - if A is None: - A = self.A - if Y is None: - Y = self.Y - if R is None: - R = self.R + if stretch is None: + stretch = self.stretch + if weights is None: + weights = self.weights + if residuals is None: + residuals = self.residuals - N, M = self.MM.shape - K = Y.shape[0] + N, M = self.source_matrix.shape + K = weights.shape[0] # Compute scaling matrix (MATLAB: AA = repmat(reshape(A,1,M*K).^-1,Nindex,1)) - AA = np.tile(A.reshape(1, M * K, order="F") ** -1, (N, 1)) + AA = np.tile(stretch.reshape(1, M * K, order="F") ** -1, (N, 1)) # Compute indices (MATLAB: ii = repmat((index-1)',1,K*M).*AA) ii = np.arange(N)[:, None] * AA # Shape (N, M*K), replacing `index` # Weighting coefficients (MATLAB: YY = repmat(reshape(Y,1,M*K),Nindex,1)) - YY = np.tile(Y.reshape(1, M * K, order="F"), (N, 1)) + YY = np.tile(weights.reshape(1, M * K, order="F"), (N, 1)) # Compute floor indices (MATLAB: II = floor(ii); II1 = min(II+1,N+1); II2 = min(II1+1,N+1)) II = np.floor(ii).astype(int) @@ -437,15 +454,15 @@ def apply_transformation_matrix(self, A=None, Y=None, R=None): repm = np.tile(np.arange(K), (N, M)) # Compute transformations (MATLAB: kro = kron(R(index,:), ones(1, K))) - kro = np.kron(R, np.ones((1, K))) + kron = np.kron(residuals, np.ones((1, K))) # (MATLAB: kroiI = kro .* (iI); iIYY = (iI-1) .* YY) - kroiI = kro * iI + kron_iI = kron * iI iIYY = (iI - 1) * YY # Construct sparse matrices (MATLAB: sparse(II1_,repm,kro.*-iIYY,(N+1),K)) - x2 = coo_matrix(((-kro * iIYY).flatten(), (II1_.flatten() - 1, repm.flatten())), shape=(N + 1, K)).tocsc() - x3 = coo_matrix(((kroiI * YY).flatten(), (II2_.flatten() - 1, repm.flatten())), shape=(N + 1, K)).tocsc() + x2 = coo_matrix(((-kron * iIYY).flatten(), (II1_.flatten() - 1, repm.flatten())), shape=(N + 1, K)).tocsc() + x3 = coo_matrix(((kron_iI * YY).flatten(), (II2_.flatten() - 1, repm.flatten())), shape=(N + 1, K)).tocsc() # Combine the last row into previous, then remove the last row x2[N - 1, :] += x2[N, :] @@ -453,9 +470,9 @@ def apply_transformation_matrix(self, A=None, Y=None, R=None): x2 = x2[:-1, :] x3 = x3[:-1, :] - AT = x2 + x3 + stretch_transformed = x2 + x3 - return AT + return stretch_transformed def solve_quadratic_program(self, T, m, alg="trust-constr"): """ @@ -472,15 +489,15 @@ def solve_quadratic_program(self, T, m, alg="trust-constr"): - T: (N, K) ndarray Matrix computed from getAfun(A(k, m), X[:, k]). - m: int - Index of the current column in MM. + Index of the current column in source_matrix. Returns: - y: (K,) ndarray Optimal solution for y, clipped to ensure non-negativity. """ - MM_col = self.MM[:, m] + source_matrix_col = self.source_matrix[:, m] Q = T.T @ T - d = -T.T @ MM_col + d = -T.T @ source_matrix_col K = Q.shape[0] reg_factor = 1e-8 * np.linalg.norm(Q, ord="fro") Q += np.eye(K) * reg_factor @@ -508,46 +525,52 @@ def hess(y): return np.maximum(result.x, 0) - def updateX(self): + def update_comps(self): """ - Updates `X` using gradient-based optimization with adaptive step size L. + Updates `comps` using gradient-based optimization with adaptive step size L. """ - # Compute `AX` using the interpolation function - AX, _, _ = self.apply_interpolation_matrix() # Skip the other two outputs + # Compute `stretched_comps` using the interpolation function + stretched_comps, _, _ = self.apply_interpolation_matrix() # Skip the other two outputs (derivatives) # Compute RA and RR - intermediate_RA = AX.flatten(order="F").reshape((self._N * self._M, self._K), order="F") - RA = intermediate_RA.sum(axis=1).reshape((self._N, self._M), order="F") - RR = RA - self.MM + intermediate_RA = stretched_comps.flatten(order="F").reshape( + (self._signal_len * self._num_conditions, self._n_components), order="F" + ) + RA = intermediate_RA.sum(axis=1).reshape((self._signal_len, self._num_conditions), order="F") + RR = RA - self.source_matrix # Compute gradient `GraX` - self._GraX = self.apply_transformation_matrix( - R=RR + self.grad_comps = self.apply_transformation_matrix( + residuals=RR ).toarray() # toarray equivalent of full, make non-sparse # Compute initial step size `L0` - L0 = np.linalg.eigvalsh(self.Y.T @ self.Y).max() * np.max([self.A.max(), 1 / self.A.min()]) + L0 = np.linalg.eigvalsh(self.weights.T @ self.weights).max() * np.max( + [self.stretch.max(), 1 / self.stretch.min()] + ) # Compute adaptive step size `L` - if self._preX is None: + if self._prev_comps is None: L = L0 else: - num = np.sum((self._GraX - self._preGraX) * (self.X - self._preX)) # Element-wise multiplication - denom = np.linalg.norm(self.X - self._preX, "fro") ** 2 # Frobenius norm squared + num = np.sum( + (self.grad_comps - self._prev_grad_comps) * (self.comps - self._prev_comps) + ) # Elem-wise multiply + denom = np.linalg.norm(self.comps - self._prev_comps, "fro") ** 2 # Frobenius norm squared L = num / denom if denom > 0 else L0 if L <= 0: L = L0 - # Store our old X before updating because it is used in step selection - self._preX = self.X.copy() + # Store our old component matrix before updating because it is used in step selection + self._prev_comps = self.comps.copy() - while True: # iterate updating X - x_step = self._preX - self._GraX / L + while True: # iterate updating components + comps_step = self._prev_comps - self.grad_comps / L # Solve x^3 + p*x + q = 0 for the largest real root - self.X = np.square(cubic_largest_real_root(-x_step, self.eta / (2 * L))) + self.comps = np.square(cubic_largest_real_root(-comps_step, self.eta / (2 * L))) # Mask values that should be set to zero - mask = self.X**2 * L / 2 - L * self.X * x_step + self.eta * np.sqrt(self.X) < 0 - self.X = mask * self.X + mask = self.comps**2 * L / 2 - L * self.comps * comps_step + self.eta * np.sqrt(self.comps) < 0 + self.comps = mask * self.comps objective_improvement = self._objective_history[-1] - self.get_objective_function( - R=self.get_residual_matrix() + residuals=self.get_residual_matrix() ) # Check if objective function improves @@ -558,21 +581,17 @@ def updateX(self): if np.isinf(L): break - def updateY2(self): + def update_weights(self): """ - Updates Y using matrix operations, solving a quadratic program via `solve_mkr_box`. + Updates weights using matrix operations, solving a quadratic program via to do so. """ - K = self._K - N = self._N - M = self._M - - for m in range(M): - T = np.zeros((N, K)) # Initialize T as an (N, K) zero matrix + for m in range(self._num_conditions): + T = np.zeros((self._signal_len, self._n_components)) # Initialize T as an (N, K) zero matrix # Populate T using apply_interpolation - for k in range(K): - T[:, k] = self.apply_interpolation(self.A[k, m], self.X[:, k], return_derivatives=True)[ + for k in range(self._n_components): + T[:, k] = self.apply_interpolation(self.stretch[k, m], self.comps[:, k], return_derivatives=True)[ 0 ].squeeze() @@ -580,69 +599,72 @@ def updateY2(self): y = self.solve_quadratic_program(T=T, m=m) # Update Y - self.Y[:, m] = y + self.weights[:, m] = y - def regularize_function(self, A=None): + def regularize_function(self, stretch=None): """ Computes the regularization function, gradient, and Hessian for optimization. Returns: - fun: Objective function value (scalar) - gra: Gradient (same shape as A) """ - if A is None: - A = self.A - - K = self._K - M = self._M - N = self._N + if stretch is None: + stretch = self.stretch # Compute interpolated matrices - AX, TX, HX = self.apply_interpolation_matrix(A=A, return_derivatives=True) + stretched_comps, d_str_cmps, dd_str_cmps = self.apply_interpolation_matrix( + stretch=stretch, return_derivatives=True + ) # Compute residual - intermediate_RA = AX.flatten(order="F").reshape((N * M, K), order="F") - RA = intermediate_RA.sum(axis=1).reshape((N, M), order="F") - RA = RA - self.MM + intermediate_RA = stretched_comps.flatten(order="F").reshape( + (self._signal_len * self._num_conditions, self._n_components), order="F" + ) + RA = intermediate_RA.sum(axis=1).reshape((self._signal_len, self._num_conditions), order="F") + RA = RA - self.source_matrix # Compute objective function - fun = self.get_objective_function(RA, A) + reg_func = self.get_objective_function(RA, stretch) # Compute gradient - tiled_derivative = np.sum(TX * np.tile(RA, (1, K)), axis=0) - der_reshaped = np.asarray(tiled_derivative).reshape((M, K), order="F") - gra = der_reshaped.T + self.rho * A @ self.P.T @ self.P + tiled_derivative = np.sum(d_str_cmps * np.tile(RA, (1, self._n_components)), axis=0) + der_reshaped = np.asarray(tiled_derivative).reshape((self._num_conditions, self._n_components), order="F") + func_grad = ( + der_reshaped.T + self.rho * stretch @ self.spline_smooth_operator.T @ self.spline_smooth_operator + ) - return fun, gra + return reg_func, func_grad - def updateA2(self): + def update_stretch(self): """ - Updates matrix A using constrained optimization (equivalent to fmincon in MATLAB). + Updates stretching matrix using constrained optimization (equivalent to fmincon in MATLAB). """ - # Flatten A for compatibility with the optimizer (since SciPy expects 1D input) - A_initial = self.A.flatten() + # Flatten stretch for compatibility with the optimizer (since SciPy expects 1D input) + stretch_init_vec = self.stretch.flatten() # Define the optimization function - def objective(A_vec): - A_matrix = A_vec.reshape(self.A.shape) # Reshape back to matrix form - fun, gra = self.regularize_function(A_matrix) - gra = gra.flatten() - return fun, gra + def objective(stretch_vec): + stretch_matrix = stretch_vec.reshape(self.stretch.shape) # Reshape back to matrix form + func, grad = self.regularize_function(stretch_matrix) + grad = grad.flatten() + return func, grad # Optimization constraints: lower bound 0.1, no upper bound - bounds = [(0.1, None)] * A_initial.size # Equivalent to 0.1 * ones(K, M) + bounds = [(0.1, None)] * stretch_init_vec.size # Equivalent to 0.1 * ones(K, M) # Solve optimization problem (equivalent to fmincon) result = minimize( - fun=lambda A_vec: objective(A_vec)[0], # Objective function - x0=A_initial, # Initial guess + fun=lambda stretch_vec: objective(stretch_vec)[0], # Objective function + x0=stretch_init_vec, # Initial guess method="trust-constr", # Equivalent to 'trust-region-reflective' - jac=lambda A_vec: objective(A_vec)[1], # Gradient - bounds=bounds, # Lower bounds on A + jac=lambda stretch_vec: objective(stretch_vec)[1], # Gradient + bounds=bounds, # Lower bounds on stretch + # TODO: A Hessian can be incorporated for better convergence. ) - # Update A with the optimized values - self.A = result.x.reshape(self.A.shape) + # Update stretch with the optimized values + self.stretch = result.x.reshape(self.stretch.shape) def cubic_largest_real_root(p, q): diff --git a/src/diffpy/snmf/stretchednmfapp.py b/src/diffpy/snmf/stretchednmfapp.py deleted file mode 100644 index f577b887..00000000 --- a/src/diffpy/snmf/stretchednmfapp.py +++ /dev/null @@ -1,64 +0,0 @@ -import argparse -from pathlib import Path - -from diffpy.snmf.io import initialize_variables, load_input_signals -from diffpy.snmf.subroutines import initialize_components, lift_data - -ALLOWED_DATA_TYPES = ["powder_diffraction", "pd", "pair_distribution_function", "pdf"] - - -def create_parser(): - parser = argparse.ArgumentParser( - prog="stretched_nmf", description="Stretched Nonnegative Matrix Factorization" - ) - parser.add_argument( - "-i", - "--input-directory", - type=str, - default=None, - help="Directory containing experimental data. Defaults to current working directory.", - ) - parser.add_argument( - "-o", - "--output-directory", - type=str, - help="The directory where the results will be written. Defaults to '/snmf_results'.", - ) - parser.add_argument( - "-t", - "--data-type", - type=str, - default=None, - choices=ALLOWED_DATA_TYPES, - help="The type of the experimental data.", - ) - parser.add_argument( - "-l", - "--lift-factor", - type=float, - default=1, - help="The lifting factor. Data will be lifted by lifted_data = data + abs(min(data) * lift). Default 1.", - ) - parser.add_argument( - "number-of-components", - type=int, - help="The number of component signals for the NMF decomposition. Must be an integer greater than 0", - ) - parser.add_argument("-v", "--version", action="version", help="Print the software version number") - args = parser.parse_args() - return args - - -def main(): - args = create_parser() - if args.input_directory is None: - args.input_directory = Path.cwd() - grid, input_data = load_input_signals(args.input_directory) - lifted_input_data = lift_data(input_data, args.lift_factor) - variables = initialize_variables(lifted_input_data, args.number_of_components, args.data_type) - components = initialize_components(variables["number_of_components"], variables["number_of_signals"], grid) - return components - - -if __name__ == "__main__": - main() diff --git a/src/diffpy/snmf/subroutines.py b/src/diffpy/snmf/subroutines.py deleted file mode 100644 index ae7f7ef7..00000000 --- a/src/diffpy/snmf/subroutines.py +++ /dev/null @@ -1,494 +0,0 @@ -import numdifftools -import numpy as np - -from diffpy.snmf.containers import ComponentSignal -from diffpy.snmf.factorizers import lsqnonneg -from diffpy.snmf.optimizers import get_weights - - -def initialize_components(number_of_components, number_of_signals, grid_vector): - """Initializes ComponentSignals for each of the components in the decomposition. - - Parameters - ---------- - number_of_components: int - The number of component signals in the NMF decomposition - number_of_signals: int - grid_vector: 1d array - The grid of the user provided signals. - - Returns - ------- - tuple of ComponentSignal objects - The tuple containing `number_of_components` of initialized ComponentSignal objects. - """ - if number_of_components <= 0: - raise ValueError(f"Number of components = {number_of_components}. Number_of_components must be >= 1.") - components = list() - for component in range(number_of_components): - component = ComponentSignal(grid_vector, number_of_signals, component) - components.append(component) - return tuple(components) - - -def lift_data(data_input, lift=1): - """Lifts values of data_input. - - Adds 'lift' * the minimum value in data_input to data_input element-wise. - - Parameters - ---------- - data_input: 2d array like - The matrix containing a series of signals to be decomposed. Has dimensions N x M where N is the length - of each signal and M is the number of signals. - - lift: float - The factor representing how much to lift 'data_input'. - - Returns - ------- - 2d array like - The matrix that contains data_input - (min(data_input) * lift). - """ - data_input = np.asarray(data_input) - return data_input + np.abs(np.min(data_input) * lift) - - -def construct_stretching_matrix(components, number_of_components, number_of_signals): - """Constructs the stretching factor matrix. - - Parameters - ---------- - components: tuple of ComponentSignal objects - The tuple containing the component signals in ComponentSignal objects. - number_of_signals: int - The number of signals in the data provided by the user. - - Returns - ------- - 2d array - The matrix containing the stretching factors for the component signals for each of the signals in the - raw data. Has dimensions `component_signal` x `number_of_signals` - """ - if (len(components)) == 0: - raise ValueError(f"Number of components = {number_of_components}. Number_of_components must be >= 1.") - number_of_components = len(components) - - if number_of_signals <= 0: - raise ValueError(f"Number of signals = {number_of_signals}. Number_of_signals must be >= 1.") - - stretching_factor_matrix = np.zeros((number_of_components, number_of_signals)) - for i, component in enumerate(components): - stretching_factor_matrix[i] = component.stretching_factors - return stretching_factor_matrix - - -def construct_component_matrix(components): - """Constructs the component matrix. - - Parameters - ---------- - components: tuple of ComponentSignal objects - The tuple containing the component signals in ComponentSignal objects. - - Returns - ------- - 2d array - The matrix containing the component signal values. Has dimensions `signal_length` x `number_of_components`. - """ - signal_length = len(components[0].iq) - number_of_components = len(components) - if signal_length == 0: - raise ValueError(f"Signal length = {signal_length}. Signal length must be >= 1") - if number_of_components == 0: - raise ValueError(f"Number of components = {number_of_components}. Number_of_components must be >= 1") - - component_matrix = np.zeros((number_of_components, signal_length)) - for i, component in enumerate(components): - component_matrix[i] = component.iq - return component_matrix - - -def construct_weight_matrix(components): - """Constructs the weights matrix. - - Constructs a ΔΆ x M matrix where K is the number of components and M is the - number of signals. Each element is the stretching factor for a specific - weights for a specific signal from the data input. - - Parameters - ---------- - components: tuple of ComponentSignal objects - The tuple containing the component signals. - - Returns - ------- - 2d array like - The 2d array containing the weightings for each component for each signal. - """ - number_of_components = len(components) - number_of_signals = len(components[0].weights) - if number_of_components == 0: - raise ValueError(f"Number of components = {number_of_components}. Number of components must be >= 1") - if number_of_signals == 0: - raise ValueError(f"Number of signals = {number_of_signals}. Number_of_signals must be >= 1.") - weights_matrix = np.zeros((number_of_components, number_of_signals)) - for i, component in enumerate(components): - weights_matrix[i] = component.weights - return weights_matrix - - -def update_weights(components, data_input, method=None): - """Updates the weights matrix. - - Updates the weights matrix and the weights vector for each ComponentSignal object. - - Parameters - ---------- - components: tuple of ComponentSignal objects - The tuple containing the component signals. - method: str - The string specifying which method should be used to find a new weight matrix: non-negative least squares - or a quadratic program. - data_input: 2d array - The 2d array containing the user-provided signals. - - Returns - ------- - 2d array - The 2d array containing the weight factors for each component for each signal from `data_input`. - Has dimensions K x M where K is the number of components and M is the number of signals in `data_input.` - """ - data_input = np.asarray(data_input) - weight_matrix = construct_weight_matrix(components) - number_of_signals = len(components[0].weights) - number_of_components = len(components) - signal_length = len(components[0].grid) - for signal in range(number_of_signals): - stretched_components = np.zeros((signal_length, number_of_components)) - for i, component in enumerate(components): - stretched_components[:, i] = component.apply_stretch(signal)[0] - if method == "align": - weights = lsqnonneg(stretched_components, data_input[:, signal]) - else: - weights = get_weights( - stretched_components.T @ stretched_components, - -stretched_components.T @ data_input[:, signal], - 0, - 1, - ) - weight_matrix[:, signal] = weights - return weight_matrix - - -def reconstruct_signal(components, signal_idx): - """Reconstructs a specific signal from its weighted and stretched components. - - Calculates the linear combination of stretched components where each term is the stretched component multiplied - by its weight factor. - - Parameters - ---------- - components: tuple of ComponentSignal objects - The tuple containing the ComponentSignal objects - signal_idx: int - The index of the specific signal in the input data to be reconstructed - - Returns - ------- - 1d array like - The reconstruction of a signal from calculated weights, stretching factors, and iq values. - """ - signal_length = len(components[0].grid) - reconstruction = np.zeros(signal_length) - for component in components: - stretched = component.apply_stretch(signal_idx)[0] - stretched_and_weighted = component.apply_weight(signal_idx, stretched) - reconstruction += stretched_and_weighted - return reconstruction - - -def initialize_arrays(number_of_components, number_of_moments, signal_length): - """Generates the initial guesses for the weight, stretching, and component matrices. - - Calculates the initial guesses for the component matrix, stretching factor matrix, and weight matrix. The - initial guess for the component matrix is a random (signal_length) x (number_of_components) matrix where - each element is between 0 and 1. The initial stretching factor matrix is a random - (number_of_components) x (number_of_moments) matrix where each element is number slightly perturbed from 1. - The initial weight matrix guess is a random (number_of_components) x (number_of_moments) matrix where - each element is between 0 and 1. - - Parameters - ---------- - number_of_components: int - The number of component signals to obtain from the stretched nmf decomposition. - - number_of_moments: int - The number of signals in the user provided dataset where each signal is at a different moment. - - signal_length: int - The length of each signal in the user provided dataset. - - Returns - ------- - tuple of 2d arrays of floats - The tuple containing three elements: the initial component matrix guess, the initial stretching factor matrix - guess, and the initial weight factor matrix guess in that order. - """ - component_matrix_guess = np.random.rand(signal_length, number_of_components) - weight_matrix_guess = np.random.rand(number_of_components, number_of_moments) - stretching_matrix_guess = ( - np.ones(number_of_components, number_of_moments) - + np.random.randn(number_of_components, number_of_moments) * 1e-3 - ) - return component_matrix_guess, weight_matrix_guess, stretching_matrix_guess - - -def objective_function( - residual_matrix, stretching_factor_matrix, smoothness, smoothness_term, component_matrix, sparsity -): - """Defines the objective function of the algorithm and returns its value. - - Calculates the value of '(||residual_matrix||_F) ** 2 + smoothness * (||smoothness_term * - stretching_factor_matrix.T||)**2 + sparsity * sum(component_matrix ** .5)' and returns its value. - - Parameters - ---------- - residual_matrix: 2d array like - The matrix where each column is the difference between an experimental PDF/XRD pattern and a calculated - PDF/XRD pattern at each grid point. Has dimensions R x M where R is the length of each pattern and M is - the amount of patterns. - - stretching_factor_matrix: 2d array like - The matrix containing the stretching factors of the calculated component signal. Has dimensions K x M where - K is the amount of components and M is the number of experimental PDF/XRD patterns. - - smoothness: float - The coefficient of the smoothness term which determines the intensity of the smoothness term and its - behavior. It is not very sensitive and is usually adjusted by multiplying it by ten. - - smoothness_term: 2d array like - The regularization term that ensures that smooth changes in the component stretching signals are favored. - Has dimensions (M-2) x M where M is the amount of experimentally obtained PDF/XRD patterns, the moment - amount. - - component_matrix: 2d array like - The matrix containing the calculated component signals of the experimental PDF/XRD patterns. Has dimensions - R x K where R is the signal length and K is the number of component signals. - - sparsity: float - The parameter determining the intensity of the sparsity regularization term which enables the algorithm to - exploit the sparse nature of XRD data. It is usually adjusted by doubling. - - Returns - ------- - float - The value of the objective function. - """ - residual_matrix = np.asarray(residual_matrix) - stretching_factor_matrix = np.asarray(stretching_factor_matrix) - component_matrix = np.asarray(component_matrix) - return ( - 0.5 * np.linalg.norm(residual_matrix, "fro") ** 2 - + 0.5 * smoothness * np.linalg.norm(smoothness_term @ stretching_factor_matrix.T, "fro") ** 2 - + sparsity * np.sum(np.sqrt(component_matrix)) - ) - - -def get_stretched_component(stretching_factor, component, signal_length): - """Applies a stretching factor to a component signal. - - Computes a stretched signal and reinterpolates it onto the original grid of points. Uses a normalized grid - of evenly spaced integers counting from 0 to signal_length (exclusive) to approximate values in between grid - nodes. Once this grid is stretched, values at grid nodes past the unstretched signal's domain are set to zero. - Returns the approximate values of x(r/a) from x(r) where x is a component signal. - - Parameters - ---------- - stretching_factor: float - The stretching factor of a component signal at a particular moment. - component: 1d array like - The calculated component signal without stretching or weighting. Has length N, the length of the signal. - signal_length: int - The length of the component signal. - - Returns - ------- - tuple of 1d array of floats - The calculated component signal with stretching factors applied. Has length N, the length of the unstretched - component signal. Also returns the gradient and hessian of the stretching transformation. - """ - component = np.asarray(component) - normalized_grid = np.arange(signal_length) - - def stretched_component_func(stretching_factor): - return np.interp(normalized_grid / stretching_factor, normalized_grid, component, left=0, right=0) - - derivative_func = numdifftools.Derivative(stretched_component_func) - second_derivative_func = numdifftools.Derivative(derivative_func) - - stretched_component = stretched_component_func(stretching_factor) - stretched_component_gra = derivative_func(stretching_factor) - stretched_component_hess = second_derivative_func(stretching_factor) - - return ( - np.asarray(stretched_component), - np.asarray(stretched_component_gra), - np.asarray(stretched_component_hess), - ) - - -def update_weights_matrix( - component_amount, - signal_length, - stretching_factor_matrix, - component_matrix, - data_input, - moment_amount, - weights_matrix, - method, -): - """Update the weight factors matrix. - - Parameters - ---------- - component_amount: int - The number of component signals the user would like to determine from the experimental data. - - signal_length: int - The length of the experimental signal patterns - - stretching_factor_matrix: 2d array like - The matrx containing the stretching factors of the calculated component signals. Has dimensions K x M - where K is the number of component signals and M is the number of XRD/PDF patterns. - - component_matrix: 2d array like - The matrix containing the unstretched calculated component signals. Has dimensions N x K where N is the - length of the signals and K is the number of component signals. - - data_input: 2d array like - The experimental series of PDF/XRD patterns. Has dimensions N x M where N is the length of the PDF/XRD - signals and M is the number of PDF/XRD patterns. - - moment_amount: int - The number of PDF/XRD patterns from the experimental data. - - weights_matrix: 2d array like - The matrix containing the weights of the stretched component signals. Has dimensions K x M where K is - the number of component signals and M is the number of XRD/PDF patterns. - - method: str - The string specifying the method for obtaining individual weights. - - Returns - ------- - 2d array like - The matrix containing the new weight factors of the stretched component signals. - """ - stretching_factor_matrix = np.asarray(stretching_factor_matrix) - component_matrix = np.asarray(component_matrix) - data_input = np.asarray(data_input) - weights_matrix = np.asarray(weights_matrix) - weight = np.zeros(component_amount) - for i in range(moment_amount): - stretched_components = np.zeros((signal_length, component_amount)) - for n in range(component_amount): - stretched_components[:, n] = get_stretched_component( - stretching_factor_matrix[n, i], component_matrix[:, n], signal_length - )[0] - if method == "align": - weight = lsqnonneg(stretched_components[0:signal_length, :], data_input[0:signal_length, i]) - else: - weight = get_weights( - stretched_components[0:signal_length, :].T @ stretched_components[0:signal_length, :], - -1 * stretched_components[0:signal_length, :].T @ data_input[0:signal_length, i], - 0, - 1, - ) - weights_matrix[:, i] = weight - return weights_matrix - - -def get_residual_matrix( - component_matrix, weights_matrix, stretching_matrix, data_input, moment_amount, component_amount, signal_length -): - """Obtains the residual matrix between the experimental data and calculated data. - - Calculates the difference between the experimental data and the reconstructed experimental data created from - the calculated components, weights, and stretching factors. For each experimental pattern, the stretched and - weighted components making up that pattern are subtracted. - - Parameters - ---------- - component_matrix: 2d array like - The matrix containing the calculated component signals. Has dimensions N x K where N is the length of the - signal and K is the number of calculated component signals. - - weights_matrix: 2d array like - The matrix containing the calculated weights of the stretched component signals. Has dimensions K x M where - K is the number of components and M is the number of moments or experimental PDF/XRD patterns. - - stretching_matrix: 2d array like - The matrix containing the calculated stretching factors of the calculated component signals. Has dimensions - K x M where K is the number of components and M is the number of moments or experimental PDF/XRD patterns. - - data_input: 2d array like - The matrix containing the experimental PDF/XRD data. Has dimensions N x M where N is the length of the - signals and M is the number of signal patterns. - - moment_amount: int - The number of patterns in the experimental data. Represents the number of moments in time in the data series - - component_amount: int - The number of component signals the user would like to obtain from the experimental data. - - signal_length: int - The length of the signals in the experimental data. - - - Returns - ------- - 2d array like - The matrix containing the residual between the experimental data and reconstructed data from calculated - values. Has dimensions N x M where N is the signal length and M is the number of moments. Each column - contains the difference between an experimental signal and a reconstruction of that signal from the - calculated weights, components, and stretching factors. - """ - component_matrix = np.asarray(component_matrix) - weights_matrix = np.asarray(weights_matrix) - stretching_matrix = np.asarray(stretching_matrix) - data_input = np.asarray(data_input) - residual_matrx = -1 * data_input - for m in range(moment_amount): - residual = residual_matrx[:, m] - for k in range(component_amount): - residual = ( - residual - + weights_matrix[k, m] - * get_stretched_component(stretching_matrix[k, m], component_matrix[:, k], signal_length)[0] - ) - residual_matrx[:, m] = residual - return residual_matrx - - -def reconstruct_data(components): - """Reconstructs the `input_data` matrix. - - Reconstructs the `input_data` matrix from calculated component signals, weights, and stretching factors. - - Parameters - ---------- - components: tuple of ComponentSignal objects - The tuple containing the component signals. - - Returns - ------- - 2d array - The 2d array containing the reconstruction of input_data. - """ - signal_length = len(components[0].iq) - number_of_signals = len(components[0].weights) - data_reconstruction = np.zeros((signal_length, number_of_signals)) - for signal in range(number_of_signals): - data_reconstruction[:, signal] = reconstruct_signal(components, signal) - return data_reconstruction