diff --git a/README.md b/README.md index a0e5d6a1..981d1492 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,9 @@ mysql -uroot -proot123 -h127.0.0.1 -Dprestogateway Once logged in to mysql console, please run [gateway-ha-persistence.sql](/gateway-ha/src/main/resources/gateway-ha-persistence.sql) to populate the tables. ### Build and run + +Please note these steps have been verified with JDK 8 and 11. Higher versions of Java might run into unexpected issues. + run `mvn clean install` to build `presto-gateway` Edit the [config file](/gateway-ha/gateway-ha-config.yml) and update the mysql db information. @@ -46,6 +49,9 @@ Remove `TLSv1, TLSv1.1` and redo the above steps to build and run `presto-gatewa Now you can access load balanced presto at localhost:8080 port. We will refer to this as `prestogateway.lyft.com` +If you see test failures while building `presto-gateway` or in an IDE, please run `mvn process-classes` to instrument javalite models +which are used by the tests . Ref [javalite-examples](https://github.com/javalite/javalite-examples/tree/master/simple-example#instrumentation) for more details. + ## Gateway API ### Add or update a backend diff --git a/baseapp/pom.xml b/baseapp/pom.xml index 38d7910a..148eec29 100644 --- a/baseapp/pom.xml +++ b/baseapp/pom.xml @@ -7,7 +7,7 @@ com.lyft.data prestogateway-parent - 1.8.8-SNAPSHOT + 1.8.9 ../ diff --git a/gateway-ha/pom.xml b/gateway-ha/pom.xml index c89718a9..4fe7c473 100644 --- a/gateway-ha/pom.xml +++ b/gateway-ha/pom.xml @@ -8,7 +8,7 @@ com.lyft.data prestogateway-parent - 1.8.8-SNAPSHOT + 1.8.9 ../ diff --git a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/PrestoQueueLengthChecker.java b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/PrestoQueueLengthChecker.java index 6b9a2212..bdb51d34 100644 --- a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/PrestoQueueLengthChecker.java +++ b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/PrestoQueueLengthChecker.java @@ -20,6 +20,8 @@ public PrestoQueueLengthChecker(PrestoQueueLengthRoutingTable routingManager) { @Override public void observe(List stats) { Map> clusterQueueMap = new HashMap>(); + Map> clusterRunningMap + = new HashMap>(); for (ClusterStats stat : stats) { if (!clusterQueueMap.containsKey(stat.getRoutingGroup())) { @@ -29,12 +31,20 @@ public void observe(List stats) { } } ); + clusterRunningMap.put(stat.getRoutingGroup(), new HashMap() { + { + put(stat.getClusterId(), stat.getRunningQueryCount()); + } + } + ); } else { clusterQueueMap.get(stat.getRoutingGroup()).put(stat.getClusterId(), stat.getQueuedQueryCount()); + clusterRunningMap.get(stat.getRoutingGroup()).put(stat.getClusterId(), + stat.getRunningQueryCount()); } } - routingManager.updateRoutingTable(clusterQueueMap); + routingManager.updateRoutingTable(clusterQueueMap, clusterRunningMap); } } diff --git a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/PrestoQueueLengthRoutingTable.java b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/PrestoQueueLengthRoutingTable.java index cd1b49aa..6bec8382 100644 --- a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/PrestoQueueLengthRoutingTable.java +++ b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/PrestoQueueLengthRoutingTable.java @@ -2,6 +2,7 @@ import com.lyft.data.gateway.ha.config.ProxyBackendConfiguration; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -26,17 +27,19 @@ */ @Slf4j public class PrestoQueueLengthRoutingTable extends HaRoutingManager { + private static final Random RANDOM = new Random(); private static final int MIN_WT = 1; private static final int MAX_WT = 100; private final Object lockObject = new Object(); private ConcurrentHashMap routingGroupWeightSum; private ConcurrentHashMap> clusterQueueLengthMap; + private Map> weightedDistributionRouting; /** * A Routing Manager that distributes queries according to assigned weights based on - * Presto cluster queue length. + * Presto cluster queue length and falls back to Running Count if queue length are equal. */ public PrestoQueueLengthRoutingTable(GatewayBackendManager gatewayBackendManager, QueryHistoryManager queryHistoryManager) { @@ -179,7 +182,7 @@ private void computeWeightsBasedOnQueueLength(ConcurrentHashMap backends) { @@ -208,25 +211,34 @@ public void updateRoutingTable(String routingGroup, Set backends) { /** * Update routing Table with new Queue Lengths. */ - public void updateRoutingTable(Map> updatedQueueLengthMap) { + public void updateRoutingTable(Map> updatedQueueLengthMap, + Map> updatedRunningLengthMap) { synchronized (lockObject) { log.debug("Update Routing table with new cluster queue lengths : [{}]", - updatedQueueLengthMap.toString()); + updatedQueueLengthMap.toString()); clusterQueueLengthMap.clear(); for (String grp : updatedQueueLengthMap.keySet()) { if (grp == null) { continue; } - ConcurrentHashMap queueMap = new ConcurrentHashMap<>(); - queueMap.putAll(updatedQueueLengthMap.get(grp)); + + int maxQueueLen = Collections.max(updatedQueueLengthMap.get(grp).values()); + int minQueueLen = Collections.min(updatedQueueLengthMap.get(grp).values()); + + if (minQueueLen == maxQueueLen && updatedQueueLengthMap.get(grp).size() > 1) { + log.info("Queue lengths equal: {} for all clusters in the group {}." + + " Falling back to Running Counts : {}", maxQueueLen, grp, + updatedRunningLengthMap.get(grp)); + queueMap.putAll(updatedRunningLengthMap.get(grp)); + } else { + queueMap.putAll(updatedQueueLengthMap.get(grp)); + } clusterQueueLengthMap.put(grp, queueMap); } - computeWeightsBasedOnQueueLength(clusterQueueLengthMap); } - } /** diff --git a/gateway-ha/src/test/java/com/lyft/data/gateway/ha/router/TestPrestoQueueLengthRoutingTable.java b/gateway-ha/src/test/java/com/lyft/data/gateway/ha/router/TestPrestoQueueLengthRoutingTable.java index 37a6ad7b..c9d38d8d 100644 --- a/gateway-ha/src/test/java/com/lyft/data/gateway/ha/router/TestPrestoQueueLengthRoutingTable.java +++ b/gateway-ha/src/test/java/com/lyft/data/gateway/ha/router/TestPrestoQueueLengthRoutingTable.java @@ -26,6 +26,7 @@ public class TestPrestoQueueLengthRoutingTable { String[] mockRoutingGroups = {"adhoc", "scheduled"}; String mockRoutingGroup = "adhoc"; Map> clusterQueueMap; + Map> clusterRunningMap; @BeforeClass(alwaysRun = true) public void setUp() { @@ -52,6 +53,7 @@ private void deactiveAllBackends() { backendManager.deactivateBackend(mockRoutingGroup + i); } clusterQueueMap = new HashMap<>(); + clusterRunningMap = new HashMap<>(); } private void addMockBackends(String groupName, int numBackends, @@ -70,60 +72,49 @@ private void addMockBackends(String groupName, int numBackends, } } - private void registerBackEndsWithRandomQueueLength(String groupName, int numBackends) { - int mockQueueLength = 0; - String backend; - int maxLength = 200; - Random generator = new Random(); - - Map queueLenghts = new HashMap<>(); - - for (int i = 0; i < numBackends; i++) { - backend = groupName + i; - backendManager.activateBackend(backend); - queueLenghts.put(backend, generator.nextInt(maxLength)); - } - - clusterQueueMap.put(groupName, queueLenghts); - routingTable.updateRoutingTable(clusterQueueMap); - } - private void registerBackEndsWithRandomQueueLengths(String groupName, int numBackends) { int mockQueueLength = 0; String backend; Random rand = new Random(); - Map queueLenghts = new HashMap<>(); + Map queueLengths = new HashMap<>(); for (int i = 0; i < numBackends; i++) { backend = groupName + i; backendManager.activateBackend(backend); - queueLenghts.put(backend, mockQueueLength += rand.nextInt(100)); + queueLengths.put(backend, mockQueueLength += rand.nextInt(100)); } - clusterQueueMap.put(groupName, queueLenghts); - routingTable.updateRoutingTable(clusterQueueMap); + clusterQueueMap.put(groupName, queueLengths); + // Running counts don't matter if queue lengths are random. + routingTable.updateRoutingTable(clusterQueueMap, clusterRunningMap); } private void registerBackEnds(String groupName, int numBackends, - int queueLengthDistributiveFactor) { + int queueLengthDistributiveFactor, + int runningLenDistributiveFactor) { int mockQueueLength = 0; + int mockRunningLength = 0; String backend; - Map queueLenghts = new HashMap<>(); + Map queueLengths = new HashMap<>(); + Map runningLengths = new HashMap<>(); for (int i = 0; i < numBackends; i++) { backend = groupName + i; backendManager.activateBackend(backend); - queueLenghts.put(backend, mockQueueLength += queueLengthDistributiveFactor); + queueLengths.put(backend, mockQueueLength += queueLengthDistributiveFactor); + runningLengths.put(backend, mockRunningLength += runningLenDistributiveFactor); } - clusterQueueMap.put(groupName, queueLenghts); - routingTable.updateRoutingTable(clusterQueueMap); + clusterQueueMap.put(groupName, queueLengths); + clusterRunningMap.put(groupName, runningLengths); + routingTable.updateRoutingTable(clusterQueueMap, clusterRunningMap); } - private void resetBackends(String groupName, int numBk, int queueDistribution) { + private void resetBackends(String groupName, int numBk, + int queueDistribution, int runningDistribution) { deactiveAllBackends(); - registerBackEnds(groupName, numBk, queueDistribution); + registerBackEnds(groupName, numBk, queueDistribution, runningDistribution); } private Map routeQueries(String groupName, int numRequests) { @@ -149,11 +140,12 @@ private Map routeQueries(String groupName, int numRequests) { public void testRoutingWithEvenWeightDistribution() { int queueDistribution = 3; + int runningDistribution = 0; for (int numRequests : QUERY_VOLUMES) { for (int numBk = 1; numBk <= NUM_BACKENDS; numBk++) { - resetBackends(mockRoutingGroup, numBk, queueDistribution); + resetBackends(mockRoutingGroup, numBk, queueDistribution, runningDistribution); Map routingDistribution = routeQueries(mockRoutingGroup, numRequests); // Useful for debugging @@ -201,8 +193,6 @@ public void testRoutingWithSkewedWeightDistribution() { assert routingDistribution.values().stream().mapToInt(Integer::intValue).sum() == numRequests; } - - } } } @@ -210,11 +200,11 @@ public void testRoutingWithSkewedWeightDistribution() { @Test public void testRoutingWithEqualWeightDistribution() { int queueDistribution = 0; - + int runningDistribution = 0; for (int numRequests : QUERY_VOLUMES) { for (int numBk = 1; numBk <= NUM_BACKENDS; numBk++) { - resetBackends(mockRoutingGroup, numBk, queueDistribution); + resetBackends(mockRoutingGroup, numBk, queueDistribution, runningDistribution); Map routingDistribution = routeQueries(mockRoutingGroup, numRequests); //Useful Debugging Info @@ -235,6 +225,37 @@ public void testRoutingWithEqualWeightDistribution() { } } + + @Test + public void testRoutingWithEqualQueueSkewedRunningDistribution() { + int queueDistribution = 0; + int runningDistribution = 100; + + for (int numRequests : QUERY_VOLUMES) { + for (int numBk = 1; numBk <= NUM_BACKENDS; numBk++) { + + resetBackends(mockRoutingGroup, numBk, queueDistribution, runningDistribution); + Map routingDistribution = routeQueries(mockRoutingGroup, numRequests); + + //Useful Debugging Info + /* + System.out.println("Input :" + clusterRunningMap.toString() + " Num of Requests:" + + numRequests + + " Internal Routing table: " + routingTable.getInternalWeightedRoutingTable + (mockRoutingGroup).toString() + + " Distribution: " + routingDistribution.toString()); + */ + if (numBk > 2 && routingDistribution.containsKey(mockRoutingGroup + (numBk - 1))) { + assert routingDistribution.get(mockRoutingGroup + (numBk - 1)) + <= Math.ceil(numRequests / numBk); + } else { + assert routingDistribution.values().stream().mapToInt(Integer::intValue).sum() + == numRequests; + } + } + } + } + @Test public void testRoutingWithMultipleGroups() { int queueDistribution = 10; @@ -242,7 +263,7 @@ public void testRoutingWithMultipleGroups() { int numBk = 3; for (String grp : mockRoutingGroups) { - resetBackends(grp, numBk, queueDistribution); + resetBackends(grp, numBk, queueDistribution, 0); Map routingDistribution = routeQueries(grp, numRequests); // Useful for debugging @@ -295,10 +316,10 @@ public void testActiveClusterMonitorUpdateAndRouting() throws InterruptedExcepti } globalToggle.set(!globalToggle.get()); clusterQueueMap.put(mockRoutingGroup, queueLenghts); - routingTable.updateRoutingTable(clusterQueueMap); + routingTable.updateRoutingTable(clusterQueueMap, clusterQueueMap); }; - resetBackends(mockRoutingGroup, numBk, 0); + resetBackends(mockRoutingGroup, numBk, 0, 0); scheduler.scheduleAtFixedRate(activeClusterMonitor, 0, 1, SECONDS); @@ -308,7 +329,7 @@ public void testActiveClusterMonitorUpdateAndRouting() throws InterruptedExcepti totalDistribution.putAll(routingDistribution); } else { for (String key : routingDistribution.keySet()) { - sum = totalDistribution.get(key) + routingDistribution.get(key); + sum = totalDistribution.getOrDefault(key, 0) + routingDistribution.get(key); totalDistribution.put(key, sum); } } diff --git a/pom.xml b/pom.xml index c64a1b00..886b7390 100644 --- a/pom.xml +++ b/pom.xml @@ -9,8 +9,8 @@ prestogateway-parent prestogateway-parent pom - 1.8.8-SNAPSHOT - + 1.8.9 + 1.8 1.8 @@ -19,7 +19,7 @@ 9.4.9.v20180320 1.7.25 - 1.18.10 + 1.18.22 6.10 1.2.1 diff --git a/proxyserver/pom.xml b/proxyserver/pom.xml index 3fea8750..a4472f93 100644 --- a/proxyserver/pom.xml +++ b/proxyserver/pom.xml @@ -8,7 +8,7 @@ com.lyft.data prestogateway-parent - 1.8.8-SNAPSHOT + 1.8.9 ../