diff --git a/src/main/java/com/google/devtools/build/lib/actions/ActionExecutionMetadata.java b/src/main/java/com/google/devtools/build/lib/actions/ActionExecutionMetadata.java index 0dd03238eb2a4d..eb9cc3d0e68897 100644 --- a/src/main/java/com/google/devtools/build/lib/actions/ActionExecutionMetadata.java +++ b/src/main/java/com/google/devtools/build/lib/actions/ActionExecutionMetadata.java @@ -114,4 +114,14 @@ default String getProgressMessage(RepositoryMapping mainRepositoryMapping) { default boolean mayInsensitivelyPropagateInputs() { return false; } + + /** + * Returns true if the action may modify spawn outputs after the spawn has executed. + * + *

If this returns true, any kind of spawn output caching or reuse needs to happen + * synchronously directly after the spawn execution. + */ + default boolean mayModifySpawnOutputsAfterExecution() { + return false; + } } diff --git a/src/main/java/com/google/devtools/build/lib/analysis/test/TestRunnerAction.java b/src/main/java/com/google/devtools/build/lib/analysis/test/TestRunnerAction.java index 70e2259748ac52..752888254e4db8 100644 --- a/src/main/java/com/google/devtools/build/lib/analysis/test/TestRunnerAction.java +++ b/src/main/java/com/google/devtools/build/lib/analysis/test/TestRunnerAction.java @@ -309,6 +309,16 @@ private static ImmutableSet nonNullAsSet(Artifact... artifacts) { this.isExecutedOnWindows = isExecutedOnWindows; } + @Override + public boolean mayModifySpawnOutputsAfterExecution() { + // Test actions modify test spawn outputs after execution: + // - if there are multiple attempts (unavoidable); + // - in all cases due to appending any stray stderr output to the test log in + // StandaloneTestStrategy. + // TODO: Get rid of the second case and only return true if there are multiple attempts. + return true; + } + public boolean isExecutedOnWindows() { return isExecutedOnWindows; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java index fe7f47f83b6161..73d3ad14edb8e9 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java @@ -1333,6 +1333,14 @@ public InMemoryOutput downloadOutputs(RemoteAction action, RemoteActionResult re public static final class LocalExecution { private final RemoteAction action; private final SettableFuture spawnResultFuture; + private final Phaser spawnResultConsumers = + new Phaser(1) { + @Override + protected boolean onAdvance(int phase, int registeredParties) { + // We only use a single phase. + return true; + } + }; private LocalExecution(RemoteAction action) { this.action = action; @@ -1356,6 +1364,37 @@ public static LocalExecution createIfDeduplicatable(RemoteAction action) { return new LocalExecution(action); } + /** + * Attempts to register a thread waiting for the {@link #spawnResultFuture} to become available + * and returns true if successful. + * + *

Every call to this method must be matched by a call to {@link #unregister()} via + * try-finally. + * + *

This always returns true for actions that do not modify their spawns' outputs after + * execution. + */ + public boolean registerForOutputReuse() { + // We only use a single phase. + return spawnResultConsumers.register() == 0; + } + + /** + * Unregisters a thread waiting for the {@link #spawnResultFuture}, either after successful + * reuse of the outputs or upon failure. + */ + public void unregister() { + spawnResultConsumers.arriveAndDeregister(); + } + + /** + * Waits for all potential consumers of the {@link #spawnResultFuture} to be done with their + * output reuse. + */ + public void awaitAllOutputReuse() { + spawnResultConsumers.arriveAndAwaitAdvance(); + } + /** * Signals to all potential consumers of the {@link #spawnResultFuture} that this execution has * been cancelled and that the result will not be available. @@ -1571,7 +1610,8 @@ public void uploadOutputs(RemoteAction action, SpawnResult spawnResult, Runnable SpawnResult.Status.SUCCESS.equals(spawnResult.status()) && spawnResult.exitCode() == 0, "shouldn't upload outputs of failed local action"); - if (remoteOptions.remoteCacheAsync) { + if (remoteOptions.remoteCacheAsync + && !action.getSpawn().getResourceOwner().mayModifySpawnOutputsAfterExecution()) { Single.using( remoteCache::retain, remoteCache -> diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java index 3224f56ac23684..ae802b93203935 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java @@ -109,89 +109,110 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context) // results haven't been uploaded to the cache yet and deduplicate all of them against the // first one. LocalExecution previousExecution = null; - thisExecution = LocalExecution.createIfDeduplicatable(action); - if (shouldUploadLocalResults && thisExecution != null) { - previousExecution = inFlightExecutions.putIfAbsent(action.getActionKey(), thisExecution); - } - // Metadata will be available in context.current() until we detach. - // This is done via a thread-local variable. try { - RemoteActionResult result; - try (SilentCloseable c = prof.profile(ProfilerTask.REMOTE_CACHE_CHECK, "check cache hit")) { - result = remoteExecutionService.lookupCache(action); - } - // In case the remote cache returned a failed action (exit code != 0) we treat it as a - // cache miss - if (result != null && result.getExitCode() == 0) { - Stopwatch fetchTime = Stopwatch.createStarted(); - InMemoryOutput inMemoryOutput; - try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "download outputs")) { - inMemoryOutput = remoteExecutionService.downloadOutputs(action, result); - } - fetchTime.stop(); - totalTime.stop(); - spawnMetrics - .setFetchTimeInMs((int) fetchTime.elapsed().toMillis()) - .setTotalTimeInMs((int) totalTime.elapsed().toMillis()) - .setNetworkTimeInMs((int) action.getNetworkTime().getDuration().toMillis()); - SpawnResult spawnResult = - createSpawnResult( - digestUtil, + thisExecution = LocalExecution.createIfDeduplicatable(action); + if (shouldUploadLocalResults && thisExecution != null) { + LocalExecution previousOrThisExecution = + inFlightExecutions.merge( action.getActionKey(), - result.getExitCode(), - /* cacheHit= */ true, - result.cacheName(), - inMemoryOutput, - result.getExecutionMetadata().getExecutionStartTimestamp(), - result.getExecutionMetadata().getExecutionCompletedTimestamp(), - spawnMetrics.build(), - spawn.getMnemonic()); - return SpawnCache.success(spawnResult); + thisExecution, + (existingExecution, thisExecutionArg) -> { + if (existingExecution.registerForOutputReuse()) { + return existingExecution; + } else { + // The existing execution has completed and its results may have already + // been modified by its action, so we can't deduplicate against it. Instead, + // start a new in-flight execution. + return thisExecutionArg; + } + }); + previousExecution = + previousOrThisExecution == thisExecution ? null : previousOrThisExecution; } - } catch (CacheNotFoundException e) { - // Intentionally left blank - } catch (IOException e) { - if (BulkTransferException.allCausedByCacheNotFoundException(e)) { + try { + RemoteActionResult result; + try (SilentCloseable c = + prof.profile(ProfilerTask.REMOTE_CACHE_CHECK, "check cache hit")) { + result = remoteExecutionService.lookupCache(action); + } + // In case the remote cache returned a failed action (exit code != 0) we treat it as a + // cache miss + if (result != null && result.getExitCode() == 0) { + Stopwatch fetchTime = Stopwatch.createStarted(); + InMemoryOutput inMemoryOutput; + try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "download outputs")) { + inMemoryOutput = remoteExecutionService.downloadOutputs(action, result); + } + fetchTime.stop(); + totalTime.stop(); + spawnMetrics + .setFetchTimeInMs((int) fetchTime.elapsed().toMillis()) + .setTotalTimeInMs((int) totalTime.elapsed().toMillis()) + .setNetworkTimeInMs((int) action.getNetworkTime().getDuration().toMillis()); + SpawnResult spawnResult = + createSpawnResult( + digestUtil, + action.getActionKey(), + result.getExitCode(), + /* cacheHit= */ true, + result.cacheName(), + inMemoryOutput, + result.getExecutionMetadata().getExecutionStartTimestamp(), + result.getExecutionMetadata().getExecutionCompletedTimestamp(), + spawnMetrics.build(), + spawn.getMnemonic()); + return SpawnCache.success(spawnResult); + } + } catch (CacheNotFoundException e) { // Intentionally left blank - } else { - String errorMessage = Utils.grpcAwareErrorMessage(e, verboseFailures); - if (isNullOrEmpty(errorMessage)) { - errorMessage = e.getClass().getSimpleName(); + } catch (IOException e) { + if (BulkTransferException.allCausedByCacheNotFoundException(e)) { + // Intentionally left blank + } else { + String errorMessage = Utils.grpcAwareErrorMessage(e, verboseFailures); + if (isNullOrEmpty(errorMessage)) { + errorMessage = e.getClass().getSimpleName(); + } + errorMessage = "Remote Cache: " + errorMessage; + remoteExecutionService.report(Event.warn(errorMessage)); } - errorMessage = "Remote Cache: " + errorMessage; - remoteExecutionService.report(Event.warn(errorMessage)); } - } - if (previousExecution != null) { - Stopwatch fetchTime = Stopwatch.createStarted(); - SpawnResult previousResult; - try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "reuse outputs")) { - previousResult = remoteExecutionService.waitForAndReuseOutputs(action, previousExecution); - } - if (previousResult != null) { - spawnMetrics - .setFetchTimeInMs((int) fetchTime.elapsed().toMillis()) - .setTotalTimeInMs((int) totalTime.elapsed().toMillis()) - .setNetworkTimeInMs((int) action.getNetworkTime().getDuration().toMillis()); - SpawnMetrics buildMetrics = spawnMetrics.build(); - return SpawnCache.success( - new SpawnResult.DelegateSpawnResult(previousResult) { - @Override - public String getRunnerName() { - return "deduplicated"; - } + if (previousExecution != null) { + Stopwatch fetchTime = Stopwatch.createStarted(); + SpawnResult previousResult; + try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "reuse outputs")) { + previousResult = + remoteExecutionService.waitForAndReuseOutputs(action, previousExecution); + } + if (previousResult != null) { + spawnMetrics + .setFetchTimeInMs((int) fetchTime.elapsed().toMillis()) + .setTotalTimeInMs((int) totalTime.elapsed().toMillis()) + .setNetworkTimeInMs((int) action.getNetworkTime().getDuration().toMillis()); + SpawnMetrics buildMetrics = spawnMetrics.build(); + return SpawnCache.success( + new SpawnResult.DelegateSpawnResult(previousResult) { + @Override + public String getRunnerName() { + return "deduplicated"; + } - @Override - public SpawnMetrics getMetrics() { - return buildMetrics; - } - }); + @Override + public SpawnMetrics getMetrics() { + return buildMetrics; + } + }); + } + // If we reach here, the previous execution was not successful (it encountered an + // exception or the spawn had an exit code != 0). Since it isn't possible to accurately + // recreate the failure without rerunning the action, we fall back to running the action + // locally. This means that we have introduced an unnecessary wait, but that can only + // happen in the case of a failing build with --keep_going. + } + } finally { + if (previousExecution != null) { + previousExecution.unregister(); } - // If we reach here, the previous execution was not successful (it encountered an exception - // or the spawn had an exit code != 0). Since it isn't possible to accurately recreate the - // failure without rerunning the action, we fall back to running the action locally. This - // means that we have introduced an unnecessary wait, but that can only happen in the case - // of a failing build with --keep_going. } } @@ -239,6 +260,17 @@ public void store(SpawnResult result) throws ExecException, InterruptedException // large. remoteExecutionService.uploadOutputs( action, result, () -> inFlightExecutions.remove(action.getActionKey())); + if (thisExecutionFinal != null + && action.getSpawn().getResourceOwner().mayModifySpawnOutputsAfterExecution()) { + // In this case outputs have been uploaded synchronously and the callback above has run, + // so no new executions will be deduplicated against this one. We can safely await all + // existing executions finish the reuse. + // Note that while this call itself isn't interruptible, all operations it awaits are + // interruptible. + try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "await output reuse")) { + thisExecutionFinal.awaitAllOutputReuse(); + } + } } private void checkForConcurrentModifications() diff --git a/src/main/java/com/google/devtools/build/lib/rules/java/JavaCompileAction.java b/src/main/java/com/google/devtools/build/lib/rules/java/JavaCompileAction.java index 06068df7d8c76d..5db91c5f75888b 100644 --- a/src/main/java/com/google/devtools/build/lib/rules/java/JavaCompileAction.java +++ b/src/main/java/com/google/devtools/build/lib/rules/java/JavaCompileAction.java @@ -709,6 +709,14 @@ public NestedSet getPossibleInputsForTesting() { return null; } + @Override + public boolean mayModifySpawnOutputsAfterExecution() { + // Causes of spawn output modification after execution: + // - Fallback to the full classpath with --experimental_java_classpath=bazel. + // - In-place rewriting of .jdeps files with --experimental_output_paths=strip. + return true; + } + /** * Locally rewrites a .jdeps file to replace missing config prefixes. * diff --git a/src/main/java/com/google/devtools/build/lib/rules/java/JavaHeaderCompileAction.java b/src/main/java/com/google/devtools/build/lib/rules/java/JavaHeaderCompileAction.java index 4b1e299dbc80d1..ff7db176c273a1 100644 --- a/src/main/java/com/google/devtools/build/lib/rules/java/JavaHeaderCompileAction.java +++ b/src/main/java/com/google/devtools/build/lib/rules/java/JavaHeaderCompileAction.java @@ -157,6 +157,14 @@ protected void afterExecute( } } + @Override + public boolean mayModifySpawnOutputsAfterExecution() { + // Causes of spawn output modification after execution: + // - In-place rewriting of .jdeps files with --experimental_output_paths=strip. + // TODO: Use separate files as action and spawn output to avoid in-place modification. + return true; + } + public static Builder newBuilder(RuleContext ruleContext) { return new Builder(ruleContext); } diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java index 9f52f300db3614..2d14126fdcf678 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java @@ -44,10 +44,12 @@ import com.google.common.eventbus.EventBus; import com.google.common.util.concurrent.ListenableFuture; import com.google.devtools.build.lib.actions.ActionContext; +import com.google.devtools.build.lib.actions.ActionExecutionMetadata; import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ActionInputHelper; import com.google.devtools.build.lib.actions.ArtifactExpander; import com.google.devtools.build.lib.actions.ArtifactPathResolver; +import com.google.devtools.build.lib.actions.ExecException; import com.google.devtools.build.lib.actions.ExecutionRequirements; import com.google.devtools.build.lib.actions.ForbiddenActionInputException; import com.google.devtools.build.lib.actions.InputMetadataProvider; @@ -93,7 +95,11 @@ import com.google.devtools.common.options.Options; import java.io.IOException; import java.time.Duration; +import java.util.Set; import java.util.SortedMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import org.junit.Before; import org.junit.Test; @@ -243,10 +249,16 @@ private static SimpleSpawn simpleSpawnWithExecutionInfo( } private static SimpleSpawn simplePathMappedSpawn(String configSegment) { + return simplePathMappedSpawn( + configSegment, new FakeOwner("Mnemonic", "Progress Message", "//dummy:label")); + } + + private static SimpleSpawn simplePathMappedSpawn( + String configSegment, ActionExecutionMetadata owner) { String inputPath = "bazel-bin/%s/bin/input"; String outputPath = "bazel-bin/%s/bin/output"; return new SimpleSpawn( - new FakeOwner("Mnemonic", "Progress Message", "//dummy:label"), + owner, ImmutableList.of("cp", inputPath.formatted("cfg"), outputPath.formatted("cfg")), ImmutableMap.of("VARIABLE", "value"), ImmutableMap.of(ExecutionRequirements.SUPPORTS_PATH_MAPPING, ""), @@ -755,6 +767,121 @@ public void pathMappedActionIsDeduplicated() throws Exception { assertThat(secondCacheHandle.willStore()).isFalse(); } + @Test + public void pathMappedActionIsDeduplicatedWithSpawnOutputModification() throws Exception { + // arrange + RemoteSpawnCache cache = createRemoteSpawnCache(); + + ActionExecutionMetadata firstExecutionOwner = + new FakeOwner("Mnemonic", "Progress Message", "//dummy:label") { + @Override + public boolean mayModifySpawnOutputsAfterExecution() { + return true; + } + }; + SimpleSpawn firstSpawn = simplePathMappedSpawn("k8-fastbuild", firstExecutionOwner); + FakeActionInputFileCache firstFakeFileCache = new FakeActionInputFileCache(execRoot); + firstFakeFileCache.createScratchInput(firstSpawn.getInputFiles().getSingleton(), "xyz"); + SpawnExecutionContext firstPolicy = + createSpawnExecutionContext(firstSpawn, execRoot, firstFakeFileCache, outErr); + + SimpleSpawn secondSpawn = simplePathMappedSpawn("k8-opt"); + FakeActionInputFileCache secondFakeFileCache = new FakeActionInputFileCache(execRoot); + secondFakeFileCache.createScratchInput(secondSpawn.getInputFiles().getSingleton(), "xyz"); + SpawnExecutionContext secondPolicy = + createSpawnExecutionContext(secondSpawn, execRoot, secondFakeFileCache, outErr); + + RemoteExecutionService remoteExecutionService = cache.getRemoteExecutionService(); + CountDownLatch enteredWaitForAndReuseOutputs = new CountDownLatch(1); + CountDownLatch completeWaitForAndReuseOutputs = new CountDownLatch(1); + CountDownLatch enteredUploadOutputs = new CountDownLatch(1); + Set spawnsThatWaitedForOutputReuse = ConcurrentHashMap.newKeySet(); + Mockito.doAnswer( + (Answer) + invocation -> { + spawnsThatWaitedForOutputReuse.add( + ((RemoteAction) invocation.getArgument(0)).getSpawn()); + enteredWaitForAndReuseOutputs.countDown(); + completeWaitForAndReuseOutputs.await(); + return (SpawnResult) invocation.callRealMethod(); + }) + .when(remoteExecutionService) + .waitForAndReuseOutputs(any(), any()); + // Simulate a very slow upload to the remote cache to ensure that the second spawn is + // deduplicated rather than a cache hit. This is a slight hack, but also avoids introducing + // more concurrency to this test. + Mockito.doAnswer( + (Answer) + invocation -> { + enteredUploadOutputs.countDown(); + return null; + }) + .when(remoteExecutionService) + .uploadOutputs(any(), any(), any()); + + // act + // Simulate the first spawn writing to the output, but delay its completion. + CacheHandle firstCacheHandle = cache.lookup(firstSpawn, firstPolicy); + FileSystemUtils.writeContent( + fs.getPath("/exec/root/bazel-bin/k8-fastbuild/bin/output"), UTF_8, "hello"); + + // Start the second spawn and wait for it to deduplicate against the first one. + AtomicReference secondCacheHandleRef = new AtomicReference<>(); + Thread lookupSecondSpawn = + new Thread( + () -> { + try { + secondCacheHandleRef.set(cache.lookup(secondSpawn, secondPolicy)); + } catch (InterruptedException + | IOException + | ExecException + | ForbiddenActionInputException e) { + throw new IllegalStateException(e); + } + }); + lookupSecondSpawn.start(); + enteredWaitForAndReuseOutputs.await(); + + // Complete the first spawn and immediately corrupt its outputs. + Thread completeFirstSpawn = + new Thread( + () -> { + try { + firstCacheHandle.store( + new SpawnResult.Builder() + .setExitCode(0) + .setStatus(Status.SUCCESS) + .setRunnerName("test") + .build()); + FileSystemUtils.writeContent( + fs.getPath("/exec/root/bazel-bin/k8-fastbuild/bin/output"), UTF_8, "corrupted"); + } catch (IOException | ExecException | InterruptedException e) { + throw new IllegalStateException(e); + } + }); + completeFirstSpawn.start(); + // Make it more likely to detect races by waiting for the first spawn to (fake) upload its + // outputs. + enteredUploadOutputs.await(); + + // Let the second spawn complete its output reuse. + completeWaitForAndReuseOutputs.countDown(); + lookupSecondSpawn.join(); + CacheHandle secondCacheHandle = secondCacheHandleRef.get(); + + completeFirstSpawn.join(); + + // assert + assertThat(spawnsThatWaitedForOutputReuse).containsExactly(secondSpawn); + assertThat(secondCacheHandle.hasResult()).isTrue(); + assertThat(secondCacheHandle.getResult().getRunnerName()).isEqualTo("deduplicated"); + assertThat( + FileSystemUtils.readContent( + fs.getPath("/exec/root/bazel-bin/k8-opt/bin/output"), UTF_8)) + .isEqualTo("hello"); + assertThat(secondCacheHandle.willStore()).isFalse(); + } + @Test public void deduplicatedActionWithNonZeroExitCodeIsACacheMiss() throws Exception { // arrange