From 5d68e1f2e3ba942d700c021440522bfe00d932ad Mon Sep 17 00:00:00 2001 From: Fabian Meumertzheim Date: Tue, 27 Aug 2024 12:50:35 -0700 Subject: [PATCH] Force synchronous upload and reuse of possibly modified spawn outputs When an action may modify a spawn's outputs after execution, the upload of outputs to the cache and reuse for deduplicated actions need to happen synchronously directly after spawn execution to avoid a race. This commit implements this for cache uploads by marking all actions with this property and simply disabling async upload for all spawns executed by such actions. For output reuse, all executions deduplicated against the first one register atomically upon deduplication and cause the cache upload to wait for all of them to complete reuse. Fixes #22501 Fixes #23288 Work towards #21578 Closes #23307 (no longer needed) Closes #23382. PiperOrigin-RevId: 668101364 Change-Id: Ice75dbe14a7dd46e02ecb096d2b2a30940216356 --- .../lib/actions/ActionExecutionMetadata.java | 10 + .../lib/analysis/test/TestRunnerAction.java | 10 + .../lib/remote/RemoteExecutionService.java | 42 +++- .../build/lib/remote/RemoteSpawnCache.java | 182 ++++++++++-------- .../lib/rules/java/JavaCompileAction.java | 8 + .../rules/java/JavaHeaderCompileAction.java | 8 + .../lib/remote/RemoteSpawnCacheTest.java | 129 ++++++++++++- 7 files changed, 312 insertions(+), 77 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/actions/ActionExecutionMetadata.java b/src/main/java/com/google/devtools/build/lib/actions/ActionExecutionMetadata.java index 95c02ee92b8421..5eab4dbd702aba 100644 --- a/src/main/java/com/google/devtools/build/lib/actions/ActionExecutionMetadata.java +++ b/src/main/java/com/google/devtools/build/lib/actions/ActionExecutionMetadata.java @@ -119,4 +119,14 @@ default String getProgressMessage(RepositoryMapping mainRepositoryMapping) { default boolean mayInsensitivelyPropagateInputs() { return false; } + + /** + * Returns true if the action may modify spawn outputs after the spawn has executed. + * + *

If this returns true, any kind of spawn output caching or reuse needs to happen + * synchronously directly after the spawn execution. + */ + default boolean mayModifySpawnOutputsAfterExecution() { + return false; + } } diff --git a/src/main/java/com/google/devtools/build/lib/analysis/test/TestRunnerAction.java b/src/main/java/com/google/devtools/build/lib/analysis/test/TestRunnerAction.java index c431b15ed63d24..2c618e7ab48c94 100644 --- a/src/main/java/com/google/devtools/build/lib/analysis/test/TestRunnerAction.java +++ b/src/main/java/com/google/devtools/build/lib/analysis/test/TestRunnerAction.java @@ -309,6 +309,16 @@ private static ImmutableSet nonNullAsSet(Artifact... artifacts) { this.isExecutedOnWindows = isExecutedOnWindows; } + @Override + public boolean mayModifySpawnOutputsAfterExecution() { + // Test actions modify test spawn outputs after execution: + // - if there are multiple attempts (unavoidable); + // - in all cases due to appending any stray stderr output to the test log in + // StandaloneTestStrategy. + // TODO: Get rid of the second case and only return true if there are multiple attempts. + return true; + } + public boolean isExecutedOnWindows() { return isExecutedOnWindows; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java index 6e6e29a4bd88e4..576783085f4378 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java @@ -1406,6 +1406,14 @@ public InMemoryOutput downloadOutputs(RemoteAction action, RemoteActionResult re public static final class LocalExecution { private final RemoteAction action; private final SettableFuture spawnResultFuture; + private final Phaser spawnResultConsumers = + new Phaser(1) { + @Override + protected boolean onAdvance(int phase, int registeredParties) { + // We only use a single phase. + return true; + } + }; private LocalExecution(RemoteAction action) { this.action = action; @@ -1429,6 +1437,37 @@ public static LocalExecution createIfDeduplicatable(RemoteAction action) { return new LocalExecution(action); } + /** + * Attempts to register a thread waiting for the {@link #spawnResultFuture} to become available + * and returns true if successful. + * + *

Every call to this method must be matched by a call to {@link #unregister()} via + * try-finally. + * + *

This always returns true for actions that do not modify their spawns' outputs after + * execution. + */ + public boolean registerForOutputReuse() { + // We only use a single phase. + return spawnResultConsumers.register() == 0; + } + + /** + * Unregisters a thread waiting for the {@link #spawnResultFuture}, either after successful + * reuse of the outputs or upon failure. + */ + public void unregister() { + spawnResultConsumers.arriveAndDeregister(); + } + + /** + * Waits for all potential consumers of the {@link #spawnResultFuture} to be done with their + * output reuse. + */ + public void awaitAllOutputReuse() { + spawnResultConsumers.arriveAndAwaitAdvance(); + } + /** * Signals to all potential consumers of the {@link #spawnResultFuture} that this execution has * been cancelled and that the result will not be available. @@ -1644,7 +1683,8 @@ public void uploadOutputs(RemoteAction action, SpawnResult spawnResult, Runnable SpawnResult.Status.SUCCESS.equals(spawnResult.status()) && spawnResult.exitCode() == 0, "shouldn't upload outputs of failed local action"); - if (remoteOptions.remoteCacheAsync) { + if (remoteOptions.remoteCacheAsync + && !action.getSpawn().getResourceOwner().mayModifySpawnOutputsAfterExecution()) { Single.using( remoteCache::retain, remoteCache -> diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java index 3224f56ac23684..ae802b93203935 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java @@ -109,89 +109,110 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context) // results haven't been uploaded to the cache yet and deduplicate all of them against the // first one. LocalExecution previousExecution = null; - thisExecution = LocalExecution.createIfDeduplicatable(action); - if (shouldUploadLocalResults && thisExecution != null) { - previousExecution = inFlightExecutions.putIfAbsent(action.getActionKey(), thisExecution); - } - // Metadata will be available in context.current() until we detach. - // This is done via a thread-local variable. try { - RemoteActionResult result; - try (SilentCloseable c = prof.profile(ProfilerTask.REMOTE_CACHE_CHECK, "check cache hit")) { - result = remoteExecutionService.lookupCache(action); - } - // In case the remote cache returned a failed action (exit code != 0) we treat it as a - // cache miss - if (result != null && result.getExitCode() == 0) { - Stopwatch fetchTime = Stopwatch.createStarted(); - InMemoryOutput inMemoryOutput; - try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "download outputs")) { - inMemoryOutput = remoteExecutionService.downloadOutputs(action, result); - } - fetchTime.stop(); - totalTime.stop(); - spawnMetrics - .setFetchTimeInMs((int) fetchTime.elapsed().toMillis()) - .setTotalTimeInMs((int) totalTime.elapsed().toMillis()) - .setNetworkTimeInMs((int) action.getNetworkTime().getDuration().toMillis()); - SpawnResult spawnResult = - createSpawnResult( - digestUtil, + thisExecution = LocalExecution.createIfDeduplicatable(action); + if (shouldUploadLocalResults && thisExecution != null) { + LocalExecution previousOrThisExecution = + inFlightExecutions.merge( action.getActionKey(), - result.getExitCode(), - /* cacheHit= */ true, - result.cacheName(), - inMemoryOutput, - result.getExecutionMetadata().getExecutionStartTimestamp(), - result.getExecutionMetadata().getExecutionCompletedTimestamp(), - spawnMetrics.build(), - spawn.getMnemonic()); - return SpawnCache.success(spawnResult); + thisExecution, + (existingExecution, thisExecutionArg) -> { + if (existingExecution.registerForOutputReuse()) { + return existingExecution; + } else { + // The existing execution has completed and its results may have already + // been modified by its action, so we can't deduplicate against it. Instead, + // start a new in-flight execution. + return thisExecutionArg; + } + }); + previousExecution = + previousOrThisExecution == thisExecution ? null : previousOrThisExecution; } - } catch (CacheNotFoundException e) { - // Intentionally left blank - } catch (IOException e) { - if (BulkTransferException.allCausedByCacheNotFoundException(e)) { + try { + RemoteActionResult result; + try (SilentCloseable c = + prof.profile(ProfilerTask.REMOTE_CACHE_CHECK, "check cache hit")) { + result = remoteExecutionService.lookupCache(action); + } + // In case the remote cache returned a failed action (exit code != 0) we treat it as a + // cache miss + if (result != null && result.getExitCode() == 0) { + Stopwatch fetchTime = Stopwatch.createStarted(); + InMemoryOutput inMemoryOutput; + try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "download outputs")) { + inMemoryOutput = remoteExecutionService.downloadOutputs(action, result); + } + fetchTime.stop(); + totalTime.stop(); + spawnMetrics + .setFetchTimeInMs((int) fetchTime.elapsed().toMillis()) + .setTotalTimeInMs((int) totalTime.elapsed().toMillis()) + .setNetworkTimeInMs((int) action.getNetworkTime().getDuration().toMillis()); + SpawnResult spawnResult = + createSpawnResult( + digestUtil, + action.getActionKey(), + result.getExitCode(), + /* cacheHit= */ true, + result.cacheName(), + inMemoryOutput, + result.getExecutionMetadata().getExecutionStartTimestamp(), + result.getExecutionMetadata().getExecutionCompletedTimestamp(), + spawnMetrics.build(), + spawn.getMnemonic()); + return SpawnCache.success(spawnResult); + } + } catch (CacheNotFoundException e) { // Intentionally left blank - } else { - String errorMessage = Utils.grpcAwareErrorMessage(e, verboseFailures); - if (isNullOrEmpty(errorMessage)) { - errorMessage = e.getClass().getSimpleName(); + } catch (IOException e) { + if (BulkTransferException.allCausedByCacheNotFoundException(e)) { + // Intentionally left blank + } else { + String errorMessage = Utils.grpcAwareErrorMessage(e, verboseFailures); + if (isNullOrEmpty(errorMessage)) { + errorMessage = e.getClass().getSimpleName(); + } + errorMessage = "Remote Cache: " + errorMessage; + remoteExecutionService.report(Event.warn(errorMessage)); } - errorMessage = "Remote Cache: " + errorMessage; - remoteExecutionService.report(Event.warn(errorMessage)); } - } - if (previousExecution != null) { - Stopwatch fetchTime = Stopwatch.createStarted(); - SpawnResult previousResult; - try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "reuse outputs")) { - previousResult = remoteExecutionService.waitForAndReuseOutputs(action, previousExecution); - } - if (previousResult != null) { - spawnMetrics - .setFetchTimeInMs((int) fetchTime.elapsed().toMillis()) - .setTotalTimeInMs((int) totalTime.elapsed().toMillis()) - .setNetworkTimeInMs((int) action.getNetworkTime().getDuration().toMillis()); - SpawnMetrics buildMetrics = spawnMetrics.build(); - return SpawnCache.success( - new SpawnResult.DelegateSpawnResult(previousResult) { - @Override - public String getRunnerName() { - return "deduplicated"; - } + if (previousExecution != null) { + Stopwatch fetchTime = Stopwatch.createStarted(); + SpawnResult previousResult; + try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "reuse outputs")) { + previousResult = + remoteExecutionService.waitForAndReuseOutputs(action, previousExecution); + } + if (previousResult != null) { + spawnMetrics + .setFetchTimeInMs((int) fetchTime.elapsed().toMillis()) + .setTotalTimeInMs((int) totalTime.elapsed().toMillis()) + .setNetworkTimeInMs((int) action.getNetworkTime().getDuration().toMillis()); + SpawnMetrics buildMetrics = spawnMetrics.build(); + return SpawnCache.success( + new SpawnResult.DelegateSpawnResult(previousResult) { + @Override + public String getRunnerName() { + return "deduplicated"; + } - @Override - public SpawnMetrics getMetrics() { - return buildMetrics; - } - }); + @Override + public SpawnMetrics getMetrics() { + return buildMetrics; + } + }); + } + // If we reach here, the previous execution was not successful (it encountered an + // exception or the spawn had an exit code != 0). Since it isn't possible to accurately + // recreate the failure without rerunning the action, we fall back to running the action + // locally. This means that we have introduced an unnecessary wait, but that can only + // happen in the case of a failing build with --keep_going. + } + } finally { + if (previousExecution != null) { + previousExecution.unregister(); } - // If we reach here, the previous execution was not successful (it encountered an exception - // or the spawn had an exit code != 0). Since it isn't possible to accurately recreate the - // failure without rerunning the action, we fall back to running the action locally. This - // means that we have introduced an unnecessary wait, but that can only happen in the case - // of a failing build with --keep_going. } } @@ -239,6 +260,17 @@ public void store(SpawnResult result) throws ExecException, InterruptedException // large. remoteExecutionService.uploadOutputs( action, result, () -> inFlightExecutions.remove(action.getActionKey())); + if (thisExecutionFinal != null + && action.getSpawn().getResourceOwner().mayModifySpawnOutputsAfterExecution()) { + // In this case outputs have been uploaded synchronously and the callback above has run, + // so no new executions will be deduplicated against this one. We can safely await all + // existing executions finish the reuse. + // Note that while this call itself isn't interruptible, all operations it awaits are + // interruptible. + try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "await output reuse")) { + thisExecutionFinal.awaitAllOutputReuse(); + } + } } private void checkForConcurrentModifications() diff --git a/src/main/java/com/google/devtools/build/lib/rules/java/JavaCompileAction.java b/src/main/java/com/google/devtools/build/lib/rules/java/JavaCompileAction.java index 9d2784c2d2eaee..37af94b5321ac8 100644 --- a/src/main/java/com/google/devtools/build/lib/rules/java/JavaCompileAction.java +++ b/src/main/java/com/google/devtools/build/lib/rules/java/JavaCompileAction.java @@ -714,6 +714,14 @@ public NestedSet getPossibleInputsForTesting() { return null; } + @Override + public boolean mayModifySpawnOutputsAfterExecution() { + // Causes of spawn output modification after execution: + // - Fallback to the full classpath with --experimental_java_classpath=bazel. + // - In-place rewriting of .jdeps files with --experimental_output_paths=strip. + return true; + } + /** * Locally rewrites a .jdeps file to replace missing config prefixes. * diff --git a/src/main/java/com/google/devtools/build/lib/rules/java/JavaHeaderCompileAction.java b/src/main/java/com/google/devtools/build/lib/rules/java/JavaHeaderCompileAction.java index 0866cfba49fa88..728aedf7edb9b3 100644 --- a/src/main/java/com/google/devtools/build/lib/rules/java/JavaHeaderCompileAction.java +++ b/src/main/java/com/google/devtools/build/lib/rules/java/JavaHeaderCompileAction.java @@ -161,6 +161,14 @@ protected void afterExecute( } } + @Override + public boolean mayModifySpawnOutputsAfterExecution() { + // Causes of spawn output modification after execution: + // - In-place rewriting of .jdeps files with --experimental_output_paths=strip. + // TODO: Use separate files as action and spawn output to avoid in-place modification. + return true; + } + public static Builder newBuilder(RuleContext ruleContext) { return new Builder(ruleContext); } diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java index 6b1dd7cc153da9..8ab1a5637e9242 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java @@ -44,10 +44,12 @@ import com.google.common.eventbus.EventBus; import com.google.common.util.concurrent.ListenableFuture; import com.google.devtools.build.lib.actions.ActionContext; +import com.google.devtools.build.lib.actions.ActionExecutionMetadata; import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ActionInputHelper; import com.google.devtools.build.lib.actions.Artifact.ArtifactExpander; import com.google.devtools.build.lib.actions.ArtifactPathResolver; +import com.google.devtools.build.lib.actions.ExecException; import com.google.devtools.build.lib.actions.ExecutionRequirements; import com.google.devtools.build.lib.actions.ForbiddenActionInputException; import com.google.devtools.build.lib.actions.InputMetadataProvider; @@ -93,7 +95,11 @@ import com.google.devtools.common.options.Options; import java.io.IOException; import java.time.Duration; +import java.util.Set; import java.util.SortedMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import org.junit.Before; import org.junit.Test; @@ -243,10 +249,16 @@ private static SimpleSpawn simpleSpawnWithExecutionInfo( } private static SimpleSpawn simplePathMappedSpawn(String configSegment) { + return simplePathMappedSpawn( + configSegment, new FakeOwner("Mnemonic", "Progress Message", "//dummy:label")); + } + + private static SimpleSpawn simplePathMappedSpawn( + String configSegment, ActionExecutionMetadata owner) { String inputPath = "bazel-bin/%s/bin/input"; String outputPath = "bazel-bin/%s/bin/output"; return new SimpleSpawn( - new FakeOwner("Mnemonic", "Progress Message", "//dummy:label"), + owner, ImmutableList.of("cp", inputPath.formatted("cfg"), outputPath.formatted("cfg")), ImmutableMap.of("VARIABLE", "value"), ImmutableMap.of(ExecutionRequirements.SUPPORTS_PATH_MAPPING, ""), @@ -757,6 +769,121 @@ public void pathMappedActionIsDeduplicated() throws Exception { assertThat(secondCacheHandle.willStore()).isFalse(); } + @Test + public void pathMappedActionIsDeduplicatedWithSpawnOutputModification() throws Exception { + // arrange + RemoteSpawnCache cache = createRemoteSpawnCache(); + + ActionExecutionMetadata firstExecutionOwner = + new FakeOwner("Mnemonic", "Progress Message", "//dummy:label") { + @Override + public boolean mayModifySpawnOutputsAfterExecution() { + return true; + } + }; + SimpleSpawn firstSpawn = simplePathMappedSpawn("k8-fastbuild", firstExecutionOwner); + FakeActionInputFileCache firstFakeFileCache = new FakeActionInputFileCache(execRoot); + firstFakeFileCache.createScratchInput(firstSpawn.getInputFiles().getSingleton(), "xyz"); + SpawnExecutionContext firstPolicy = + createSpawnExecutionContext(firstSpawn, execRoot, firstFakeFileCache, outErr); + + SimpleSpawn secondSpawn = simplePathMappedSpawn("k8-opt"); + FakeActionInputFileCache secondFakeFileCache = new FakeActionInputFileCache(execRoot); + secondFakeFileCache.createScratchInput(secondSpawn.getInputFiles().getSingleton(), "xyz"); + SpawnExecutionContext secondPolicy = + createSpawnExecutionContext(secondSpawn, execRoot, secondFakeFileCache, outErr); + + RemoteExecutionService remoteExecutionService = cache.getRemoteExecutionService(); + CountDownLatch enteredWaitForAndReuseOutputs = new CountDownLatch(1); + CountDownLatch completeWaitForAndReuseOutputs = new CountDownLatch(1); + CountDownLatch enteredUploadOutputs = new CountDownLatch(1); + Set spawnsThatWaitedForOutputReuse = ConcurrentHashMap.newKeySet(); + Mockito.doAnswer( + (Answer) + invocation -> { + spawnsThatWaitedForOutputReuse.add( + ((RemoteAction) invocation.getArgument(0)).getSpawn()); + enteredWaitForAndReuseOutputs.countDown(); + completeWaitForAndReuseOutputs.await(); + return (SpawnResult) invocation.callRealMethod(); + }) + .when(remoteExecutionService) + .waitForAndReuseOutputs(any(), any()); + // Simulate a very slow upload to the remote cache to ensure that the second spawn is + // deduplicated rather than a cache hit. This is a slight hack, but also avoids introducing + // more concurrency to this test. + Mockito.doAnswer( + (Answer) + invocation -> { + enteredUploadOutputs.countDown(); + return null; + }) + .when(remoteExecutionService) + .uploadOutputs(any(), any(), any()); + + // act + // Simulate the first spawn writing to the output, but delay its completion. + CacheHandle firstCacheHandle = cache.lookup(firstSpawn, firstPolicy); + FileSystemUtils.writeContent( + fs.getPath("/exec/root/bazel-bin/k8-fastbuild/bin/output"), UTF_8, "hello"); + + // Start the second spawn and wait for it to deduplicate against the first one. + AtomicReference secondCacheHandleRef = new AtomicReference<>(); + Thread lookupSecondSpawn = + new Thread( + () -> { + try { + secondCacheHandleRef.set(cache.lookup(secondSpawn, secondPolicy)); + } catch (InterruptedException + | IOException + | ExecException + | ForbiddenActionInputException e) { + throw new IllegalStateException(e); + } + }); + lookupSecondSpawn.start(); + enteredWaitForAndReuseOutputs.await(); + + // Complete the first spawn and immediately corrupt its outputs. + Thread completeFirstSpawn = + new Thread( + () -> { + try { + firstCacheHandle.store( + new SpawnResult.Builder() + .setExitCode(0) + .setStatus(Status.SUCCESS) + .setRunnerName("test") + .build()); + FileSystemUtils.writeContent( + fs.getPath("/exec/root/bazel-bin/k8-fastbuild/bin/output"), UTF_8, "corrupted"); + } catch (IOException | ExecException | InterruptedException e) { + throw new IllegalStateException(e); + } + }); + completeFirstSpawn.start(); + // Make it more likely to detect races by waiting for the first spawn to (fake) upload its + // outputs. + enteredUploadOutputs.await(); + + // Let the second spawn complete its output reuse. + completeWaitForAndReuseOutputs.countDown(); + lookupSecondSpawn.join(); + CacheHandle secondCacheHandle = secondCacheHandleRef.get(); + + completeFirstSpawn.join(); + + // assert + assertThat(spawnsThatWaitedForOutputReuse).containsExactly(secondSpawn); + assertThat(secondCacheHandle.hasResult()).isTrue(); + assertThat(secondCacheHandle.getResult().getRunnerName()).isEqualTo("deduplicated"); + assertThat( + FileSystemUtils.readContent( + fs.getPath("/exec/root/bazel-bin/k8-opt/bin/output"), UTF_8)) + .isEqualTo("hello"); + assertThat(secondCacheHandle.willStore()).isFalse(); + } + @Test public void deduplicatedActionWithNonZeroExitCodeIsACacheMiss() throws Exception { // arrange