Skip to content

Commit

Permalink
LinkTravelTimeContainer: Change internal representation (#813)
Browse files Browse the repository at this point in the history
* Use mutable collection and change format to be `Map[Id[Link], Array[Double]]`

* Whenever is needed use `TravelTimeCalculatorConfigGroup.getMaxTime()` to get max time for iteration:
- Warm start
- PhyssimCalcLinkStats
- BeamCalcLinkStats
- LinkTravelTimeContainer

* Merge remote-tracking branch 'origin/master' into art/reduce-memory-consumption-for-link-traveltime-container-4ci

* Fix after merge

* Added `VolumesAnalyzerFixed` which respects `maxTime`

* Update BeamCalcLinkStatsSpec.scala

* Merge remote-tracking branch 'origin/master' into art/reduce-memory-consumption-for-link-traveltime-container-4ci

* Fixed failing test

* Dummy commit
  • Loading branch information
REASY authored and colinsheppard committed Oct 25, 2018
1 parent 3381c5b commit 672c46d
Showing 15 changed files with 157 additions and 87 deletions.
13 changes: 7 additions & 6 deletions src/main/java/beam/analysis/physsim/PhyssimCalcLinkStats.java
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@

import beam.sim.config.BeamConfig;
import beam.utils.BeamCalcLinkStats;
import beam.utils.VolumesAnalyzerFixed;
import org.jfree.chart.*;
import org.jfree.chart.plot.CategoryPlot;
import org.jfree.chart.plot.PlotOrientation;
@@ -11,6 +12,7 @@
import org.matsim.api.core.v01.network.Link;
import org.matsim.api.core.v01.network.Network;
import org.matsim.core.api.experimental.events.EventsManager;
import org.matsim.core.config.groups.TravelTimeCalculatorConfigGroup;
import org.matsim.core.controler.OutputDirectoryHierarchy;
import org.matsim.core.router.util.TravelTime;
import org.matsim.core.trafficmonitoring.TravelTimeCalculator;
@@ -59,7 +61,8 @@ public class PhyssimCalcLinkStats {
private BeamCalcLinkStats linkStats;
private VolumesAnalyzer volumes;

public PhyssimCalcLinkStats(Network network, OutputDirectoryHierarchy controlerIO, BeamConfig beamConfig) {
public PhyssimCalcLinkStats(Network network, OutputDirectoryHierarchy controlerIO, BeamConfig beamConfig,
TravelTimeCalculatorConfigGroup ttcConfigGroup) {
this.network = network;
this.controllerIO = controlerIO;
this.beamConfig = beamConfig;
@@ -74,7 +77,7 @@ public PhyssimCalcLinkStats(Network network, OutputDirectoryHierarchy controlerI
noOfBins = _noOfTimeBins.intValue() + 1;
}

linkStats = new BeamCalcLinkStats(network);
linkStats = new BeamCalcLinkStats(network, ttcConfigGroup);
}

public void notifyIterationEnds(int iteration, TravelTimeCalculator travelTimeCalculator) {
@@ -266,12 +269,10 @@ private Color getRandomColor() {
return new Color(r, g, b);
}

public void notifyIterationStarts(EventsManager eventsManager) {

public void notifyIterationStarts(EventsManager eventsManager, TravelTimeCalculatorConfigGroup travelTimeCalculatorConfigGroup) {
this.linkStats.reset();
volumes = new VolumesAnalyzer(3600, 24 * 3600 - 1, network);
volumes = new VolumesAnalyzerFixed(3600, travelTimeCalculatorConfigGroup.getMaxTime() - 1, network);
eventsManager.addHandler(volumes);

this.relativeSpeedFrequenciesPerBin.clear();
}

Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@
import beam.analysis.via.EventWriterXML_viaCompatible;
import beam.calibration.impl.example.CountsObjectiveFunction;
import beam.router.BeamRouter;
import beam.sim.BeamServices;
import beam.sim.common.GeoUtils;
import beam.sim.config.BeamConfig;
import beam.sim.metrics.MetricsSupport;
@@ -80,26 +81,25 @@ public AgentSimToPhysSimPlanConverter(EventsManager eventsManager,
TransportNetwork transportNetwork,
OutputDirectoryHierarchy controlerIO,
Scenario scenario,
GeoUtils geoUtils,
ActorRef router,
BeamConfig beamConfig) {
BeamServices beamServices) {

eventsManager.addHandler(this);
this.controlerIO = controlerIO;
this.router = router;
this.beamConfig = beamConfig;
this.router = beamServices.beamRouter();
this.beamConfig = beamServices.beamConfig();
this.rand.setSeed(beamConfig.matsim().modules().global().randomSeed());
agentSimScenario = scenario;
agentSimPhysSimInterfaceDebuggerEnabled = beamConfig.beam().physsim().jdeqsim().agentSimPhysSimInterfaceDebugger().enabled();

if (agentSimPhysSimInterfaceDebuggerEnabled) {
log.warn("AgentSimPhysSimInterfaceDebugger is enabled");
agentSimPhysSimInterfaceDebugger = new AgentSimPhysSimInterfaceDebugger(geoUtils, transportNetwork);
agentSimPhysSimInterfaceDebugger = new AgentSimPhysSimInterfaceDebugger(beamServices.geo(), transportNetwork);
}

preparePhysSimForNewIteration();

linkStatsGraph = new PhyssimCalcLinkStats(agentSimScenario.getNetwork(), controlerIO, beamConfig);
linkStatsGraph = new PhyssimCalcLinkStats(agentSimScenario.getNetwork(), controlerIO, beamServices.beamConfig(),
beamServices.travelTimeCalculatorConfigGroup());
linkSpeedStatsGraph = new PhyssimCalcLinkSpeedStats(agentSimScenario.getNetwork(), controlerIO, beamConfig);
linkSpeedDistributionStatsGraph = new PhyssimCalcLinkSpeedDistributionStats(agentSimScenario.getNetwork(), controlerIO, beamConfig);
}
@@ -140,7 +140,7 @@ private void setupActorsAndRunPhysSim(int iterationNumber) {
config.setSimulationEndTime(beamConfig.matsim().modules().qsim().endTime());
JDEQSimulation jdeqSimulation = new JDEQSimulation(config, jdeqSimScenario, jdeqsimEvents);

linkStatsGraph.notifyIterationStarts(jdeqsimEvents);
linkStatsGraph.notifyIterationStarts(jdeqsimEvents, agentSimScenario.getConfig().travelTimeCalculator());

log.info("JDEQSim Start");
startSegment("jdeqsim-execution", "jdeqsim");
6 changes: 4 additions & 2 deletions src/main/java/beam/utils/BeamCalcLinkStats.java
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
import org.matsim.api.core.v01.Id;
import org.matsim.api.core.v01.network.Link;
import org.matsim.api.core.v01.network.Network;
import org.matsim.core.config.groups.TravelTimeCalculatorConfigGroup;
import org.matsim.core.router.util.TravelTime;
import org.matsim.core.utils.io.IOUtils;
import org.slf4j.Logger;
@@ -35,6 +36,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

public class BeamCalcLinkStats {

@@ -50,10 +52,10 @@ public class BeamCalcLinkStats {
private int count = 0;

@Inject
public BeamCalcLinkStats(final Network network) {
public BeamCalcLinkStats(final Network network, final TravelTimeCalculatorConfigGroup ttConfigGroup) {
this.network = network;
this.linkData = new ConcurrentHashMap<>();
this.nofHours = 24;
this.nofHours = (int)TimeUnit.SECONDS.toHours(ttConfigGroup.getMaxTime());
reset();
}

52 changes: 52 additions & 0 deletions src/main/java/beam/utils/VolumesAnalyzerFixed.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package beam.utils;

import org.apache.log4j.Logger;
import org.matsim.analysis.VolumesAnalyzer;
import org.matsim.api.core.v01.Id;
import org.matsim.api.core.v01.network.Link;
import org.matsim.api.core.v01.network.Network;

import java.util.concurrent.TimeUnit;

public class VolumesAnalyzerFixed extends VolumesAnalyzer {
private final static Logger log = Logger.getLogger(VolumesAnalyzerFixed.class);

private final int timeBinSize;
private final int maxTime;
private final int maxSlotIndex;
private final int maxHour;

public VolumesAnalyzerFixed(int timeBinSize, int maxTime, Network network) {
super(timeBinSize, maxTime, network);
this.timeBinSize = timeBinSize;
this.maxTime = maxTime;
this.maxSlotIndex = (this.maxTime / this.timeBinSize) + 1;
this.maxHour = (int)TimeUnit.SECONDS.toHours(this.maxTime + 1);
}
@Override
public double[] getVolumesPerHourForLink(final Id<Link> linkId) {
if (3600.0 % this.timeBinSize != 0) log.error("Volumes per hour and per link probably not correct!");

double[] volumes = new double[this.maxHour];

int[] volumesForLink = this.getVolumesForLink(linkId);
if (volumesForLink == null) return volumes;

int slotsPerHour = (int)(3600.0 / this.timeBinSize);
for (int hour = 0; hour < this.maxHour; hour++) {
double time = hour * 3600.0;
for (int i = 0; i < slotsPerHour; i++) {
volumes[hour] += volumesForLink[this.getTimeSlotIndex(time)];
time += this.timeBinSize;
}
}
return volumes;
}

private int getTimeSlotIndex(final double time) {
if (time > this.maxTime) {
return this.maxSlotIndex;
}
return ((int)time / this.timeBinSize);
}
}
75 changes: 38 additions & 37 deletions src/main/scala/beam/router/LinkTravelTimeContainer.scala
Original file line number Diff line number Diff line change
@@ -10,64 +10,65 @@ import org.matsim.api.core.v01.population.Person
import org.matsim.core.router.util.TravelTime
import org.matsim.vehicles.Vehicle

class LinkTravelTimeContainer(fileName: String, timeBinSizeInSeconds: Int) extends TravelTime with LazyLogging {
import scala.collection.mutable

private var linkTravelTimeMap: Map[Id[Link], Map[Int, Double]] = Map()
class LinkTravelTimeContainer(fileName: String, timeBinSizeInSeconds: Int, maxHour: Int) extends TravelTime with LazyLogging {
val linkTravelTimeMap: scala.collection.Map[Id[Link], Array[Double]] = loadLinkStats()

private def loadLinkStats(): Unit = {
def loadLinkStats(): scala.collection.Map[Id[Link], Array[Double]] = {
val start = System.currentTimeMillis()
val linkTravelTimeMap: mutable.HashMap[Id[Link], Array[Double]] = mutable.HashMap()
logger.debug(s"Stats fileName -> $fileName is being loaded")

val gzipStream = new GZIPInputStream(new FileInputStream(fileName))
val bufferedReader = new BufferedReader(new InputStreamReader(gzipStream))
// Source.fromInputStream(gzipStream).getLines()
// .map(_.split(",")).filter(_ (7).equalsIgnoreCase("avg")).foreach(linkStats => {
//
// })

var line: String = null

while ({
line = bufferedReader.readLine
line != null
}) {
val linkStats = line.split(",")

if (linkStats.length == 10 && "avg".equalsIgnoreCase(linkStats(7))) {
val linkId = Id.createLinkId(linkStats(0))
val hour = linkStats(3).toDouble.toInt
val travelTime = linkStats(9).toDouble

val travelTimes = linkTravelTimeMap.get(linkId) match {
case Some(linkTravelTime) =>
linkTravelTime + (hour -> travelTime)
case None =>
Map(hour -> travelTime)
try {
var line: String = null
while ({
line = bufferedReader.readLine
line != null
}) {
val linkStats = line.split(",")
if (linkStats.length == 10 && "avg".equalsIgnoreCase(linkStats(7))) {
val linkId = Id.createLinkId(linkStats(0))
val hour = linkStats(3).toDouble.toInt
val travelTime = linkStats(9).toDouble
linkTravelTimeMap.get(linkId) match {
case Some(travelTimePerHourArr) =>
travelTimePerHourArr.update(hour, travelTime)
case None =>
val travelTimePerHourArr = Array.ofDim[Double](maxHour)
travelTimePerHourArr.update(hour, travelTime)
linkTravelTimeMap.put(linkId, travelTimePerHourArr)
}
}
linkTravelTimeMap += (linkId -> travelTimes)
}
}
finally {
bufferedReader.close()
gzipStream.close()
}
val end = System.currentTimeMillis()
logger.info("LinkTravelTimeMap is initialized in {} ms", end - start)

logger.debug("LinkTravelTimeMap is initialized")
linkTravelTimeMap
}

def getLinkTravelTime(link: Link, time: Double, person: Person, vehicle: Vehicle): Double = {
linkTravelTimeMap.get(link.getId) match {
case Some(linkTravelTime) =>
linkTravelTime.get(getSlot(time)) match {
case Some(travelTime) =>
travelTime
case None =>
link.getFreespeed
case Some(traveTimePerHour) =>
val idx = getSlot(time)
if (idx < traveTimePerHour.size) traveTimePerHour(idx)
else {
logger.warn("Got {} as index for traveTimePerHour with max size {}. Something might be wrong!", idx, maxHour)
link.getFreespeed
}
case None =>
link.getFreespeed
}
}

private def getSlot(time: Double): Int = {

Math.round(Math.floor(time / timeBinSizeInSeconds)).toInt
}

loadLinkStats()
}
3 changes: 3 additions & 0 deletions src/main/scala/beam/router/r5/R5RoutingWorker.scala
Original file line number Diff line number Diff line change
@@ -43,6 +43,7 @@ import com.typesafe.config.Config
import org.matsim.api.core.v01.network.Network
import org.matsim.api.core.v01.population.Person
import org.matsim.api.core.v01.{Coord, Id}
import org.matsim.core.config.groups.TravelTimeCalculatorConfigGroup
import org.matsim.core.controler.ControlerI
import org.matsim.core.router.util.TravelTime
import org.matsim.core.scenario.{MutableScenario, ScenarioUtils}
@@ -123,6 +124,8 @@ class R5RoutingWorker(workerParams: WorkerParameters) extends Actor with ActorLo
override def matsimServices: org.matsim.core.controler.MatsimServices = ???

override def rideHailIterationHistoryActor: akka.actor.ActorRef = ???

override val travelTimeCalculatorConfigGroup: TravelTimeCalculatorConfigGroup = ???
}

val initializer = new TransitInitializer(beamServices, transportNetwork, scenario.getTransitVehicles)
4 changes: 3 additions & 1 deletion src/main/scala/beam/sim/BeamHelper.scala
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ package beam.sim
import java.io.FileOutputStream
import java.nio.file.{Files, Paths, StandardCopyOption}
import java.util.Properties
import java.util.concurrent.TimeUnit

import beam.agentsim.agents.ridehail.RideHailSurgePricingManager
import beam.agentsim.events.handling.BeamEventsHandling
@@ -357,7 +358,8 @@ trait BeamHelper extends LazyLogging {
val networkCoordinator = new NetworkCoordinator(beamConfig)
networkCoordinator.loadNetwork()

val beamWarmStart = BeamWarmStart(beamConfig)
val maxHour = TimeUnit.SECONDS.toHours(new TravelTimeCalculatorConfigGroup().getMaxTime).toInt
val beamWarmStart = BeamWarmStart(beamConfig, maxHour)
beamWarmStart.warmStartPopulation(matsimConfig)

val scenario = ScenarioUtils.loadScenario(matsimConfig).asInstanceOf[MutableScenario]
11 changes: 5 additions & 6 deletions src/main/scala/beam/sim/BeamMobsim.scala
Original file line number Diff line number Diff line change
@@ -12,11 +12,7 @@ import akka.pattern.ask
import akka.util.Timeout
import beam.agentsim.agents.BeamAgent.Finish
import beam.agentsim.agents.modalbehaviors.DrivesVehicle.BeamVehicleStateUpdate
import beam.agentsim.agents.ridehail.RideHailManager.{
BufferedRideHailRequestsTimeout,
NotifyIterationEnds,
RideHailAllocationManagerTimeout
}
import beam.agentsim.agents.ridehail.RideHailManager.{BufferedRideHailRequestsTimeout, NotifyIterationEnds, RideHailAllocationManagerTimeout}
import beam.agentsim.agents.ridehail.{RideHailAgent, RideHailManager, RideHailSurgePricingManager}
import beam.agentsim.agents.vehicles.EnergyEconomyAttributes.Powertrain
import beam.agentsim.agents.vehicles._
@@ -70,6 +66,8 @@ class BeamMobsim @Inject()(

val rideHailHouseholds: mutable.Set[Id[Household]] = mutable.Set()

val MaxHour: Int = 24

var debugActorWithTimerActorRef: ActorRef = _
var debugActorWithTimerCancellable: Cancellable = _
/*
@@ -358,7 +356,8 @@ class BeamMobsim @Inject()(
Await.result(beamServices.beamRouter ? InitTransit(scheduler, parkingManager), timeout.duration)

if (beamServices.iterationNumber == 0) {
val warmStart = BeamWarmStart(beamServices.beamConfig)
val maxHour = TimeUnit.SECONDS.toHours(beamServices.travelTimeCalculatorConfigGroup.getMaxTime).toInt
val warmStart = BeamWarmStart(beamServices.beamConfig, maxHour)
warmStart.warmStartTravelTime(beamServices.beamRouter)
}

6 changes: 5 additions & 1 deletion src/main/scala/beam/sim/BeamServices.scala
Original file line number Diff line number Diff line change
@@ -20,14 +20,14 @@ import beam.utils.{DateUtils, FileUtils}
import com.google.inject.{ImplementedBy, Inject, Injector}
import org.matsim.api.core.v01.population.Person
import org.matsim.api.core.v01.{Coord, Id}
import org.matsim.core.config.groups.TravelTimeCalculatorConfigGroup
import org.matsim.core.controler._
import org.matsim.core.utils.collections.QuadTree
import org.matsim.households.Household
import org.slf4j.LoggerFactory
import org.supercsv.io.CsvMapReader
import org.supercsv.prefs.CsvPreference

import scala.collection.JavaConverters._
import scala.collection.concurrent.TrieMap
import scala.concurrent.duration.FiniteDuration

@@ -39,6 +39,8 @@ trait BeamServices extends ActorInject {
val controler: ControlerI
val beamConfig: BeamConfig

val travelTimeCalculatorConfigGroup: TravelTimeCalculatorConfigGroup

val geo: GeoUtils
var modeChoiceCalculatorFactory: ModeChoiceCalculatorFactory
val dates: DateUtils
@@ -65,6 +67,8 @@ class BeamServicesImpl @Inject()(val injector: Injector) extends BeamServices {

val geo: GeoUtils = injector.getInstance(classOf[GeoUtils])

val travelTimeCalculatorConfigGroup: TravelTimeCalculatorConfigGroup = injector.getInstance(classOf[TravelTimeCalculatorConfigGroup])

val dates: DateUtils = DateUtils(
ZonedDateTime.parse(beamConfig.beam.routing.baseDate).toLocalDateTime,
ZonedDateTime.parse(beamConfig.beam.routing.baseDate)
4 changes: 1 addition & 3 deletions src/main/scala/beam/sim/BeamSim.scala
Original file line number Diff line number Diff line change
@@ -112,9 +112,7 @@ class BeamSim @Inject()(
transportNetwork,
event.getServices.getControlerIO,
scenario,
beamServices.geo,
beamServices.beamRouter,
beamServices.beamConfig
beamServices
)
}

Loading

0 comments on commit 672c46d

Please sign in to comment.