Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make all polling updaters wait for graph update finish #6262

Open
wants to merge 4 commits into
base: dev-2.x
Choose a base branch
from

Conversation

miklcct
Copy link
Contributor

@miklcct miklcct commented Nov 19, 2024

Summary

This makes all polling updaters wait for graph update finish before returning.

Issue

Fixes #6252

Unit tests

None. This is a performance issue.

Documentation

None.

Copy link

codecov bot commented Nov 19, 2024

Codecov Report

Attention: Patch coverage is 10.00000% with 18 lines in your changes missing coverage. Please review.

Project coverage is 69.79%. Comparing base (684b841) to head (c74c9e5).
Report is 1 commits behind head on dev-2.x.

Files with missing lines Patch % Lines
...anner/updater/alert/GtfsRealtimeAlertsUpdater.java 0.00% 15 Missing ⚠️
...entripplanner/updater/trip/PollingTripUpdater.java 0.00% 1 Missing ⚠️
...ehicle_position/PollingVehiclePositionUpdater.java 0.00% 1 Missing ⚠️
...r/updater/vehicle_rental/VehicleRentalUpdater.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##             dev-2.x    #6262      +/-   ##
=============================================
- Coverage      69.79%   69.79%   -0.01%     
+ Complexity     17786    17783       -3     
=============================================
  Files           2017     2017              
  Lines          76042    76040       -2     
  Branches        7781     7781              
=============================================
- Hits           53073    53071       -2     
+ Misses         20264    20263       -1     
- Partials        2705     2706       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@miklcct miklcct marked this pull request as ready for review November 19, 2024 14:30
@miklcct miklcct requested a review from a team as a code owner November 19, 2024 14:30
@leonardehrenfried leonardehrenfried changed the title make all polling updater wait for graph update finish Make all polling updaters wait for graph update finish Nov 21, 2024
@t2gran t2gran added this to the 2.7 (next release) milestone Nov 22, 2024
@t2gran t2gran requested a review from vpaturet November 22, 2024 12:11
// Handle update in graph writer runnable
saveResultOnGraph
.execute(context -> updateHandler.update(feed, context.gtfsRealtimeFuzzyTripMatcher()))
.get();
Copy link
Member

@t2gran t2gran Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have a timeout here - what happens if a task take too much time or does not return ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task is to write the update into the graph. I am not sure what will happen if the task to update the graph is forcibly terminated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't want to use the asynchronicity of the Future, why do we even need a Future at all? I think in such a case the method could just return void.

In any case, we need to discuss this in the dev meeting.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After further discussion I finally realized what's going on here. In fact @leonardehrenfried, this is using the Future to delay completion of the polling graph updater until the subtask it enqueues on a different ExecutorService completes. I will comment further on the main discussion thread.

@optionsome optionsome marked this pull request as draft November 26, 2024 10:11
@abyrd
Copy link
Member

abyrd commented Nov 26, 2024

This was shifted to a draft pending profiling and code audit to determine why exactly applying tens of thousands of add updates is so slow (as reported by @miklcct).

# Conflicts:
#	application/src/main/java/org/opentripplanner/updater/alert/GtfsRealtimeAlertsUpdater.java
@miklcct
Copy link
Contributor Author

miklcct commented Nov 27, 2024

Unfortunately I can do nothing on it:

image

The same slowness should also appear when you have tens of thousands of trip updates because most of the running time is spent on updating the timetables of a trip pattern, as you need to sort the trip times when building the timetable, and the TimetableSnapshot only supports applying updates trip by trip (i.e. if you have hundreds of trips updated real times, you need to rebuild the timetable hundreds of times, even if they are on the same pattern). Also the deduplicator spent a lot of time as well.

image

The other problem which caused the original bug was that, the server I used was a slow server which was heavily loaded. Not only it runs OTP, but also it runs the GTFS-RT generator to be consumed locally, and also a few CI agents to build various components as well such as the app. The server was a virtual machine which shared a physical host with other virtual hosts running the production OTP servers as well.

With this fix, it is safe to run all polling updaters with a very small interval, e.g. PT1S, i.e. to poll as frequently as possible to the capability of the server update. I think this fix is absolutely needed, otherwise all the CPU times of the computer will be consumed by the queue of graph updates by all the updaters forever, and the memory getting exhausted, to the eventual state of OTP freezing and needs to be restarted. Only if the users are reporting real time information lagging after this fix we will need to solve performance problems.

@miklcct miklcct marked this pull request as ready for review November 27, 2024 16:09
@t2gran
Copy link
Member

t2gran commented Nov 28, 2024

You could try to see if this change helps (Deduplicator#IntArray):

/** A wrapper for a primitive int array. This is insane but necessary in Java. */
  private static final class IntArray implements Serializable {
    private final int[] array;
    private final int hash;

    public IntArray(int[] array) {
      this.array = array;
      this.hash = Arrays.hashCode(array);
    }

    @Override
    public int hashCode() {
      return hash;
    }

    @Override
    public boolean equals(Object other) {
      if (other instanceof IntArray that) {
        return hash == that.hash && Arrays.equals(array, that.array);
      }
      return false;
    }
  }

I am not to optimistic, but it is wort trying.

@t2gran
Copy link
Member

t2gran commented Nov 28, 2024

You say you get many trip updates for the same pattern, can you provide some numbers?

  • Number of patterns updated
  • Number of trips updated
  • Number of stops updated

The trips should be sorted, so another thing to investigate is:

  • Number of patterns sorted correct before the sort operation is done. It looks like we do a merge sort here - witch should be efficient for already sorted collections, but using insert sort would probably be better. For example:
static void sort(int[] a) {
   for(int i=1; i<a.length; ++i) {
     var temp = a[i];
     int j = i-1;
     if(temp < a[j]) {
       do {
         a[j+1] = a[j];
         --j;
       } while (j >= 0 && temp < a[j]);
       a[j+1] = temp;
     }
   }
 }

Is it so that you ge a lot of updates to trips that does not run at the moment?

@miklcct
Copy link
Contributor Author

miklcct commented Nov 28, 2024

Our upstream system will provide live times up to 24 hours in the future. It is common for trips to be pre-cancelled, or delays posted in advance if there are known disruptions on the network.

The number of trips updated is about 30k.

The number of stops is probably about 20k (a rough approximation only - about 4k stations in the whole network so 4 to 5 platforms each on average will be a good guess).

The number of patterns is a bit hard to guess: on one extreme, a service using different platforms in a station is in a different pattern so most intercity services are in small patterns, while at the other extreme, services such as Elizabeth line are effectively metro services that use the exact same platforms every few minutes, that hundreds of trips are in the same pattern, and all of them have live times for every single stop.

@t2gran
Copy link
Member

t2gran commented Nov 29, 2024

I think you can ignore my code suggestions above - looking at the flamegraph again I see that the time spent doing deduplication and sorting is minimal - the problem is that it is done for each update.

To move forward on this I suggest the following:

  • You should probably look into why this takes so long - I think there is a serious problem. Maybe you are applying the same updates every time?
  • To make a temporary workaround you can create a OTPFeature (WaitOnPollingGraphUpdatersHack), and use this feature flag to feature enable the Future.get() call.

@optionsome
Copy link
Member

    @Override
    public boolean equals(Object other) {
      if (other instanceof IntArray that) {
        return hash == that.hash && Arrays.equals(array, that.array);
      }
      return false;
    }
  }

Isn't this supposed to be return hash == that.hash || Arrays.equals(array, that.array);?

@miklcct
Copy link
Contributor Author

miklcct commented Nov 29, 2024

    @Override
    public boolean equals(Object other) {
      if (other instanceof IntArray that) {
        return hash == that.hash && Arrays.equals(array, that.array);
      }
      return false;
    }
  }

Isn't this supposed to be return hash == that.hash || Arrays.equals(array, that.array);?

The hash being equal doesn't guarantee the contents are equal, I think that the code is correct.

If the hash isn't equal it returns false directly with no need to check the content. If they are equal the content must be checked.

@miklcct
Copy link
Contributor Author

miklcct commented Nov 29, 2024

I think you can ignore my code suggestions above - looking at the flamegraph again I see that the time spent doing deduplication and sorting is minimal - the problem is that it is done for each update.

To move forward on this I suggest the following:

  • You should probably look into why this takes so long - I think there is a serious problem. Maybe you are applying the same updates every time?
  • To make a temporary workaround you can create a OTPFeature (WaitOnPollingGraphUpdatersHack), and use this feature flag to feature enable the Future.get() call.

Yes, I am applying the same updates every time. It is the nature of polling updaters with FULL_DATASET GTFS-RT that you always need to start from the scheduled timetable and apply everything.

@slvlirnoff
Copy link
Member

Do you know the percentage of the update that are modified/replacement trips? (I guess these are to handle platform changes/variations from the planned timetable)

@miklcct
Copy link
Contributor Author

miklcct commented Nov 29, 2024

Do you know the percentage of the update that are modified/replacement trips? (I guess these are to handle platform changes/variations from the planned timetable)

The vast majority of them.

@abyrd
Copy link
Member

abyrd commented Dec 3, 2024

We discussed this PR at length, during and after today's developer meeting. The problem this PR is solving and the mechanism by which it solves the problem were not obvious to everyone, and we agreed it needs some documentation in the PR comments, and then in the code as both short line comments and Javadoc at the head of GraphUpdaterManager.

The context is as follows: GraphUpdaterManager has one ExecutorService with N threads on which the polling graph updaters run in parallel (GraphUpdaterManager#pollingUpdaterPool), and another ExecutorService with a single thread that applies batches of changes to the graph sequentially (GraphUpdaterManager#scheduler).

The central change here at
https://github.com/opentripplanner/OpenTripPlanner/pull/6262/files#diff-21abb8fddc0268062a63e69fc6fbe68d8dc012bd8481eeb2b7c6cf1f8a76efcdR85-R87
is in a code block within a polling graph updater which is run periodically on the multi-threaded pollingUpdaterPool. It fetches an entire GTFS-RT TripUpdates feed, then enqueues a secondary task on the single-threaded scheduler ExecutorService, which serves to "serialize" (in the database sense) all updates to the transit data.

Without this PR, the code block in question immediately returns after it enqueues the task on the single-threaded executor. The timing mechanism that provides the recurring schedule of the polling graph updater has the characteristic that it waits at least a specified number of seconds after the task completes before firing the next instance of the task. But this delay is measured from the time the fetch/parse/enqueue task completes, not from the time that the secondary task it enqueues in the single-threaded scheduler completes.

In the event that that secondary task takes longer to complete than the inter-polling duration, either because the secondary task is large/slow or because it's enqueued behind some other slow task, the polling executor will enqueue another task in the single-threaded executor while the first one has not yet completed. Over time, this leads to a pile-up of large update tasks, with the one currently executing being farther and farther behind the most recent data.

With this PR, the code block cited above is modified to call get() on the Future returned by saveResultOnGraph.execute(). This blocks, delaying completion of the polling graph updater until the subtask it enqueues on the single-threaded ExecutorService completes. The polling interval is then measured from just after the secondary single-threaded task completes, ensuring no task pile-up from this particular polling updater. Slow execution of its updater task effectively applies backpressure on the polling process, slowing it down.

Without this PR we are just ignoring the fact that GraphUpdaterManager.execute() returns a Future; with this PR we are blocking the polling graph updater thread as long as it takes for its subtask to finish.

Potential down sides of this include blocking entire threads for many seconds at a time while waiting for the sub-tasks they enqueue. This does not appear to be a problem as the size of the multi-threaded ExecutorService thread pool is unlimited, with the parameters at its creation specifying only a minimum size, not a maximum. Even in cases with many polling updaters (Entur uses ~50) the number of idle threads should be within reason and have no serious performance impact. And in practice we only expect a few of them to block for perceptible lengths of time.

Another potential concern is that this approach will apply backpressure on any one polling updater, but several updaters producing slow sub-tasks could still cause a pile-up in the single-threaded queue. But on further inspection, apparently this is not the case. By virtue of all polling updaters waiting on completion of tasks that are all in the same single-threaded queue, the poll intervals of all the polling updaters are effectively increased in response to crowding conditions in the single-threaded queue.

One common reaction to these queue pile-ups is that it's a configuration issue, and one should simply increase the polling interval to leave enough time for these large secondary tasks to complete before the next polling operation. The problem with this idea is that the necessary time to process each task can be highly variable depending on operational conditions, of both transit and the server itself. A snowstorm could greatly increase the size of the realtime data file. Other realtime updaters could encounter larger batches of updates even when this one doesn't, delaying its tasks in the single-threaded queue. Heavier routing load on the server might slow down application of realtime updates, etc. We want the polling to slow down dynamically in response to varying levels of contention for the single-threaded graph update executor.

In this PR, the same change (calling get() on the Future) is made to every kind of polling graph updater. It is debatable whether this is necessary for all updaters and/or whether consistency is a good thing, independent of whether a particular updater has a tendency to produce large/slow tasks. An important factor to consider is which polling updaters can produce very large batch updates (taking longer than one second, and sometimes upwards of 15 seconds to apply) and under what circumstances. Another thing to consider is whether there are any downsides to uniformly applying the blocking approach, even when polling updaters finish quickly. Our tentative conclusion in today's meetings was that uniformly applying the blocking approach in this PR has only benefits and will create nice uniform backpressure across the whole system.

Other details are:

  • Interactions with streaming updaters (should they also block, how would they transmit backpressure upstream)
  • Interaction with periodic snapshot tasks, which are enqueued at the head (LIFO not FIFO) of the single-threaded executor.

@abyrd
Copy link
Member

abyrd commented Dec 3, 2024

@t2gran has suggested that the GraphUpdaterManager.execute() should not expose the Future as a return value, but instead expose a blocking function that encapsulates the Future, and somehow applies it conditionally depending on the type of updater, the phase of configuration/execution or other factors. This makes sense to me - the reason I originally misunderstood this PR is that I never realized execute() exposed the Future as a return value.

On the other points I left unfinished above:
This whole discussion is about polling updaters. There's a sense they're the ones responsible for generating large monolithic updates (GTFS FULL_DATASET polling). @t2gran and I agreed that there's no inherent reason polling couldn't efficiently produce incremental update sets, with the client including the last monotonically increasing version number / timestamp from the previous server response it processed to provide a cutoff point, starting at which which the server would send all newer messages in a batch. So we don't want to make any assumptions about polling => large and slow. On the other hand, I think streaming messages can be assumed to be much lighter, much less temporally dense, much more readily interspersed with other single-threaded update tasks. If there are no downsides to blocking execute() of polling tasks (that are typically larger blocks of messages rather than single ones), we might be tempted to apply the same blocking logic to streaming updaters. But it's not clear it would provide any benefit at all in those cases.

@vpaturet pointed out today that the periodic scheduling of snapshot tasks on the single graph updater thread "cuts in line" and lands at the head of the queue rather than its tail. It must wait for any other task that's already running on that single thread, but will be the very next task to run, making the update visible to end users. The delay until it's next triggered is measured from when it finishes, so if a snapshot task is stuck waiting for a large update block to finish, multiple snapshot tasks will not pile up in the queue. They also experience backpressure from congestion on this single thread.

Note that the snapshot tasks and the polling graph updaters are both using this kind of periodic scheduling, but they're running on different executors. The snapshots are periodically scheduled directly on the single-threaded executor; the polling updaters are periodically scheduled on the multi-threaded GraphUpdaterManager#pollingUpdaterPool. So there's no concern about the snapshot tasks and polling updaters trying to cut in line in front of one another. It's the secondary tasks enqueued by the polling updaters that the snapshot tasks are cutting in front of.

@abyrd abyrd self-assigned this Dec 3, 2024
@abyrd
Copy link
Member

abyrd commented Dec 3, 2024

And finally, the underlying question still remains: even with 30k updates, why does it take 15 seconds? There's probably something that can be optimized away there, and switching to streaming updates would probably completely eliminate the problem.

@miklcct
Copy link
Contributor Author

miklcct commented Dec 3, 2024

And finally, the underlying question still remains: even with 30k updates, why does it take 15 seconds? There's probably something that can be optimized away there, and switching to streaming updates would probably completely eliminate the problem.

We will need to do that if we further expand our live time coverage to every single train, metro, bus, tram, ferry, etc., in the whole country but this can definitely help for a medium-sized deployment without setting up another piece of dedicated infrastructure to convert polling to streaming GTFS-RT.

@habrahamsson-skanetrafiken
Copy link
Contributor

@t2gran will come with a suggestion of how to move forward and @abyrd will write some documentation.

Copy link
Member

@t2gran t2gran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you add the feature toggle this should be ok. The WaitOnPullingUpdatersBeforeResheduling should be on(true) by default.

Feel free to find a better name and adding an appropriate documentation for the feature.

Comment on lines +85 to +87
saveResultOnGraph
.execute(context -> updateHandler.update(feed, context.gtfsRealtimeFuzzyTripMatcher()))
.get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good change, but we need to be able to turn this off in case it have some side-effects we have missed. This is critical code, and it is possible that we have not understood all implications. So, it would be nice to introduce a OTPFeature to be able to turn this the synchronisation off. This need to be done for all calls to Feature#get() introduced in this PR.

Suggested change
saveResultOnGraph
.execute(context -> updateHandler.update(feed, context.gtfsRealtimeFuzzyTripMatcher()))
.get();
var f = saveResultOnGraph.execute(context -> updateHandler.update(feed, context.gtfsRealtimeFuzzyTripMatcher()));
if(OTPFeature.WaitOnPullingUpdatersBeforeResheduling.isOn()) {
f.get();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add a method to the updater and a unit test to do that, such that the behaviour is testable.

@vpaturet
Copy link
Contributor

A slightly different approach would be to save the future and test it at the beginning of the next polling round.
By doing so, the thread would block only in the case where the task takes longer than the polling interval, thus reducing the risk of side-effects in the "normal" case where there is sufficient time between polling rounds.
As @t2gran mentioned, this logic should be encapsulated.

private volatile Future<?> previousTask;

....

protected void runPolling() throws InterruptedException, ExecutionException {

    if (previousTask != null && !previousTask.isDone()) {
      LOG.warn("Delaying polling until the previous task is complete");
      long startBlockingWait = System.currentTimeMillis();
      previousTask.get();
      LOG.info("Resuming polling after waiting an additional {}s", (System.currentTimeMillis() - startBlockingWait) / 1000);
    }
    
   ...

   previousTask = saveResultOnGraph.execute(context -> updateHandler.update(feed, context.gtfsRealtimeFuzzyTripMatcher()));

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Real time update is lagging behind when the server is under load
8 participants