Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removes pyqt dependency and adds some progress bars #520

Merged
merged 13 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 17 additions & 24 deletions aequilibrae/paths/all_or_nothing.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,30 @@
import importlib.util as iutil
import threading
from multiprocessing.dummy import Pool as ThreadPool

import numpy as np

from .multi_threaded_aon import MultiThreadedAoN
from ..utils import WorkerThread
from aequilibrae.matrix import AequilibraeMatrix
from aequilibrae import global_logger
from aequilibrae.matrix import AequilibraeMatrix
from .multi_threaded_aon import MultiThreadedAoN

try:
from aequilibrae.paths.AoN import one_to_all, assign_link_loads
except ImportError as ie:
global_logger.warning(f"Could not import procedures from the binary. {ie.args}")

spec = iutil.find_spec("PyQt5")
pyqt = spec is not None
if pyqt:
from PyQt5.QtCore import pyqtSignal as SIGNAL
from aequilibrae.utils.signal import SIGNAL

if False:
from .results import AssignmentResults
from .graph import Graph


class allOrNothing(WorkerThread):
if pyqt:
assignment = SIGNAL(object)

def __init__(self, matrix, graph, results):
class allOrNothing:
def __init__(self, class_name, matrix, graph, results):
# type: (AequilibraeMatrix, Graph, AssignmentResults)->None
self.assignment: SIGNAL = None

WorkerThread.__init__(self, None)

self.class_name = class_name
self.matrix = matrix
self.graph = graph
self.results = results
Expand All @@ -49,16 +42,18 @@ def __init__(self, matrix, graph, results):
elif not np.array_equal(matrix.index, graph.centroids):
raise ValueError("Matrix and graph do not have compatible sets of centroids.")

def _build_signal(self):
if self.assignment is None:
self.assignment = SIGNAL(object)
self.assignment.emit(["start", self.matrix.zones, self.class_name])

def doWork(self):
self.execute()

def execute(self):
self._build_signal()
self.report = []
self.cumulative = 0

if pyqt:
self.assignment.emit(["zones finalized", 0])

self.aux_res.prepare(self.graph, self.results)
self.matrix.matrix_view = self.matrix.matrix_view.reshape(
(self.graph.num_zones, self.graph.num_zones, self.results.classes["number"])
Expand All @@ -75,13 +70,12 @@ def execute(self):
pool.apply_async(self.func_assig_thread, args=(orig, all_threads))
pool.close()
pool.join()
self.assignment.emit(["update", self.matrix.index.shape[0], self.class_name])
# TODO: Multi-thread this sum
self.results.compact_link_loads = np.sum(self.aux_res.temp_link_loads, axis=0)
assign_link_loads(
self.results.link_loads, self.results.compact_link_loads, self.results.crosswalk, self.results.cores
)
if pyqt:
self.assignment.emit(["finished_threaded_procedure", None])

def func_assig_thread(self, origin, all_threads):
thread_id = threading.get_ident()
Expand All @@ -94,6 +88,5 @@ def func_assig_thread(self, origin, all_threads):
self.cumulative += 1
if x != origin:
self.report.append(x)
if pyqt:
self.assignment.emit(["zones finalized", self.cumulative])
self.assignment.emit(["text AoN", f"{self.cumulative:,}/{self.matrix.zones:,}"])
if self.cumulative % 10 == 0:
self.assignment.emit(["update", self.cumulative, self.class_name])
93 changes: 34 additions & 59 deletions aequilibrae/paths/linear_approximation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import importlib.util as iutil
import logging
import os
from functools import partial
Expand All @@ -7,49 +6,29 @@
from typing import List, Dict

import numpy as np
from aequilibrae.paths.AoN import copy_two_dimensions, copy_three_dimensions
from aequilibrae.paths.AoN import linear_combination, linear_combination_skims, aggregate_link_costs
from aequilibrae.paths.AoN import sum_a_times_b_minus_c, linear_combination_1d
from aequilibrae.paths.AoN import triple_linear_combination, triple_linear_combination_skims
from scipy.optimize import root_scalar

from aequilibrae import global_logger
from aequilibrae.paths.all_or_nothing import allOrNothing
from aequilibrae.paths.results import AssignmentResults
from aequilibrae.paths.traffic_class import TrafficClass
from ..utils import WorkerThread

try:
from aequilibrae.paths.AoN import linear_combination, linear_combination_skims, aggregate_link_costs
from aequilibrae.paths.AoN import triple_linear_combination, triple_linear_combination_skims
from aequilibrae.paths.AoN import copy_one_dimension, copy_two_dimensions, copy_three_dimensions
from aequilibrae.paths.AoN import sum_a_times_b_minus_c, linear_combination_1d
except ImportError as ie:
global_logger.warning(f"Could not import procedures from the binary. {ie.args}")

import scipy

if int(scipy.__version__.split(".")[1]) >= 3:
from scipy.optimize import root_scalar

recent_scipy = True
else:
from scipy.optimize import root as root_scalar

recent_scipy = False
global_logger.warning("Using older version of Scipy. For better performance, use Scipy >= 1.4")

if False:
from aequilibrae.paths.traffic_assignment import TrafficAssignment

spec = iutil.find_spec("PyQt5")
pyqt = spec is not None
if pyqt:
from PyQt5.QtCore import pyqtSignal as SIGNAL
from aequilibrae.utils.signal import SIGNAL
from aequilibrae.utils.python_signal import PythonSignal


class LinearApproximation(WorkerThread):
if pyqt:
equilibration = SIGNAL(object)
assignment = SIGNAL(object)

class LinearApproximation:
def __init__(self, assig_spec, algorithm, project=None) -> None:
WorkerThread.__init__(self, None)
self.equilibration = SIGNAL(object)
self.assignment = SIGNAL(object)
if isinstance(self.assignment, PythonSignal):
self.assignment.pos = 1

self.logger = project.logger if project else logging.getLogger("aequilibrae")

Expand Down Expand Up @@ -470,7 +449,7 @@ def execute(self): # noqa: C901
# Prepares the fixed cost to be used
if c.fixed_cost_field:
# divide fixed cost by volume-dependent prefactor (vot) such that we don't have to do it for
# each occurence in the objective funtion. TODO: Need to think about cost skims here, we do
# each occurrence in the objective function. TODO: Need to think about cost skims here, we do
# not want this there I think
v = c.graph.graph[c.fixed_cost_field].values[:]
c.fixed_cost[c.graph.graph.__supernet_id__] = v * c.fc_multiplier / c.vot
Expand All @@ -480,28 +459,32 @@ def execute(self): # noqa: C901
# Just need to create some arrays for cost
c.graph.set_graph(self.time_field)

self.aons[c._id] = allOrNothing(c.matrix, c.graph, c._aon_results)
self.aons[c._id] = allOrNothing(c._id, c.matrix, c.graph, c._aon_results)

self.equilibration.emit(["start", self.max_iter, "Equilibrium Assignment"])
self.logger.info(f"{self.algorithm} Assignment STATS")
self.logger.info("Iteration, RelativeGap, stepsize")
for self.iter in range(1, self.max_iter + 1): # noqa: B020
self.iteration_issue = []
if pyqt:
self.equilibration.emit(["rgap", self.rgap])
self.equilibration.emit(["iterations", self.iter])
self.equilibration.emit(["key_value", "rgap", self.rgap])
self.equilibration.emit(["key_value", "iterations", self.iter])

aon_flows = []

self.__maybe_create_path_file_directories()

for c in self.traffic_classes: # type: TrafficClass
if self.assignment.pbar is None:
pedrocamargo marked this conversation as resolved.
Show resolved Hide resolved
self.assignment.emit(["start", c.matrix.zones, "All-or-Nothing"])
# cost = c.fixed_cost / c.vot + self.congested_time # now only once
cost = c.fixed_cost + self.congested_time
aggregate_link_costs(cost, c.graph.compact_cost, c.results.crosswalk)

aon = self.aons[c._id] # This is a new object every iteration, with new aux_res
if pyqt:
aon.assignment.connect(self.signal_handler)
self.assignment.emit(["refresh"])
self.assignment.emit(["reset"])
aon.assignment = self.assignment

aon.execute()
c._aon_results.link_loads *= c.pce
c._aon_results.total_flows()
Expand Down Expand Up @@ -585,7 +568,7 @@ def execute(self): # noqa: C901
# Check convergence
# This needs to be done with the current costs, and not the future ones
converged = self.check_convergence() if self.iter > 1 else False

self.equilibration.emit(["update", self.iter, f"Equilibrium Assignment: RGap - {self.rgap:.3E}"])
self.vdf.apply_vdf(
self.congested_time,
self.fw_total_flow,
Expand All @@ -604,6 +587,8 @@ def execute(self): # noqa: C901
self.convergence_report["rgap"].append(self.rgap)
self.convergence_report["warnings"].append("; ".join(self.iteration_issue))
self.convergence_report["alpha"].append(self.stepsize)
self.equilibration.emit(["key_value", "rgap", self.rgap])
self.equilibration.emit(["key_value", "iterations", self.iter])

if self.algorithm in ["cfw", "bfw"]:
self.convergence_report["beta0"].append(self.betas[0])
Expand Down Expand Up @@ -633,10 +618,9 @@ def execute(self): # noqa: C901
if (self.rgap > self.rgap_target) and (self.algorithm != "all-or-nothing"):
self.logger.error(f"Desired RGap of {self.rgap_target} was NOT reached")
self.logger.info(f"{self.algorithm} Assignment finished. {self.iter} iterations and {self.rgap} final gap")
if pyqt:
self.equilibration.emit(["rgap", self.rgap])
self.equilibration.emit(["iterations", self.iter])
self.equilibration.emit(["finished_threaded_procedure"])
self.equilibration.emit(["update", self.max_iter, f"Equilibrium Assignment: RGap - {self.rgap:.3E}"])
self.assignment.emit(["finished"])
self.equilibration.emit(["finished"])

def __derivative_of_objective_stepsize_dependent(self, stepsize, const_term):
"""The stepsize-dependent part of the derivative of the objective function. If fixed costs are defined,
Expand Down Expand Up @@ -676,18 +660,10 @@ def calculate_stepsize(self):
x_tol = max(min(1e-6, self.rgap * 1e-5), 1e-12)

try:
if recent_scipy:
min_res = root_scalar(derivative_of_objective, bracket=[0, 1], xtol=x_tol)
self.stepsize = min_res.root
if not min_res.converged:
self.logger.warning("Descent direction stepsize finder has not converged")
else:
min_res = root_scalar(derivative_of_objective, 1 / self.iter, xtol=x_tol)
if not min_res.success:
self.logger.warning("Descent direction stepsize finder has not converged")
self.stepsize = min_res.x[0]
if self.stepsize <= 0.0 or self.stepsize >= 1.0:
raise ValueError("wrong root")
min_res = root_scalar(derivative_of_objective, bracket=[0, 1], xtol=x_tol)
self.stepsize = min_res.root
if not min_res.converged:
self.logger.warning("Descent direction stepsize finder has not converged")

self.conjugate_failed = False

Expand Down Expand Up @@ -732,5 +708,4 @@ def check_convergence(self):
return False

def signal_handler(self, val):
if pyqt:
self.assignment.emit(val)
self.assignment.emit(val)
29 changes: 10 additions & 19 deletions aequilibrae/paths/network_skimming.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import importlib.util as iutil
import multiprocessing as mp
import sys
import threading
Expand All @@ -10,22 +9,18 @@
from aequilibrae.context import get_active_project
from aequilibrae.paths.multi_threaded_skimming import MultiThreadedNetworkSkimming
from aequilibrae.paths.results.skim_results import SkimResults
from aequilibrae.utils import WorkerThread

try:
from aequilibrae.paths.AoN import skimming_single_origin
except ImportError as ie:
global_logger.warning(f"Could not import procedures from the binary. {ie.args}")

spec = iutil.find_spec("PyQt5")
pyqt = spec is not None
if pyqt:
from PyQt5.QtCore import pyqtSignal
from aequilibrae.utils.signal import SIGNAL

sys.dont_write_bytecode = True


class NetworkSkimming(WorkerThread):
class NetworkSkimming:
"""

.. code-block:: python
Expand Down Expand Up @@ -61,11 +56,9 @@ class NetworkSkimming(WorkerThread):
>>> project.close()
"""

if pyqt:
skimming = pyqtSignal(object)
skimming = SIGNAL(object)

def __init__(self, graph, origins=None, project=None):
WorkerThread.__init__(self, None)
self.project = project
self.origins = origins
self.graph = graph
Expand All @@ -82,8 +75,7 @@ def doWork(self):

def execute(self):
"""Runs the skimming process as specified in the graph"""
if pyqt:
self.skimming.emit(["zones finalized", 0])
self.skimming.emit(["zones finalized", 0])
self.results.cores = self.cores
self.results.prepare(self.graph)
self.aux_res = MultiThreadedNetworkSkimming()
Expand All @@ -105,9 +97,8 @@ def execute(self):
self.procedure_id = uuid4().hex
self.procedure_date = str(datetime.today())

if pyqt:
self.skimming.emit(["text skimming", "Saving Outputs"])
self.skimming.emit(["finished_threaded_procedure", None])
self.skimming.emit(["text skimming", "Saving Outputs"])
self.skimming.emit(["finished_threaded_procedure", None])

def set_cores(self, cores: int) -> None:
"""
Expand Down Expand Up @@ -168,7 +159,7 @@ def __func_skim_thread(self, origin, all_threads):
self.cumulative += 1
if x != origin:
self.report.append(x)
if pyqt:
self.skimming.emit(["zones finalized", self.cumulative])
txt = str(self.cumulative) + " / " + str(self.matrix.zones)
self.skimming.emit(["text skimming", txt])

self.skimming.emit(["zones finalized", self.cumulative])
txt = str(self.cumulative) + " / " + str(self.matrix.zones)
self.skimming.emit(["text skimming", txt])
12 changes: 5 additions & 7 deletions aequilibrae/project/network/gmns_builder.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import csv
import math
import re
from copy import deepcopy
import csv
import string
from collections import defaultdict
from copy import deepcopy

import numpy as np
import pandas as pd
import string
import shapely.wkb
import shapely.wkt
from shapely.geometry import LineString, Point
from pyproj import Transformer
from ...utils import WorkerThread
from shapely.geometry import LineString, Point

from aequilibrae import logger
from aequilibrae.parameters import Parameters
Expand Down Expand Up @@ -48,11 +47,10 @@ def resolve_recusive_dict(base_dict):
return dict(resolved)


class GMNSBuilder(WorkerThread):
class GMNSBuilder:
def __init__(
self, net, link_path: str, node_path: str, uses_path: str = None, geom_path: str = None, srid: int = 4326
) -> None:
WorkerThread.__init__(self, None)
self.p = Parameters()
self.links = net.links
self.nodes = net.nodes
Expand Down
7 changes: 3 additions & 4 deletions aequilibrae/project/network/gmns_exporter.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import pandas as pd
from os.path import join
from ...utils import WorkerThread

import pandas as pd

from aequilibrae.parameters import Parameters
from aequilibrae.utils.db_utils import commit_and_close


class GMNSExporter(WorkerThread):
class GMNSExporter:
def __init__(self, net, path) -> None:
WorkerThread.__init__(self, None)
self.p = Parameters()
self.links_df = net.links.data
self.nodes_df = net.nodes.data
Expand Down
Loading