diff --git a/aequilibrae/paths/odme.py b/aequilibrae/paths/odme.py index f2c6afaa9..133f60d7d 100644 --- a/aequilibrae/paths/odme.py +++ b/aequilibrae/paths/odme.py @@ -1,6 +1,7 @@ """ ODME Infrastructure (User Interaction Class): """ + # TODO - 3 todo's remaining in code, see below from typing import Tuple @@ -25,6 +26,7 @@ if has_omx: import openmatrix as omx + class ODME(object): """Origin-Destination Matrix Estimation class. @@ -59,7 +61,7 @@ class ODME(object): >>> import pandas as pd >>> counts = pd.read_csv("/tmp/test_data.csv") - # We can now run the ODME procedure, see Use examples page for a more + # We can now run the ODME procedure, see Use examples page for a more # comprehensive overview of the options available when initialising. >>> odme = ODME(assignment, counts) >>> odme.execute() # See Use examples for optional arguments @@ -81,19 +83,20 @@ class ODME(object): # Statistics on the procedure tracking each link (count volumes) >>> link_stats = results.get_link_statistics() """ + # Input count volume columns (assigned volumes will be added during execution) COUNT_VOLUME_COLS = ["class", "link_id", "direction", "obs_volume"] - GMEAN_LIMIT = 0.01 # FACTOR LIMITING VARIABLE - FOR TESTING PURPOSES - DEFUNCT! + GMEAN_LIMIT = 0.01 # FACTOR LIMITING VARIABLE - FOR TESTING PURPOSES - DEFUNCT! ALL_ALGORITHMS = ["gmean", "spiess", "reg_spiess"] - DEFAULT_STOP_CRIT = {"max_outer": 50, "max_inner": 50, - "convergence_crit": 10**-4, "inner_convergence": 10**-4} + DEFAULT_STOP_CRIT = {"max_outer": 50, "max_inner": 50, "convergence_crit": 10**-4, "inner_convergence": 10**-4} - def __init__(self, + def __init__( + self, assignment: TrafficAssignment, count_volumes: pd.DataFrame, stop_crit=None, algorithm: str = "spiess", - alpha: float = None + alpha: float = None, ) -> None: """ Parameters: @@ -104,11 +107,11 @@ def __init__(self, need to have preset select links (these will be overwritten). count_volumes: a dataframe detailing the links, the class they are associated with, the direction and their observed volume. - stop_crit: the maximum number of iterations and the convergence criterion + stop_crit: the maximum number of iterations and the convergence criterion (see ODME.DEFAULT_STOP_CRIT for formatting). algorithm: specification for which gradient-descent based algorithm to use (see ODME.ALL_ALGORITHMS for options). - alpha: used as a hyper-parameter for regularised spiess (see technical document for + alpha: used as a hyper-parameter for regularised spiess (see technical document for details). NOTE - certain functionality is only implemented for single class ODME - see docstrings for @@ -152,7 +155,7 @@ def __init__(self, # Component of objective function from flows/regularisation: self.flow_obj, self.reg_obj = None, None # Initially inf to ensure inner iterations begin - self.convergence_change = float('inf') + self.convergence_change = float("inf") # Stopping criterion if not stop_crit: @@ -164,24 +167,20 @@ def __init__(self, # Hyper-parameters for regularisation: if algorithm in ["reg_spiess"]: - if alpha is None or alpha > 1 or alpha < 0: # THIS CHECK SHOULD PROBABLY BE MORE ROBUST + if alpha is None or alpha > 1 or alpha < 0: # THIS CHECK SHOULD PROBABLY BE MORE ROBUST raise ValueError("Hyper-parameter alpha should be between 0 and 1") self.alpha = alpha self.beta = 1 - alpha # Results/Statistics: self.results = ODMEResults(self) - + # Procedure Information: self.procedure_date = "" self.procedure_id = "" # Utilities: - def __check_inputs(self, - counts: pd.DataFrame, - stop_crit: dict, - alpha: float, - algorithm: str) -> None: + def __check_inputs(self, counts: pd.DataFrame, stop_crit: dict, alpha: float, algorithm: str) -> None: """ Ensures all user input is of correct format/value. NOTE - we do not check if the assignment is given properly, @@ -192,9 +191,11 @@ def __check_inputs(self, if not isinstance(algorithm, str): raise ValueError("Algorithm must be input as a string") elif algorithm not in self.ALL_ALGORITHMS: - raise ValueError(f"'{algorithm}' is not a valid algorithm.\n" + - "Currently implemented algorithms include:\n" + - '\n'.join(self.ALL_ALGORITHMS)) + raise ValueError( + f"'{algorithm}' is not a valid algorithm.\n" + + "Currently implemented algorithms include:\n" + + "\n".join(self.ALL_ALGORITHMS) + ) # Check stopping criteria if given stop_error = False @@ -211,19 +212,21 @@ def __check_inputs(self, stop_error = True elif stop_crit[key] < 1: stop_error = True - else: + else: if not isinstance(stop_crit[key], (float, int)): stop_error = True elif stop_crit[key] < 0: stop_error = True if stop_error: - raise ValueError("Stopping criterion must be given as a dictionary as follows," + - "(key -> type of value):" + - "max_outer -> positive integer" + - "max_inner -> positive integer" + - "convergence_crit -> non-negative integer/float" + - "inner_convergence -> non-negative integer/float") + raise ValueError( + "Stopping criterion must be given as a dictionary as follows," + + "(key -> type of value):" + + "max_outer -> positive integer" + + "max_inner -> positive integer" + + "convergence_crit -> non-negative integer/float" + + "inner_convergence -> non-negative integer/float" + ) # Check count volumes counts_error = False @@ -241,8 +244,7 @@ def __check_inputs(self, if not counts_error: observed = counts["obs_volume"] - if not (pd.api.types.is_float_dtype(observed) or - pd.api.types.is_integer_dtype(observed)): + if not (pd.api.types.is_float_dtype(observed) or pd.api.types.is_integer_dtype(observed)): counts_error = True elif not np.all(observed >= 0): counts_error = True @@ -252,17 +254,19 @@ def __check_inputs(self, counts_error = True if counts_error: - raise ValueError("Count volumes must be a non-empty pandas dataframe with columns:\n" + - '\n'.join(self.COUNT_VOLUME_COLS) + - "\n and all observed volumes must be non-negative floats or integers, and" + - "only a single count volume should be given for a" + - "particular class, link_id and direction") + raise ValueError( + "Count volumes must be a non-empty pandas dataframe with columns:\n" + + "\n".join(self.COUNT_VOLUME_COLS) + + "\n and all observed volumes must be non-negative floats or integers, and" + + "only a single count volume should be given for a" + + "particular class, link_id and direction" + ) # Check alpha value if given if alpha is not None: if not isinstance(alpha, (float, int)): raise ValueError("Input alpha should be a float or integer (0 to 1)") - elif alpha > 1 or alpha < 0: + elif alpha > 1 or alpha < 0: raise ValueError("Input alpha should be between 0 and 1") def __duplicate_matrices(self): @@ -272,7 +276,7 @@ def __duplicate_matrices(self): # Loop through TrafficClasses - create new and replace, then set classes new_classes = [] for usr_cls in self.classes: - mat = usr_cls.matrix.copy(cores = usr_cls.matrix.view_names, memory_only=True) + mat = usr_cls.matrix.copy(cores=usr_cls.matrix.view_names, memory_only=True) mat.computational_view() new_cls = TrafficClass(usr_cls._id, usr_cls.graph, mat) @@ -287,9 +291,9 @@ def __duplicate_matrices(self): def estimate_alpha(self, alpha: float) -> float: """ - Estimates a starting hyper-paramater for regularised + Estimates a starting hyper-paramater for regularised spiess given a number between 0-1. - + NOTE - currently only implemented for single class """ demand_sum = np.sum(self.demands[0]) @@ -313,10 +317,8 @@ def __set_select_links(self) -> None: for user_class in self.classes: user_class.set_select_links( { - self.get_sl_key(row): - [(row['link_id'], row['direction'])] - for _, row in c_v[c_v['class'] == user_class._id - ].iterrows() + self.get_sl_key(row): [(row["link_id"], row["direction"])] + for _, row in c_v[c_v["class"] == user_class._id].iterrows() } ) @@ -350,10 +352,10 @@ def __init_objective_func(self) -> None: Current objective functions have 2 parts which are summed: 1. The p-norm raised to the power p of the error vector for observed flows. - 2. The p-norm raised to the power p of the error matrix (treated as a n^2 vector) + 2. The p-norm raised to the power p of the error matrix (treated as a n^2 vector) for the demand matrix. - - NOTE - currently (1.) must always be present, but (2.) (the regularisation term) + + NOTE - currently (1.) must always be present, but (2.) (the regularisation term) need not be present. """ p_1 = self._norms[0] @@ -366,10 +368,9 @@ def __reg_obj_func(self) -> None: # NOTE - pce not yet included for multi-class """ obs_vals = self.count_volumes["obs_volume"].to_numpy() - assign_vals = self.count_volumes['assign_volume'].to_numpy() - self.flow_obj = self.alpha * np.sum(np.abs(obs_vals - assign_vals)**p_1) / p_1 - self.reg_obj = self.beta * np.sum( - np.abs(self.original_demands[0] - self.demands[0])**p_2) / p_2 + assign_vals = self.count_volumes["assign_volume"].to_numpy() + self.flow_obj = self.alpha * np.sum(np.abs(obs_vals - assign_vals) ** p_1) / p_1 + self.reg_obj = self.beta * np.sum(np.abs(self.original_demands[0] - self.demands[0]) ** p_2) / p_2 self.__set_convergence_values(self.flow_obj + self.reg_obj) def __obj_func(self) -> None: @@ -379,8 +380,8 @@ def __obj_func(self) -> None: # NOTE - pce not yet included for multi-class """ obs_vals = self.count_volumes["obs_volume"].to_numpy() - assign_vals = self.count_volumes['assign_volume'].to_numpy() - self.flow_obj = np.sum(np.abs(obs_vals - assign_vals)**p_1) / p_1 + assign_vals = self.count_volumes["assign_volume"].to_numpy() + self.flow_obj = np.sum(np.abs(obs_vals - assign_vals) ** p_1) / p_1 self.__set_convergence_values(self.flow_obj) if p_2: @@ -412,17 +413,19 @@ def save_to_project(self, name: str, file_name: str, project=None) -> None: self.__save_as_omx(file_path) elif ".aem" in file_name: self.__save_as_aem(file_path) - else: # unsupported file-type + else: # unsupported file-type raise ValueError("Only supporting .omx and .aem") record = mats.new_record(name, file_name) record.procedure_id = self.procedure_id record.timestamp = self.procedure_date record.procedure = "Origin-Destination Matrix Estimation" - record.report = json.dumps({ - "iterations": self.results.get_iteration_statistics().to_dict(), - "by_link": self.results.get_link_statistics().to_dict() - }) + record.report = json.dumps( + { + "iterations": self.results.get_iteration_statistics().to_dict(), + "by_link": self.results.get_link_statistics().to_dict(), + } + ) record.save() def __save_as_omx(self, file_path: str) -> None: @@ -483,8 +486,8 @@ def get_demands(self) -> list[np.ndarray]: # ODME Execution: def execute(self, verbose=False, print_rate=1) -> None: - """ - Run ODME algorithm until either the maximum iterations has been reached, + """ + Run ODME algorithm until either the maximum iterations has been reached, or the convergence criterion has been met. Parameters: @@ -510,7 +513,7 @@ def execute(self, verbose=False, print_rate=1) -> None: # Inner iterations: # Ensure at least 1 inner iteration is run per outer loop - self.convergence_change = float('inf') + self.convergence_change = float("inf") inner = 0 while inner < self.max_inner and self.convergence_change > self.inner_convergence_crit: inner += 1 @@ -530,15 +533,15 @@ def execute(self, verbose=False, print_rate=1) -> None: # function is sufficient - we may want to replace the matrix.matrices value # and call matrix.computational_view() (with appropriate arguments) instead. def __perform_assignment(self) -> None: - """ + """ Uses current demand matrix to perform an assignment, then save - the assigned flows and select link matrices. Also recalculates the + the assigned flows and select link matrices. Also recalculates the objective function following an assignment. This function will only be called at the start of an outer iteration & during the final convergence test. """ - # Change the demand matrices within the TrafficClass's to the current + # Change the demand matrices within the TrafficClass's to the current # demand matrices that have been calculated from the previous outer iteration. for aeq_matrix, demand in zip(self.aequilibrae_matrices, self.demands): aeq_matrix.matrix_view = demand @@ -550,9 +553,7 @@ def __perform_assignment(self) -> None: for assignclass, demand in zip(self.classes, self.demands): sl_matrices = assignclass.results.select_link_od.matrix for link in sl_matrices: - self._sl_matrices[link] = np.nan_to_num( - sl_matrices[link] / demand - ) + self._sl_matrices[link] = np.nan_to_num(sl_matrices[link] / demand) # Extract and store array of assigned volumes to the select links self.__extract_volumes() @@ -564,7 +565,7 @@ def __extract_volumes(self) -> None: """ Extracts and stores assigned volumes (corresponding for those for which we have observations - ie count volumes). - + NOTE - this does not take into account pce, ie this is the number of vehicles, not 'flow'. """ @@ -581,18 +582,14 @@ def extract_volume(row) -> None: Extracts volume corresponding to particular link (from row) and return it. For inner iterations need to calculate this via __calculate_volumes """ - return assign_df.loc[assign_df['link_id'] == row['link_id'], - col[row['class']][row['direction']]].values[0] + return assign_df.loc[assign_df["link_id"] == row["link_id"], col[row["class"]][row["direction"]]].values[0] # Extract a flow for each count volume: - self.count_volumes['assign_volume'] = self.count_volumes.apply( - extract_volume, - axis=1 - ) + self.count_volumes["assign_volume"] = self.count_volumes.apply(extract_volume, axis=1) def __execute_inner_iter(self) -> None: """ - Runs an inner iteration of the ODME algorithm. + Runs an inner iteration of the ODME algorithm. This assumes the SL matrices stay constant and modifies the current demand matrices. """ # Element-wise multiplication of demand matrices by scaling factors @@ -608,7 +605,7 @@ def __execute_inner_iter(self) -> None: def __get_scaling_factors(self) -> list[np.ndarray]: """ Returns scaling matrices for each user class - depending on algorithm chosen. - + NOTE - we expect any algorithm to return a list of factor matrices in order of the stored user classes. """ @@ -625,15 +622,15 @@ def __calculate_volumes(self) -> None: # Calculate a single flow: def __calculate_volume(self, row: pd.Series) -> float: """ - Given a single row of the count volumes dataframe, - calculates the appropriate corresponding assigned + Given a single row of the count volumes dataframe, + calculates the appropriate corresponding assigned volume. """ sl_matrix = self._sl_matrices[self.get_sl_key(row)] - demand_matrix = self.demands[self.names_to_indices[row['class']]] + demand_matrix = self.demands[self.names_to_indices[row["class"]]] return np.sum(sl_matrix * demand_matrix) # Calculate flows for all rows: - self.count_volumes['assign_volume'] = self.count_volumes.apply( - lambda row: __calculate_volume(self, row), - axis=1) + self.count_volumes["assign_volume"] = self.count_volumes.apply( + lambda row: __calculate_volume(self, row), axis=1 + ) diff --git a/aequilibrae/paths/odme_/__init__.py b/aequilibrae/paths/odme_/__init__.py index 3b4c3e013..2841cfb08 100644 --- a/aequilibrae/paths/odme_/__init__.py +++ b/aequilibrae/paths/odme_/__init__.py @@ -5,6 +5,7 @@ This is copied from paths.results.__init__.py """ + __author__ = "Mittun Sudhahar ($Author: Mittun Sudhahar $)" __version__ = "1.0" __revision__ = "$Revision: 1 $" diff --git a/aequilibrae/paths/odme_/results.py b/aequilibrae/paths/odme_/results.py index cd54fca27..b82aa67c0 100644 --- a/aequilibrae/paths/odme_/results.py +++ b/aequilibrae/paths/odme_/results.py @@ -10,33 +10,55 @@ # ADD FUNCTIONALITY TO SPLIT DATA INTO A: DEPENDENT ON ITERATION, B: DEPENDENT ON COUNT VOLUME # ADD A WAY TO SAVE B TO CSV's class ODMEResults(object): - """ Results and statistics of an ODME procedure. - - See Use examples or docstring of ODME for how to use. + """Results and statistics of an ODME procedure. + + See Use examples or docstring of ODME for how to use. User interaction methods include: get_cumulative_factors() get_iteration_statistics() get_link_statistics() """ + # Columns for various dataframes: # This one get written to the procedure_report - ITERATION_COLS = ["class", "Outer Loop #", "Inner Loop #", "Total Iteration #", - "Total Run Time (s)", "Loop Time (s)", "Convergence", "Inner Convergence", - "Flow Objective", "Reg Objective", - 'mean_factor', 'median_factor', 'std_deviation_factor', - 'variance_factor', 'min_factor', 'max_factor'] + ITERATION_COLS = [ + "class", + "Outer Loop #", + "Inner Loop #", + "Total Iteration #", + "Total Run Time (s)", + "Loop Time (s)", + "Convergence", + "Inner Convergence", + "Flow Objective", + "Reg Objective", + "mean_factor", + "median_factor", + "std_deviation_factor", + "variance_factor", + "min_factor", + "max_factor", + ] # This only for debugging - LINK_COLS = ["class", "link_id", "direction", - "Outer Loop #", "Inner Loop #", "Total Iteration #", - "obs_volume", "assign_volume", "Assigned - Observed"] + LINK_COLS = [ + "class", + "link_id", + "direction", + "Outer Loop #", + "Inner Loop #", + "Total Iteration #", + "obs_volume", + "assign_volume", + "Assigned - Observed", + ] # For logging different iterations: INNER, OUTER, FINAL_LOG = 0, 1, 2 - def __init__(self, odme: 'ODME') -> None: + def __init__(self, odme: "ODME") -> None: """ Initialises necessary fields from odme object in order to generate statistics and results. @@ -69,18 +91,11 @@ def get_cumulative_factors(self) -> pd.DataFrame: Return the cumulative factors (ratio of final to initial matrix) in a pandas dataframe. """ cumulative_factors = [] - for initial, final, name in zip( - self.odme.original_demands, - self.odme.demands, - self.odme.class_names - ): + for initial, final, name in zip(self.odme.original_demands, self.odme.demands, self.odme.class_names): # Get cumulative factors for this demand matrix and store them: factors = np.nan_to_num(final / initial, nan=1) cumulative_factors.append( - pd.DataFrame({ - "class": [name for _ in range(final.size)], - "Factors": factors.ravel() - }) + pd.DataFrame({"class": [name for _ in range(final.size)], "Factors": factors.ravel()}) ) return pd.concat(cumulative_factors, ignore_index=True) @@ -120,9 +135,7 @@ def log_iter(self, iter_type: int) -> None: elif iter_type == self.FINAL_LOG: self.__prepare_final() else: - raise ValueError( - f"\'{iter_type}\' is not a valid type of iteration!" - ) + raise ValueError(f"'{iter_type}' is not a valid type of iteration!") self.__log_stats() @@ -146,7 +159,7 @@ def __update_iteration_stats(self) -> None: # Create Data: for cls_name, factor_stats in zip(self.odme.class_names, self.current_factors): data = dict() - + data["class"] = [cls_name] data["Outer Loop #"] = [self.outer] data["Inner Loop #"] = [self.inner] @@ -156,7 +169,7 @@ def __update_iteration_stats(self) -> None: data["Convergence"] = [self.odme.last_convergence] data["Inner Convergence"] = [self.odme.convergence_change] data["Flow Objective"] = [self.odme.flow_obj] - data["Reg Objective"] = [self.odme.reg_obj] # Only relevant for reg_spiess + data["Reg Objective"] = [self.odme.reg_obj] # Only relevant for reg_spiess data["mean_factor"] = factor_stats["mean_factor"] data["median_factor"] = factor_stats["median_factor"] data["std_deviation_factor"] = factor_stats["std_deviation_factor"] @@ -181,13 +194,12 @@ def __update_link_stats(self) -> None: Appends the newest set of link statistics. """ data = self.odme.count_volumes.copy(deep=True) - data[ "Outer Loop #"] = [self.outer for _ in range(len(data))] + data["Outer Loop #"] = [self.outer for _ in range(len(data))] data["Inner Loop #"] = [self.inner for _ in range(len(data))] data["Total Iteration #"] = [self.total_iter for _ in range(len(data))] data["Assigned - Observed"] = ( - self.odme.count_volumes['assign_volume'].to_numpy() - - self.odme.count_volumes["obs_volume"].to_numpy() - ) + self.odme.count_volumes["assign_volume"].to_numpy() - self.odme.count_volumes["obs_volume"].to_numpy() + ) self.link_stats.append(data) def record_factor_stats(self, factors: list[np.ndarray]) -> None: @@ -198,20 +210,22 @@ def record_factor_stats(self, factors: list[np.ndarray]) -> None: # Create statistics on all new factors: self.current_factors = [] for factor in factors: - self.current_factors.append({ - 'mean_factor' : np.mean(factor), - 'median_factor': np.median(factor), - 'std_deviation_factor' : np.std(factor), - 'variance_factor' : np.var(factor), - 'min_factor' : np.min(factor), - 'max_factor' : np.max(factor) - }) + self.current_factors.append( + { + "mean_factor": np.mean(factor), + "median_factor": np.median(factor), + "std_deviation_factor": np.std(factor), + "variance_factor": np.var(factor), + "min_factor": np.min(factor), + "max_factor": np.max(factor), + } + ) # Extra Utilities: def init_timer(self) -> None: """ Initialises the internal times (for statistics purposes). - + Should be run when the ODME procedure begins execution. """ self.time = time.time() @@ -246,11 +260,13 @@ def __reset_current_factors(self) -> None: """ self.current_factors = [] for _ in self.odme.classes: - self.current_factors.append({ - 'mean_factor' : None, - 'median_factor': None, - 'std_deviation_factor' : None, - 'variance_factor' : None, - 'min_factor' : None, - 'max_factor' : None - }) + self.current_factors.append( + { + "mean_factor": None, + "median_factor": None, + "std_deviation_factor": None, + "variance_factor": None, + "min_factor": None, + "max_factor": None, + } + ) diff --git a/aequilibrae/paths/odme_/scaling_factors.py b/aequilibrae/paths/odme_/scaling_factors.py index 1ec38e37a..6ae5662c2 100644 --- a/aequilibrae/paths/odme_/scaling_factors.py +++ b/aequilibrae/paths/odme_/scaling_factors.py @@ -6,22 +6,24 @@ import numpy as np import scipy.stats as spstats + class ScalingFactors(object): - """ ODME Algorithms (Scaling Factor Generation) - + """ODME Algorithms (Scaling Factor Generation) + Class should not need to be used by users, only developers. - To add a new algorithm simply add it to the ALL_ALGORITHMS list here and + To add a new algorithm simply add it to the ALL_ALGORITHMS list here and in the ODME class, and then update __set_algorithm and ensure your method - output a list of factor matrices for each input demand matrix as per speciciations. + output a list of factor matrices for each input demand matrix as per speciciations. """ + ALL_ALGORITHMS = ["gmean", "spiess", "reg_spiess"] - def __init__(self, odme: 'ODME', algorithm: str) -> None: + def __init__(self, odme: "ODME", algorithm: str) -> None: """ Initialises necessary fields from odme object in order to generate - a set of scaling matrices for the current iteration of the odme + a set of scaling matrices for the current iteration of the odme procedure. - + Parameters: odme: the ODME object containing all fields pertaining to the odme procedure algorithm: the algorithm to use to generate scaling factors. @@ -34,10 +36,9 @@ def __init__(self, odme: 'ODME', algorithm: str) -> None: self._c_v = odme.count_volumes self.class_names = odme.class_names self._class_counts = { - name : self._c_v[self._c_v['class'] == name].reset_index(drop=True) - for name in self.class_names - } - + name: self._c_v[self._c_v["class"] == name].reset_index(drop=True) for name in self.class_names + } + # Extra Data for Convenience self.names_to_indices = odme.names_to_indices @@ -66,11 +67,9 @@ def __set_algorithm(self) -> None: elif self.algo_name == "reg_spiess": self._algorithm = self.__reg_spiess - else: # Should never be called - should be dealt with in ODME class + else: # Should never be called - should be dealt with in ODME class raise ValueError( - f"Invalid algorithm name: {self.algo_name}" - "Valid algorithms are: " - '\n'.join(self.ALL_ALGORITHMS) + f"Invalid algorithm name: {self.algo_name}" "Valid algorithms are: " "\n".join(self.ALL_ALGORITHMS) ) def generate(self) -> list[np.ndarray]: @@ -82,7 +81,7 @@ def generate(self) -> list[np.ndarray]: # gmean (Geometric Mean): def __geometric_mean(self) -> list[np.ndarray]: """ - Calculates scaling factor based on geometric mean of ratio between + Calculates scaling factor based on geometric mean of ratio between proportionally (via SL matrix) assigned flow & observed flows. MULTI-CLASS UNDER DEVELOPMENT! (REQUIRES TESTING) @@ -103,7 +102,7 @@ def __geometric_mean(self) -> list[np.ndarray]: scaling_factors = [] # Steps 1 & 2: for demand, name in zip(self.demand_matrices, self.class_names): - observed = self._c_v[self._c_v['class'] == name] + observed = self._c_v[self._c_v["class"] == name] # If there are no observations leave matrix unchanged if len(observed) == 0: @@ -113,12 +112,12 @@ def __geometric_mean(self) -> list[np.ndarray]: factors = np.empty((len(observed), *(demand.shape))) for j, row in self._c_v.iterrows(): # Create factor matrix: - if row["obs_volume"] != 0 and row['assign_volume'] != 0: + if row["obs_volume"] != 0 and row["assign_volume"] != 0: # Modulate factor by select link dependency: - link_factor = (row['obs_volume'] / row['assign_volume']) - 1 + link_factor = (row["obs_volume"] / row["assign_volume"]) - 1 sl_matrix = self._sl_matrices[self.odme.get_sl_key(row)] - factor_matrix = (sl_matrix * link_factor) + factor_matrix = sl_matrix * link_factor # Apply factor limiting: # factor_matrix = np.clip(factor_matrix, -self.GMEAN_LIMIT, self.GMEAN_LIMIT) @@ -129,7 +128,7 @@ def __geometric_mean(self) -> list[np.ndarray]: # If assigned or observed value is 0 we cannot do anything right now else: factor_matrix = np.ones(demand.shape) - + # Add factor matrix factors[j, :, :] = factor_matrix @@ -155,12 +154,9 @@ def __spiess(self) -> list[np.ndarray]: step_sizes = self.__get_step_sizes_spiess(gradient_matrices) # Get scaling factors: - scaling_factors = [ - 1 - (step * gradient) - for step, gradient in zip(step_sizes,gradient_matrices) - ] + scaling_factors = [1 - (step * gradient) for step, gradient in zip(step_sizes, gradient_matrices)] return scaling_factors - + def __get_derivative_matrices_spiess(self) -> list[np.ndarray]: """ Returns derivative matrix (see (Spiess, 1990) and technical documentation) @@ -169,12 +165,12 @@ def __get_derivative_matrices_spiess(self) -> list[np.ndarray]: # without storing too many things in memory. derivatives = [] # Create a derivative matrix for each user class: - for demand, user_class in zip(self.demand_matrices , self.class_names): + for demand, user_class in zip(self.demand_matrices, self.class_names): observed = self._class_counts[user_class] factors = np.empty((len(observed), *(demand.shape))) for j, row in observed.iterrows(): sl_matrix = self._sl_matrices[self.odme.get_sl_key(row)] - factors[j, :, :] = sl_matrix * (row['assign_volume'] - row['obs_volume']) + factors[j, :, :] = sl_matrix * (row["assign_volume"] - row["obs_volume"]) # Add derivative matrix to list of derivatives: derivatives.append(np.sum(factors, axis=0)) @@ -186,7 +182,7 @@ def __get_step_sizes_spiess(self, gradients: list[np.ndarray]) -> list[float]: Returns estimate of optimal step size (see (Spiess, 1990) and technical documentation) Parameters: - gradients: The previously calculated gradient matrices - required for calculating + gradients: The previously calculated gradient matrices - required for calculating derivative of link flows with respect to step size. """ # Note, we could reduce the number of bounds we need to calculate @@ -198,10 +194,7 @@ def __get_step_sizes_spiess(self, gradients: list[np.ndarray]) -> list[float]: lambdas = [] for bounds, user_class, gradient in zip(all_bounds, self.class_names, gradients): # Calculating link flow derivatives: - flow_derivatives = self.__get_flow_derivatives_spiess( - user_class, - gradient - ) + flow_derivatives = self.__get_flow_derivatives_spiess(user_class, gradient) # Calculate minimising step length: errors = self.__get_flow_errors(user_class) @@ -216,10 +209,7 @@ def __get_step_sizes_spiess(self, gradients: list[np.ndarray]) -> list[float]: return lambdas - def __get_flow_derivatives_spiess(self, - user_class: str, - gradient: np.ndarray - ) -> np.ndarray: + def __get_flow_derivatives_spiess(self, user_class: str, gradient: np.ndarray) -> np.ndarray: """ Returns an array of flow derivatives (v_a' in technical documentation) for the particular class. @@ -246,7 +236,7 @@ def __get_flow_errors(self, user_class: str) -> np.ndarray: volume given for that class. """ data = self._class_counts[user_class] - return data['obs_volume'].to_numpy() - data['assign_volume'].to_numpy() + return data["obs_volume"].to_numpy() - data["assign_volume"].to_numpy() def __enforce_bounds(self, value: float, upper: float, lower: float) -> float: """ @@ -262,21 +252,19 @@ def __enforce_bounds(self, value: float, upper: float, lower: float) -> float: lower: the lower bound """ if value > upper: - return upper # Upper Bound Violated + return upper # Upper Bound Violated elif value < lower: - return lower # Lower Bound Violated + return lower # Lower Bound Violated else: - return value # Bounds Not Violated + return value # Bounds Not Violated - def __get_step_size_limits_spiess(self, - gradients: list[np.ndarray] - ) -> list[Tuple[float, float]]: + def __get_step_size_limits_spiess(self, gradients: list[np.ndarray]) -> list[Tuple[float, float]]: """ Returns bounds for step size in order of upper bound, then lower bound (see (Spiess, 1990) and technical documentation) for each gradient matrix. Parameters: - gradient: The currently calculating gradient matrix - required for calculating + gradient: The currently calculating gradient matrix - required for calculating derivative of link flows with respect to step size. """ bounds = [] @@ -287,16 +275,16 @@ def __get_step_size_limits_spiess(self, if np.any(upper_mask): upper_lim = 1 / np.min(gradient[upper_mask]) else: - upper_lim = float('inf') + upper_lim = float("inf") # Lower bound: lower_mask = np.logical_and(demand > 0, gradient < 0) if np.any(lower_mask): lower_lim = 1 / np.max(gradient[lower_mask]) else: - lower_lim = float('-inf') + lower_lim = float("-inf") - bounds.append((upper_lim, lower_lim)) # Tuple[float, float] + bounds.append((upper_lim, lower_lim)) # Tuple[float, float] return bounds @@ -321,10 +309,7 @@ def __reg_spiess(self) -> list[np.ndarray]: step_sizes = self.__get_step_sizes_reg_spiess(gradient_matrices) # Get scaling factors: - scaling_factors = [ - 1 - (step * gradient) - for step, gradient in zip(step_sizes, gradient_matrices) - ] + scaling_factors = [1 - (step * gradient) for step, gradient in zip(step_sizes, gradient_matrices)] return scaling_factors @@ -338,22 +323,19 @@ def __get_derivative_matrices_reg_spiess(self) -> list[np.ndarray]: """ spiess_grads = self.__get_derivative_matrices_spiess() g_hats = self.original_demands - reg_grads = [ - demand - g_hat - for demand, g_hat in zip(self.demand_matrices, g_hats) - ] + reg_grads = [demand - g_hat for demand, g_hat in zip(self.demand_matrices, g_hats)] return [ (self._alpha * spiess) + (self._beta * regularisation) for regularisation, spiess in zip(reg_grads, spiess_grads) - ] + ] def __get_step_sizes_reg_spiess(self, gradients: list[np.ndarray]) -> list[float]: """ Returns estimate of optimal step size (see technical documentation) Parameters: - gradients: The previously calculated gradient matrices - required for calculating + gradients: The previously calculated gradient matrices - required for calculating derivative of link flows with respect to step size and finding 'eta' term (see same paper - basically rate of change of objective w.r.t. change in demand across iteration application of 'f'). @@ -362,7 +344,7 @@ def __get_step_sizes_reg_spiess(self, gradients: list[np.ndarray]) -> list[float the technical documentation has not been updated to check for certain how this should be implemented for such cases. Single class also required more testing """ - # NOTE - as per technical doc, bounds do not change from reg_spiess to spiess for + # NOTE - as per technical doc, bounds do not change from reg_spiess to spiess for # single class. all_bounds = self.__get_step_size_limits_spiess(gradients) @@ -370,10 +352,7 @@ def __get_step_sizes_reg_spiess(self, gradients: list[np.ndarray]) -> list[float lambdas = [] for gradient, user_class, bounds in zip(gradients, self.class_names, all_bounds): # Calculating flow components for step size: - flow_derivatives = self.__get_flow_derivatives_spiess( - user_class, - gradient - ) + flow_derivatives = self.__get_flow_derivatives_spiess(user_class, gradient) flow_errors = self.__get_flow_errors(user_class) # Calculate demand components of step size @@ -382,14 +361,11 @@ def __get_step_sizes_reg_spiess(self, gradients: list[np.ndarray]) -> list[float # Calculate minimising step length: MAY WANT TO MAKE THIS A SEPARATE FUNCTION min_lambda = ( - ( - (self._alpha * np.sum(flow_derivatives * flow_errors)) + - (self._beta * np.sum(demand_errors * demand_derivative)) - ) / - ( - (self._alpha * np.sum(np.square(flow_derivatives))) + - (self._beta * np.sum(np.square(demand_derivative))) - ) + (self._alpha * np.sum(flow_derivatives * flow_errors)) + + (self._beta * np.sum(demand_errors * demand_derivative)) + ) / ( + (self._alpha * np.sum(np.square(flow_derivatives))) + + (self._beta * np.sum(np.square(demand_derivative))) ) # If all flow derivatives are 0 we should not perturb matrix (i.e, step-size = 0) @@ -401,7 +377,7 @@ def __get_step_sizes_reg_spiess(self, gradients: list[np.ndarray]) -> list[float return lambdas - def __get_demand_errors(self, user_class:str) -> np.ndarray: + def __get_demand_errors(self, user_class: str) -> np.ndarray: """ Returns array of errors between current and initial demand matrices of the form (initial - current,...) @@ -409,9 +385,7 @@ def __get_demand_errors(self, user_class:str) -> np.ndarray: index = self.names_to_indices[user_class] return self.original_demands[index] - self.demand_matrices[index] - def __get_demand_derivative(self, - user_class:str, - gradient: np.ndarray) -> np.ndarray: + def __get_demand_derivative(self, user_class: str, gradient: np.ndarray) -> np.ndarray: """ Returns array of 'eta' terms (see technical documentation) for a given class. diff --git a/aequilibrae/paths/traffic_assignment.py b/aequilibrae/paths/traffic_assignment.py index c96218d2e..a2ec20bd0 100644 --- a/aequilibrae/paths/traffic_assignment.py +++ b/aequilibrae/paths/traffic_assignment.py @@ -1005,5 +1005,5 @@ def set_frequency_field(self, frequency_field: str) -> None: self._config["Frequency field"] = frequency_field def _prepare_execute(self) -> None: - """ Does nothing, included for base class compatibility """ + """Does nothing, included for base class compatibility""" pass diff --git a/aequilibrae/project/data/matrix_record.py b/aequilibrae/project/data/matrix_record.py index a3bc9233f..b5a8197d2 100644 --- a/aequilibrae/project/data/matrix_record.py +++ b/aequilibrae/project/data/matrix_record.py @@ -48,12 +48,10 @@ def update_cores(self): """Updates this matrix record with the matrix core count in disk""" self.__dict__["cores"] = self.__get_cores() - @property def report(self): """Retrieves the underlying report and decodes from JSON""" - return json.loads(self.__dict__['report']) - + return json.loads(self.__dict__["report"]) def get_data(self) -> AequilibraeMatrix: """Returns the actual matrix for further computation @@ -74,10 +72,10 @@ def __setattr__(self, instance, value) -> None: raise ValueError("Another matrix with this name already exists") elif instance == "file_name": raise ValueError("There is another matrix record for this file") - + if instance == "report": self.__dict__[instance] = json.dumps(value) - else: + else: self.__dict__[instance] = value if instance in ["file_name", "cores"]: @@ -90,4 +88,3 @@ def __get_cores(self) -> int: mat.close() del mat return cores - diff --git a/tests/aequilibrae/paths/test_odme.py b/tests/aequilibrae/paths/test_odme.py index 061443acb..74188bdea 100644 --- a/tests/aequilibrae/paths/test_odme.py +++ b/tests/aequilibrae/paths/test_odme.py @@ -15,6 +15,7 @@ # NOTE - we cannot test using bfw/cfw until Issue #493 is resolved. + class TestODMESingleClass(TestCase): """ Basic unit tests for ODME single class execution @@ -25,8 +26,7 @@ def setUp(self) -> None: os.environ["PATH"] = os.path.join(gettempdir(), "temp_data") + ";" + os.environ["PATH"] proj_path = os.path.join(gettempdir(), "test_odme_files" + uuid.uuid4().hex) os.mkdir(proj_path) - zipfile.ZipFile(join(dirname(siouxfalls_project), - "sioux_falls_single_class.zip")).extractall(proj_path) + zipfile.ZipFile(join(dirname(siouxfalls_project), "sioux_falls_single_class.zip")).extractall(proj_path) # Initialise project: self.project = Project() @@ -73,10 +73,7 @@ def test_basic_1_1(self) -> None: """ # Set synthetic demand matrix & count volumes self.matrix.matrices = np.zeros(self.dims) - count_volumes = pd.DataFrame( - data=[["car", 1, 1, 0]], - columns=ODME.COUNT_VOLUME_COLS - ) + count_volumes = pd.DataFrame(data=[["car", 1, 1, 0]], columns=ODME.COUNT_VOLUME_COLS) # Run ODME algorithm. odme = ODME(self.assignment, count_volumes, algorithm=self.algorithm) @@ -84,9 +81,9 @@ def test_basic_1_1(self) -> None: # Check result: np.testing.assert_allclose( - np.zeros(self.dims), - odme.get_demands()[0], - err_msg="0 demand matrix with single count volume of 0 does not return 0 matrix", + np.zeros(self.dims), + odme.get_demands()[0], + err_msg="0 demand matrix with single count volume of 0 does not return 0 matrix", ) def test_basic_1_2(self) -> None: @@ -109,7 +106,7 @@ def test_basic_1_2(self) -> None: ["car", 18, 1, 100], ["car", 6, 1, 2], ["car", 65, 1, 85], - ["car", 23, 1, 0] + ["car", 23, 1, 0], ] count_volumes = pd.DataFrame(data=data, columns=ODME.COUNT_VOLUME_COLS) @@ -118,8 +115,7 @@ def test_basic_1_2(self) -> None: odme.execute() # Check result: - err_msg = ("Demand matrix with many 0 entries, has non-zero demand " + - "following ODME at one of those entries") + err_msg = "Demand matrix with many 0 entries, has non-zero demand " + "following ODME at one of those entries" for orig, dest in zeroes: np.testing.assert_array_equal( odme.get_demands()[0][self.index[orig], self.index[dest], 0], @@ -129,10 +125,10 @@ def test_basic_1_2(self) -> None: def test_basic_1_3(self) -> None: """ - Given count volumes which are identical to the assigned volumes of an + Given count volumes which are identical to the assigned volumes of an initial demand matrix - ODME should not change this demand matrix (since we are looking for a local solution and this already provides one). - + Also checks that the shape of the resulting matrix matches the intial demand matrix. @@ -143,14 +139,12 @@ def test_basic_1_3(self) -> None: # Extract assigned flow on various links self.assignment.execute() assign_df = self.assignment.results().reset_index(drop=False).fillna(0) - links = [1,2,4,5,6,8,11,12,14,19,23,26,32,38,49,52,64,71,72] - flows = [assign_df.loc[assign_df["link_id"] == link, "matrix_ab"].values[0] - for link in links] + links = [1, 2, 4, 5, 6, 8, 11, 12, 14, 19, 23, 26, 32, 38, 49, 52, 64, 71, 72] + flows = [assign_df.loc[assign_df["link_id"] == link, "matrix_ab"].values[0] for link in links] # Perform ODME with unchanged count volumes count_volumes = pd.DataFrame( - data=[["car", link, 1, flows[i]] for i, link in enumerate(links)], - columns=ODME.COUNT_VOLUME_COLS + data=[["car", link, 1, flows[i]] for i, link in enumerate(links)], columns=ODME.COUNT_VOLUME_COLS ) odme = ODME(self.assignment, count_volumes, algorithm=self.algorithm) odme.execute() @@ -159,8 +153,10 @@ def test_basic_1_3(self) -> None: np.testing.assert_allclose( init_demand[:, :, np.newaxis], odme.get_demands()[0], - err_msg=("Demand matrix changed when given many links with observed " + - "volume equal to initial assigned volumes") + err_msg=( + "Demand matrix changed when given many links with observed " + + "volume equal to initial assigned volumes" + ), ) # 2) Input Validity @@ -175,31 +171,22 @@ def test_basic_2_1(self) -> None: """ # No count volumes: with self.assertRaises(ValueError): - ODME(self.assignment, - pd.DataFrame(data=[], columns=ODME.COUNT_VOLUME_COLS), - algorithm=self.algorithm) + ODME(self.assignment, pd.DataFrame(data=[], columns=ODME.COUNT_VOLUME_COLS), algorithm=self.algorithm) # Negative count volumes: links = [1, 3, 10, 30, 36, 41, 49, 57, 62, 66, 69, 70] - count_volumes = pd.DataFrame( - data=[["car", link, 1, -link] for link in links], - columns=ODME.COUNT_VOLUME_COLS - ) + count_volumes = pd.DataFrame(data=[["car", link, 1, -link] for link in links], columns=ODME.COUNT_VOLUME_COLS) with self.assertRaises(ValueError): ODME(self.assignment, count_volumes, algorithm=self.algorithm) # Duplicate count volumes: - count_volumes = pd.DataFrame( - data=[["car", 1, 1, i] for i in range(5)], - columns=ODME.COUNT_VOLUME_COLS - ) + count_volumes = pd.DataFrame(data=[["car", 1, 1, i] for i in range(5)], columns=ODME.COUNT_VOLUME_COLS) with self.assertRaises(ValueError): ODME(self.assignment, count_volumes, algorithm=self.algorithm) # Non-float/integer count volumes: count_volumes = pd.DataFrame( - data=[["car", 1, 1, '7'], ["car", 10, 1, [1]], ["car", 15, 1, (1, 2)]], - columns=ODME.COUNT_VOLUME_COLS + data=[["car", 1, 1, "7"], ["car", 10, 1, [1]], ["car", 15, 1, (1, 2)]], columns=ODME.COUNT_VOLUME_COLS ) with self.assertRaises(ValueError): ODME(self.assignment, count_volumes, algorithm=self.algorithm) @@ -209,34 +196,24 @@ def test_basic_2_2(self) -> None: Check ValueError is raised if invalid stopping criteria are given or stopping criteria are given with missing criteria. """ - count_volumes = pd.DataFrame( - data=[["car", 1, 1, 1]], - columns=ODME.COUNT_VOLUME_COLS - ) + count_volumes = pd.DataFrame(data=[["car", 1, 1, 1]], columns=ODME.COUNT_VOLUME_COLS) # Check invalid (0) max iterations - stop_crit = {"max_outer": 0, - "max_inner": 0, - "convergence_crit": 10**-4, - "inner_convergence": 10**-4 - } + stop_crit = {"max_outer": 0, "max_inner": 0, "convergence_crit": 10**-4, "inner_convergence": 10**-4} with self.assertRaises(ValueError): ODME(self.assignment, count_volumes, stop_crit=stop_crit, algorithm=self.algorithm) # Check invalid (negative) convergence - stop_crit = {"max_outer": 10, - "max_inner": 10, - "convergence_crit": -10**-4, - "inner_convergence": -10**-4 - } + stop_crit = {"max_outer": 10, "max_inner": 10, "convergence_crit": -(10**-4), "inner_convergence": -(10**-4)} with self.assertRaises(ValueError): ODME(self.assignment, count_volumes, stop_crit=stop_crit, algorithm=self.algorithm) # Check missing criteria - stop_crit = {"max_outer": 10, + stop_crit = { + "max_outer": 10, "max_inner": 10, "convergence_crit": 10**-4, - } + } with self.assertRaises(ValueError): ODME(self.assignment, count_volumes, stop_crit=stop_crit, algorithm=self.algorithm) @@ -263,10 +240,7 @@ def test_basic_3_1(self) -> None: old_flow = assign_df.loc[assign_df["link_id"] == 38, "matrix_ab"].values[0] # Perform ODME with doubled link flow on link 38 - count_volumes = pd.DataFrame( - data=[["car", 38, 1, 2 * old_flow]], - columns=ODME.COUNT_VOLUME_COLS - ) + count_volumes = pd.DataFrame(data=[["car", 38, 1, 2 * old_flow]], columns=ODME.COUNT_VOLUME_COLS) odme = ODME(self.assignment, count_volumes, algorithm=self.algorithm) odme.execute() @@ -277,7 +251,7 @@ def test_basic_3_1(self) -> None: new_flow = assign_df.loc[assign_df["link_id"] == 38, "matrix_ab"].values[0] # Assert link flow is doubled: - self.assertAlmostEqual(new_flow, 2 * old_flow) + self.assertAlmostEqual(new_flow, 2 * old_flow) # Assert only appropriate O-D's have increased non-zero demand od_13_12 = new_demand[self.index[13], self.index[12]] @@ -301,10 +275,7 @@ def test_basic_3_2(self) -> None: self.matrix.matrices = demand # Perform ODME with competing link flows on 5 & 35 - count_volumes = pd.DataFrame( - data=[["car", 5, 1, 100], ["car", 35, 1, 50]], - columns=ODME.COUNT_VOLUME_COLS - ) + count_volumes = pd.DataFrame(data=[["car", 5, 1, 100], ["car", 35, 1, 50]], columns=ODME.COUNT_VOLUME_COLS) odme = ODME(self.assignment, count_volumes, algorithm=self.algorithm) odme.execute() @@ -316,17 +287,14 @@ def test_basic_3_2(self) -> None: flow_35 = assign_df.loc[assign_df["link_id"] == 35, "matrix_ab"].values[0] # Assert link flows are equal: - self.assertAlmostEqual(flow_5, flow_35, - msg=f"Expected balanced flows but are: {flow_5} and {flow_35}") + self.assertAlmostEqual(flow_5, flow_35, msg=f"Expected balanced flows but are: {flow_5} and {flow_35}") # Assert link flows are balanced halfway between each other: - self.assertTrue(flow_5 > 50 and flow_5 < 100, - msg="Expected flows to be between 50 & 100") + self.assertTrue(flow_5 > 50 and flow_5 < 100, msg="Expected flows to be between 50 & 100") # Assert only appropriate O-D's have had demand changed od_13_1 = new_demand[self.index[13], self.index[1]] - self.assertAlmostEqual(np.sum(new_demand), od_13_1, - msg="Unexpected OD pair has non-zero demand") + self.assertAlmostEqual(np.sum(new_demand), od_13_1, msg="Unexpected OD pair has non-zero demand") # Saving Test def test_save(self) -> None: @@ -337,10 +305,7 @@ def test_save(self) -> None: self.matrix.matrices = np.zeros(self.dims) # Placeholder count volumes: - counts = pd.DataFrame( - data=[["car", 1, 1, 0]], - columns=ODME.COUNT_VOLUME_COLS - ) + counts = pd.DataFrame(data=[["car", 1, 1, 0]], columns=ODME.COUNT_VOLUME_COLS) # Create ODME object then save to appropriate location odme = ODME(self.assignment, counts) @@ -354,11 +319,12 @@ def test_save(self) -> None: # Check saved matrix is the same as the initial matrix: np.testing.assert_allclose( - np.zeros(self.dims), - matrix.matrix_view[:, :, np.newaxis], # Need new axis as this is single class - err_msg="Saved 0 demand matrix but did not get 0 demand matrix when extracted!", + np.zeros(self.dims), + matrix.matrix_view[:, :, np.newaxis], # Need new axis as this is single class + err_msg="Saved 0 demand matrix but did not get 0 demand matrix when extracted!", ) + class TestODMEMultiClass(TestCase): """ Basic unit tests for ODME multiple class execution @@ -424,8 +390,7 @@ def setUp(self) -> None: self.matrix_view_names = [matrix.view_names[0] for matrix in self.matrices] self.matrix_dims = [matrix.matrices.shape for matrix in self.matrices] self.matrix_view_dims = [matrix.matrix_view.shape + (1,) for matrix in self.matrices] - self.class_to_matrix_idx = [ - matrix.names.index(matrix.view_names[0]) for matrix in self.matrices] + self.class_to_matrix_idx = [matrix.names.index(matrix.view_names[0]) for matrix in self.matrices] self.indexes = [self.car_index, self.truck_index, self.moto_index] # Currently testing algorithm: @@ -448,8 +413,7 @@ def test_all_zeros(self) -> None: matrix.matrices = np.zeros(dims) count_volumes = pd.DataFrame( - data=[[user_class, 1, 1, 0] for user_class in self.class_ids], - columns=ODME.COUNT_VOLUME_COLS + data=[[user_class, 1, 1, 0] for user_class in self.class_ids], columns=ODME.COUNT_VOLUME_COLS ) # Run ODME algorithm. @@ -458,16 +422,9 @@ def test_all_zeros(self) -> None: demands = odme.get_demands() # Check for each class that the matrix is still 0's. - for demand, dims, matrix, mname in zip( - demands, - self.matrix_view_dims, - self.matrices, - self.class_ids - ): + for demand, dims, matrix, mname in zip(demands, self.matrix_view_dims, self.matrices, self.class_ids): np.testing.assert_allclose( - demand, - np.zeros(dims), - err_msg=f"The {mname} matrix was changed from 0 when initially a 0 matrix!" + demand, np.zeros(dims), err_msg=f"The {mname} matrix was changed from 0 when initially a 0 matrix!" ) # Input Validity @@ -478,10 +435,7 @@ def test_mc_inputs(self) -> None: """ # Duplicate count volumes: data = [[cls_id, 10, 1, i] for i in range(3) for cls_id in self.class_ids] - count_volumes = pd.DataFrame( - data=data, - columns=ODME.COUNT_VOLUME_COLS - ) + count_volumes = pd.DataFrame(data=data, columns=ODME.COUNT_VOLUME_COLS) with self.assertRaises(ValueError): ODME(self.assignment, count_volumes, algorithm=self.algorithm) @@ -499,22 +453,23 @@ def test_simple_mc(self) -> None: # Set synthetic demand matrices ods = [10, 20, 50] for dims, matrix, index, o_d, idx in zip( - self.matrix_dims, - self.matrices, - self.indexes, - ods, - self.class_to_matrix_idx - ): + self.matrix_dims, self.matrices, self.indexes, ods, self.class_to_matrix_idx + ): matrix.matrices = np.zeros(dims) matrix.matrices[index[13], index[1], idx] = o_d # Perform ODME with competing link flows on 5 & 35 flows = [[100, 50], [30, 10], [20, 60]] count_volumes = pd.DataFrame( - data=[["car", 5, 1, flows[0][0]], ["car", 35, 1, flows[0][1]], - ["truck", 5, 1, flows[1][0]], ["truck", 35, 1, flows[1][1]], - ["motorcycle", 5, 1, flows[2][0]], ["motorcycle", 35, 1, flows[2][1]]], - columns=ODME.COUNT_VOLUME_COLS + data=[ + ["car", 5, 1, flows[0][0]], + ["car", 35, 1, flows[0][1]], + ["truck", 5, 1, flows[1][0]], + ["truck", 35, 1, flows[1][1]], + ["motorcycle", 5, 1, flows[2][0]], + ["motorcycle", 35, 1, flows[2][1]], + ], + columns=ODME.COUNT_VOLUME_COLS, ) odme = ODME(self.assignment, count_volumes, algorithm=self.algorithm) odme.execute() @@ -528,17 +483,16 @@ def test_simple_mc(self) -> None: for index, demand, idx in zip(self.indexes, demands, self.class_to_matrix_idx): # Assert only appropriate O-D's have had demand changed od_13_1 = demand[index[13], index[1], 0] - self.assertAlmostEqual(np.sum(demand), od_13_1, - msg="Unexpected OD pair has non-zero demand") + self.assertAlmostEqual(np.sum(demand), od_13_1, msg="Unexpected OD pair has non-zero demand") for flow, name in zip(flows, self.matrix_view_names): flow_5 = assign_df.loc[assign_df["link_id"] == 5, f"{name}_ab"].values[0] flow_35 = assign_df.loc[assign_df["link_id"] == 35, f"{name}_ab"].values[0] # Assert link flows are equal: - self.assertAlmostEqual(flow_5, flow_35, - msg=f"Expected balanced flows but are: {flow_5} and {flow_35}") + self.assertAlmostEqual(flow_5, flow_35, msg=f"Expected balanced flows but are: {flow_5} and {flow_35}") # Assert link flows are balanced halfway between each other: - self.assertTrue(flow_5 > min(flow) and flow_5 < max(flow), - msg=f"Expected flows to be between {min(flow)} & {max(flow)}") + self.assertTrue( + flow_5 > min(flow) and flow_5 < max(flow), msg=f"Expected flows to be between {min(flow)} & {max(flow)}" + ) diff --git a/tests/aequilibrae/project/test_matrices.py b/tests/aequilibrae/project/test_matrices.py index b843408bb..5f1edbe88 100644 --- a/tests/aequilibrae/project/test_matrices.py +++ b/tests/aequilibrae/project/test_matrices.py @@ -20,7 +20,6 @@ def setUp(self) -> None: self.project = Project() self.project.open(proj_dir) - self.matrices = self.project.matrices self.matrices.reload() self.curr = self.project.conn.cursor() @@ -57,7 +56,6 @@ def test_report(self): self.matrices.reload() self.assertEqual(self.matrices.get_record("omx2").report, {"a": 7}, "Can store a dictionary") - def test_clear_database(self): self.__mat_count(3, "The test data started wrong")