Skip to content

Commit 62da203

Browse files
authored
Fix Bug Where Weight Is Set to 0 in Ramping-Up Strategy (#6014)
Motivation: When the original weight is less than 10, the initial weight is incorrectly set to 0, potentially leading to `EndpointSelectionTimeoutException`. Modifications: - Ensured a minimum weight of 1 is set when the original weight is greater than 1 in the ramping-up strategy. - Added debugging logs for selector and selection strategy to facilitate troubleshooting. Result: - Fix a bug where weights could unintentionally be set to 0 in ramping-up strategies.
1 parent 5d346b2 commit 62da203

File tree

11 files changed

+206
-16
lines changed

11 files changed

+206
-16
lines changed

core/src/main/java/com/linecorp/armeria/client/endpoint/DynamicEndpointGroup.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import static com.google.common.base.Preconditions.checkArgument;
1919
import static com.google.common.collect.ImmutableList.toImmutableList;
20+
import static com.linecorp.armeria.internal.client.endpoint.EndpointToStringUtil.toShortString;
2021
import static com.linecorp.armeria.internal.common.util.CollectionUtil.truncate;
2122
import static java.util.Objects.requireNonNull;
2223

@@ -32,6 +33,9 @@
3233
import java.util.concurrent.locks.Lock;
3334
import java.util.function.Consumer;
3435

36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
38+
3539
import com.google.common.collect.ImmutableList;
3640
import com.google.common.collect.Iterables;
3741
import com.google.common.collect.Lists;
@@ -51,6 +55,8 @@
5155
*/
5256
public class DynamicEndpointGroup extends AbstractEndpointGroup implements ListenableAsyncCloseable {
5357

58+
private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointGroup.class);
59+
5460
/**
5561
* Returns a newly created builder.
5662
*/
@@ -223,6 +229,8 @@ protected final void addEndpoint(Endpoint e) {
223229
final List<Endpoint> newEndpointsUnsorted = Lists.newArrayList(endpoints);
224230
newEndpointsUnsorted.add(e);
225231
endpoints = newEndpoints = ImmutableList.sortedCopyOf(newEndpointsUnsorted);
232+
logger.info("An endpoint has been added: {}. Current endpoints: {}",
233+
toShortString(e), toShortString(newEndpoints));
226234
} finally {
227235
endpointsLock.unlock();
228236
}
@@ -238,12 +246,17 @@ protected final void removeEndpoint(Endpoint e) {
238246
final List<Endpoint> newEndpoints;
239247
endpointsLock.lock();
240248
try {
241-
if (!allowEmptyEndpoints && endpoints.size() == 1) {
249+
final List<Endpoint> oldEndpoints = endpoints;
250+
if (!allowEmptyEndpoints && oldEndpoints.size() == 1) {
242251
return;
243252
}
244-
endpoints = newEndpoints = endpoints.stream()
245-
.filter(endpoint -> !endpoint.equals(e))
246-
.collect(toImmutableList());
253+
endpoints = newEndpoints = oldEndpoints.stream()
254+
.filter(endpoint -> !endpoint.equals(e))
255+
.collect(toImmutableList());
256+
if (endpoints.size() != oldEndpoints.size()) {
257+
logger.info("An endpoint has been removed: {}. Current endpoints: {}",
258+
toShortString(e), toShortString(newEndpoints));
259+
}
247260
} finally {
248261
endpointsLock.unlock();
249262
}
@@ -266,6 +279,7 @@ protected final void setEndpoints(Iterable<Endpoint> endpoints) {
266279
return;
267280
}
268281
this.endpoints = newEndpoints;
282+
logger.info("New endpoints have been set: {}", toShortString(newEndpoints));
269283
} finally {
270284
endpointsLock.unlock();
271285
}
@@ -376,7 +390,7 @@ public String toString() {
376390
protected final String toString(Consumer<? super StringBuilder> builderMutator) {
377391
final StringBuilder buf = new StringBuilder();
378392
buf.append(getClass().getSimpleName());
379-
buf.append("{selectionStrategy=").append(selectionStrategy.getClass());
393+
buf.append("{selector=").append(toStringSelector());
380394
buf.append(", allowsEmptyEndpoints=").append(allowEmptyEndpoints);
381395
buf.append(", initialized=").append(initialEndpointsFuture.isDone());
382396
buf.append(", numEndpoints=").append(endpoints.size());
@@ -385,6 +399,21 @@ protected final String toString(Consumer<? super StringBuilder> builderMutator)
385399
return buf.append('}').toString();
386400
}
387401

402+
/**
403+
* Returns the string representation of the {@link EndpointSelector} of this {@link DynamicEndpointGroup}.
404+
* If the {@link EndpointSelector} is not created yet, it returns the class name of the
405+
* {@link EndpointSelectionStrategy}.
406+
*/
407+
protected String toStringSelector() {
408+
final EndpointSelector endpointSelector = selector.get();
409+
if (endpointSelector == null) {
410+
// Return selection strategy if selector is not created yet.
411+
return selectionStrategy.getClass().toString();
412+
}
413+
414+
return endpointSelector.toString();
415+
}
416+
388417
private class InitialEndpointsFuture extends EventLoopCheckingFuture<List<Endpoint>> {
389418

390419
@Override

core/src/main/java/com/linecorp/armeria/client/endpoint/RoundRobinStrategy.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import java.util.List;
2020
import java.util.concurrent.atomic.AtomicInteger;
2121

22+
import com.google.common.base.MoreObjects;
23+
2224
import com.linecorp.armeria.client.ClientRequestContext;
2325
import com.linecorp.armeria.client.Endpoint;
2426
import com.linecorp.armeria.common.annotation.Nullable;
@@ -57,5 +59,12 @@ public Endpoint selectNow(ClientRequestContext ctx) {
5759
final int currentSequence = sequence.getAndIncrement();
5860
return endpoints.get(Math.abs(currentSequence % endpoints.size()));
5961
}
62+
63+
@Override
64+
public String toString() {
65+
return MoreObjects.toStringHelper(this)
66+
.add("endpoints", group().endpoints())
67+
.toString();
68+
}
6069
}
6170
}

core/src/main/java/com/linecorp/armeria/client/endpoint/StickyEndpointSelectionStrategy.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.List;
2121
import java.util.function.ToLongFunction;
2222

23+
import com.google.common.base.MoreObjects;
2324
import com.google.common.hash.Hashing;
2425

2526
import com.linecorp.armeria.client.ClientRequestContext;
@@ -84,7 +85,6 @@ private static final class StickyEndpointSelector extends AbstractEndpointSelect
8485
@Nullable
8586
@Override
8687
public Endpoint selectNow(ClientRequestContext ctx) {
87-
8888
final List<Endpoint> endpoints = group().endpoints();
8989
if (endpoints.isEmpty()) {
9090
return null;
@@ -94,5 +94,12 @@ public Endpoint selectNow(ClientRequestContext ctx) {
9494
final int nearest = Hashing.consistentHash(key, endpoints.size());
9595
return endpoints.get(nearest);
9696
}
97+
98+
@Override
99+
public String toString() {
100+
return MoreObjects.toStringHelper(this)
101+
.add("endpoints", group().endpoints())
102+
.toString();
103+
}
97104
}
98105
}

core/src/main/java/com/linecorp/armeria/client/endpoint/WeightRampingUpStrategy.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,10 @@
2222
import static com.linecorp.armeria.client.endpoint.WeightRampingUpStrategyBuilder.defaultTransition;
2323
import static com.linecorp.armeria.internal.client.endpoint.EndpointAttributeKeys.createdAtNanos;
2424
import static com.linecorp.armeria.internal.client.endpoint.EndpointAttributeKeys.hasCreatedAtNanos;
25+
import static com.linecorp.armeria.internal.client.endpoint.EndpointToStringUtil.toShortString;
2526
import static java.util.Objects.requireNonNull;
2627

27-
import java.util.ArrayDeque;
2828
import java.util.ArrayList;
29-
import java.util.Deque;
3029
import java.util.HashMap;
3130
import java.util.HashSet;
3231
import java.util.Iterator;
@@ -37,6 +36,9 @@
3736
import java.util.concurrent.TimeUnit;
3837
import java.util.function.Supplier;
3938

39+
import org.slf4j.Logger;
40+
import org.slf4j.LoggerFactory;
41+
4042
import com.google.common.annotations.VisibleForTesting;
4143
import com.google.common.base.MoreObjects;
4244
import com.google.common.collect.ImmutableList;
@@ -76,6 +78,8 @@
7678
*/
7779
final class WeightRampingUpStrategy implements EndpointSelectionStrategy {
7880

81+
private static final Logger logger = LoggerFactory.getLogger(WeightRampingUpStrategy.class);
82+
7983
private static final Ticker defaultTicker = Ticker.systemTicker();
8084
private static final WeightedRandomDistributionEndpointSelector EMPTY_SELECTOR =
8185
new WeightedRandomDistributionEndpointSelector(ImmutableList.of());
@@ -130,8 +134,6 @@ final class RampingUpEndpointWeightSelector extends AbstractEndpointSelector {
130134

131135
private final List<Endpoint> endpointsFinishedRampingUp = new ArrayList<>();
132136

133-
@VisibleForTesting
134-
final Deque<EndpointsRampingUpEntry> endpointsRampingUp = new ArrayDeque<>();
135137
@VisibleForTesting
136138
final Map<Long, EndpointsRampingUpEntry> rampingUpWindowsMap = new HashMap<>();
137139
private Object2LongOpenHashMap<Endpoint> endpointCreatedTimestamps = new Object2LongOpenHashMap<>();
@@ -233,7 +235,25 @@ private void buildEndpointSelector() {
233235
endpointAndStep.endpoint().withWeight(endpointAndStep.currentWeight()));
234236
}
235237
}
236-
endpointSelector = new WeightedRandomDistributionEndpointSelector(targetEndpointsBuilder.build());
238+
final List<Endpoint> endpoints = targetEndpointsBuilder.build();
239+
if (rampingUpWindowsMap.isEmpty()) {
240+
logger.info("Finished ramping up. endpoints: {}", toShortString(endpoints));
241+
} else {
242+
logger.debug("Ramping up. endpoints: {}", toShortString(endpoints));
243+
}
244+
245+
boolean found = false;
246+
for (Endpoint endpoint : endpoints) {
247+
if (endpoint.weight() > 0) {
248+
found = true;
249+
break;
250+
}
251+
}
252+
if (!found) {
253+
logger.warn("No valid endpoint with weight > 0. endpoints: {}", toShortString(endpoints));
254+
}
255+
256+
endpointSelector = new WeightedRandomDistributionEndpointSelector(endpoints);
237257
}
238258

239259
@VisibleForTesting
@@ -288,6 +308,15 @@ private void close() {
288308
lock.unlock();
289309
}
290310
}
311+
312+
@Override
313+
public String toString() {
314+
return MoreObjects.toStringHelper(this)
315+
.add("endpointSelector", endpointSelector)
316+
.add("endpointsFinishedRampingUp", endpointsFinishedRampingUp)
317+
.add("rampingUpWindowsMap", rampingUpWindowsMap)
318+
.toString();
319+
}
291320
}
292321

293322
private static int numStep(long rampingUpIntervalNanos, Ticker ticker, long createTimestamp) {

core/src/main/java/com/linecorp/armeria/client/endpoint/WeightRampingUpStrategyBuilder.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,17 @@ public final class WeightRampingUpStrategyBuilder {
4343
static final int DEFAULT_TOTAL_STEPS = 10;
4444
static final int DEFAULT_RAMPING_UP_TASK_WINDOW_MILLIS = 500;
4545
static final EndpointWeightTransition DEFAULT_LINEAR_TRANSITION =
46-
(endpoint, currentStep, totalSteps) ->
47-
// currentStep is never greater than totalSteps so we can cast long to int.
48-
Ints.saturatedCast((long) endpoint.weight() * currentStep / totalSteps);
46+
(endpoint, currentStep, totalSteps) -> {
47+
// currentStep is never greater than totalSteps so we can cast long to int.
48+
final int currentWeight =
49+
Ints.saturatedCast((long) endpoint.weight() * currentStep / totalSteps);
50+
if (endpoint.weight() > 0 && currentWeight == 0) {
51+
// If the original weight is not 0,
52+
// we should return 1 to make sure the endpoint is selected.
53+
return 1;
54+
}
55+
return currentWeight;
56+
};
4957
static final EndpointWeightTransition defaultTransition = EndpointWeightTransition.linear();
5058

5159
private EndpointWeightTransition transition = defaultTransition;

core/src/main/java/com/linecorp/armeria/client/endpoint/WeightedRoundRobinStrategy.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,16 @@
1717
package com.linecorp.armeria.client.endpoint;
1818

1919
import static com.google.common.collect.ImmutableList.toImmutableList;
20+
import static com.linecorp.armeria.internal.client.endpoint.EndpointToStringUtil.toShortString;
2021

2122
import java.util.Comparator;
2223
import java.util.List;
2324
import java.util.concurrent.atomic.AtomicInteger;
2425

26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import com.google.common.base.MoreObjects;
2530
import com.google.common.collect.ImmutableList;
2631
import com.google.common.collect.Streams;
2732

@@ -31,6 +36,8 @@
3136

3237
final class WeightedRoundRobinStrategy implements EndpointSelectionStrategy {
3338

39+
private static final Logger logger = LoggerFactory.getLogger(WeightedRoundRobinStrategy.class);
40+
3441
static final WeightedRoundRobinStrategy INSTANCE = new WeightedRoundRobinStrategy();
3542

3643
private WeightedRoundRobinStrategy() {}
@@ -63,6 +70,17 @@ private static final class WeightedRoundRobinSelector extends AbstractEndpointSe
6370

6471
@Override
6572
protected void updateNewEndpoints(List<Endpoint> endpoints) {
73+
boolean found = false;
74+
for (Endpoint endpoint : endpoints) {
75+
if (endpoint.weight() > 0) {
76+
found = true;
77+
break;
78+
}
79+
}
80+
if (!found) {
81+
logger.warn("No valid endpoint with weight > 0. endpoints: {}", toShortString(endpoints));
82+
}
83+
6684
final EndpointsAndWeights endpointsAndWeights = this.endpointsAndWeights;
6785
if (endpointsAndWeights == null || endpointsAndWeights.endpoints != endpoints) {
6886
this.endpointsAndWeights = new EndpointsAndWeights(endpoints);
@@ -94,6 +112,13 @@ private static final class EndpointsGroupByWeight {
94112
}
95113
}
96114

115+
@Override
116+
public String toString() {
117+
return MoreObjects.toStringHelper(this)
118+
.add("endpointsAndWeights", endpointsAndWeights)
119+
.toString();
120+
}
121+
97122
//
98123
// In general, assume the weights are w0 < w1 < ... < wM where M = N - 1, N is number of endpoints.
99124
//
@@ -228,6 +253,16 @@ Endpoint selectEndpoint(int currentSequence) {
228253

229254
return endpoints.get(Math.abs(currentSequence % endpoints.size()));
230255
}
256+
257+
@Override
258+
public String toString() {
259+
return MoreObjects.toStringHelper(this)
260+
.add("endpoints", endpoints)
261+
.add("weighted", weighted)
262+
.add("totalWeight", totalWeight)
263+
.add("accumulatedGroups", accumulatedGroups)
264+
.toString();
265+
}
231266
}
232267
}
233268
}

core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroup.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ public String toString() {
378378
.add("numEndpoints", endpoints.size())
379379
.add("candidates", truncate(delegateEndpoints, 10))
380380
.add("numCandidates", delegateEndpoints.size())
381-
.add("selectionStrategy", selectionStrategy().getClass())
381+
.add("selector", toStringSelector())
382382
.add("initialized", whenReady().isDone())
383383
.add("initialSelectionTimeoutMillis", initialSelectionTimeoutMillis)
384384
.add("selectionTimeoutMillis", selectionTimeoutMillis)

core/src/main/java/com/linecorp/armeria/common/metric/MeterIdPrefixFunction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public interface MeterIdPrefixFunction {
5555
* <li>Client-side tags:<ul>
5656
* <li>{@code method} - RPC method name or {@link HttpMethod#name()} if RPC method name is not
5757
* available</li>
58+
* <li>{@code service} - RPC service name or innermost service class name</li>
5859
* <li>{@code httpStatus} - {@link HttpStatus#code()}</li>
5960
* </ul></li>
6061
* </ul>

0 commit comments

Comments
 (0)