diff --git a/src/main/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutor.java index 428da883fba69d..003115278b3fd0 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutor.java @@ -146,10 +146,12 @@ ExecuteResponse start() throws IOException, InterruptedException { // retrying when received a unauthenticated error, and propagate to refreshIfUnauthenticated // which will then call retrier again. It will reset the retry time counter so we could // retry more than --remote_retry times which is not expected. - response = - retrier.execute( - () -> Utils.refreshIfUnauthenticated(this::execute, callCredentialsProvider), - executeBackoff); + if (lastOperation == null) { + response = + retrier.execute( + () -> Utils.refreshIfUnauthenticated(this::execute, callCredentialsProvider), + executeBackoff); + } // If no response from Execute(), use WaitExecution() in a "loop" which is implemented // inside the retry block. @@ -177,7 +179,7 @@ ExecuteResponse execute() throws IOException { try { Iterator operationStream = executeFunction.apply(request); - return handleOperationStream(operationStream); + return handleOperationStream(operationStream, /* waitExecution= */ false); } catch (Throwable e) { // If lastOperation is not null, we know the execution request is accepted by the server. In // this case, we will fallback to WaitExecution() loop when the stream is broken. @@ -197,33 +199,43 @@ ExecuteResponse waitExecution() throws IOException { WaitExecutionRequest.newBuilder().setName(lastOperation.getName()).build(); try { Iterator operationStream = waitExecutionFunction.apply(request); - return handleOperationStream(operationStream); + return handleOperationStream(operationStream, /* waitExecution= */ true); + } catch (StatusRuntimeException e) { + throw new IOException(e); } catch (Throwable e) { - // A NOT_FOUND error means Operation was lost on the server, retry Execute(). - // - // However, we only retry Execute() if executeBackoff should retry. Also increase the retry - // counter at the same time (done by nextDelayMillis()). - if (e instanceof StatusRuntimeException) { - StatusRuntimeException sre = (StatusRuntimeException) e; - if (sre.getStatus().getCode() == Code.NOT_FOUND - && executeBackoff.nextDelayMillis(sre) >= 0) { - lastOperation = null; - return null; - } - } + lastOperation = null; throw new IOException(e); } } /** Process a stream of operations from Execute() or WaitExecution(). */ - ExecuteResponse handleOperationStream(Iterator operationStream) throws IOException { + @Nullable + ExecuteResponse handleOperationStream( + Iterator operationStream, boolean waitExecution) throws IOException { try { while (operationStream.hasNext()) { Operation operation = operationStream.next(); - ExecuteResponse response = extractResponseOrThrowIfError(operation); - // At this point, we successfully received a response that is not an error. - lastOperation = operation; + // Either done or should be repeated + lastOperation = operation.getDone() ? null : operation; + + ExecuteResponse response; + try { + response = extractResponseOrThrowIfError(operation); + } catch (StatusRuntimeException e) { + // An operation error means Operation has been terminally completed, retry Execute(). + // + // However, we only retry Execute() if executeBackoff should retry. Also increase the + // retry + // counter at the same time (done by nextDelayMillis()). + if (waitExecution + && (retrier.isRetriable(e) || e.getStatus().getCode() == Code.NOT_FOUND) + && executeBackoff.nextDelayMillis(e) >= 0) { + lastOperation = null; + return null; + } + throw e; + } // We don't want to reset executeBackoff since if there is an error: // 1. If happened before we received a first response, we want to ensure the retry @@ -258,8 +270,8 @@ ExecuteResponse handleOperationStream(Iterator operationStream) throw } } - // The operation completed successfully but without a result. - throw new IOException("Remote server error: execution terminated with no result"); + // The operation stream completed successfully but without a result. + return null; } finally { close(operationStream); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java b/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java index 8e4e95afd56810..bc2f6ec59940d9 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java @@ -13,10 +13,11 @@ // limitations under the License. package com.google.devtools.build.lib.remote; -import static com.google.common.base.Throwables.throwIfInstanceOf; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import com.google.common.base.Throwables; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory; import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory.ChannelConnection; import com.google.devtools.build.lib.remote.grpc.DynamicConnectionPool; @@ -28,9 +29,12 @@ import io.netty.util.ReferenceCounted; import io.reactivex.rxjava3.annotations.CheckReturnValue; import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.SingleObserver; import io.reactivex.rxjava3.core.SingleSource; +import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.functions.Function; import java.io.IOException; +import java.util.concurrent.ExecutionException; /** * A wrapper around a {@link DynamicConnectionPool} exposing {@link Channel} and a reference count. @@ -80,19 +84,48 @@ public ListenableFuture withChannelFuture( } public T withChannelBlocking(Function source) - throws IOException, InterruptedException { + throws ExecutionException, IOException, InterruptedException { try { - return withChannel(channel -> Single.just(source.apply(channel))).blockingGet(); - } catch (RuntimeException e) { + return withChannelBlockingGet(source); + } catch (ExecutionException e) { Throwable cause = e.getCause(); - if (cause != null) { - throwIfInstanceOf(cause, IOException.class); - throwIfInstanceOf(cause, InterruptedException.class); - } + Throwables.throwIfInstanceOf(cause, IOException.class); + Throwables.throwIfUnchecked(cause); throw e; } } + // prevents rxjava silent possible wrap of RuntimeException and misinterpretation + private T withChannelBlockingGet(Function source) + throws ExecutionException, InterruptedException { + SettableFuture future = SettableFuture.create(); + withChannel(channel -> Single.just(source.apply(channel))) + .subscribe( + new SingleObserver() { + @Override + public void onError(Throwable t) { + future.setException(t); + } + + @Override + public void onSuccess(T t) { + future.set(t); + } + + @Override + public void onSubscribe(Disposable d) { + future.addListener( + () -> { + if (future.isCancelled()) { + d.dispose(); + } + }, + directExecutor()); + } + }); + return future.get(); + } + @CheckReturnValue public Single withChannel(Function> source) { return dynamicConnectionPool diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 2795b133a1224f..18db228f3e254b 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -188,6 +188,7 @@ private static boolean shouldEnableRemoteDownloader(RemoteOptions options) { return !Strings.isNullOrEmpty(options.remoteDownloader); } + @Nullable private static ServerCapabilities getAndVerifyServerCapabilities( RemoteOptions remoteOptions, ReferenceCountedChannel channel, @@ -535,9 +536,12 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { digestUtil, ServerCapabilitiesRequirement.CACHE); } - } catch (IOException e) { + } catch (AbruptExitException e) { + throw e; // prevent abrupt interception + } catch (Exception e) { String errorMessage = - "Failed to query remote execution capabilities: " + Utils.grpcAwareErrorMessage(e); + "Failed to query remote execution capabilities: " + + Utils.grpcAwareErrorMessage(e, verboseFailures); if (remoteOptions.remoteLocalFallback) { if (verboseFailures) { errorMessage += System.lineSeparator() + Throwables.getStackTraceAsString(e); @@ -559,12 +563,12 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { if (Strings.isNullOrEmpty(remoteBytestreamUriPrefix)) { try { remoteBytestreamUriPrefix = cacheChannel.withChannelBlocking(Channel::authority); - } catch (IOException e) { + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } handleInitFailure(env, e, Code.CACHE_INIT_FAILURE); return; - } catch (InterruptedException e) { - handleInitFailure(env, new IOException(e), Code.CACHE_INIT_FAILURE); - return; } if (!Strings.isNullOrEmpty(remoteOptions.remoteInstanceName)) { remoteBytestreamUriPrefix += "/" + remoteOptions.remoteInstanceName; @@ -587,7 +591,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { !remoteOptions.remoteOutputsMode.downloadAllOutputs(), digestUtil, cacheClient); - } catch (IOException e) { + } catch (Exception e) { handleInitFailure(env, e, Code.CACHE_INIT_FAILURE); return; } @@ -652,7 +656,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { !remoteOptions.remoteOutputsMode.downloadAllOutputs(), digestUtil, cacheClient); - } catch (IOException e) { + } catch (Exception e) { handleInitFailure(env, e, Code.CACHE_INIT_FAILURE); return; } @@ -698,7 +702,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { } private static void handleInitFailure( - CommandEnvironment env, IOException e, Code remoteExecutionCode) { + CommandEnvironment env, Exception e, Code remoteExecutionCode) { env.getReporter().handle(Event.error(e.getMessage())); env.getBlazeModuleEnvironment() .exit( @@ -793,7 +797,7 @@ private static void checkClientServerCompatibility( } @Override - public void afterCommand() throws AbruptExitException { + public void afterCommand() { Preconditions.checkNotNull(blockWaitingModule, "blockWaitingModule must not be null"); // Some cleanup tasks must wait until every other BlazeModule's afterCommand() has run, as diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java index a69d4f60e1faea..e13a973ba9bd54 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java @@ -31,7 +31,6 @@ import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import io.grpc.CallCredentials; import io.grpc.Channel; -import io.grpc.StatusRuntimeException; import java.io.IOException; import java.util.List; import java.util.concurrent.TimeUnit; @@ -73,21 +72,17 @@ public ServerCapabilities get(String buildRequestId, String commandId) RequestMetadata metadata = TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "capabilities", null); RemoteActionExecutionContext context = RemoteActionExecutionContext.create(metadata); - try { - GetCapabilitiesRequest request = - instanceName == null - ? GetCapabilitiesRequest.getDefaultInstance() - : GetCapabilitiesRequest.newBuilder().setInstanceName(instanceName).build(); - return retrier.execute( - () -> - channel.withChannelBlocking( - channel -> capabilitiesBlockingStub(context, channel).getCapabilities(request))); - } catch (StatusRuntimeException e) { - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); - } - throw new IOException(e); - } + GetCapabilitiesRequest request = + instanceName == null + ? GetCapabilitiesRequest.getDefaultInstance() + : GetCapabilitiesRequest.newBuilder().setInstanceName(instanceName).build(); + ServerCapabilities caps = + retrier.execute( + () -> + channel.withChannelBlocking( + channel -> + capabilitiesBlockingStub(context, channel).getCapabilities(request))); + return caps; } static class ClientServerCompatibilityStatus { diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java index fd4205bf2925c0..6207d11edce643 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java @@ -19,7 +19,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; -import com.google.common.base.Throwables; import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ExecException; import com.google.devtools.build.lib.actions.FileArtifactValue; @@ -141,13 +140,7 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context) if (BulkTransferException.allCausedByCacheNotFoundException(e)) { // Intentionally left blank } else { - String errorMessage; - if (!verboseFailures) { - errorMessage = Utils.grpcAwareErrorMessage(e); - } else { - // On --verbose_failures print the whole stack trace - errorMessage = "\n" + Throwables.getStackTraceAsString(e); - } + String errorMessage = Utils.grpcAwareErrorMessage(e, verboseFailures); if (isNullOrEmpty(errorMessage)) { errorMessage = e.getClass().getSimpleName(); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java index ec7e7d7e86d6e2..4be0774599b770 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java @@ -30,7 +30,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; -import com.google.common.base.Throwables; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.CommandLines.ParamFileActionInput; @@ -573,11 +572,7 @@ private SpawnResult handleError( catastrophe = false; } - String errorMessage = Utils.grpcAwareErrorMessage(exception); - if (verboseFailures) { - // On --verbose_failures print the whole stack trace - errorMessage += "\n" + Throwables.getStackTraceAsString(exception); - } + String errorMessage = Utils.grpcAwareErrorMessage(exception, verboseFailures); if (exception.getCause() instanceof ExecutionStatusException) { ExecutionStatusException e = (ExecutionStatusException) exception.getCause(); diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java index 720560b8e925af..4ca78d304adb7a 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java +++ b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java @@ -350,7 +350,7 @@ private static String executionStatusExceptionErrorMessage(ExecutionStatusExcept + errorDetailsMessage(status.getDetailsList()); } - public static String grpcAwareErrorMessage(IOException e) { + private static String grpcAwareErrorMessage(IOException e) { io.grpc.Status errStatus = io.grpc.Status.fromThrowable(e); if (e.getCause() instanceof ExecutionStatusException) { // Display error message returned by the remote service. diff --git a/src/test/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutorTest.java b/src/test/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutorTest.java index d42021a4027141..bbfcd8a3a7e5b8 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutorTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutorTest.java @@ -17,258 +17,63 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.Assert.assertThrows; -import build.bazel.remote.execution.v2.ActionResult; -import build.bazel.remote.execution.v2.Digest; -import build.bazel.remote.execution.v2.ExecuteRequest; import build.bazel.remote.execution.v2.ExecuteResponse; -import build.bazel.remote.execution.v2.ExecutionCapabilities; -import build.bazel.remote.execution.v2.OutputFile; -import build.bazel.remote.execution.v2.RequestMetadata; import build.bazel.remote.execution.v2.ServerCapabilities; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.authandtls.CallCredentialsProvider; import com.google.devtools.build.lib.remote.RemoteRetrier.ExponentialBackoff; import com.google.devtools.build.lib.remote.common.OperationObserver; -import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; -import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory; -import com.google.devtools.build.lib.remote.options.RemoteOptions; +import com.google.devtools.build.lib.remote.common.RemoteExecutionClient; import com.google.devtools.build.lib.remote.util.TestUtils; -import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; -import com.google.devtools.common.options.Options; -import com.google.longrunning.Operation; -import com.google.rpc.Code; -import io.grpc.ManagedChannel; -import io.grpc.Server; import io.grpc.Status; -import io.grpc.inprocess.InProcessChannelBuilder; -import io.grpc.inprocess.InProcessServerBuilder; -import io.reactivex.rxjava3.core.Single; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.Executors; -import org.junit.After; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** Tests for {@link ExperimentalGrpcRemoteExecutor}. */ @RunWith(JUnit4.class) -public class ExperimentalGrpcRemoteExecutorTest { - - private RemoteActionExecutionContext context; - private FakeExecutionService executionService; - private RemoteOptions remoteOptions; - private Server fakeServer; +public class ExperimentalGrpcRemoteExecutorTest extends GrpcRemoteExecutorTestBase { private ListeningScheduledExecutorService retryService; - ExperimentalGrpcRemoteExecutor executor; - - private static final int MAX_RETRY_ATTEMPTS = 5; - - private static final OutputFile DUMMY_OUTPUT = - OutputFile.newBuilder() - .setPath("dummy.txt") - .setDigest( - Digest.newBuilder() - .setHash("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855") - .setSizeBytes(0) - .build()) - .build(); - - private static final ExecuteRequest DUMMY_REQUEST = - ExecuteRequest.newBuilder() - .setInstanceName("dummy") - .setActionDigest( - Digest.newBuilder() - .setHash("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855") - .setSizeBytes(0) - .build()) - .build(); - - private static final ExecuteResponse DUMMY_RESPONSE = - ExecuteResponse.newBuilder() - .setResult(ActionResult.newBuilder().addOutputFiles(DUMMY_OUTPUT).build()) - .build(); - - @Before - public final void setUp() throws Exception { - context = RemoteActionExecutionContext.create(RequestMetadata.getDefaultInstance()); - - executionService = new FakeExecutionService(); - - String fakeServerName = "fake server for " + getClass(); - // Use a mutable service registry for later registering the service impl for each test case. - fakeServer = - InProcessServerBuilder.forName(fakeServerName) - .addService(executionService) - .directExecutor() - .build() - .start(); - remoteOptions = Options.getDefaults(RemoteOptions.class); - remoteOptions.remoteMaxRetryAttempts = MAX_RETRY_ATTEMPTS; - - retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); + @Override + protected RemoteExecutionClient createExecutionService( + ServerCapabilities caps, ReferenceCountedChannel channel) throws Exception { RemoteRetrier retrier = TestUtils.newRemoteRetrier( () -> new ExponentialBackoff(remoteOptions), RemoteRetrier.RETRIABLE_GRPC_ERRORS, retryService); - ReferenceCountedChannel channel = - new ReferenceCountedChannel( - new ChannelConnectionFactory() { - @Override - public Single create() { - ManagedChannel ch = - InProcessChannelBuilder.forName(fakeServerName) - .intercept(TracingMetadataUtils.newExecHeadersInterceptor(remoteOptions)) - .directExecutor() - .build(); - return Single.just(new ChannelConnection(ch)); - } - - @Override - public int maxConcurrency() { - return 100; - } - }); - ServerCapabilities caps = - ServerCapabilities.newBuilder() - .setExecutionCapabilities( - ExecutionCapabilities.newBuilder().setExecEnabled(true).build()) - .build(); + return new ExperimentalGrpcRemoteExecutor( + caps, remoteOptions, channel, CallCredentialsProvider.NO_CREDENTIALS, retrier); + } - executor = - new ExperimentalGrpcRemoteExecutor( - caps, remoteOptions, channel, CallCredentialsProvider.NO_CREDENTIALS, retrier); + @Override + public void setUp() throws Exception { + retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); + super.setUp(); } - @After + @Override public void tearDown() throws Exception { retryService.shutdownNow(); retryService.awaitTermination( com.google.devtools.build.lib.testutil.TestUtils.WAIT_TIMEOUT_SECONDS, SECONDS); - - fakeServer.shutdownNow(); - fakeServer.awaitTermination(); - - executor.close(); - } - - @Test - public void executeRemotely_smoke() throws Exception { - executionService.whenExecute(DUMMY_REQUEST).thenAck().thenAck().thenDone(DUMMY_RESPONSE); - - ExecuteResponse response = - executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP); - - assertThat(response).isEqualTo(DUMMY_RESPONSE); - assertThat(executionService.getExecTimes()).isEqualTo(1); - } - - @Test - public void executeRemotely_errorInOperation_retryExecute() throws Exception { - executionService.whenExecute(DUMMY_REQUEST).thenError(new RuntimeException("Unavailable")); - executionService.whenExecute(DUMMY_REQUEST).thenError(Code.UNAVAILABLE); - executionService.whenExecute(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE); - - ExecuteResponse response = - executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP); - - assertThat(executionService.getExecTimes()).isEqualTo(3); - assertThat(response).isEqualTo(DUMMY_RESPONSE); - } - - @Test - public void executeRemotely_errorInResponse_retryExecute() throws Exception { - executionService - .whenExecute(DUMMY_REQUEST) - .thenDone( - ExecuteResponse.newBuilder() - .setStatus(com.google.rpc.Status.newBuilder().setCode(Code.UNAVAILABLE_VALUE)) - .build()); - executionService.whenExecute(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE); - - ExecuteResponse response = - executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP); - - assertThat(executionService.getExecTimes()).isEqualTo(2); - assertThat(response).isEqualTo(DUMMY_RESPONSE); - } - - @Test - public void executeRemotely_unretriableErrorInResponse_reportError() { - executionService - .whenExecute(DUMMY_REQUEST) - .thenDone( - ExecuteResponse.newBuilder() - .setStatus(com.google.rpc.Status.newBuilder().setCode(Code.INVALID_ARGUMENT_VALUE)) - .build()); - executionService.whenExecute(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE); - - IOException e = - assertThrows( - IOException.class, - () -> { - executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP); - }); - - assertThat(e).hasMessageThat().contains("INVALID_ARGUMENT"); - assertThat(executionService.getExecTimes()).isEqualTo(1); - } - - @Test - public void executeRemotely_retryExecuteAndFail() { - for (int i = 0; i <= MAX_RETRY_ATTEMPTS * 2; ++i) { - executionService.whenExecute(DUMMY_REQUEST).thenError(Code.UNAVAILABLE); - } - - IOException exception = - assertThrows( - IOException.class, - () -> { - executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP); - }); - - assertThat(executionService.getExecTimes()).isEqualTo(MAX_RETRY_ATTEMPTS + 1); - assertThat(exception).hasMessageThat().contains("UNAVAILABLE"); - } - - @Test - public void executeRemotely_executeAndWait() throws Exception { - executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE); - executionService.whenWaitExecution(DUMMY_REQUEST).thenDone(DUMMY_RESPONSE); - - ExecuteResponse response = - executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP); - - assertThat(executionService.getExecTimes()).isEqualTo(1); - assertThat(executionService.getWaitTimes()).isEqualTo(1); - assertThat(response).isEqualTo(DUMMY_RESPONSE); - } - - @Test - public void executeRemotely_executeAndRetryWait() throws Exception { - executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE); - executionService.whenWaitExecution(DUMMY_REQUEST).thenDone(DUMMY_RESPONSE); - - ExecuteResponse response = - executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP); - - assertThat(executionService.getExecTimes()).isEqualTo(1); - assertThat(executionService.getWaitTimes()).isEqualTo(1); - assertThat(response).isEqualTo(DUMMY_RESPONSE); + super.tearDown(); } @Test public void executeRemotely_executeAndRetryWait_forever() throws Exception { - executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE); + executionService.whenExecute(DUMMY_REQUEST).thenAck().finish(); int errorTimes = MAX_RETRY_ATTEMPTS * 2; for (int i = 0; i < errorTimes; ++i) { - executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenError(Code.DEADLINE_EXCEEDED); + executionService + .whenWaitExecution(DUMMY_REQUEST) + .thenAck() + .thenError(Status.DEADLINE_EXCEEDED.asRuntimeException()); } executionService.whenWaitExecution(DUMMY_REQUEST).thenDone(DUMMY_RESPONSE); @@ -282,9 +87,11 @@ public void executeRemotely_executeAndRetryWait_forever() throws Exception { @Test public void executeRemotely_executeAndRetryWait_failForConsecutiveErrors() { - executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE); + executionService.whenExecute(DUMMY_REQUEST).thenAck().finish(); for (int i = 0; i < MAX_RETRY_ATTEMPTS * 2; ++i) { - executionService.whenWaitExecution(DUMMY_REQUEST).thenError(Code.UNAVAILABLE); + executionService + .whenWaitExecution(DUMMY_REQUEST) + .thenError(Status.UNAVAILABLE.asRuntimeException()); } assertThrows( @@ -340,24 +147,14 @@ public void executeRemotely_responseWithoutResult_shouldNotCrash() { assertThat(executionService.getExecTimes()).isEqualTo(1); } - @Test - public void executeRemotely_retryExecuteWhenUnauthenticated() - throws IOException, InterruptedException { - executionService.whenExecute(DUMMY_REQUEST).thenError(Code.UNAUTHENTICATED); - executionService.whenExecute(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE); - - ExecuteResponse response = - executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP); - - assertThat(executionService.getExecTimes()).isEqualTo(2); - assertThat(response).isEqualTo(DUMMY_RESPONSE); - } - @Test public void executeRemotely_retryWaitExecutionWhenUnauthenticated() throws IOException, InterruptedException { - executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE); - executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenError(Code.UNAUTHENTICATED); + executionService.whenExecute(DUMMY_REQUEST).thenAck().finish(); + executionService + .whenWaitExecution(DUMMY_REQUEST) + .thenAck() + .thenError(Status.UNAUTHENTICATED.asRuntimeException()); executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE); ExecuteResponse response = @@ -367,53 +164,4 @@ public void executeRemotely_retryWaitExecutionWhenUnauthenticated() assertThat(executionService.getWaitTimes()).isEqualTo(2); assertThat(response).isEqualTo(DUMMY_RESPONSE); } - - @Test - public void executeRemotely_retryExecuteIfNotFound() throws IOException, InterruptedException { - executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE); - executionService.whenWaitExecution(DUMMY_REQUEST).thenError(Code.NOT_FOUND); - executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE); - executionService.whenWaitExecution(DUMMY_REQUEST).thenDone(DUMMY_RESPONSE); - - ExecuteResponse response = - executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP); - - assertThat(executionService.getExecTimes()).isEqualTo(2); - assertThat(executionService.getWaitTimes()).isEqualTo(2); - assertThat(response).isEqualTo(DUMMY_RESPONSE); - } - - @Test - public void executeRemotely_notFoundLoop_reportError() { - for (int i = 0; i <= MAX_RETRY_ATTEMPTS * 2; ++i) { - executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE); - executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenError(Code.NOT_FOUND); - } - - IOException e = - assertThrows( - IOException.class, - () -> { - executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP); - }); - - assertThat(e).hasCauseThat().isInstanceOf(ExecutionStatusException.class); - ExecutionStatusException executionStatusException = (ExecutionStatusException) e.getCause(); - assertThat(executionStatusException.getStatus().getCode()).isEqualTo(Status.Code.NOT_FOUND); - assertThat(executionService.getExecTimes()).isEqualTo(MAX_RETRY_ATTEMPTS + 1); - assertThat(executionService.getWaitTimes()).isEqualTo(MAX_RETRY_ATTEMPTS + 1); - } - - @Test - public void executeRemotely_notifyObserver() throws IOException, InterruptedException { - executionService.whenExecute(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE); - - List notified = new ArrayList<>(); - executor.executeRemotely(context, DUMMY_REQUEST, notified::add); - - assertThat(notified) - .containsExactly( - FakeExecutionService.ackOperation(DUMMY_REQUEST), - FakeExecutionService.doneOperation(DUMMY_REQUEST, DUMMY_RESPONSE)); - } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/FakeExecutionService.java b/src/test/java/com/google/devtools/build/lib/remote/FakeExecutionService.java index 4c9d93989449dc..59170db624241b 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/FakeExecutionService.java +++ b/src/test/java/com/google/devtools/build/lib/remote/FakeExecutionService.java @@ -115,12 +115,17 @@ public void thenDone(ExecuteResponse response) { } public void thenError(Code code) { + // From REAPI Spec: + // > Errors discovered during creation of the `Operation` will be reported + // > as gRPC Status errors, while errors that occurred while running the + // > action will be reported in the `status` field of the `ExecuteResponse`. The + // > server MUST NOT set the `error` field of the `Operation` proto. Operation operation = - Operation.newBuilder() - .setName(getResourceName(request)) - .setDone(true) - .setError(Status.newBuilder().setCode(code.getNumber())) - .build(); + doneOperation( + request, + ExecuteResponse.newBuilder() + .setStatus(Status.newBuilder().setCode(code.getNumber())) + .build()); operations.add(() -> operation); finish(); } @@ -133,7 +138,7 @@ public void thenError(RuntimeException e) { finish(); } - private void finish() { + public void finish() { String name = getResourceName(request); provider.append(name, ImmutableList.copyOf(operations)); } diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutorTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutorTest.java new file mode 100644 index 00000000000000..b5af2b64a0a5c7 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutorTest.java @@ -0,0 +1,103 @@ +// Copyright 2023 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import static com.google.common.truth.Truth.assertThat; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.Assert.assertThrows; + +import build.bazel.remote.execution.v2.ExecuteResponse; +import build.bazel.remote.execution.v2.ServerCapabilities; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.devtools.build.lib.authandtls.CallCredentialsProvider; +import com.google.devtools.build.lib.remote.RemoteRetrier.ExponentialBackoff; +import com.google.devtools.build.lib.remote.common.OperationObserver; +import com.google.devtools.build.lib.remote.common.RemoteExecutionClient; +import com.google.devtools.build.lib.remote.util.TestUtils; +import com.google.rpc.Code; +import java.io.IOException; +import java.util.concurrent.Executors; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link GrpcRemoteExecutor}. */ +@RunWith(JUnit4.class) +public class GrpcRemoteExecutorTest extends GrpcRemoteExecutorTestBase { + private ListeningScheduledExecutorService retryService; + + @Override + public void setUp() throws Exception { + retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); + super.setUp(); + } + + @Override + protected RemoteExecutionClient createExecutionService( + ServerCapabilities caps, ReferenceCountedChannel channel) throws Exception { + RemoteRetrier retrier = + TestUtils.newRemoteRetrier( + () -> new ExponentialBackoff(remoteOptions), + RemoteRetrier.RETRIABLE_GRPC_EXEC_ERRORS, + retryService); + + return new GrpcRemoteExecutor(caps, channel, CallCredentialsProvider.NO_CREDENTIALS, retrier); + } + + @Override + public void tearDown() throws Exception { + retryService.shutdownNow(); + retryService.awaitTermination( + com.google.devtools.build.lib.testutil.TestUtils.WAIT_TIMEOUT_SECONDS, SECONDS); + super.tearDown(); + } + + @Test + public void executeRemotely_operationWithoutResult_crashes() { + executionService.whenExecute(DUMMY_REQUEST).thenDone(); + + assertThrows( + IllegalStateException.class, + () -> executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP)); + // Shouldn't retry in this case + assertThat(executionService.getExecTimes()).isEqualTo(1); + } + + @Test + public void executeRemotely_responseWithoutResult_crashes() { + executionService.whenExecute(DUMMY_REQUEST).thenDone(ExecuteResponse.getDefaultInstance()); + + assertThrows( + IllegalStateException.class, + () -> executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP)); + + assertThat(executionService.getExecTimes()).isEqualTo(1); + } + + @Test + public void executeRemotely_retryWaitExecutionWhenUnauthenticated() + throws IOException, InterruptedException { + executionService.whenExecute(DUMMY_REQUEST).thenAck().finish(); + executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenError(Code.UNAUTHENTICATED); + executionService.whenExecute(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE); + + ExecuteResponse response = + executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP); + + assertThat(executionService.getExecTimes()).isEqualTo(2); + assertThat(executionService.getWaitTimes()).isEqualTo(1); + assertThat(response).isEqualTo(DUMMY_RESPONSE); + } +} diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutorTestBase.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutorTestBase.java new file mode 100644 index 00000000000000..9094b5f5d45446 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutorTestBase.java @@ -0,0 +1,330 @@ +// Copyright 2023 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; + +import build.bazel.remote.execution.v2.ActionResult; +import build.bazel.remote.execution.v2.Digest; +import build.bazel.remote.execution.v2.ExecuteRequest; +import build.bazel.remote.execution.v2.ExecuteResponse; +import build.bazel.remote.execution.v2.ExecutionCapabilities; +import build.bazel.remote.execution.v2.OutputFile; +import build.bazel.remote.execution.v2.RequestMetadata; +import build.bazel.remote.execution.v2.ServerCapabilities; +import com.google.devtools.build.lib.remote.common.OperationObserver; +import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import com.google.devtools.build.lib.remote.common.RemoteExecutionClient; +import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory; +import com.google.devtools.build.lib.remote.options.RemoteOptions; +import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; +import com.google.devtools.common.options.Options; +import com.google.longrunning.Operation; +import com.google.rpc.Code; +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.reactivex.rxjava3.core.Single; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** Base test class for {@link RemoteExecutionClient} gRPC implementations. */ +public abstract class GrpcRemoteExecutorTestBase { + + protected RemoteActionExecutionContext context; + protected FakeExecutionService executionService; + protected RemoteOptions remoteOptions; + private Server fakeServer; + protected RemoteExecutionClient executor; + + protected static final int MAX_RETRY_ATTEMPTS = 5; + + private static final OutputFile DUMMY_OUTPUT = + OutputFile.newBuilder() + .setPath("dummy.txt") + .setDigest( + Digest.newBuilder() + .setHash("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855") + .setSizeBytes(0) + .build()) + .build(); + + protected static final ExecuteRequest DUMMY_REQUEST = + ExecuteRequest.newBuilder() + .setInstanceName("dummy") + .setActionDigest( + Digest.newBuilder() + .setHash("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855") + .setSizeBytes(0) + .build()) + .build(); + + protected static final ExecuteResponse DUMMY_RESPONSE = + ExecuteResponse.newBuilder() + .setResult(ActionResult.newBuilder().addOutputFiles(DUMMY_OUTPUT).build()) + .build(); + + protected abstract RemoteExecutionClient createExecutionService( + ServerCapabilities caps, ReferenceCountedChannel channel) throws Exception; + + @Before + public void setUp() throws Exception { + context = RemoteActionExecutionContext.create(RequestMetadata.getDefaultInstance()); + + executionService = new FakeExecutionService(); + + String fakeServerName = "fake server for " + getClass(); + // Use a mutable service registry for later registering the service impl for each test case. + fakeServer = + InProcessServerBuilder.forName(fakeServerName) + .addService(executionService) + .directExecutor() + .build() + .start(); + + remoteOptions = Options.getDefaults(RemoteOptions.class); + remoteOptions.remoteMaxRetryAttempts = MAX_RETRY_ATTEMPTS; + + ReferenceCountedChannel channel = + new ReferenceCountedChannel( + new ChannelConnectionFactory() { + @Override + public Single create() { + ManagedChannel ch = + InProcessChannelBuilder.forName(fakeServerName) + .intercept(TracingMetadataUtils.newExecHeadersInterceptor(remoteOptions)) + .directExecutor() + .build(); + return Single.just(new ChannelConnection(ch)); + } + + @Override + public int maxConcurrency() { + return 100; + } + }); + + ServerCapabilities caps = + ServerCapabilities.newBuilder() + .setExecutionCapabilities( + ExecutionCapabilities.newBuilder().setExecEnabled(true).build()) + .build(); + + executor = createExecutionService(caps, channel); + } + + @After + public void tearDown() throws Exception { + fakeServer.shutdownNow(); + fakeServer.awaitTermination(); + + executor.close(); + } + + @Test + public void executeRemotely_smoke() throws Exception { + executionService.whenExecute(DUMMY_REQUEST).thenAck().thenAck().thenDone(DUMMY_RESPONSE); + + ExecuteResponse response = + executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP); + + assertThat(response).isEqualTo(DUMMY_RESPONSE); + assertThat(executionService.getExecTimes()).isEqualTo(1); + } + + @Test + public void executeRemotely_errorInOperation_retryExecute() throws Exception { + executionService.whenExecute(DUMMY_REQUEST).thenError(new RuntimeException("Unavailable")); + executionService.whenExecute(DUMMY_REQUEST).thenError(Code.UNAVAILABLE); + executionService.whenExecute(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE); + + ExecuteResponse response = + executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP); + + assertThat(executionService.getExecTimes()).isEqualTo(3); + assertThat(response).isEqualTo(DUMMY_RESPONSE); + } + + @Test + public void executeRemotely_errorInResponse_retryExecute() throws Exception { + executionService + .whenExecute(DUMMY_REQUEST) + .thenDone( + ExecuteResponse.newBuilder() + .setStatus(com.google.rpc.Status.newBuilder().setCode(Code.UNAVAILABLE_VALUE)) + .build()); + executionService.whenExecute(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE); + + ExecuteResponse response = + executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP); + + assertThat(executionService.getExecTimes()).isEqualTo(2); + assertThat(response).isEqualTo(DUMMY_RESPONSE); + } + + @Test + public void executeRemotely_unretriableErrorInResponse_reportError() { + executionService + .whenExecute(DUMMY_REQUEST) + .thenDone( + ExecuteResponse.newBuilder() + .setStatus(com.google.rpc.Status.newBuilder().setCode(Code.INVALID_ARGUMENT_VALUE)) + .build()); + executionService.whenExecute(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE); + + IOException e = + assertThrows( + IOException.class, + () -> executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP)); + + assertThat(e).hasMessageThat().contains("INVALID_ARGUMENT"); + assertThat(executionService.getExecTimes()).isEqualTo(1); + } + + @Test + public void executeRemotely_retryExecuteAndFail() { + for (int i = 0; i <= MAX_RETRY_ATTEMPTS * 2; ++i) { + executionService.whenExecute(DUMMY_REQUEST).thenError(Code.UNAVAILABLE); + } + + IOException exception = + assertThrows( + IOException.class, + () -> executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP)); + + assertThat(executionService.getExecTimes()).isEqualTo(MAX_RETRY_ATTEMPTS + 1); + assertThat(exception).hasMessageThat().contains("UNAVAILABLE"); + } + + @Test + public void executeRemotely_executeAndWait() throws Exception { + executionService.whenExecute(DUMMY_REQUEST).thenAck().finish(); + executionService.whenWaitExecution(DUMMY_REQUEST).thenDone(DUMMY_RESPONSE); + + ExecuteResponse response = + executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP); + + assertThat(executionService.getExecTimes()).isEqualTo(1); + assertThat(executionService.getWaitTimes()).isEqualTo(1); + assertThat(response).isEqualTo(DUMMY_RESPONSE); + } + + @Test + public void executeRemotely_executeAndRetryWait() throws Exception { + executionService.whenExecute(DUMMY_REQUEST).thenAck().finish(); + executionService.whenWaitExecution(DUMMY_REQUEST).thenDone(DUMMY_RESPONSE); + + ExecuteResponse response = + executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP); + + assertThat(executionService.getExecTimes()).isEqualTo(1); + assertThat(executionService.getWaitTimes()).isEqualTo(1); + assertThat(response).isEqualTo(DUMMY_RESPONSE); + } + + @Test + public void executeRemotely_retryExecuteWhenUnauthenticated() + throws IOException, InterruptedException { + executionService.whenExecute(DUMMY_REQUEST).thenError(Code.UNAUTHENTICATED); + executionService.whenExecute(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE); + + ExecuteResponse response = + executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP); + + assertThat(executionService.getExecTimes()).isEqualTo(2); + assertThat(response).isEqualTo(DUMMY_RESPONSE); + } + + @Test + public void executeRemotely_retryExecuteIfNotFound() throws IOException, InterruptedException { + executionService.whenExecute(DUMMY_REQUEST).thenAck().finish(); + executionService.whenWaitExecution(DUMMY_REQUEST).thenError(Code.NOT_FOUND); + executionService.whenExecute(DUMMY_REQUEST).thenAck().finish(); + executionService.whenWaitExecution(DUMMY_REQUEST).thenDone(DUMMY_RESPONSE); + + ExecuteResponse response = + executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP); + + assertThat(executionService.getExecTimes()).isEqualTo(2); + assertThat(executionService.getWaitTimes()).isEqualTo(2); + assertThat(response).isEqualTo(DUMMY_RESPONSE); + } + + @Test + public void executeRemotely_retryExecuteOnFinish() throws IOException, InterruptedException { + executionService.whenExecute(DUMMY_REQUEST).thenAck().finish(); + executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().finish(); + executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE); + + ExecuteResponse response = + executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP); + + assertThat(executionService.getExecTimes()).isEqualTo(1); + assertThat(executionService.getWaitTimes()).isEqualTo(2); + assertThat(response).isEqualTo(DUMMY_RESPONSE); + } + + @Test + public void executeRemotely_notFoundLoop_reportError() { + for (int i = 0; i <= MAX_RETRY_ATTEMPTS; ++i) { + executionService.whenExecute(DUMMY_REQUEST).thenAck().finish(); + executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenError(Code.NOT_FOUND); + } + + IOException e = + assertThrows( + IOException.class, + () -> executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP)); + + assertThat(e).hasCauseThat().isInstanceOf(ExecutionStatusException.class); + ExecutionStatusException executionStatusException = (ExecutionStatusException) e.getCause(); + assertThat(executionStatusException.getStatus().getCode()).isEqualTo(Status.Code.NOT_FOUND); + assertThat(executionService.getExecTimes()).isEqualTo(MAX_RETRY_ATTEMPTS + 1); + assertThat(executionService.getWaitTimes()).isEqualTo(MAX_RETRY_ATTEMPTS + 1); + } + + @Test + public void executeRemotely_notifyObserver() throws IOException, InterruptedException { + executionService.whenExecute(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE); + + List notified = new ArrayList<>(); + var unused = executor.executeRemotely(context, DUMMY_REQUEST, notified::add); + + assertThat(notified) + .containsExactly( + FakeExecutionService.ackOperation(DUMMY_REQUEST), + FakeExecutionService.doneOperation(DUMMY_REQUEST, DUMMY_RESPONSE)); + } + + @Test + public void executeRemotely_retryExecuteOnNoResultDoneOperation() + throws IOException, InterruptedException { + executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE); + executionService.whenExecute(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE); + + ExecuteResponse response = + executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP); + + assertThat(executionService.getExecTimes()).isEqualTo(2); + assertThat(executionService.getWaitTimes()).isEqualTo(0); + assertThat(response).isEqualTo(DUMMY_RESPONSE); + } +} diff --git a/src/test/java/com/google/devtools/build/lib/remote/UtilsTest.java b/src/test/java/com/google/devtools/build/lib/remote/UtilsTest.java index 6dcb2d53fdcb8b..c99d88bb2b9d3c 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/UtilsTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/UtilsTest.java @@ -36,14 +36,35 @@ public class UtilsTest { @Test - public void testGrpcAwareErrorMessages() { + public void testGrpcAwareErrorMessage() { IOException ioError = new IOException("io error"); IOException wrappedGrpcError = new IOException( "wrapped error", Status.ABORTED.withDescription("grpc error").asRuntimeException()); - assertThat(Utils.grpcAwareErrorMessage(ioError)).isEqualTo("io error"); - assertThat(Utils.grpcAwareErrorMessage(wrappedGrpcError)).isEqualTo("ABORTED: grpc error"); + assertThat(Utils.grpcAwareErrorMessage(ioError, /* verboseFailures= */ false)) + .isEqualTo("io error"); + assertThat(Utils.grpcAwareErrorMessage(wrappedGrpcError, /* verboseFailures= */ false)) + .isEqualTo("ABORTED: grpc error"); + } + + @Test + public void testGrpcAwareErrorMessage_verboseFailures() { + IOException ioError = new IOException("io error"); + IOException wrappedGrpcError = + new IOException( + "wrapped error", Status.ABORTED.withDescription("grpc error").asRuntimeException()); + + assertThat(Utils.grpcAwareErrorMessage(ioError, /* verboseFailures= */ true)) + .startsWith( + "io error\n" + + "java.io.IOException: io error\n" + + "\tat com.google.devtools.build.lib.remote.UtilsTest.testGrpcAwareErrorMessage_verboseFailures"); + assertThat(Utils.grpcAwareErrorMessage(wrappedGrpcError, /* verboseFailures= */ true)) + .startsWith( + "ABORTED: grpc error\n" + + "java.io.IOException: wrapped error\n" + + "\tat com.google.devtools.build.lib.remote.UtilsTest.testGrpcAwareErrorMessage_verboseFailures"); } @Test