Skip to content

Commit

Permalink
Revert changes to HyperpathGenerating and tests, use arrays instead
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake-Moss committed Dec 15, 2023
1 parent 7e53841 commit 5aa8c6b
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 129 deletions.
31 changes: 26 additions & 5 deletions aequilibrae/paths/optimal_strategies.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
from scipy import sparse
import numpy as np
from aequilibrae.paths.public_transport import HyperpathGenerating


Expand All @@ -8,23 +10,42 @@ def __init__(self, assig_spec):
self.__logger = assig_spec.logger
self.__classes = {}
self.__results = {}
self.__demand_cols = {}

for cls in self.__assig_spec.classes:
cls.results.prepare(cls.graph, cls.matrix)

self.__results[cls._id] = cls.results
self.__classes[cls._id] = HyperpathGenerating(
cls.graph,
cls.matrix.matrix[cls._id], # FIXME: this is not the correct way to index the matrices
assignment_config=assig_spec._config,
threads=cls.results.cores,
cls.graph.graph,
head="a_node",
tail="b_node",
trav_time=assig_spec._config["Time field"],
freq=assig_spec._config["Frequency field"],
)

demand = sparse.coo_matrix(cls.matrix.matrix[cls.matrix_core], dtype=np.float64)

# Since the aeq matrix indexes based on centroids, and the transit graph can make the destinction between origins and destinations,
# We need to translate the index of the cols in to the destination node_ids for the assignment
if len(cls.graph.od_node_mapping.columns) == 2:
o_vert_ids = cls.graph.od_node_mapping.iloc[demand.row]["node_id"].values.astype(np.uint32)
d_vert_ids = cls.graph.od_node_mapping.iloc[demand.col]["node_id"].values.astype(np.uint32)
else:
o_vert_ids = cls.graph.od_node_mapping.iloc[demand.row]["o_node_id"].values.astype(np.uint32)
d_vert_ids = cls.graph.od_node_mapping.iloc[demand.col]["d_node_id"].values.astype(np.uint32)

self.__demand_cols[cls._id] = {
"origin_column": o_vert_ids,
"destination_column": d_vert_ids,
"demand_column": demand.data,
}

def execute(self):
for cls_id, hyperpath in self.__classes.items():
self.__logger.info(f"Executing S&F assignment for {cls_id}")

hyperpath.execute()
hyperpath.assign(**self.__demand_cols[cls_id], threads=self.__assig_spec.cores)
self.__results[cls_id].link_loads = hyperpath._edges["volume"].values

# def run(self, origin=None, destination=None, volume=None):
Expand Down
209 changes: 128 additions & 81 deletions aequilibrae/paths/public_transport.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import numpy as np
import multiprocessing
cimport numpy as cnp
cimport openmp
from scipy import sparse

from aequilibrae.project.database_connection import database_connection
from aequilibrae.context import get_active_project
Expand All @@ -20,58 +19,28 @@ include 'hyperpath.pyx'


class HyperpathGenerating:
tail = "a_node"
head = "b_node"
origin_column="orig_vert_idx",
destination_column="dest_vert_idx",
demand_column="demand",

def __init__(
self,
graph,
matrix,
assignment_config: dict,
check_edges: bool = False,
check_demand: bool = False,
threads: int = 0
):
"""A class for hyperpath generation.
:Arguments:
**graph** (:obj:`TransitGraph`): TransitGraph object
"""A class for hyperpath generation.
**matrix** (:obj:`np.typing.ArrayLike`): Slice of AequilibraE matrx object for the demand. Required 2d dimensional.
:Arguments:
**edges** (:obj:`pandas.DataFrame`): The edges of the graph.
**assignment_conffig** (:obj:`dict[str, str]`): Dictionary containing the `Time field` and `Frequency field` columns names.
**tail** (:obj:`str`): The column name for the tail of the edge (optional, default is "tail").
**check_edges** (:obj:`bool`): If True, check the validity of the edges (optional, default is False).
**head** (:obj:`str`): The column name for the head of the edge (optional, default is "head").
**check_demand** (:obj:`bool`): If True, check the validity of the demand data (optional, default is False).
**trav_time** (:obj:`str`): The column name for the travel time of the edge (optional, default is "trav_time").
**threads** (:obj:`int`): The number of threads to use for computation (optional, default is 0, using all available threads).
"""
**freq** (:obj:`str`): The column name for the frequency of the edge (optional, default is "freq").
edges = graph.graph
trav_time = assignment_config["Time field"]
freq = assignment_config["Frequency field"]
self._assignment_config = assignment_config
self.__od_node_mapping = graph.od_node_mapping
**check_edges** (:obj:`bool`): If True, check the validity of the edges (optional, default is False).
"""

# This is a sparse representation of the AequilibraE Matrix object, the index is from od to od, *NOT* origin to destination. We'll use the od_node_mapping to convert index to node_id before the assignment
self.__demand = sparse.coo_matrix(matrix, dtype=np.float64)

if check_demand:
self._check_demand(self.__demand)

if check_edges:
self._check_edges(graph, trav_time, freq)

self.threads = threads

def __init__(self, edges, tail="tail", head="head", trav_time="trav_time", freq="freq", check_edges=False):
# load the edges
if self.check_edges:
self._check_edges(edges, self.tail, self.head, trav_time, freq)
self._edges = edges[["link_id", self.tail, self.head, trav_time, freq]].copy(deep=True)
if check_edges:
self._check_edges(edges, tail, head, trav_time, freq)
self._edges = edges[[tail, head, trav_time, freq]].copy(deep=True)
self.edge_count = len(self._edges)

# remove inf values if any, and values close to zero
Expand All @@ -92,18 +61,18 @@ class HyperpathGenerating:
self._edges[data_col] = self._edges.index

# convert to CSC format
self.vertex_count = self._edges[[self.tail, self.head]].max().max() + 1
rs_indptr, _, rs_data = convert_graph_to_csc_uint32(self._edges, self.tail, self.head, data_col, self.vertex_count)
self.vertex_count = self._edges[[tail, head]].max().max() + 1
rs_indptr, _, rs_data = convert_graph_to_csc_uint32(self._edges, tail, head, data_col, self.vertex_count)
self._indptr = rs_indptr.astype(np.uint32)
self._edge_idx = rs_data.astype(np.uint32)

# edge attributes
self._trav_time = self._edges[trav_time].values.astype(DATATYPE_PY)
self._freq = self._edges[freq].values.astype(DATATYPE_PY)
self._tail = self._edges[self.tail].values.astype(np.uint32)
self._head = self._edges[self.head].values.astype(np.uint32)
self._tail = self._edges[tail].values.astype(np.uint32)
self._head = self._edges[head].values.astype(np.uint32)

def run(self, origin, destination, volume):
def run(self, origin, destination, volume, return_inf=False):
# column storing the resulting edge volumes
self._edges["volume"] = 0.0
self.u_i_vec = None
Expand All @@ -119,6 +88,7 @@ class HyperpathGenerating:
self._check_vertex_idx(item)
self._check_volume(volume[i])
self._check_vertex_idx(destination)
assert isinstance(return_inf, bool)

o_vert_ids = np.array(origin, dtype=np.uint32)
d_vert_ids = np.array([destination], dtype=np.uint32)
Expand Down Expand Up @@ -158,51 +128,75 @@ class HyperpathGenerating:
assert isinstance(v, float)
assert v >= 0.0

def _check_edges(self, graph, trav_time, freq):
if "TransitGraph" not in [t.__name__ for t in type(graph).__mro__] :
raise TypeError("graph should be a TransitGraph")
def _check_edges(self, edges, tail, head, trav_time, freq):
if type(edges) != pd.core.frame.DataFrame:
raise TypeError("edges should be a pandas DataFrame")

for col in [tail, head, trav_time, freq]:
if col not in edges:
raise KeyError(f"edge column '{col}' not found in graph edges dataframe")

if edges[[tail, head, trav_time, freq]].isna().any().any():
raise ValueError(
" ".join(
[
f"edges[[{tail}, {head}, {trav_time}, {freq}]] ",
"should not have any missing value",
]
)
)

for col in [tail, head]:
if not pd.api.types.is_integer_dtype(edges[col].dtype):
raise TypeError(f"column '{col}' should be of integer type")

for col in [trav_time, freq]:
if not pd.api.types.is_numeric_dtype(graph[col].dtype):
if not pd.api.types.is_numeric_dtype(edges[col].dtype):
raise TypeError(f"column '{col}' should be of numeric type")

if any(graph.graph[col].isna()):
raise ValueError(f"column '{col}' should not have any missing values")

if graph[col].min() < 0.0:
if edges[col].min() < 0.0:
raise ValueError(f"column '{col}' should be nonnegative")

def assign(
self,
origin_column,
destination_column,
demand_column,
check_demand=False,
threads=None
):
"""
Assigns demand to the edges of the graph.
def execute(self, threads=0):
"""Assigns demand to the edges of the graph.
Assumes the ``*_column`` arguments are provided as numpy arrays that form a COO sprase matrix.
:Arguments:
**origin_column** (:obj:`np.ndarray`): The column for the origin vertices (optional, default is "orig_vert_idx").
**destination_column** (:obj:`np.ndarray`): The column or the destination vertices (optional, default is "dest_vert_idx").
**demand_column** (:obj:`np.ndarray`): The column for the demand values (optional, default is "demand").
**check_demand** (:obj:`bool`): If True, check the validity of the demand data (optional, default is False).
**threads** (:obj:`int`):The number of threads to use for computation (optional, default is 0, using all available threads).
"""

# check the input demand paramater
if not threads:
threads = self.threads
# if self.check_demand:
# self._check_demand(self.demand, self.origin_column, self.destination_column, self.demand_column)
# self.demand = self.demand[self.demand[self.demand_column] > 0]
if check_demand:
self._check_demand(origin_column, destination_column, demand_column)

if threads is None:
threads = 0 # Default to all threads

# initialize the column storing the resulting edge volumes
self._edges["volume"] = 0.0

# travel time is computed but not saved into an array in the following
self.u_i_vec = None

if len(self.__od_node_mapping.columns) == 2:
o_vert_ids = self.__od_node_mapping.iloc[self.__demand.row]["node_id"].values.astype(np.uint32)
d_vert_ids = self.__od_node_mapping.iloc[self.__demand.col]["node_id"].values.astype(np.uint32)
else:
o_vert_ids = self.__od_node_mapping.iloc[self.__demand.row]["o_node_id"].values.astype(np.uint32)
d_vert_ids = self.__od_node_mapping.iloc[self.__demand.col]["d_node_id"].values.astype(np.uint32)
demand_vls = self.__demand.data

# get the list of all destinations
destination_vertex_indices = np.unique(d_vert_ids)
destination_vertex_indices = np.unique(destination_column)

compute_SF_in_parallel(
self._indptr[:],
Expand All @@ -211,20 +205,73 @@ class HyperpathGenerating:
self._freq[:],
self._tail[:],
self._head[:],
d_vert_ids[:],
destination_column[:],
destination_vertex_indices[:],
o_vert_ids[:],
demand_vls[:],
origin_column[:],
demand_column[:],
self._edges["volume"].values,
False,
self.vertex_count,
self._edges["volume"].shape[0],
(multiprocessing.cpu_count() if threads < 1 else threads)
)

def _check_demand(self, demand):
if demand.min() < 0.0:
raise ValueError(f"column demand should be nonnegative")
def _check_demand(self, origin_column, destination_column, demand_column):
for col, col_name in zip([origin_column, destination_column, demand_column], ["origin", "destination", "demand"]):
if not isinstance(col, (np.ndarray, np.generic)):
raise TypeError(f"{col_name} should be a numpy array")

if np.any(np.isnan(col)):
raise ValueError(f"{col_name} should not have any missing value")

for col, col_name in zip([origin_column, destination_column], ["origin", "destination"]):
if not col.dtype == np.uint32:
raise TypeError(f"column '{col_name}' should be of np.uint32")

if not demand_column.dtype == np.float64:
raise TypeError(f"demand column should be of np.float64 type")

if demand_column.min() < 0.0:
raise ValueError(f"demand column should be nonnegative")

def info(self) -> dict:
info = {
"Algorithm": "Spiess, Heinz & Florian, Michael Hyperpath generation",
"Computer name": socket.gethostname(),
"Procedure ID": self.procedure_id,
}

return info

def save_results(self, table_name: str, keep_zero_flows=True, project=None) -> None:
"""Saves the assignment results to results_database.sqlite

Method fails if table exists

:Arguments:
**table_name** (:obj:`str`): Name of the table to hold this assignment result
**keep_zero_flows** (:obj:`bool`): Whether we should keep records for zero flows. Defaults to True
**project** (:obj:`Project`, Optional): Project we want to save the results to. Defaults to the active project
"""
df = self._edges
if not keep_zero_flows:
df = df[df.volume > 0]
if not project:
project = project or get_active_project()
conn = sqlite3.connect(os.path.join(project.project_base_path, "results_database.sqlite"))
df.to_sql(table_name, conn)
conn.close()
conn = database_connection("transit", project.project_base_path)
report = {"setup": self.info()}
data = [table_name, "hyperpath assignment", self.procedure_id, str(report), self.procedure_date, self.description]
conn.execute(
"""Insert into results(table_name, procedure, procedure_id, procedure_report, timestamp,
description) Values(?,?,?,?,?,?)""",
data,
)
conn.commit()
conn.close()
def results(self):
return self._edges[["link_id", "a_node", "b_node", "trav_time", "freq", "volume"]].copy(deep=True)
Loading

0 comments on commit 5aa8c6b

Please sign in to comment.