Skip to content

Commit

Permalink
Add optional immediately disk saving
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake-Moss committed Feb 12, 2024
1 parent 85fdb12 commit eecaaa3
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 116 deletions.
16 changes: 16 additions & 0 deletions aequilibrae/paths/route_choice.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ from libcpp.unordered_map cimport unordered_map
from libcpp.utility cimport pair
from libcpp cimport bool

cimport numpy as np # Numpy *must* be cimport'd BEFORE pyarrow.lib, there's nothing quite like Cython.
cimport pyarrow as pa
cimport pyarrow.lib as libpa
cimport pyarrow._dataset_parquet as pq
from libcpp.memory cimport shared_ptr

# std::linear_congruential_engine is not available in the Cython libcpp.random shim. We'll import it ourselves
# from libcpp.random cimport minstd_rand
from libc.stdint cimport uint_fast32_t, uint_fast64_t
Expand Down Expand Up @@ -143,3 +149,13 @@ cdef class RouteChoiceSet:
long long [:] _thread_reached_first,
unsigned int seed
) noexcept nogil

@staticmethod
cdef shared_ptr[libpa.CTable] make_table_from_results(vector[pair[long long, long long]] &ods, vector[RouteSet_t *] &route_sets)


cdef class Checkpoint:
cdef:
public object where
public object schema
public object partition_cols
272 changes: 205 additions & 67 deletions aequilibrae/paths/route_choice.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ from aequilibrae import Graph
from libc.math cimport INFINITY
from libc.string cimport memcpy
from libc.limits cimport UINT_MAX
from libc.stdlib cimport abort
from libcpp.vector cimport vector
from libcpp.unordered_set cimport unordered_set
from libcpp.unordered_map cimport unordered_map
Expand All @@ -65,7 +66,20 @@ from cython.operator cimport dereference as deref
from cython.parallel cimport parallel, prange, threadid

import numpy as np
import pyarrow as pa
from typing import List, Tuple
import itertools
import pathlib
import logging

cimport numpy as np # Numpy *must* be cimport'd BEFORE pyarrow.lib, there's nothing quite like Cython.
cimport pyarrow as pa
cimport pyarrow.lib as libpa
import pyarrow.dataset
import pyarrow.parquet as pq
from libcpp.memory cimport shared_ptr

from libc.stdio cimport fprintf, printf, stderr

# It would really be nice if these were modules. The 'include' syntax is long deprecated and adds a lot to compilation times
include 'basic_path_finding.pyx'
Expand All @@ -77,6 +91,14 @@ cdef class RouteChoiceSet:
Balmer, and Axhausen, 'Route Choice Sets for Very High-Resolution Data'
"""

route_set_dtype = pa.list_(pa.uint32())

schema = pa.schema([
pa.field("origin id", pa.uint32(), nullable=False),
pa.field("destination id", pa.uint32(), nullable=False),
pa.field("route set", route_set_dtype, nullable=False),
])

def __cinit__(self):
"""C level init. For C memory allocation and initialisation. Called exactly once per object."""
pass
Expand Down Expand Up @@ -132,7 +154,16 @@ cdef class RouteChoiceSet:
@cython.wraparound(False)
@cython.embedsignature(True)
@cython.initializedcheck(False)
def batched(self, ods: List[Tuple[int, int]], max_routes: int = 0, max_depth: int = 0, seed: int = 0, cores: int = 1, a_star: bool = True):
def batched(
self,
ods: List[Tuple[int, int]],
max_routes: int = 0,
max_depth: int = 0,
seed: int = 0,
cores: int = 1,
a_star: bool = True,
where: Optional[str] = None
):
"""
Compute the a route set for a list of OD pairs.
Expand All @@ -146,104 +177,119 @@ cdef class RouteChoiceSet:
**max_depth** (:obj:`int`): Maximum depth BFS can explore. Must be non-negative. Default of ``0`` for unlimited.
**seed** (:obj:`int`): Seed used for rng. Must be non-negative. Default of ``0``.
**cores** (:obj:`int`): Number of cores to use when parallelising over OD pairs. Must be non-negative. Default of ``1``.
**where** (:obj:`str`): Optional file path to save results to immediately. Will return None.
:Returns:
**route sets** (:obj:`dict[tuple[int, int], list[tuple[int, ...]]]`): Returns a list of unique tuples of compact link IDs for
each OD pair provided (as keys). Represents paths from
``origin`` to ``destination``.
each OD pair provided (as keys). Represents paths from ``origin`` to ``destination``. None if ``where`` was not None.
"""
cdef:
vector[pair[long long, long long]] c_ods = ods

# A* (and Dijkstra's) require memory views, so we must allocate here and take slices. Python can handle this memory
double [:, :] cost_matrix = np.empty((cores, self.cost_view.shape[0]), dtype=float)
long long [:, :] predecessors_matrix = np.empty((cores, self.num_nodes + 1), dtype=np.int64)
long long [:, :] conn_matrix = np.empty((cores, self.num_nodes + 1), dtype=np.int64)
long long [:, :] b_nodes_matrix = np.broadcast_to(self.b_nodes_view, (cores, self.b_nodes_view.shape[0])).copy()

# This matrix is never read from, it exists to allow using the Dijkstra's method without changing the
# interface.
long long [:, :] _reached_first_matrix

vector[RouteSet_t *] *results = new vector[RouteSet_t *](len(ods))
long long origin_index, dest_index, i
unsigned int c_max_routes = max_routes
unsigned int c_max_depth = max_depth
unsigned int c_seed = seed
unsigned int c_cores = cores
long long o, d

if max_routes == 0 and max_depth == 0:
raise ValueError("Either `max_routes` or `max_depth` must be > 0")

if max_routes < 0 or max_depth < 0 or cores < 0:
raise ValueError("`max_routes`, `max_depth`, and `cores` must be non-negative")

cdef:
unsigned int c_max_routes = max_routes
unsigned int c_max_depth = max_depth
unsigned int c_seed = seed
unsigned int c_cores = cores
long long o, d

for o, d in ods:
if self.nodes_to_indices_view[o] == -1:
raise ValueError(f"Origin {o} is not present within the compact graph")
if self.nodes_to_indices_view[d] == -1:
raise ValueError(f"Destination {d} is not present within the compact graph")

cdef:
vector[pair[long long, long long]] c_ods

# A* (and Dijkstra's) require memory views, so we must allocate here and take slices. Python can handle this memory
double [:, :] cost_matrix = np.empty((cores, self.cost_view.shape[0]), dtype=float)
long long [:, :] predecessors_matrix = np.empty((cores, self.num_nodes + 1), dtype=np.int64)
long long [:, :] conn_matrix = np.empty((cores, self.num_nodes + 1), dtype=np.int64)
long long [:, :] b_nodes_matrix = np.broadcast_to(self.b_nodes_view, (cores, self.b_nodes_view.shape[0])).copy()

# This matrix is never read from, it exists to allow using the Dijkstra's method without changing the
# interface.
long long [:, :] _reached_first_matrix

vector[RouteSet_t *] *results

self.a_star = a_star

if self.a_star:
_reached_first_matrix = np.zeros((cores, 1), dtype=np.int64) # Dummy array to allow slicing
else:
_reached_first_matrix = np.zeros((cores, self.num_nodes), dtype=np.int64)

with nogil, parallel(num_threads=c_cores):
for i in prange(c_ods.size()):
origin_index = self.nodes_to_indices_view[c_ods[i].first]
dest_index = self.nodes_to_indices_view[c_ods[i].second]

if self.block_flows_through_centroids:
blocking_centroid_flows(
0, # Always blocking
if where is not None:
checkpoint = Checkpoint(where, self.schema, partition_cols=["origin id"])
batches = list(Checkpoint.batches(ods))
results = new vector[RouteSet_t *](<size_t>max(len(batch) for batch in batches))
else:
batches = [ods]
results = new vector[RouteSet_t *](len(ods))

for batch in batches:
results.resize(len(batch)) # We know we've allocated enough size to store all max length batch but we resize to a smaller size when not needed
c_ods = batch # Convert the batch to a cpp vector, this isn't strictly efficient but is nicer
with nogil, parallel(num_threads=c_cores):
for i in prange(c_ods.size()):
origin_index = self.nodes_to_indices_view[c_ods[i].first]
dest_index = self.nodes_to_indices_view[c_ods[i].second]

if self.block_flows_through_centroids:
blocking_centroid_flows(
0, # Always blocking
origin_index,
self.zones,
self.graph_fs_view,
b_nodes_matrix[threadid()],
self.b_nodes_view,
)

deref(results)[i] = RouteChoiceSet.generate_route_set(
self,
origin_index,
self.zones,
self.graph_fs_view,
dest_index,
c_max_routes,
c_max_depth,
cost_matrix[threadid()],
predecessors_matrix[threadid()],
conn_matrix[threadid()],
b_nodes_matrix[threadid()],
self.b_nodes_view,
_reached_first_matrix[threadid()],
c_seed,
)

deref(results)[i] = RouteChoiceSet.generate_route_set(
self,
origin_index,
dest_index,
c_max_routes,
c_max_depth,
cost_matrix[threadid()],
predecessors_matrix[threadid()],
conn_matrix[threadid()],
b_nodes_matrix[threadid()],
_reached_first_matrix[threadid()],
c_seed,
)

if self.block_flows_through_centroids:
blocking_centroid_flows(
1, # Always unblocking
origin_index,
self.zones,
self.graph_fs_view,
b_nodes_matrix[threadid()],
self.b_nodes_view,
)
if self.block_flows_through_centroids:
blocking_centroid_flows(
1, # Always unblocking
origin_index,
self.zones,
self.graph_fs_view,
b_nodes_matrix[threadid()],
self.b_nodes_view,
)

table = libpa.pyarrow_wrap_table(RouteChoiceSet.make_table_from_results(c_ods, deref(results)))

# Once we've made the table all results have been copied into some pyarrow structure, we can free our internal structures
for result in deref(results):
for route in deref(result):
del route

if where is None: # There was only one batch anyway
return table

# Build results into python objects using Cythons auto-conversion from vector[long long] to list then to tuple (for set operations)
res = []
for result in deref(results):
links = []
for route in deref(result):
links.append(tuple(deref(route)))
del route
res.append(links)
checkpoint.write(table)

del results
return dict(zip(ods, res))
del table

return

@cython.initializedcheck(False)
cdef void path_find(
Expand Down Expand Up @@ -385,3 +431,95 @@ cdef class RouteChoiceSet:
del banned

return route_set

@staticmethod
cdef shared_ptr[libpa.CTable] make_table_from_results(vector[pair[long long, long long]] &ods, vector[RouteSet_t *] &route_sets):
cdef:
shared_ptr[libpa.CArray] paths
shared_ptr[libpa.CArray] offsets
libpa.CMemoryPool *pool = libpa.c_get_memory_pool()
libpa.CUInt32Builder *path_builder = new libpa.CUInt32Builder(pool)
libpa.CInt32Builder *offset_builder = new libpa.CInt32Builder(pool) # Must be Int32 *not* UInt32
libpa.CUInt32Builder *o_col = new libpa.CUInt32Builder(pool)
libpa.CUInt32Builder *d_col = new libpa.CUInt32Builder(pool)
vector[shared_ptr[libpa.CArray]] columns
shared_ptr[libpa.CDataType] route_set_dtype = libpa.pyarrow_unwrap_data_type(RouteChoiceSet.route_set_dtype)

libpa.CResult[shared_ptr[libpa.CArray]] route_set_results

int offset = 0

columns.emplace_back(shared_ptr[libpa.CArray]()) # Origins
columns.emplace_back(shared_ptr[libpa.CArray]()) # Destination
columns.emplace_back(shared_ptr[libpa.CArray]()) # Route set

for i in range(ods.size()):
route_set = route_sets[i]

# Instead of construction a "list of lists" style object for storing the route sets we instead will construct one big array of link ids
# with a corresponding offsets array that indicates where each new row (path) starts.
for route in deref(route_set):
o_col.Append(ods[i].first)
d_col.Append(ods[i].second)
offset_builder.Append(offset)
# TODO: Pyarrows Cython API is incredibly lacking, it's just functional and doesn't include all the nice things the C++ API has
# One such thing its missing the AppendValues, which can add whole iterators at once in a much smarter fashion.
# We'll have to reimport/extern the classes we use if we want to avoid the below
for link in deref(route):
path_builder.Append(link)

offset += route.size()

path_builder.Finish(&paths)
offset_builder.Append(offset) # Mark the end of the array in offsets
offset_builder.Finish(&offsets)

route_set_results = libpa.CListArray.FromArraysAndType(route_set_dtype, deref(offsets.get()), deref(paths.get()), pool, shared_ptr[libpa.CBuffer]())
if not route_set_results.ok():
fprintf(stderr, "The Routes Sets Aren't Alright: %s\n", route_set_results.status().ToString().c_str())
abort()

o_col.Finish(&columns[0])
d_col.Finish(&columns[1])
columns[2] = deref(route_set_results)

cdef shared_ptr[libpa.CTable] table = libpa.CTable.MakeFromArrays(libpa.pyarrow_unwrap_schema(RouteChoiceSet.schema), columns)

del path_builder
del offset_builder
del o_col
del d_col
return table


@cython.embedsignature(True)
cdef class Checkpoint:
"""
A small wrapper class to write a dataset partition by partition
"""

def __init__(self, where, schema, partition_cols = None):
"""Python level init, may be called multiple times, for things that can't be done in __cinit__."""
self.where = pathlib.Path(where)
self.schema = schema
self.partition_cols = partition_cols

def write(self, table):
logger = logging.getLogger("aequilibrae")
pq.write_to_dataset(
table,
self.where,
partitioning=self.partition_cols,
partitioning_flavor="hive",
schema=self.schema,
use_threads=True,
existing_data_behavior="overwrite_or_ignore",
file_visitor=lambda written_file: logger.info(f"Wrote partition dataset at {written_file.path}")
)

def read_dataset(self):
return pa.dataset.dataset(self.where, format="parquet", partitioning=pa.dataset.HivePartitioning(self.schema))

@staticmethod
def batches(ods: List[Tuple[int, int]]):
return (list(g) for k, g in itertools.groupby(sorted(ods), key=lambda x: x[0]))
7 changes: 7 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@
exec(f.read())

include_dirs = [np.get_include()]
libraries = []
library_dirs = []
if iutil.find_spec("pyarrow") is not None:
import pyarrow as pa

include_dirs.append(pa.get_include())
libraries.extend(pa.get_libraries())
library_dirs.extend(pa.get_library_dirs())

is_win = "WINDOWS" in platform.platform().upper()
is_mac = any(e in platform.platform().upper() for e in ["MACOS", "DARWIN"])
Expand Down Expand Up @@ -62,8 +66,11 @@
extra_link_args=link_args,
define_macros=[("NPY_NO_DEPRECATED_API", "NPY_1_7_API_VERSION")],
include_dirs=include_dirs,
libraries=libraries,
library_dirs=library_dirs,
language="c++",
)

ext_mod_graph_building = Extension(
"aequilibrae.paths.graph_building",
[join("aequilibrae", "paths", "graph_building.pyx")],
Expand Down
Loading

0 comments on commit eecaaa3

Please sign in to comment.