Skip to content

Commit

Permalink
Force synchronous upload and reuse of possibly modified spawn outputs
Browse files Browse the repository at this point in the history
When an action may modify a spawn's outputs after execution, the upload of outputs to the cache and reuse for deduplicated actions need to happen synchronously directly after spawn execution to avoid a race.

This commit implements this for cache uploads by marking all actions with this property and simply disabling async upload for all spawns executed by such actions.

For output reuse, all executions deduplicated against the first one register atomically upon deduplication and cause the cache upload to wait for all of them to complete reuse.

Fixes #22501
Fixes #23288
Work towards #21578
Closes #23307 (no longer needed)

Closes #23382.

PiperOrigin-RevId: 668101364
Change-Id: Ice75dbe14a7dd46e02ecb096d2b2a30940216356
  • Loading branch information
fmeum authored and copybara-github committed Aug 27, 2024
1 parent f316312 commit bc275b7
Show file tree
Hide file tree
Showing 7 changed files with 312 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,14 @@ default String getProgressMessage(RepositoryMapping mainRepositoryMapping) {
default boolean mayInsensitivelyPropagateInputs() {
return false;
}

/**
* Returns true if the action may modify spawn outputs after the spawn has executed.
*
* <p>If this returns true, any kind of spawn output caching or reuse needs to happen
* synchronously directly after the spawn execution.
*/
default boolean mayModifySpawnOutputsAfterExecution() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,16 @@ private static ImmutableSet<Artifact> nonNullAsSet(Artifact... artifacts) {
this.isExecutedOnWindows = isExecutedOnWindows;
}

@Override
public boolean mayModifySpawnOutputsAfterExecution() {
// Test actions modify test spawn outputs after execution:
// - if there are multiple attempts (unavoidable);
// - in all cases due to appending any stray stderr output to the test log in
// StandaloneTestStrategy.
// TODO: Get rid of the second case and only return true if there are multiple attempts.
return true;
}

public boolean isExecutedOnWindows() {
return isExecutedOnWindows;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1333,6 +1333,14 @@ public InMemoryOutput downloadOutputs(RemoteAction action, RemoteActionResult re
public static final class LocalExecution {
private final RemoteAction action;
private final SettableFuture<SpawnResult> spawnResultFuture;
private final Phaser spawnResultConsumers =
new Phaser(1) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
// We only use a single phase.
return true;
}
};

private LocalExecution(RemoteAction action) {
this.action = action;
Expand All @@ -1356,6 +1364,37 @@ public static LocalExecution createIfDeduplicatable(RemoteAction action) {
return new LocalExecution(action);
}

/**
* Attempts to register a thread waiting for the {@link #spawnResultFuture} to become available
* and returns true if successful.
*
* <p>Every call to this method must be matched by a call to {@link #unregister()} via
* try-finally.
*
* <p>This always returns true for actions that do not modify their spawns' outputs after
* execution.
*/
public boolean registerForOutputReuse() {
// We only use a single phase.
return spawnResultConsumers.register() == 0;
}

/**
* Unregisters a thread waiting for the {@link #spawnResultFuture}, either after successful
* reuse of the outputs or upon failure.
*/
public void unregister() {
spawnResultConsumers.arriveAndDeregister();
}

/**
* Waits for all potential consumers of the {@link #spawnResultFuture} to be done with their
* output reuse.
*/
public void awaitAllOutputReuse() {
spawnResultConsumers.arriveAndAwaitAdvance();
}

/**
* Signals to all potential consumers of the {@link #spawnResultFuture} that this execution has
* been cancelled and that the result will not be available.
Expand Down Expand Up @@ -1571,7 +1610,8 @@ public void uploadOutputs(RemoteAction action, SpawnResult spawnResult, Runnable
SpawnResult.Status.SUCCESS.equals(spawnResult.status()) && spawnResult.exitCode() == 0,
"shouldn't upload outputs of failed local action");

if (remoteOptions.remoteCacheAsync) {
if (remoteOptions.remoteCacheAsync
&& !action.getSpawn().getResourceOwner().mayModifySpawnOutputsAfterExecution()) {
Single.using(
remoteCache::retain,
remoteCache ->
Expand Down
182 changes: 107 additions & 75 deletions src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,89 +109,110 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context)
// results haven't been uploaded to the cache yet and deduplicate all of them against the
// first one.
LocalExecution previousExecution = null;
thisExecution = LocalExecution.createIfDeduplicatable(action);
if (shouldUploadLocalResults && thisExecution != null) {
previousExecution = inFlightExecutions.putIfAbsent(action.getActionKey(), thisExecution);
}
// Metadata will be available in context.current() until we detach.
// This is done via a thread-local variable.
try {
RemoteActionResult result;
try (SilentCloseable c = prof.profile(ProfilerTask.REMOTE_CACHE_CHECK, "check cache hit")) {
result = remoteExecutionService.lookupCache(action);
}
// In case the remote cache returned a failed action (exit code != 0) we treat it as a
// cache miss
if (result != null && result.getExitCode() == 0) {
Stopwatch fetchTime = Stopwatch.createStarted();
InMemoryOutput inMemoryOutput;
try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "download outputs")) {
inMemoryOutput = remoteExecutionService.downloadOutputs(action, result);
}
fetchTime.stop();
totalTime.stop();
spawnMetrics
.setFetchTimeInMs((int) fetchTime.elapsed().toMillis())
.setTotalTimeInMs((int) totalTime.elapsed().toMillis())
.setNetworkTimeInMs((int) action.getNetworkTime().getDuration().toMillis());
SpawnResult spawnResult =
createSpawnResult(
digestUtil,
thisExecution = LocalExecution.createIfDeduplicatable(action);
if (shouldUploadLocalResults && thisExecution != null) {
LocalExecution previousOrThisExecution =
inFlightExecutions.merge(
action.getActionKey(),
result.getExitCode(),
/* cacheHit= */ true,
result.cacheName(),
inMemoryOutput,
result.getExecutionMetadata().getExecutionStartTimestamp(),
result.getExecutionMetadata().getExecutionCompletedTimestamp(),
spawnMetrics.build(),
spawn.getMnemonic());
return SpawnCache.success(spawnResult);
thisExecution,
(existingExecution, thisExecutionArg) -> {
if (existingExecution.registerForOutputReuse()) {
return existingExecution;
} else {
// The existing execution has completed and its results may have already
// been modified by its action, so we can't deduplicate against it. Instead,
// start a new in-flight execution.
return thisExecutionArg;
}
});
previousExecution =
previousOrThisExecution == thisExecution ? null : previousOrThisExecution;
}
} catch (CacheNotFoundException e) {
// Intentionally left blank
} catch (IOException e) {
if (BulkTransferException.allCausedByCacheNotFoundException(e)) {
try {
RemoteActionResult result;
try (SilentCloseable c =
prof.profile(ProfilerTask.REMOTE_CACHE_CHECK, "check cache hit")) {
result = remoteExecutionService.lookupCache(action);
}
// In case the remote cache returned a failed action (exit code != 0) we treat it as a
// cache miss
if (result != null && result.getExitCode() == 0) {
Stopwatch fetchTime = Stopwatch.createStarted();
InMemoryOutput inMemoryOutput;
try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "download outputs")) {
inMemoryOutput = remoteExecutionService.downloadOutputs(action, result);
}
fetchTime.stop();
totalTime.stop();
spawnMetrics
.setFetchTimeInMs((int) fetchTime.elapsed().toMillis())
.setTotalTimeInMs((int) totalTime.elapsed().toMillis())
.setNetworkTimeInMs((int) action.getNetworkTime().getDuration().toMillis());
SpawnResult spawnResult =
createSpawnResult(
digestUtil,
action.getActionKey(),
result.getExitCode(),
/* cacheHit= */ true,
result.cacheName(),
inMemoryOutput,
result.getExecutionMetadata().getExecutionStartTimestamp(),
result.getExecutionMetadata().getExecutionCompletedTimestamp(),
spawnMetrics.build(),
spawn.getMnemonic());
return SpawnCache.success(spawnResult);
}
} catch (CacheNotFoundException e) {
// Intentionally left blank
} else {
String errorMessage = Utils.grpcAwareErrorMessage(e, verboseFailures);
if (isNullOrEmpty(errorMessage)) {
errorMessage = e.getClass().getSimpleName();
} catch (IOException e) {
if (BulkTransferException.allCausedByCacheNotFoundException(e)) {
// Intentionally left blank
} else {
String errorMessage = Utils.grpcAwareErrorMessage(e, verboseFailures);
if (isNullOrEmpty(errorMessage)) {
errorMessage = e.getClass().getSimpleName();
}
errorMessage = "Remote Cache: " + errorMessage;
remoteExecutionService.report(Event.warn(errorMessage));
}
errorMessage = "Remote Cache: " + errorMessage;
remoteExecutionService.report(Event.warn(errorMessage));
}
}
if (previousExecution != null) {
Stopwatch fetchTime = Stopwatch.createStarted();
SpawnResult previousResult;
try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "reuse outputs")) {
previousResult = remoteExecutionService.waitForAndReuseOutputs(action, previousExecution);
}
if (previousResult != null) {
spawnMetrics
.setFetchTimeInMs((int) fetchTime.elapsed().toMillis())
.setTotalTimeInMs((int) totalTime.elapsed().toMillis())
.setNetworkTimeInMs((int) action.getNetworkTime().getDuration().toMillis());
SpawnMetrics buildMetrics = spawnMetrics.build();
return SpawnCache.success(
new SpawnResult.DelegateSpawnResult(previousResult) {
@Override
public String getRunnerName() {
return "deduplicated";
}
if (previousExecution != null) {
Stopwatch fetchTime = Stopwatch.createStarted();
SpawnResult previousResult;
try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "reuse outputs")) {
previousResult =
remoteExecutionService.waitForAndReuseOutputs(action, previousExecution);
}
if (previousResult != null) {
spawnMetrics
.setFetchTimeInMs((int) fetchTime.elapsed().toMillis())
.setTotalTimeInMs((int) totalTime.elapsed().toMillis())
.setNetworkTimeInMs((int) action.getNetworkTime().getDuration().toMillis());
SpawnMetrics buildMetrics = spawnMetrics.build();
return SpawnCache.success(
new SpawnResult.DelegateSpawnResult(previousResult) {
@Override
public String getRunnerName() {
return "deduplicated";
}

@Override
public SpawnMetrics getMetrics() {
return buildMetrics;
}
});
@Override
public SpawnMetrics getMetrics() {
return buildMetrics;
}
});
}
// If we reach here, the previous execution was not successful (it encountered an
// exception or the spawn had an exit code != 0). Since it isn't possible to accurately
// recreate the failure without rerunning the action, we fall back to running the action
// locally. This means that we have introduced an unnecessary wait, but that can only
// happen in the case of a failing build with --keep_going.
}
} finally {
if (previousExecution != null) {
previousExecution.unregister();
}
// If we reach here, the previous execution was not successful (it encountered an exception
// or the spawn had an exit code != 0). Since it isn't possible to accurately recreate the
// failure without rerunning the action, we fall back to running the action locally. This
// means that we have introduced an unnecessary wait, but that can only happen in the case
// of a failing build with --keep_going.
}
}

Expand Down Expand Up @@ -239,6 +260,17 @@ public void store(SpawnResult result) throws ExecException, InterruptedException
// large.
remoteExecutionService.uploadOutputs(
action, result, () -> inFlightExecutions.remove(action.getActionKey()));
if (thisExecutionFinal != null
&& action.getSpawn().getResourceOwner().mayModifySpawnOutputsAfterExecution()) {
// In this case outputs have been uploaded synchronously and the callback above has run,
// so no new executions will be deduplicated against this one. We can safely await all
// existing executions finish the reuse.
// Note that while this call itself isn't interruptible, all operations it awaits are
// interruptible.
try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "await output reuse")) {
thisExecutionFinal.awaitAllOutputReuse();
}
}
}

private void checkForConcurrentModifications()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,14 @@ public NestedSet<Artifact> getPossibleInputsForTesting() {
return null;
}

@Override
public boolean mayModifySpawnOutputsAfterExecution() {
// Causes of spawn output modification after execution:
// - Fallback to the full classpath with --experimental_java_classpath=bazel.
// - In-place rewriting of .jdeps files with --experimental_output_paths=strip.
return true;
}

/**
* Locally rewrites a .jdeps file to replace missing config prefixes.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,14 @@ protected void afterExecute(
}
}

@Override
public boolean mayModifySpawnOutputsAfterExecution() {
// Causes of spawn output modification after execution:
// - In-place rewriting of .jdeps files with --experimental_output_paths=strip.
// TODO: Use separate files as action and spawn output to avoid in-place modification.
return true;
}

public static Builder newBuilder(RuleContext ruleContext) {
return new Builder(ruleContext);
}
Expand Down
Loading

0 comments on commit bc275b7

Please sign in to comment.