diff --git a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java index a257637de22..6b8537fd658 100644 --- a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java +++ b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java @@ -19,17 +19,15 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.MoreObjects; import io.grpc.ChannelLogger.ChannelLogLevel; import io.grpc.ConnectivityState; import io.grpc.ConnectivityStateInfo; import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.FixedResultPicker; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.PickResult; -import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.ResolvedAddresses; import io.grpc.LoadBalancer.Subchannel; -import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; import io.grpc.NameResolver.ConfigOrError; @@ -110,7 +108,8 @@ Status tryAcceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { defaultProvider = getProviderOrThrow(defaultPolicy, "using default policy"); } catch (PolicyException e) { Status s = Status.INTERNAL.withDescription(e.getMessage()); - helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new FailingPicker(s)); + helper.updateBalancingState( + ConnectivityState.TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(s))); delegate.shutdown(); delegateProvider = null; delegate = new NoopLoadBalancer(); @@ -122,7 +121,8 @@ Status tryAcceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { if (delegateProvider == null || !policySelection.provider.getPolicyName().equals(delegateProvider.getPolicyName())) { - helper.updateBalancingState(ConnectivityState.CONNECTING, new EmptyPicker()); + helper.updateBalancingState( + ConnectivityState.CONNECTING, new FixedResultPicker(PickResult.withNoResult())); delegate.shutdown(); delegateProvider = policySelection.provider; LoadBalancer old = delegate; @@ -236,30 +236,4 @@ private PolicyException(String msg) { super(msg); } } - - private static final class EmptyPicker extends SubchannelPicker { - - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return PickResult.withNoResult(); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(EmptyPicker.class).toString(); - } - } - - private static final class FailingPicker extends SubchannelPicker { - private final Status failure; - - FailingPicker(Status failure) { - this.failure = failure; - } - - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return PickResult.withError(failure); - } - } } diff --git a/core/src/main/java/io/grpc/internal/OobChannel.java b/core/src/main/java/io/grpc/internal/OobChannel.java index 71973ed5d64..b2272a89672 100644 --- a/core/src/main/java/io/grpc/internal/OobChannel.java +++ b/core/src/main/java/io/grpc/internal/OobChannel.java @@ -38,8 +38,8 @@ import io.grpc.InternalLogId; import io.grpc.InternalWithLogId; import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.FixedResultPicker; import io.grpc.LoadBalancer.PickResult; -import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.ManagedChannel; @@ -182,23 +182,7 @@ public Object getInternalSubchannel() { } }; - final class OobSubchannelPicker extends SubchannelPicker { - final PickResult result = PickResult.withSubchannel(subchannelImpl); - - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return result; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(OobSubchannelPicker.class) - .add("result", result) - .toString(); - } - } - - subchannelPicker = new OobSubchannelPicker(); + subchannelPicker = new FixedResultPicker(PickResult.withSubchannel(subchannelImpl)); delayedTransport.reprocess(subchannelPicker); } @@ -270,23 +254,8 @@ void handleSubchannelStateChange(final ConnectivityStateInfo newState) { delayedTransport.reprocess(subchannelPicker); break; case TRANSIENT_FAILURE: - final class OobErrorPicker extends SubchannelPicker { - final PickResult errorResult = PickResult.withError(newState.getStatus()); - - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return errorResult; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(OobErrorPicker.class) - .add("errorResult", errorResult) - .toString(); - } - } - - delayedTransport.reprocess(new OobErrorPicker()); + delayedTransport.reprocess( + new FixedResultPicker(PickResult.withError(newState.getStatus()))); break; default: // Do nothing diff --git a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java index 2689d7d2308..935214a94fd 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java @@ -24,7 +24,6 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import io.grpc.Attributes; @@ -164,7 +163,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { if (noOldAddrs) { // Make tests happy; they don't properly assume starting in CONNECTING rawConnectivityState = CONNECTING; - updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult())); + updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); } if (rawConnectivityState == READY) { @@ -237,7 +236,7 @@ public void handleNameResolutionError(Status error) { subchannels.clear(); addressIndex.updateGroups(ImmutableList.of()); rawConnectivityState = TRANSIENT_FAILURE; - updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error))); + updateBalancingState(TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error))); } void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo stateInfo) { @@ -290,7 +289,7 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo case CONNECTING: rawConnectivityState = CONNECTING; - updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult())); + updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); break; case READY: @@ -322,7 +321,7 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo if (isPassComplete()) { rawConnectivityState = TRANSIENT_FAILURE; updateBalancingState(TRANSIENT_FAILURE, - new Picker(PickResult.withError(stateInfo.getStatus()))); + new FixedResultPicker(PickResult.withError(stateInfo.getStatus()))); // Refresh Name Resolution, but only when all 3 conditions are met // * We are at the end of addressIndex @@ -385,11 +384,11 @@ private void updateHealthCheckedState(SubchannelData subchannelData) { updateBalancingState(READY, new FixedResultPicker(PickResult.withSubchannel(subchannelData.subchannel))); } else if (subchannelData.getHealthState() == TRANSIENT_FAILURE) { - updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError( + updateBalancingState(TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError( subchannelData.healthStateInfo.getStatus()))); } else if (concludedState != TRANSIENT_FAILURE) { updateBalancingState(subchannelData.getHealthState(), - new Picker(PickResult.withNoResult())); + new FixedResultPicker(PickResult.withNoResult())); } } @@ -593,28 +592,6 @@ ConnectivityState getConcludedConnectivityState() { return this.concludedState; } - /** - * No-op picker which doesn't add any custom picking logic. It just passes already known result - * received in constructor. - */ - private static final class Picker extends SubchannelPicker { - private final PickResult result; - - Picker(PickResult result) { - this.result = checkNotNull(result, "result"); - } - - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return result; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(Picker.class).add("result", result).toString(); - } - } - /** * Picker that requests connection during the first pick, and returns noResult. */ diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java index a23855e67ec..aa8b5a7e9a9 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java @@ -22,7 +22,6 @@ import static io.grpc.ConnectivityState.SHUTDOWN; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; -import com.google.common.base.MoreObjects; import io.grpc.ConnectivityState; import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; @@ -87,7 +86,7 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) { // The channel state does not get updated when doing name resolving today, so for the moment // let LB report CONNECTION and call subchannel.requestConnection() immediately. - updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel))); + updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); subchannel.requestConnection(); } else { subchannel.updateAddresses(servers); @@ -105,7 +104,7 @@ public void handleNameResolutionError(Status error) { // NB(lukaszx0) Whether we should propagate the error unconditionally is arguable. It's fine // for time being. - updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error))); + updateBalancingState(TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error))); } private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { @@ -139,13 +138,13 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo case CONNECTING: // It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave // the current picker in-place. But ignoring the potential optimization is simpler. - picker = new Picker(PickResult.withNoResult()); + picker = new FixedResultPicker(PickResult.withNoResult()); break; case READY: - picker = new Picker(PickResult.withSubchannel(subchannel)); + picker = new FixedResultPicker(PickResult.withSubchannel(subchannel)); break; case TRANSIENT_FAILURE: - picker = new Picker(PickResult.withError(stateInfo.getStatus())); + picker = new FixedResultPicker(PickResult.withError(stateInfo.getStatus())); break; default: throw new IllegalArgumentException("Unsupported state:" + newState); @@ -173,28 +172,6 @@ public void requestConnection() { } } - /** - * No-op picker which doesn't add any custom picking logic. It just passes already known result - * received in constructor. - */ - private static final class Picker extends SubchannelPicker { - private final PickResult result; - - Picker(PickResult result) { - this.result = checkNotNull(result, "result"); - } - - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return result; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(Picker.class).add("result", result).toString(); - } - } - /** Picker that requests connection during the first pick, and returns noResult. */ private final class RequestConnectionPicker extends SubchannelPicker { private final AtomicBoolean connectionRequested = new AtomicBoolean(false); diff --git a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java index 3e0258f2e40..819293e070b 100644 --- a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java @@ -219,7 +219,7 @@ public void refreshNameResolutionAfterSubchannelConnectionBroken() { inOrder.verify(mockSubchannel).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - assertSame(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); + assertThat(pickerCaptor.getValue().pickSubchannel(mockArgs).hasResult()).isFalse(); inOrder.verify(mockSubchannel).requestConnection(); stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); @@ -278,7 +278,7 @@ public void pickAfterResolvedAndChanged() throws Exception { assertThat(args.getAddresses()).isEqualTo(servers); inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); verify(mockSubchannel).requestConnection(); - assertEquals(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); + assertThat(pickerCaptor.getValue().pickSubchannel(mockArgs).hasResult()).isFalse(); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); @@ -300,7 +300,7 @@ public void pickAfterStateChangeAfterResolution() throws Exception { verify(mockSubchannel).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - Subchannel subchannel = pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel(); + assertThat(pickerCaptor.getValue().pickSubchannel(mockArgs).hasResult()).isFalse(); reset(mockHelper); when(mockHelper.getSynchronizationContext()).thenReturn(syncContext); @@ -317,7 +317,7 @@ public void pickAfterStateChangeAfterResolution() throws Exception { stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); - assertEquals(subchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); + assertEquals(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); verify(mockHelper, atLeast(0)).getSynchronizationContext(); // Don't care verifyNoMoreInteractions(mockHelper); @@ -405,8 +405,7 @@ public void nameResolutionSuccessAfterError() throws Exception { inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); verify(mockSubchannel).requestConnection(); - assertEquals(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs) - .getSubchannel()); + assertThat(pickerCaptor.getValue().pickSubchannel(mockArgs).hasResult()).isFalse(); assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs), pickerCaptor.getValue().pickSubchannel(mockArgs)); diff --git a/examples/src/main/java/io/grpc/examples/customloadbalance/ShufflingPickFirstLoadBalancer.java b/examples/src/main/java/io/grpc/examples/customloadbalance/ShufflingPickFirstLoadBalancer.java index b49c856c4d0..4715b551524 100644 --- a/examples/src/main/java/io/grpc/examples/customloadbalance/ShufflingPickFirstLoadBalancer.java +++ b/examples/src/main/java/io/grpc/examples/customloadbalance/ShufflingPickFirstLoadBalancer.java @@ -92,7 +92,7 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) { }); this.subchannel = subchannel; - helper.updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult())); + helper.updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); subchannel.requestConnection(); } else { subchannel.updateAddresses(servers); @@ -107,7 +107,8 @@ public void handleNameResolutionError(Status error) { subchannel.shutdown(); subchannel = null; } - helper.updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error))); + helper.updateBalancingState( + TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error))); } private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { @@ -125,13 +126,13 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo picker = new RequestConnectionPicker(); break; case CONNECTING: - picker = new Picker(PickResult.withNoResult()); + picker = new FixedResultPicker(PickResult.withNoResult()); break; case READY: - picker = new Picker(PickResult.withSubchannel(subchannel)); + picker = new FixedResultPicker(PickResult.withSubchannel(subchannel)); break; case TRANSIENT_FAILURE: - picker = new Picker(PickResult.withError(stateInfo.getStatus())); + picker = new FixedResultPicker(PickResult.withError(stateInfo.getStatus())); break; default: throw new IllegalArgumentException("Unsupported state:" + currentState); @@ -154,29 +155,6 @@ public void requestConnection() { } } - /** - * No-op picker which doesn't add any custom picking logic. It just passes already known result - * received in constructor. - */ - private static final class Picker extends SubchannelPicker { - - private final PickResult result; - - Picker(PickResult result) { - this.result = checkNotNull(result, "result"); - } - - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return result; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(Picker.class).add("result", result).toString(); - } - } - /** * Picker that requests connection during the first pick, and returns noResult. */ diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 7ca20d58bce..2fc2a492ac8 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -187,6 +187,7 @@ enum Mode { private List dropList = Collections.emptyList(); // Contains only non-drop, i.e., backends from the round-robin list from the balancer. private List backendList = Collections.emptyList(); + private ConnectivityState currentState = ConnectivityState.CONNECTING; private RoundRobinPicker currentPicker = new RoundRobinPicker(Collections.emptyList(), Arrays.asList(BUFFER_ENTRY)); private boolean requestConnectionPending; @@ -937,10 +938,12 @@ private void maybeUpdatePicker(ConnectivityState state, RoundRobinPicker picker) // Discard the new picker if we are sure it won't make any difference, in order to save // re-processing pending streams, and avoid unnecessary resetting of the pointer in // RoundRobinPicker. - if (picker.dropList.equals(currentPicker.dropList) + if (state.equals(currentState) + && picker.dropList.equals(currentPicker.dropList) && picker.pickList.equals(currentPicker.pickList)) { return; } + currentState = state; currentPicker = picker; helper.updateBalancingState(state, picker); } diff --git a/rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java b/rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java index 6e59e867e32..848199f50a8 100644 --- a/rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java +++ b/rls/src/main/java/io/grpc/rls/RlsLoadBalancer.java @@ -19,7 +19,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.MoreObjects; import io.grpc.ChannelLogger; import io.grpc.ChannelLogger.ChannelLogLevel; import io.grpc.ConnectivityState; @@ -93,27 +92,14 @@ public void requestConnection() { @Override public void handleNameResolutionError(final Status error) { - class ErrorPicker extends SubchannelPicker { - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return PickResult.withError(error); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("error", error) - .toString(); - } - } - if (routeLookupClient != null) { logger.log(ChannelLogLevel.DEBUG, "closing the routeLookupClient on a name resolution error"); routeLookupClient.close(); routeLookupClient = null; lbPolicyConfiguration = null; } - helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new ErrorPicker()); + helper.updateBalancingState( + ConnectivityState.TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error))); } @Override diff --git a/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java b/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java index 12483d60794..b349aecdbf3 100644 --- a/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java +++ b/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java @@ -1025,14 +1025,9 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { @Override public void handleNameResolutionError(final Status error) { - class ErrorPicker extends SubchannelPicker { - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - return PickResult.withError(error); - } - } - - helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new ErrorPicker()); + helper.updateBalancingState( + ConnectivityState.TRANSIENT_FAILURE, + new FixedResultPicker(PickResult.withError(error))); } @Override diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index 8d16d1bd74c..188c99bcd5a 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -72,7 +72,6 @@ import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.internal.FakeClock; import io.grpc.internal.JsonParser; -import io.grpc.internal.PickFirstLoadBalancerProvider; import io.grpc.internal.PickSubchannelArgsImpl; import io.grpc.internal.testing.StreamRecorder; import io.grpc.lookup.v1.RouteLookupServiceGrpc; @@ -212,12 +211,14 @@ public void lb_serverStatusCodeConversion() throws Exception { throw new RuntimeException(e); } }); + assertThat(subchannels.poll()).isNotNull(); // default target + assertThat(subchannels.poll()).isNull(); + // Warm-up pick; will be queued InOrder inOrder = inOrder(helper); inOrder.verify(helper) .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); PickSubchannelArgs fakeSearchMethodArgs = newPickSubchannelArgs(fakeSearchMethod); - // Warm-up pick; will be queued PickResult res = picker.pickSubchannel(fakeSearchMethodArgs); assertThat(res.getStatus().isOk()).isTrue(); assertThat(res.getSubchannel()).isNull(); @@ -230,8 +231,7 @@ public void lb_serverStatusCodeConversion() throws Exception { subchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); res = picker.pickSubchannel(fakeSearchMethodArgs); assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.OK); - int expectedTimes = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 1 : 2; - verifyLongCounterAdd("grpc.lb.rls.target_picks", expectedTimes, 1, "wilderness", "complete"); + verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "complete"); // Check on conversion Throwable cause = new Throwable("cause"); @@ -284,8 +284,7 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception { res = picker.pickSubchannel(searchSubchannelArgs); assertThat(subchannelIsReady(res.getSubchannel())).isTrue(); assertThat(res.getSubchannel()).isSameInstanceAs(searchSubchannel); - int expectedTimes = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 1 : 2; - verifyLongCounterAdd("grpc.lb.rls.target_picks", expectedTimes, 1, "wilderness", "complete"); + verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "complete"); // rescue should be pending status although the overall channel state is READY res = picker.pickSubchannel(rescueSubchannelArgs); @@ -431,7 +430,7 @@ public void lb_working_withDefaultTarget_noRlsResponse() throws Exception { inOrder.verify(helper).getMetricRecorder(); inOrder.verify(helper).getChannelTarget(); inOrder.verifyNoMoreInteractions(); - int times = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 1 : 2; + int times = 1; verifyLongCounterAdd("grpc.lb.rls.default_target_picks", times, 1, "defaultTarget", "complete"); @@ -536,8 +535,7 @@ public void lb_working_withoutDefaultTarget() throws Exception { res = picker.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod)); assertThat(res.getStatus().isOk()).isFalse(); assertThat(subchannelIsReady(res.getSubchannel())).isFalse(); - int expectedTimes = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 1 : 2; - verifyLongCounterAdd("grpc.lb.rls.target_picks", expectedTimes, 1, "wilderness", "complete"); + verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "complete"); res = picker.pickSubchannel(newPickSubchannelArgs(fakeRescueMethod)); assertThat(subchannelIsReady(res.getSubchannel())).isTrue(); diff --git a/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java b/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java index c4eb4c7bae5..10436407422 100644 --- a/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java +++ b/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java @@ -54,7 +54,6 @@ import io.grpc.SynchronizationContext; import io.grpc.internal.FakeClock; import io.grpc.internal.FakeClock.ScheduledTask; -import io.grpc.internal.PickFirstLoadBalancerProvider; import io.grpc.internal.TestUtils.StandardLoadBalancerProvider; import io.grpc.util.OutlierDetectionLoadBalancer.EndpointTracker; import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig; @@ -568,9 +567,7 @@ public void successRateOneOutlier_configChange() { loadBalancer.acceptResolvedAddresses(buildResolvedAddress(config, servers)); - // The PickFirstLeafLB has an extra level of indirection because of health - int expectedStateChanges = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 8 : 12; - generateLoad(ImmutableMap.of(subchannel2, Status.DEADLINE_EXCEEDED), expectedStateChanges); + generateLoad(ImmutableMap.of(subchannel2, Status.DEADLINE_EXCEEDED), 8); // Move forward in time to a point where the detection timer has fired. forwardTime(config); @@ -604,8 +601,7 @@ public void successRateOneOutlier_unejected() { assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.copyOf(servers.get(0).getAddresses()))); // Now we produce more load, but the subchannel has started working and is no longer an outlier. - int expectedStateChanges = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 8 : 12; - generateLoad(ImmutableMap.of(), expectedStateChanges); + generateLoad(ImmutableMap.of(), 8); // Move forward in time to a point where the detection timer has fired. fakeClock.forwardTime(config.maxEjectionTimeNanos + 1, TimeUnit.NANOSECONDS); diff --git a/xds/src/main/java/io/grpc/xds/LeastRequestLoadBalancer.java b/xds/src/main/java/io/grpc/xds/LeastRequestLoadBalancer.java index dda2ad177e6..1f23f2a4af5 100644 --- a/xds/src/main/java/io/grpc/xds/LeastRequestLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/LeastRequestLoadBalancer.java @@ -54,7 +54,7 @@ final class LeastRequestLoadBalancer extends MultiChildLoadBalancer { private final ThreadSafeRandom random; - private SubchannelPicker currentPicker = new EmptyPicker(); + private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult()); private int choiceCount = DEFAULT_CHOICE_COUNT; LeastRequestLoadBalancer(Helper helper) { @@ -113,7 +113,7 @@ protected void updateOverallBalancingState() { } } if (isConnecting) { - updateBalancingState(CONNECTING, new EmptyPicker()); + updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); } else { // Give it all the failing children and let it randomly pick among them updateBalancingState(TRANSIENT_FAILURE, diff --git a/xds/src/test/java/io/grpc/xds/LeastRequestLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/LeastRequestLoadBalancerTest.java index 6fb6507fa4e..302faed95a4 100644 --- a/xds/src/test/java/io/grpc/xds/LeastRequestLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/LeastRequestLoadBalancerTest.java @@ -22,6 +22,7 @@ import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.SHUTDOWN; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; +import static io.grpc.LoadBalancerMatchers.pickerReturns; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -50,6 +51,7 @@ import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.CreateSubchannelArgs; +import io.grpc.LoadBalancer.FixedResultPicker; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; @@ -62,7 +64,6 @@ import io.grpc.internal.PickFirstLoadBalancerProvider; import io.grpc.util.AbstractTestHelper; import io.grpc.util.MultiChildLoadBalancer.ChildLbState; -import io.grpc.xds.LeastRequestLoadBalancer.EmptyPicker; import io.grpc.xds.LeastRequestLoadBalancer.LeastRequestConfig; import io.grpc.xds.LeastRequestLoadBalancer.LeastRequestLbState; import io.grpc.xds.LeastRequestLoadBalancer.ReadyPicker; @@ -238,7 +239,8 @@ public void pickAfterStateChange() throws Exception { ChildLbState childLbState = loadBalancer.getChildLbStates().iterator().next(); Subchannel subchannel = getSubchannel(servers.get(0)); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class)); + inOrder.verify(helper) + .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); assertThat(childLbState.getCurrentState()).isEqualTo(CONNECTING); deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); @@ -251,7 +253,8 @@ public void pickAfterStateChange() throws Exception { assertThat(childLbState.getCurrentState()).isEqualTo(TRANSIENT_FAILURE); assertThat(childLbState.getCurrentPicker().toString()).contains(error.toString()); refreshInvokedAndUpdateBS(inOrder, CONNECTING); - assertThat(pickerCaptor.getValue()).isInstanceOf(EmptyPicker.class); + assertThat(pickerCaptor.getValue().pickSubchannel(mockArgs)) + .isEqualTo(PickResult.withNoResult()); deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); inOrder.verify(helper).refreshNameResolution(); @@ -302,7 +305,8 @@ public void ignoreShutdownSubchannelStateChange() { ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(Attributes.EMPTY) .build()); assertThat(addressesAcceptanceStatus.isOk()).isTrue(); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class)); + inOrder.verify(helper) + .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); List savedSubchannels = new ArrayList<>(subchannels.values()); loadBalancer.shutdown(); @@ -324,7 +328,8 @@ public void stayTransientFailureUntilReady() { .build()); assertThat(addressesAcceptanceStatus.isOk()).isTrue(); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class)); + inOrder.verify(helper) + .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); // Simulate state transitions for each subchannel individually. List children = new ArrayList<>(loadBalancer.getChildLbStates()); @@ -384,7 +389,8 @@ public void refreshNameResolutionWhenSubchannelConnectionBroken() { assertThat(addressesAcceptanceStatus.isOk()).isTrue(); verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class)); + inOrder.verify(helper) + .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); // Simulate state transitions for each subchannel individually. for (Subchannel sc : subchannels.values()) { @@ -399,7 +405,8 @@ public void refreshNameResolutionWhenSubchannelConnectionBroken() { deliverSubchannelState(sc, ConnectivityStateInfo.forNonError(IDLE)); inOrder.verify(helper).refreshNameResolution(); verify(sc, times(2)).requestConnection(); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class)); + inOrder.verify(helper) + .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); } AbstractTestHelper.verifyNoMoreMeaningfulInteractions(helper); @@ -469,14 +476,6 @@ public void pickerLeastRequest() throws Exception { assertEquals(0, ((LeastRequestLbState) childLbStates.get(0)).getActiveRequests()); } - @Test - public void pickerEmptyList() throws Exception { - SubchannelPicker picker = new EmptyPicker(); - - assertNull(picker.pickSubchannel(mockArgs).getSubchannel()); - assertEquals(Status.OK, picker.pickSubchannel(mockArgs).getStatus()); - } - @Test public void nameResolutionErrorWithNoChannels() throws Exception { Status error = Status.NOT_FOUND.withDescription("nameResolutionError"); @@ -554,7 +553,8 @@ public void subchannelStateIsolation() throws Exception { Iterator pickers = pickerCaptor.getAllValues().iterator(); // The picker is incrementally updated as subchannels become READY assertEquals(CONNECTING, stateIterator.next()); - assertThat(pickers.next()).isInstanceOf(EmptyPicker.class); + assertThat(pickers.next().pickSubchannel(mockArgs)) + .isEqualTo(PickResult.withNoResult()); assertEquals(READY, stateIterator.next()); assertThat(getList(pickers.next())).containsExactly(sc1); assertEquals(READY, stateIterator.next()); @@ -585,8 +585,8 @@ public void readyPicker_emptyList() { @Test public void internalPickerComparisons() { - EmptyPicker empty1 = new EmptyPicker(); - EmptyPicker empty2 = new EmptyPicker(); + FixedResultPicker empty1 = new FixedResultPicker(PickResult.withNoResult()); + FixedResultPicker empty2 = new FixedResultPicker(PickResult.withNoResult()); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());