diff --git a/application/src/main/java/org/opentripplanner/updater/alert/GtfsRealtimeAlertsUpdater.java b/application/src/main/java/org/opentripplanner/updater/alert/GtfsRealtimeAlertsUpdater.java index a5be5ef4185..0e7ab35cb13 100644 --- a/application/src/main/java/org/opentripplanner/updater/alert/GtfsRealtimeAlertsUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/alert/GtfsRealtimeAlertsUpdater.java @@ -2,6 +2,7 @@ import com.google.transit.realtime.GtfsRealtime.FeedMessage; import java.net.URI; +import java.util.concurrent.ExecutionException; import org.opentripplanner.framework.io.OtpHttpClient; import org.opentripplanner.framework.io.OtpHttpClientFactory; import org.opentripplanner.routing.impl.TransitAlertServiceImpl; @@ -63,32 +64,28 @@ public String toString() { } @Override - protected void runPolling() { - try { - final FeedMessage feed = otpHttpClient.getAndMap( - URI.create(url), - this.headers.asMap(), - FeedMessage.PARSER::parseFrom - ); + protected void runPolling() throws InterruptedException, ExecutionException { + final FeedMessage feed = otpHttpClient.getAndMap( + URI.create(url), + this.headers.asMap(), + FeedMessage.PARSER::parseFrom + ); - long feedTimestamp = feed.getHeader().getTimestamp(); - if (feedTimestamp == lastTimestamp) { - LOG.debug("Ignoring feed with a timestamp that has not been updated from {}", url); - return; - } - if (feedTimestamp < lastTimestamp) { - LOG.info("Ignoring feed with older than previous timestamp from {}", url); - return; - } + long feedTimestamp = feed.getHeader().getTimestamp(); + if (feedTimestamp == lastTimestamp) { + LOG.debug("Ignoring feed with a timestamp that has not been updated from {}", url); + return; + } + if (feedTimestamp < lastTimestamp) { + LOG.info("Ignoring feed with older than previous timestamp from {}", url); + return; + } - // Handle update in graph writer runnable - saveResultOnGraph.execute(context -> - updateHandler.update(feed, context.gtfsRealtimeFuzzyTripMatcher()) - ); + // Handle update in graph writer runnable + saveResultOnGraph + .execute(context -> updateHandler.update(feed, context.gtfsRealtimeFuzzyTripMatcher())) + .get(); - lastTimestamp = feedTimestamp; - } catch (Exception e) { - LOG.error("Failed to process GTFS-RT Alerts feed from {}", url, e); - } + lastTimestamp = feedTimestamp; } } diff --git a/application/src/main/java/org/opentripplanner/updater/trip/PollingTripUpdater.java b/application/src/main/java/org/opentripplanner/updater/trip/PollingTripUpdater.java index c725c8b1088..42f24031839 100644 --- a/application/src/main/java/org/opentripplanner/updater/trip/PollingTripUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/trip/PollingTripUpdater.java @@ -2,6 +2,7 @@ import com.google.transit.realtime.GtfsRealtime.TripUpdate; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import org.opentripplanner.updater.spi.PollingGraphUpdater; import org.opentripplanner.updater.spi.UpdateResult; @@ -73,7 +74,7 @@ public void setup(WriteToGraphCallback writeToGraphCallback) { * applies those updates to the graph. */ @Override - public void runPolling() { + public void runPolling() throws InterruptedException, ExecutionException { // Get update lists from update source List updates = updateSource.getUpdates(); var incrementality = updateSource.incrementalityOfLastUpdates(); @@ -89,7 +90,7 @@ public void runPolling() { feedId, recordMetrics ); - saveResultOnGraph.execute(runnable); + saveResultOnGraph.execute(runnable).get(); } } diff --git a/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingAvailabilityUpdater.java b/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingAvailabilityUpdater.java index e548d5d75be..64f8ca81763 100644 --- a/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingAvailabilityUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingAvailabilityUpdater.java @@ -2,6 +2,7 @@ import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; import org.opentripplanner.service.vehicleparking.VehicleParkingRepository; @@ -49,12 +50,12 @@ public void setup(WriteToGraphCallback writeToGraphCallback) { } @Override - protected void runPolling() { + protected void runPolling() throws InterruptedException, ExecutionException { if (source.update()) { var updates = source.getUpdates(); var graphWriterRunnable = new AvailabilityUpdater(updates); - saveResultOnGraph.execute(graphWriterRunnable); + saveResultOnGraph.execute(graphWriterRunnable).get(); } } diff --git a/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingUpdater.java b/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingUpdater.java index 8e4cf8a862b..4116486ee2d 100644 --- a/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingUpdater.java @@ -6,6 +6,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; import org.opentripplanner.routing.graph.Graph; @@ -69,7 +70,7 @@ public void setup(WriteToGraphCallback writeToGraphCallback) { } @Override - protected void runPolling() { + protected void runPolling() throws InterruptedException, ExecutionException { LOG.debug("Updating vehicle parkings from {}", source); if (!source.update()) { LOG.debug("No updates"); @@ -81,7 +82,7 @@ protected void runPolling() { VehicleParkingGraphWriterRunnable graphWriterRunnable = new VehicleParkingGraphWriterRunnable( vehicleParkings ); - saveResultOnGraph.execute(graphWriterRunnable); + saveResultOnGraph.execute(graphWriterRunnable).get(); } private class VehicleParkingGraphWriterRunnable implements GraphWriterRunnable { diff --git a/application/src/main/java/org/opentripplanner/updater/vehicle_position/PollingVehiclePositionUpdater.java b/application/src/main/java/org/opentripplanner/updater/vehicle_position/PollingVehiclePositionUpdater.java index 4c487ac997b..a4d36f1d1c3 100644 --- a/application/src/main/java/org/opentripplanner/updater/vehicle_position/PollingVehiclePositionUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/vehicle_position/PollingVehiclePositionUpdater.java @@ -3,6 +3,7 @@ import com.google.transit.realtime.GtfsRealtime.VehiclePosition; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutionException; import org.opentripplanner.service.realtimevehicles.RealtimeVehicleRepository; import org.opentripplanner.service.realtimevehicles.model.RealtimeVehicle; import org.opentripplanner.standalone.config.routerconfig.updaters.VehiclePositionsUpdaterConfig; @@ -64,7 +65,7 @@ public void setup(WriteToGraphCallback writeToGraphCallback) { * applies those updates to the graph. */ @Override - public void runPolling() { + public void runPolling() throws InterruptedException, ExecutionException { // Get update lists from update source List updates = vehiclePositionSource.getPositions(); @@ -77,7 +78,7 @@ public void runPolling() { fuzzyTripMatching, updates ); - saveResultOnGraph.execute(runnable); + saveResultOnGraph.execute(runnable).get(); } } diff --git a/application/src/main/java/org/opentripplanner/updater/vehicle_rental/VehicleRentalUpdater.java b/application/src/main/java/org/opentripplanner/updater/vehicle_rental/VehicleRentalUpdater.java index 24686edce6c..c8030e5492a 100644 --- a/application/src/main/java/org/opentripplanner/updater/vehicle_rental/VehicleRentalUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/vehicle_rental/VehicleRentalUpdater.java @@ -8,6 +8,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.Stream; import org.opentripplanner.routing.linking.DisposableEdgeCollection; @@ -124,7 +125,7 @@ public String getConfigRef() { } @Override - protected void runPolling() { + protected void runPolling() throws InterruptedException, ExecutionException { LOG.debug("Updating vehicle rental stations from {}", nameForLogging); if (!source.update()) { LOG.debug("No updates from {}", nameForLogging); @@ -138,7 +139,7 @@ protected void runPolling() { stations, geofencingZones ); - saveResultOnGraph.execute(graphWriterRunnable); + saveResultOnGraph.execute(graphWriterRunnable).get(); } private class VehicleRentalGraphWriterRunnable implements GraphWriterRunnable {