proxyMap = new HashMap<>();
for (ProxyBackendConfiguration backend : backends) {
@@ -298,7 +345,7 @@ public String provideBackendForRoutingGroup(String routingGroup) {
}
updateRoutingTable(routingGroup, proxyMap.keySet());
- String clusterId = getEligibleBackEnd(routingGroup);
+ String clusterId = getEligibleBackEnd(routingGroup, user);
log.debug("Routing to eligible backend : [{}] for routing group: [{}]",
clusterId, routingGroup);
@@ -318,7 +365,7 @@ public String provideBackendForRoutingGroup(String routingGroup) {
* d.
*/
@Override
- public String provideAdhocBackend() {
+ public String provideAdhocBackend(String user) {
Map proxyMap = new HashMap<>();
List backends = getGatewayBackendManager().getActiveAdhocBackends();
if (backends.size() == 0) {
@@ -331,7 +378,7 @@ public String provideAdhocBackend() {
updateRoutingTable("adhoc", proxyMap.keySet());
- String clusterId = getEligibleBackEnd("adhoc");
+ String clusterId = getEligibleBackEnd("adhoc", user);
log.debug("Routing to eligible backend : " + clusterId + " for routing group: adhoc");
if (clusterId != null) {
return proxyMap.get(clusterId);
diff --git a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/RoutingManager.java b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/RoutingManager.java
index 0ae6441c..d7b44f2c 100644
--- a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/RoutingManager.java
+++ b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/RoutingManager.java
@@ -61,7 +61,7 @@ public void setBackendForQueryId(String queryId, String backend) {
*
* @return
*/
- public String provideAdhocBackend() {
+ public String provideAdhocBackend(String user) {
List backends = this.gatewayBackendManager.getActiveAdhocBackends();
if (backends.size() == 0) {
throw new IllegalStateException("Number of active backends found zero");
@@ -76,11 +76,11 @@ public String provideAdhocBackend() {
*
* @return
*/
- public String provideBackendForRoutingGroup(String routingGroup) {
+ public String provideBackendForRoutingGroup(String routingGroup, String user) {
List backends =
gatewayBackendManager.getActiveBackends(routingGroup);
if (backends.isEmpty()) {
- return provideAdhocBackend();
+ return provideAdhocBackend(user);
}
int backendId = Math.abs(RANDOM.nextInt()) % backends.size();
return backends.get(backendId).getProxyTo();
diff --git a/gateway-ha/src/test/java/com/lyft/data/gateway/ha/router/TestPrestoQueueLengthRoutingTable.java b/gateway-ha/src/test/java/com/lyft/data/gateway/ha/router/TestPrestoQueueLengthRoutingTable.java
index c9d38d8d..96ae2e59 100644
--- a/gateway-ha/src/test/java/com/lyft/data/gateway/ha/router/TestPrestoQueueLengthRoutingTable.java
+++ b/gateway-ha/src/test/java/com/lyft/data/gateway/ha/router/TestPrestoQueueLengthRoutingTable.java
@@ -1,13 +1,17 @@
package com.lyft.data.gateway.ha.router;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.Assert.assertEquals;
import com.lyft.data.gateway.ha.HaGatewayTestUtils;
import com.lyft.data.gateway.ha.config.DataStoreConfiguration;
import com.lyft.data.gateway.ha.config.ProxyBackendConfiguration;
import com.lyft.data.gateway.ha.persistence.JdbcConnectionManager;
import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Executors;
@@ -25,6 +29,9 @@ public class TestPrestoQueueLengthRoutingTable {
QueryHistoryManager historyManager;
String[] mockRoutingGroups = {"adhoc", "scheduled"};
String mockRoutingGroup = "adhoc";
+
+ String mockUser = "user";
+
Map> clusterQueueMap;
Map> clusterRunningMap;
@@ -86,7 +93,7 @@ private void registerBackEndsWithRandomQueueLengths(String groupName, int numBac
clusterQueueMap.put(groupName, queueLengths);
// Running counts don't matter if queue lengths are random.
- routingTable.updateRoutingTable(clusterQueueMap, clusterRunningMap);
+ routingTable.updateRoutingTable(clusterQueueMap, clusterRunningMap, null);
}
private void registerBackEnds(String groupName, int numBackends,
@@ -108,7 +115,39 @@ private void registerBackEnds(String groupName, int numBackends,
clusterQueueMap.put(groupName, queueLengths);
clusterRunningMap.put(groupName, runningLengths);
- routingTable.updateRoutingTable(clusterQueueMap, clusterRunningMap);
+ routingTable.updateRoutingTable(clusterQueueMap, clusterRunningMap, null);
+ }
+
+
+
+ private void registerBackEndsWithUserQueue(String groupName, int numBackends,
+ List userQueues) {
+
+ deactiveAllBackends();
+ int mockQueueLength = 0;
+ int mockRunningLength = 0;
+ String backend;
+
+ Map queueLengths = new HashMap<>();
+ Map runningLengths = new HashMap<>();
+ Map> userClusterQueue = new HashMap<>();
+
+ for (int i = 0; i < numBackends; i++) {
+ backend = groupName + i;
+ backendManager.activateBackend(backend);
+ queueLengths.put(backend, mockQueueLength);
+ runningLengths.put(backend, mockRunningLength);
+ if (userQueues.size() > i) {
+ Map userQueueMap =
+ userClusterQueue.getOrDefault(mockUser, new HashMap<>());
+ userQueueMap.put(backend, userQueues.get(i));
+ userClusterQueue.put(mockUser, userQueueMap);
+ }
+ }
+
+ clusterQueueMap.put(groupName, queueLengths);
+ clusterRunningMap.put(groupName, runningLengths);
+ routingTable.updateRoutingTable(clusterQueueMap, clusterRunningMap, userClusterQueue);
}
private void resetBackends(String groupName, int numBk,
@@ -124,7 +163,7 @@ private Map routeQueries(String groupName, int numRequests) {
for (int i = 0; i < numRequests; i++) {
- eligibleBackend = routingTable.getEligibleBackEnd(groupName);
+ eligibleBackend = routingTable.getEligibleBackEnd(groupName, null);
if (!routingDistribution.containsKey(eligibleBackend)) {
routingDistribution.put(eligibleBackend, 1);
@@ -197,6 +236,70 @@ public void testRoutingWithSkewedWeightDistribution() {
}
}
+ @Test
+ public void testRoutingWithUserQueuedLength() {
+ int numBackends = 2;
+ int queryVolume = 10000;
+
+ // Case 1: All user queue counts Present.
+ // Validate always routed to cluster with lowest user queue
+ registerBackEndsWithUserQueue(mockRoutingGroup, numBackends, Arrays.asList(1, 2));
+ for (int i = 0; i < queryVolume; i++) {
+ assertEquals(routingTable.getEligibleBackEnd(mockRoutingGroup, mockUser),
+ mockRoutingGroup + "0");
+ }
+
+ // Case 2: Not all user queue counts Present.
+ // Validate always routed to cluster with zero queue length i.e. the missing cluster.
+ registerBackEndsWithUserQueue(mockRoutingGroup, numBackends, Arrays.asList(1));
+ for (int i = 0; i < queryVolume; i++) {
+ assertEquals(routingTable.getEligibleBackEnd(mockRoutingGroup, mockUser),
+ mockRoutingGroup + "1");
+ }
+
+ // Case 3: All user queue counts Present but equal
+ // Validate equally routed to all clusters.
+ registerBackEndsWithUserQueue(mockRoutingGroup, numBackends, Arrays.asList(2, 2));
+ Map counts = new HashMap<>();
+ for (int i = 0; i < queryVolume; i++) {
+ String cluster = routingTable.getEligibleBackEnd(mockRoutingGroup, mockUser);
+ counts.put(cluster, counts.getOrDefault(cluster, 0) + 1);
+ }
+ double variance = 0.1;
+ double expectedLowerBound = (queryVolume / numBackends) * (1 - variance);
+ double expectedUpperBound = (queryVolume / numBackends) * (1 + variance);
+
+ for (Integer c : counts.values()) {
+ assert c >= expectedLowerBound && c <= expectedUpperBound;
+ }
+
+ // Case 4: NO user queue lengths present
+ // Validate equally routed to all clusters.
+ registerBackEndsWithUserQueue(mockRoutingGroup, numBackends, new ArrayList<>());
+ counts = new HashMap<>();
+ for (int i = 0; i < queryVolume; i++) {
+ String cluster = routingTable.getEligibleBackEnd(mockRoutingGroup, mockUser);
+ counts.put(cluster, counts.getOrDefault(cluster, 0) + 1);
+ }
+ for (Integer c : counts.values()) {
+ assert c >= expectedLowerBound && c <= expectedUpperBound;
+ }
+
+ // Case 5: Null or empty users
+ // Validate equally routed to all clusters.
+ registerBackEndsWithUserQueue(mockRoutingGroup, numBackends, new ArrayList<>());
+ counts = new HashMap<>();
+ for (int i = 0; i < queryVolume; i++) {
+ String cluster = routingTable.getEligibleBackEnd(mockRoutingGroup, null);
+ counts.put(cluster, counts.getOrDefault(cluster, 0) + 1);
+ }
+ for (Integer c : counts.values()) {
+ assert c >= expectedLowerBound && c <= expectedUpperBound;
+ }
+
+
+ }
+
@Test
public void testRoutingWithEqualWeightDistribution() {
int queueDistribution = 0;
@@ -316,7 +419,7 @@ public void testActiveClusterMonitorUpdateAndRouting() throws InterruptedExcepti
}
globalToggle.set(!globalToggle.get());
clusterQueueMap.put(mockRoutingGroup, queueLenghts);
- routingTable.updateRoutingTable(clusterQueueMap, clusterQueueMap);
+ routingTable.updateRoutingTable(clusterQueueMap, clusterQueueMap, null);
};
resetBackends(mockRoutingGroup, numBk, 0, 0);
diff --git a/pom.xml b/pom.xml
index 886b7390..c7a9dcdd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -9,8 +9,8 @@
prestogateway-parent
prestogateway-parent
pom
- 1.8.9
-
+ 1.9.0
+
1.8
1.8
diff --git a/proxyserver/pom.xml b/proxyserver/pom.xml
index a4472f93..9d94a5bb 100644
--- a/proxyserver/pom.xml
+++ b/proxyserver/pom.xml
@@ -8,7 +8,7 @@
com.lyft.data
prestogateway-parent
- 1.8.9
+ 1.9.0
../