From 5aa8c6b38e24fb5de846d02218ea2657c2caca89 Mon Sep 17 00:00:00 2001 From: Jake-Moss Date: Fri, 15 Dec 2023 12:10:33 +1000 Subject: [PATCH] Revert changes to HyperpathGenerating and tests, use arrays instead --- aequilibrae/paths/optimal_strategies.py | 31 +++- aequilibrae/paths/public_transport.pyx | 209 +++++++++++++--------- tests/aequilibrae/paths/test_hyperpath.py | 73 ++++---- 3 files changed, 184 insertions(+), 129 deletions(-) diff --git a/aequilibrae/paths/optimal_strategies.py b/aequilibrae/paths/optimal_strategies.py index 2a3fcb514..aad41e89f 100644 --- a/aequilibrae/paths/optimal_strategies.py +++ b/aequilibrae/paths/optimal_strategies.py @@ -1,4 +1,6 @@ import logging +from scipy import sparse +import numpy as np from aequilibrae.paths.public_transport import HyperpathGenerating @@ -8,23 +10,42 @@ def __init__(self, assig_spec): self.__logger = assig_spec.logger self.__classes = {} self.__results = {} + self.__demand_cols = {} for cls in self.__assig_spec.classes: cls.results.prepare(cls.graph, cls.matrix) self.__results[cls._id] = cls.results self.__classes[cls._id] = HyperpathGenerating( - cls.graph, - cls.matrix.matrix[cls._id], # FIXME: this is not the correct way to index the matrices - assignment_config=assig_spec._config, - threads=cls.results.cores, + cls.graph.graph, + head="a_node", + tail="b_node", + trav_time=assig_spec._config["Time field"], + freq=assig_spec._config["Frequency field"], ) + demand = sparse.coo_matrix(cls.matrix.matrix[cls.matrix_core], dtype=np.float64) + + # Since the aeq matrix indexes based on centroids, and the transit graph can make the destinction between origins and destinations, + # We need to translate the index of the cols in to the destination node_ids for the assignment + if len(cls.graph.od_node_mapping.columns) == 2: + o_vert_ids = cls.graph.od_node_mapping.iloc[demand.row]["node_id"].values.astype(np.uint32) + d_vert_ids = cls.graph.od_node_mapping.iloc[demand.col]["node_id"].values.astype(np.uint32) + else: + o_vert_ids = cls.graph.od_node_mapping.iloc[demand.row]["o_node_id"].values.astype(np.uint32) + d_vert_ids = cls.graph.od_node_mapping.iloc[demand.col]["d_node_id"].values.astype(np.uint32) + + self.__demand_cols[cls._id] = { + "origin_column": o_vert_ids, + "destination_column": d_vert_ids, + "demand_column": demand.data, + } + def execute(self): for cls_id, hyperpath in self.__classes.items(): self.__logger.info(f"Executing S&F assignment for {cls_id}") - hyperpath.execute() + hyperpath.assign(**self.__demand_cols[cls_id], threads=self.__assig_spec.cores) self.__results[cls_id].link_loads = hyperpath._edges["volume"].values # def run(self, origin=None, destination=None, volume=None): diff --git a/aequilibrae/paths/public_transport.pyx b/aequilibrae/paths/public_transport.pyx index 734a3939e..9805a193a 100644 --- a/aequilibrae/paths/public_transport.pyx +++ b/aequilibrae/paths/public_transport.pyx @@ -10,7 +10,6 @@ import numpy as np import multiprocessing cimport numpy as cnp cimport openmp -from scipy import sparse from aequilibrae.project.database_connection import database_connection from aequilibrae.context import get_active_project @@ -20,58 +19,28 @@ include 'hyperpath.pyx' class HyperpathGenerating: - tail = "a_node" - head = "b_node" - origin_column="orig_vert_idx", - destination_column="dest_vert_idx", - demand_column="demand", - - def __init__( - self, - graph, - matrix, - assignment_config: dict, - check_edges: bool = False, - check_demand: bool = False, - threads: int = 0 - ): - """A class for hyperpath generation. - - :Arguments: - **graph** (:obj:`TransitGraph`): TransitGraph object + """A class for hyperpath generation. - **matrix** (:obj:`np.typing.ArrayLike`): Slice of AequilibraE matrx object for the demand. Required 2d dimensional. + :Arguments: + **edges** (:obj:`pandas.DataFrame`): The edges of the graph. - **assignment_conffig** (:obj:`dict[str, str]`): Dictionary containing the `Time field` and `Frequency field` columns names. + **tail** (:obj:`str`): The column name for the tail of the edge (optional, default is "tail"). - **check_edges** (:obj:`bool`): If True, check the validity of the edges (optional, default is False). + **head** (:obj:`str`): The column name for the head of the edge (optional, default is "head"). - **check_demand** (:obj:`bool`): If True, check the validity of the demand data (optional, default is False). + **trav_time** (:obj:`str`): The column name for the travel time of the edge (optional, default is "trav_time"). - **threads** (:obj:`int`): The number of threads to use for computation (optional, default is 0, using all available threads). - """ + **freq** (:obj:`str`): The column name for the frequency of the edge (optional, default is "freq"). - edges = graph.graph - trav_time = assignment_config["Time field"] - freq = assignment_config["Frequency field"] - self._assignment_config = assignment_config - self.__od_node_mapping = graph.od_node_mapping + **check_edges** (:obj:`bool`): If True, check the validity of the edges (optional, default is False). + """ - # This is a sparse representation of the AequilibraE Matrix object, the index is from od to od, *NOT* origin to destination. We'll use the od_node_mapping to convert index to node_id before the assignment - self.__demand = sparse.coo_matrix(matrix, dtype=np.float64) - - if check_demand: - self._check_demand(self.__demand) - - if check_edges: - self._check_edges(graph, trav_time, freq) - - self.threads = threads + def __init__(self, edges, tail="tail", head="head", trav_time="trav_time", freq="freq", check_edges=False): # load the edges - if self.check_edges: - self._check_edges(edges, self.tail, self.head, trav_time, freq) - self._edges = edges[["link_id", self.tail, self.head, trav_time, freq]].copy(deep=True) + if check_edges: + self._check_edges(edges, tail, head, trav_time, freq) + self._edges = edges[[tail, head, trav_time, freq]].copy(deep=True) self.edge_count = len(self._edges) # remove inf values if any, and values close to zero @@ -92,18 +61,18 @@ class HyperpathGenerating: self._edges[data_col] = self._edges.index # convert to CSC format - self.vertex_count = self._edges[[self.tail, self.head]].max().max() + 1 - rs_indptr, _, rs_data = convert_graph_to_csc_uint32(self._edges, self.tail, self.head, data_col, self.vertex_count) + self.vertex_count = self._edges[[tail, head]].max().max() + 1 + rs_indptr, _, rs_data = convert_graph_to_csc_uint32(self._edges, tail, head, data_col, self.vertex_count) self._indptr = rs_indptr.astype(np.uint32) self._edge_idx = rs_data.astype(np.uint32) # edge attributes self._trav_time = self._edges[trav_time].values.astype(DATATYPE_PY) self._freq = self._edges[freq].values.astype(DATATYPE_PY) - self._tail = self._edges[self.tail].values.astype(np.uint32) - self._head = self._edges[self.head].values.astype(np.uint32) + self._tail = self._edges[tail].values.astype(np.uint32) + self._head = self._edges[head].values.astype(np.uint32) - def run(self, origin, destination, volume): + def run(self, origin, destination, volume, return_inf=False): # column storing the resulting edge volumes self._edges["volume"] = 0.0 self.u_i_vec = None @@ -119,6 +88,7 @@ class HyperpathGenerating: self._check_vertex_idx(item) self._check_volume(volume[i]) self._check_vertex_idx(destination) + assert isinstance(return_inf, bool) o_vert_ids = np.array(origin, dtype=np.uint32) d_vert_ids = np.array([destination], dtype=np.uint32) @@ -158,34 +128,66 @@ class HyperpathGenerating: assert isinstance(v, float) assert v >= 0.0 - def _check_edges(self, graph, trav_time, freq): - if "TransitGraph" not in [t.__name__ for t in type(graph).__mro__] : - raise TypeError("graph should be a TransitGraph") + def _check_edges(self, edges, tail, head, trav_time, freq): + if type(edges) != pd.core.frame.DataFrame: + raise TypeError("edges should be a pandas DataFrame") + + for col in [tail, head, trav_time, freq]: + if col not in edges: + raise KeyError(f"edge column '{col}' not found in graph edges dataframe") + + if edges[[tail, head, trav_time, freq]].isna().any().any(): + raise ValueError( + " ".join( + [ + f"edges[[{tail}, {head}, {trav_time}, {freq}]] ", + "should not have any missing value", + ] + ) + ) + + for col in [tail, head]: + if not pd.api.types.is_integer_dtype(edges[col].dtype): + raise TypeError(f"column '{col}' should be of integer type") for col in [trav_time, freq]: - if not pd.api.types.is_numeric_dtype(graph[col].dtype): + if not pd.api.types.is_numeric_dtype(edges[col].dtype): raise TypeError(f"column '{col}' should be of numeric type") - if any(graph.graph[col].isna()): - raise ValueError(f"column '{col}' should not have any missing values") - - if graph[col].min() < 0.0: + if edges[col].min() < 0.0: raise ValueError(f"column '{col}' should be nonnegative") + def assign( + self, + origin_column, + destination_column, + demand_column, + check_demand=False, + threads=None + ): + """ + Assigns demand to the edges of the graph. - def execute(self, threads=0): - """Assigns demand to the edges of the graph. + Assumes the ``*_column`` arguments are provided as numpy arrays that form a COO sprase matrix. :Arguments: + **origin_column** (:obj:`np.ndarray`): The column for the origin vertices (optional, default is "orig_vert_idx"). + + **destination_column** (:obj:`np.ndarray`): The column or the destination vertices (optional, default is "dest_vert_idx"). + + **demand_column** (:obj:`np.ndarray`): The column for the demand values (optional, default is "demand"). + + **check_demand** (:obj:`bool`): If True, check the validity of the demand data (optional, default is False). + **threads** (:obj:`int`):The number of threads to use for computation (optional, default is 0, using all available threads). """ # check the input demand paramater - if not threads: - threads = self.threads - # if self.check_demand: - # self._check_demand(self.demand, self.origin_column, self.destination_column, self.demand_column) - # self.demand = self.demand[self.demand[self.demand_column] > 0] + if check_demand: + self._check_demand(origin_column, destination_column, demand_column) + + if threads is None: + threads = 0 # Default to all threads # initialize the column storing the resulting edge volumes self._edges["volume"] = 0.0 @@ -193,16 +195,8 @@ class HyperpathGenerating: # travel time is computed but not saved into an array in the following self.u_i_vec = None - if len(self.__od_node_mapping.columns) == 2: - o_vert_ids = self.__od_node_mapping.iloc[self.__demand.row]["node_id"].values.astype(np.uint32) - d_vert_ids = self.__od_node_mapping.iloc[self.__demand.col]["node_id"].values.astype(np.uint32) - else: - o_vert_ids = self.__od_node_mapping.iloc[self.__demand.row]["o_node_id"].values.astype(np.uint32) - d_vert_ids = self.__od_node_mapping.iloc[self.__demand.col]["d_node_id"].values.astype(np.uint32) - demand_vls = self.__demand.data - # get the list of all destinations - destination_vertex_indices = np.unique(d_vert_ids) + destination_vertex_indices = np.unique(destination_column) compute_SF_in_parallel( self._indptr[:], @@ -211,10 +205,10 @@ class HyperpathGenerating: self._freq[:], self._tail[:], self._head[:], - d_vert_ids[:], + destination_column[:], destination_vertex_indices[:], - o_vert_ids[:], - demand_vls[:], + origin_column[:], + demand_column[:], self._edges["volume"].values, False, self.vertex_count, @@ -222,9 +216,62 @@ class HyperpathGenerating: (multiprocessing.cpu_count() if threads < 1 else threads) ) - def _check_demand(self, demand): - if demand.min() < 0.0: - raise ValueError(f"column demand should be nonnegative") + def _check_demand(self, origin_column, destination_column, demand_column): + for col, col_name in zip([origin_column, destination_column, demand_column], ["origin", "destination", "demand"]): + if not isinstance(col, (np.ndarray, np.generic)): + raise TypeError(f"{col_name} should be a numpy array") + + if np.any(np.isnan(col)): + raise ValueError(f"{col_name} should not have any missing value") + + for col, col_name in zip([origin_column, destination_column], ["origin", "destination"]): + if not col.dtype == np.uint32: + raise TypeError(f"column '{col_name}' should be of np.uint32") + + if not demand_column.dtype == np.float64: + raise TypeError(f"demand column should be of np.float64 type") + + if demand_column.min() < 0.0: + raise ValueError(f"demand column should be nonnegative") + + def info(self) -> dict: + info = { + "Algorithm": "Spiess, Heinz & Florian, Michael Hyperpath generation", + "Computer name": socket.gethostname(), + "Procedure ID": self.procedure_id, + } + + return info + + def save_results(self, table_name: str, keep_zero_flows=True, project=None) -> None: + """Saves the assignment results to results_database.sqlite + + Method fails if table exists + + :Arguments: + **table_name** (:obj:`str`): Name of the table to hold this assignment result + **keep_zero_flows** (:obj:`bool`): Whether we should keep records for zero flows. Defaults to True + **project** (:obj:`Project`, Optional): Project we want to save the results to. Defaults to the active project + """ + + df = self._edges + if not keep_zero_flows: + df = df[df.volume > 0] + + if not project: + project = project or get_active_project() + conn = sqlite3.connect(os.path.join(project.project_base_path, "results_database.sqlite")) + df.to_sql(table_name, conn) + conn.close() + + conn = database_connection("transit", project.project_base_path) + report = {"setup": self.info()} + data = [table_name, "hyperpath assignment", self.procedure_id, str(report), self.procedure_date, self.description] + conn.execute( + """Insert into results(table_name, procedure, procedure_id, procedure_report, timestamp, + description) Values(?,?,?,?,?,?)""", + data, + ) + conn.commit() + conn.close() - def results(self): - return self._edges[["link_id", "a_node", "b_node", "trav_time", "freq", "volume"]].copy(deep=True) diff --git a/tests/aequilibrae/paths/test_hyperpath.py b/tests/aequilibrae/paths/test_hyperpath.py index 124bedb50..d3e13b68c 100644 --- a/tests/aequilibrae/paths/test_hyperpath.py +++ b/tests/aequilibrae/paths/test_hyperpath.py @@ -10,11 +10,7 @@ import numpy as np import pandas as pd from unittest import TestCase -import sqlite3 -import pathlib -from tempfile import gettempdir from aequilibrae.paths.public_transport import HyperpathGenerating -from aequilibrae.transit.transit_graph_builder import TransitGraphBuilder def create_vertices(n): @@ -42,8 +38,8 @@ def create_edges(n, seed): k += 1 edges = pd.DataFrame() - edges["a_node"] = head - edges["b_node"] = tail + edges["tail"] = tail + edges["head"] = head rng = np.random.default_rng(seed=seed) edges["trav_time"] = rng.uniform(0.0, 1.0, m) @@ -316,8 +312,8 @@ def create_SF_network(dwell_time=1.0e-6, board_alight_ratio=0.5): edges = pd.DataFrame( data={ - "a_node": head, - "b_node": tail, + "tail": tail, + "head": head, "trav_time": trav_time, "freq": freq, "volume_ref": vol, @@ -326,9 +322,7 @@ def create_SF_network(dwell_time=1.0e-6, board_alight_ratio=0.5): # waiting time is in average half of the period edges["freq"] *= 2.0 - # demand = pd.DataFrame({"origin_vertex_id": [0], "destination_vertex_id": [12], "demand": [1.0]}) - demand = np.zeros((len(head), len(head))) - demand[0, 12] = 1.0 + demand = pd.DataFrame({"origin_vertex_id": [0], "destination_vertex_id": [12], "demand": [1.0]}) return edges, demand @@ -354,9 +348,13 @@ def _setUp(self, network="bell", n=10, alpha=10.0, seed=124) -> None: else: self.edges["freq"] = self.edges.freq_base / alpha - self.demand = np.zeros((len(self.vertices), len(self.vertices))) - for x, y in zip(self.vertices.index[:10], np.flip(self.vertices.index[-10:])): - self.demand[x, y] = 1.0 + self.demand = pd.DataFrame( + { + "orig_vert_idx": self.vertices.index[:10], + "dest_vert_idx": np.flip(self.vertices.index[-10:]), + "demand": np.full(10, 1), + } + ) elif network == "SF": self.edges, self.demand = create_SF_network(dwell_time=0.0) @@ -364,28 +362,6 @@ def _setUp(self, network="bell", n=10, alpha=10.0, seed=124) -> None: else: raise KeyError(f'Unknown network type "{network}"') - self.edges["link_id"] = self.edges.index + 1 - self.edges["a_node"] += 1 - self.edges["b_node"] += 1 - self.edges["direction"] = 1 - self.nodes = pd.DataFrame( - { - "node_id": np.unique(np.union1d(self.edges["a_node"].values, self.edges["b_node"].values)), - } - ) - self.nodes["node_type"] = "od" - self.nodes["taz_id"] = self.nodes["node_id"] + self.nodes["node_id"].max() - - self.con = sqlite3.connect(pathlib.Path(gettempdir()) / "test_dummy.sqlite") - - self.graph = TransitGraphBuilder(self.con, period_id=None, start=0, end=0, blocking_centroid_flows=False) - self.graph.edges = self.edges - self.graph.vertices = self.nodes - self.graph.create_od_node_mapping() - - self.transit_graph = self.graph.to_transit_graph() - self.config = {"Time field": "trav_time", "Frequency field": "freq"} - def tearDown(self) -> None: try: del self.vertices, self.edges, self.demand @@ -397,11 +373,18 @@ def tearDown(self) -> None: def test_bell_assign_parallel_agreement(self) -> None: self._setUp(network="bell") - hp = HyperpathGenerating(self.transit_graph, self.demand, self.config) + hp = HyperpathGenerating(self.edges) + # breakpoint() results = [] for threads in [1, 2, 4]: - hp.execute(threads=threads) + hp.assign( + self.demand["orig_vert_idx"].values.astype(np.uint32), + self.demand["dest_vert_idx"].values.astype(np.uint32), + self.demand["demand"].values.astype(np.float64), + check_demand=True, + threads=threads, + ) results.append(hp._edges.copy(deep=True)) for result in results[1:]: @@ -410,8 +393,8 @@ def test_bell_assign_parallel_agreement(self) -> None: def test_SF_run_01(self): self._setUp(network="SF") - hp = HyperpathGenerating(self.transit_graph, self.demand, self.config) - hp.run(origin=0 + 1, destination=12 + 1, volume=1.0) + hp = HyperpathGenerating(self.edges) + hp.run(origin=0, destination=12, volume=1.0) np.testing.assert_allclose(self.edges["volume_ref"].values, hp._edges["volume"].values, rtol=1e-05, atol=1e-08) @@ -441,9 +424,13 @@ def test_SF_run_01(self): def test_SF_assign_01(self): self._setUp(network="SF") - hp = HyperpathGenerating(self.transit_graph, self.demand, self.config, check_demand=True) + hp = HyperpathGenerating(self.edges) - hp.execute() + hp.assign( + self.demand["origin_vertex_id"].values.astype(np.uint32), + self.demand["destination_vertex_id"].values.astype(np.uint32), + self.demand["demand"].values.astype(np.float64), + check_demand=True, + ) - breakpoint() np.testing.assert_allclose(self.edges["volume_ref"].values, hp._edges["volume"].values, rtol=1e-05, atol=1e-08)