Skip to content

Commit

Permalink
Cleans GTFS importer
Browse files Browse the repository at this point in the history
  • Loading branch information
pveigadecamargo committed May 28, 2024
1 parent 2e94264 commit 04bb956
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 132 deletions.
13 changes: 8 additions & 5 deletions aequilibrae/transit/lib_gtfs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import importlib.util as iutil
from contextlib import closing
from copy import deepcopy

import geopandas as gpd
import pandas as pd
import pyproj
from pyproj import Transformer
Expand Down Expand Up @@ -36,7 +36,6 @@ def __init__(self, network, agency_identifier, file_path, day="", description=""

self.__network = network
self.project = get_active_project(False)
self.geo_links = self.project.network.links.data
self.archive_dir = None # type: str
self.day = day
self.logger = logger
Expand Down Expand Up @@ -68,7 +67,12 @@ def __init__(self, network, agency_identifier, file_path, day="", description=""
self.select_stops = {}
self.select_patterns = {}
self.select_links = {}
self.__mt = ""

links = self.project.network.links.data
self.geo_links = gpd.GeoDataFrame(links, geometry=links.geometry, crs="EPSG:4326")
# Approximately 40 meter buffer
buff_geo = self.geo_links.to_crs(3857).buffer(40).geometry
self.geo_links_buffer = gpd.GeoDataFrame(links, geometry=buff_geo.to_crs(4326), crs="EPSG:4326")

def set_capacities(self, capacities: dict):
"""Sets default capacities for modes/vehicles.
Expand Down Expand Up @@ -196,7 +200,6 @@ def execute_import(self):
return

self.logger.info(f" Importing feed for agency {self.gtfs_data.agency.agency} on {self.day}")
self.__mt = f"Importing {self.gtfs_data.agency.agency} to supply"

self.save_to_disk()

Expand Down Expand Up @@ -409,7 +412,7 @@ def builds_link_graphs_with_broken_stops(self):
]
if not route_types:
return
mm = MMGraph(self, self.__mt)
mm = MMGraph(self)
for mode_id in route_types:
mode = mode_correspondence[mode_id]
graph = mm.build_graph_with_broken_stops(mode_id)
Expand Down
23 changes: 1 addition & 22 deletions aequilibrae/transit/map_matching_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,18 @@
from aequilibrae.transit.transit_elements import mode_correspondence
from ..utils import WorkerThread

spec = iutil.find_spec("PyQt5")
pyqt = spec is not None
if pyqt:
from PyQt5.QtCore import pyqtSignal

GRAPH_VERSION = 1
CONNECTOR_SPEED = 1


class MMGraph(WorkerThread):
"""Build specialized map-matching graphs. Not designed to be used by the final user"""

if pyqt:
signal = pyqtSignal(object)

def __init__(self, lib_gtfs, mtmm):
def __init__(self, lib_gtfs):
WorkerThread.__init__(self, None)
self.project = lib_gtfs.project
self.stops = lib_gtfs.gtfs_data.stops
self.lib_gtfs = lib_gtfs
self.__mtmm = mtmm
self._idx = None
self.max_link_id = -1
self.max_node_id = -1
Expand Down Expand Up @@ -127,24 +118,16 @@ def __build_graph_from_scratch(self):
self.max_link_id = self.df.link_id.max() + 1
self.max_node_id = self.df[["a_node", "b_node"]].max().max() + 1
# Build initial index
if pyqt:
self.signal.emit(["start", "secondary", self.df.shape[0], f"Indexing links - {self.__mode}", self.__mtmm])
self._idx = GeoIndex()
for counter, (_, record) in enumerate(self.df.iterrows()):
if pyqt:
self.signal.emit(["update", "secondary", counter + 1, f"Indexing links - {self.__mode}", self.__mtmm])
self._idx.insert(feature_id=record.link_id, geometry=record.geo)
# We will progressively break links at stops' projection
# But only on the right side of the link (no boarding at the opposing link's side)
centroids = []
self.node_corresp = []
if pyqt:
self.signal.emit(["start", "secondary", len(self.stops), f"Breaking links - {self.__mode}", self.__mtmm])
self.df = self.df.assign(direction=1, free_flow_time=np.inf, wrong_side=0, closest=1, to_remove=0)
self.__all_links = {rec.link_id: rec for _, rec in self.df.iterrows()}
for counter, (stop_id, stop) in enumerate(self.stops.items()):
if pyqt:
self.signal.emit(["update", "secondary", counter + 1, f"Breaking links - {self.__mode}", self.__mtmm])
stop.___map_matching_id__[self.mode_id] = self.max_node_id
self.node_corresp.append([stop_id, self.max_node_id])
centroids.append(stop.___map_matching_id__[self.mode_id])
Expand Down Expand Up @@ -305,7 +288,3 @@ def __graph_from_broken_net(self, centroids, net):
g.set_skimming(["distance"])
g.set_blocked_centroid_flows(True)
return g

def finished(self):
if pyqt:
self.signal.emit(["finished_building_mm_graph_procedure"])
129 changes: 24 additions & 105 deletions aequilibrae/transit/transit_elements/pattern.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
from sqlite3 import Connection
from typing import List, Tuple, Optional

import geopandas as gpd
import numpy as np
import pandas as pd
import shapely.wkb
from shapely.geometry import LineString, Polygon
from shapely.geometry import LineString
from shapely.ops import substring

from aequilibrae.log import logger
from aequilibrae.paths import PathResults
from aequilibrae.transit.functions.get_srid import get_srid
from aequilibrae.utils.geo_index import GeoIndex
from .basic_element import BasicPTElement
from .link import Link
from .mode_correspondence import mode_correspondence
Expand Down Expand Up @@ -43,6 +43,7 @@ def __init__(self, route_id, gtfs_feed) -> None:
self.total_capacity = None
self.__srid = get_srid()
self.__geolinks = gtfs_feed.geo_links
self.__geolinks_buffer = gtfs_feed.geo_links_buffer
self.__logger = logger

self.__feed = gtfs_feed
Expand All @@ -66,7 +67,6 @@ def __init__(self, route_id, gtfs_feed) -> None:
self.__net_nodes_from_stops = []
self.__mm_fail_position = -1
self.__map_matched = False
self.__match_quality = None
self.shape_length = -1

def save_to_database(self, conn: Connection, commit=True) -> None:
Expand Down Expand Up @@ -97,18 +97,16 @@ def save_to_database(self, conn: Connection, commit=True) -> None:
if self.pattern_mapping and self.shape:
sqlgeo = """insert into pattern_mapping (pattern_id, seq, link, dir, geometry)
values (?, ?, ?, ?, GeomFromWKB(?, ?));"""
sql = """insert into pattern_mapping (pattern_id, seq, link, dir)
values (?, ?, ?, ?);"""
sql = "insert into pattern_mapping (pattern_id, seq, link, dir) values (?, ?, ?, ?);"

for record in self.pattern_mapping:
if record[-1] is None:
conn.execute(sql, record[:-1])
else:
geo = shapely.wkb.loads(record[-1])
if isinstance(geo, LineString):
conn.execute(sqlgeo, record + [self.__srid])
else:
conn.execute(sql, record[:-1])
if "wkb" in self.pattern_mapping.columns:
data = self.pattern_mapping[["pattern_id", "seq", "link_id", "dir", "wkb", "srid"]].to_records()
conn.executemany(sqlgeo, data)
else:
data = self.pattern_mapping[["pattern_id", "seq", "link_id", "dir"]].to_records()
conn.executemany(sql, data)
if commit:
conn.commit()

def best_shape(self) -> LineString:
"""Gets the best version of shape available for this pattern"""
Expand Down Expand Up @@ -158,20 +156,11 @@ def map_match(self):
logger.info(f"Map-matched pattern {self.pattern_id}")

def __graph_discount(self, connected_stops):
self.__geolinks = self.__geolinks[self.__geolinks.modes.str.contains(mode_correspondence[self.route_type])]
link_idx = GeoIndex()
for _, row in self.__geolinks.iterrows():
link_idx.insert(feature_id=row.link_id, geometry=row.geometry)
links = set()
for stop in connected_stops:
links.update(list(link_idx.nearest(stop.geo, 3)))
buffer = self.best_shape().buffer(40) # type: Polygon

return [
lnk
for lnk in links
if buffer.contains(self.__geolinks.loc[self.__geolinks.link_id == lnk].geometry.values[0])
]
gdf = gpd.GeoDataFrame(geometry=gpd.GeoSeries([stop.geo for stop in connected_stops], crs=self.__geolinks.crs))
gdf = self.__geolinks_buffer.sjoin(gdf, how="inner", predicate="intersects")

gdf = gdf[gdf.modes.str.contains(mode_correspondence[self.route_type])]
return gdf.link_id.to_list()

def __map_matching_complete_path_building(self):
mode_ = mode_correspondence[self.route_type]
Expand All @@ -194,7 +183,7 @@ def __map_matching_complete_path_building(self):
for i, stop in enumerate(candidate_stops):
node_o = stop.___map_matching_id__[self.route_type]
logger.debug(f"Computing paths between {node_o} and {node0}")
res.compute_path(node_o, int(node0), early_exit=True)
res.compute_path(node_o, int(node0), early_exit=False)
# Get skims, as proxy for connectivity, for all stops other than the origin
other_nodes = stop_node_idxs[:i] + stop_node_idxs[i + 1 :]
dest_skim = res.skims[other_nodes, 0]
Expand Down Expand Up @@ -346,81 +335,11 @@ def get_error(self, what_to_get="culprit") -> Optional[tuple]:

def __build_pattern_mapping(self):
# We find what is the position along routes that we have for each stop and make sure they are always growing
self.pattern_mapping = []
segments = [LineString([pt1, pt2]) for pt1, pt2 in zip(self.shape.coords, self.shape.coords[1:])]
seg_len = [seg.length for seg in segments]
distances = []

min_idx_so_far = 0
for i, stop in enumerate(self.stops):
d = [round(stop.geo.distance(line), 1) for line in segments]
idx = d[min_idx_so_far:].index(min(d[min_idx_so_far:])) + min_idx_so_far
idx = idx if i > 0 else 0
projection = segments[idx].project(stop.geo) + sum(seg_len[:idx])
distances.append(projection)
min_idx_so_far = idx

# the list needs to be monotonically increasing
if not all(x <= y for x, y in zip(distances, distances[1:])):
for i, (x, y) in enumerate(zip(distances, distances[1:])):
distances[i + 1] = distances[i] if y < x else distances[i + 1]

logger.warning(
f"""Pattern {self.pattern_id} has a map-matching pattern resulting in backwards movement.
It was fixed, but it should be checked."""
)

distances[-1] = distances[-1] if self.shape.length > distances[-1] else self.shape.length - 0.00001

# We make sure we don't have projections going beyond the length we will accumulate
# This is only required because of numerical precision issues
tot_broken_length = sum(
[self.__geolinks.loc[self.__geolinks.link_id == abs(x)].geometry.values[0].length for x in self.full_path]
self.pattern_mapping = pd.DataFrame(
{"seq": np.arange(len(self.full_path)), "link_id": np.abs(self.full_path), "dir": self.fpath_dir}
)
distances = [min(x, tot_broken_length) for x in distances]
pid = self.pattern_id
stop_pos = 0
cum_dist = 0
index = 0
for link_pos, link_id in enumerate([abs(x) for x in self.full_path]):
direc = self.fpath_dir[link_pos]
link_geo = self.__geolinks.loc[self.__geolinks.link_id == link_id].geometry.values[0]
link_geo = link_geo if direc == 0 else LineString(link_geo.coords[::-1])
if distances[stop_pos] > cum_dist + link_geo.length:
# If we have no stops falling right in this link
dt = [pid, index, link_id, direc, link_geo.wkb]
self.pattern_mapping.append(dt)
index += 1
cum_dist += link_geo.length
continue

start_point = cum_dist
end_point = cum_dist + link_geo.length
while cum_dist + link_geo.length >= distances[stop_pos]:
milepost = distances[stop_pos]
wkb = None if stop_pos == 0 else substring(self.shape, start_point, milepost).wkb
stop = self.stops[stop_pos]
dt = [pid, index, link_id, direc, wkb]
self.pattern_mapping.append(dt)
index += 1

stop_pos += 1
if stop_pos >= len(distances):
break
start_point = milepost

cum_dist += link_geo.length

if start_point != end_point:
wkb = substring(self.shape, start_point, end_point).wkb
self.pattern_mapping.append([pid, index, link_id, direc, wkb])
index += 1

if stop_pos >= len(distances):
break
self.__compute_match_quality()
self.pattern_mapping = self.pattern_mapping.assign(pattern_id=self.pattern_id, srid=4326)
links_with_geo = self.__geolinks.assign(wkb=self.__geolinks.geometry.to_wkb())
links_with_geo = links_with_geo[["link_id", "wkb"]]

def __compute_match_quality(self):
dispersion = [(self.shape.distance(stop.geo) * math.pi * 6371000 / 180) for stop in self.stops]
dispersion = sum([x * x for x in dispersion]) / len(self.stops)
self.__match_quality = dispersion
self.pattern_mapping = self.pattern_mapping.merge(links_with_geo, on="link_id", how="left")

0 comments on commit 04bb956

Please sign in to comment.