From 12b49957ece1db3f3295c251c93186ac0527d964 Mon Sep 17 00:00:00 2001 From: Michael Tsang Date: Tue, 19 Nov 2024 11:37:22 +0000 Subject: [PATCH] make all polling updater wait for graph update finish --- .../alert/GtfsRealtimeAlertsUpdater.java | 45 +++++++++---------- .../updater/trip/PollingTripUpdater.java | 5 ++- .../VehicleParkingAvailabilityUpdater.java | 5 ++- .../VehicleParkingUpdater.java | 5 ++- .../PollingVehiclePositionUpdater.java | 5 ++- .../vehicle_rental/VehicleRentalUpdater.java | 5 ++- 6 files changed, 36 insertions(+), 34 deletions(-) diff --git a/application/src/main/java/org/opentripplanner/updater/alert/GtfsRealtimeAlertsUpdater.java b/application/src/main/java/org/opentripplanner/updater/alert/GtfsRealtimeAlertsUpdater.java index b71dad6a656..0e7ab35cb13 100644 --- a/application/src/main/java/org/opentripplanner/updater/alert/GtfsRealtimeAlertsUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/alert/GtfsRealtimeAlertsUpdater.java @@ -2,6 +2,7 @@ import com.google.transit.realtime.GtfsRealtime.FeedMessage; import java.net.URI; +import java.util.concurrent.ExecutionException; import org.opentripplanner.framework.io.OtpHttpClient; import org.opentripplanner.framework.io.OtpHttpClientFactory; import org.opentripplanner.routing.impl.TransitAlertServiceImpl; @@ -63,32 +64,28 @@ public String toString() { } @Override - protected void runPolling() { - try { - final FeedMessage feed = otpHttpClient.getAndMap( - URI.create(url), - this.headers.asMap(), - FeedMessage.PARSER::parseFrom - ); + protected void runPolling() throws InterruptedException, ExecutionException { + final FeedMessage feed = otpHttpClient.getAndMap( + URI.create(url), + this.headers.asMap(), + FeedMessage.PARSER::parseFrom + ); - long feedTimestamp = feed.getHeader().getTimestamp(); - if (feedTimestamp == lastTimestamp) { - LOG.debug("Ignoring feed with a timestamp that has not been updated from {}", url); - return; - } - if (feedTimestamp < lastTimestamp) { - LOG.info("Ignoring feed with older than previous timestamp from {}", url); - return; - } + long feedTimestamp = feed.getHeader().getTimestamp(); + if (feedTimestamp == lastTimestamp) { + LOG.debug("Ignoring feed with a timestamp that has not been updated from {}", url); + return; + } + if (feedTimestamp < lastTimestamp) { + LOG.info("Ignoring feed with older than previous timestamp from {}", url); + return; + } - // Handle update in graph writer runnable - saveResultOnGraph.execute(context -> - updateHandler.update(feed, context.gtfsRealtimeFuzzyTripMatcher()) - ); + // Handle update in graph writer runnable + saveResultOnGraph + .execute(context -> updateHandler.update(feed, context.gtfsRealtimeFuzzyTripMatcher())) + .get(); - lastTimestamp = feedTimestamp; - } catch (Exception e) { - LOG.error("Error reading gtfs-realtime feed from " + url, e); - } + lastTimestamp = feedTimestamp; } } diff --git a/application/src/main/java/org/opentripplanner/updater/trip/PollingTripUpdater.java b/application/src/main/java/org/opentripplanner/updater/trip/PollingTripUpdater.java index c725c8b1088..42f24031839 100644 --- a/application/src/main/java/org/opentripplanner/updater/trip/PollingTripUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/trip/PollingTripUpdater.java @@ -2,6 +2,7 @@ import com.google.transit.realtime.GtfsRealtime.TripUpdate; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import org.opentripplanner.updater.spi.PollingGraphUpdater; import org.opentripplanner.updater.spi.UpdateResult; @@ -73,7 +74,7 @@ public void setup(WriteToGraphCallback writeToGraphCallback) { * applies those updates to the graph. */ @Override - public void runPolling() { + public void runPolling() throws InterruptedException, ExecutionException { // Get update lists from update source List updates = updateSource.getUpdates(); var incrementality = updateSource.incrementalityOfLastUpdates(); @@ -89,7 +90,7 @@ public void runPolling() { feedId, recordMetrics ); - saveResultOnGraph.execute(runnable); + saveResultOnGraph.execute(runnable).get(); } } diff --git a/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingAvailabilityUpdater.java b/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingAvailabilityUpdater.java index b23f46522c3..bc5217f7534 100644 --- a/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingAvailabilityUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingAvailabilityUpdater.java @@ -2,6 +2,7 @@ import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; import org.opentripplanner.routing.vehicle_parking.VehicleParking; @@ -49,12 +50,12 @@ public void setup(WriteToGraphCallback writeToGraphCallback) { } @Override - protected void runPolling() { + protected void runPolling() throws InterruptedException, ExecutionException { if (source.update()) { var updates = source.getUpdates(); var graphWriterRunnable = new AvailabilityUpdater(updates); - saveResultOnGraph.execute(graphWriterRunnable); + saveResultOnGraph.execute(graphWriterRunnable).get(); } } diff --git a/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingUpdater.java b/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingUpdater.java index 830429151b4..60a7b306052 100644 --- a/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingUpdater.java @@ -6,6 +6,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; import org.opentripplanner.routing.graph.Graph; @@ -69,7 +70,7 @@ public void setup(WriteToGraphCallback writeToGraphCallback) { } @Override - protected void runPolling() { + protected void runPolling() throws InterruptedException, ExecutionException { LOG.debug("Updating vehicle parkings from {}", source); if (!source.update()) { LOG.debug("No updates"); @@ -81,7 +82,7 @@ protected void runPolling() { VehicleParkingGraphWriterRunnable graphWriterRunnable = new VehicleParkingGraphWriterRunnable( vehicleParkings ); - saveResultOnGraph.execute(graphWriterRunnable); + saveResultOnGraph.execute(graphWriterRunnable).get(); } private class VehicleParkingGraphWriterRunnable implements GraphWriterRunnable { diff --git a/application/src/main/java/org/opentripplanner/updater/vehicle_position/PollingVehiclePositionUpdater.java b/application/src/main/java/org/opentripplanner/updater/vehicle_position/PollingVehiclePositionUpdater.java index 4c487ac997b..a4d36f1d1c3 100644 --- a/application/src/main/java/org/opentripplanner/updater/vehicle_position/PollingVehiclePositionUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/vehicle_position/PollingVehiclePositionUpdater.java @@ -3,6 +3,7 @@ import com.google.transit.realtime.GtfsRealtime.VehiclePosition; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutionException; import org.opentripplanner.service.realtimevehicles.RealtimeVehicleRepository; import org.opentripplanner.service.realtimevehicles.model.RealtimeVehicle; import org.opentripplanner.standalone.config.routerconfig.updaters.VehiclePositionsUpdaterConfig; @@ -64,7 +65,7 @@ public void setup(WriteToGraphCallback writeToGraphCallback) { * applies those updates to the graph. */ @Override - public void runPolling() { + public void runPolling() throws InterruptedException, ExecutionException { // Get update lists from update source List updates = vehiclePositionSource.getPositions(); @@ -77,7 +78,7 @@ public void runPolling() { fuzzyTripMatching, updates ); - saveResultOnGraph.execute(runnable); + saveResultOnGraph.execute(runnable).get(); } } diff --git a/application/src/main/java/org/opentripplanner/updater/vehicle_rental/VehicleRentalUpdater.java b/application/src/main/java/org/opentripplanner/updater/vehicle_rental/VehicleRentalUpdater.java index 24686edce6c..c8030e5492a 100644 --- a/application/src/main/java/org/opentripplanner/updater/vehicle_rental/VehicleRentalUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/vehicle_rental/VehicleRentalUpdater.java @@ -8,6 +8,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.Stream; import org.opentripplanner.routing.linking.DisposableEdgeCollection; @@ -124,7 +125,7 @@ public String getConfigRef() { } @Override - protected void runPolling() { + protected void runPolling() throws InterruptedException, ExecutionException { LOG.debug("Updating vehicle rental stations from {}", nameForLogging); if (!source.update()) { LOG.debug("No updates from {}", nameForLogging); @@ -138,7 +139,7 @@ protected void runPolling() { stations, geofencingZones ); - saveResultOnGraph.execute(graphWriterRunnable); + saveResultOnGraph.execute(graphWriterRunnable).get(); } private class VehicleRentalGraphWriterRunnable implements GraphWriterRunnable {