Skip to content

Commit

Permalink
Merge pull request #173 from lyft/akhurana
Browse files Browse the repository at this point in the history
QueueBasedRouting: Use Running Count as fallback logic
  • Loading branch information
akhurana001 authored Jul 14, 2022
2 parents aeacef6 + e23d927 commit 3d3159e
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 53 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ mysql -uroot -proot123 -h127.0.0.1 -Dprestogateway
Once logged in to mysql console, please run [gateway-ha-persistence.sql](/gateway-ha/src/main/resources/gateway-ha-persistence.sql) to populate the tables.

### Build and run

Please note these steps have been verified with JDK 8 and 11. Higher versions of Java might run into unexpected issues.

run `mvn clean install` to build `presto-gateway`

Edit the [config file](/gateway-ha/gateway-ha-config.yml) and update the mysql db information.
Expand All @@ -46,6 +49,9 @@ Remove `TLSv1, TLSv1.1` and redo the above steps to build and run `presto-gatewa

Now you can access load balanced presto at localhost:8080 port. We will refer to this as `prestogateway.lyft.com`

If you see test failures while building `presto-gateway` or in an IDE, please run `mvn process-classes` to instrument javalite models
which are used by the tests . Ref [javalite-examples](https://github.com/javalite/javalite-examples/tree/master/simple-example#instrumentation) for more details.

## Gateway API

### Add or update a backend
Expand Down
2 changes: 1 addition & 1 deletion baseapp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.lyft.data</groupId>
<artifactId>prestogateway-parent</artifactId>
<version>1.8.8-SNAPSHOT</version>
<version>1.8.9</version>
<relativePath>../</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gateway-ha/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>com.lyft.data</groupId>
<artifactId>prestogateway-parent</artifactId>
<version>1.8.8-SNAPSHOT</version>
<version>1.8.9</version>
<relativePath>../</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public PrestoQueueLengthChecker(PrestoQueueLengthRoutingTable routingManager) {
@Override
public void observe(List<ClusterStats> stats) {
Map<String, Map<String, Integer>> clusterQueueMap = new HashMap<String, Map<String, Integer>>();
Map<String, Map<String, Integer>> clusterRunningMap
= new HashMap<String, Map<String, Integer>>();

for (ClusterStats stat : stats) {
if (!clusterQueueMap.containsKey(stat.getRoutingGroup())) {
Expand All @@ -29,12 +31,20 @@ public void observe(List<ClusterStats> stats) {
}
}
);
clusterRunningMap.put(stat.getRoutingGroup(), new HashMap<String, Integer>() {
{
put(stat.getClusterId(), stat.getRunningQueryCount());
}
}
);
} else {
clusterQueueMap.get(stat.getRoutingGroup()).put(stat.getClusterId(),
stat.getQueuedQueryCount());
clusterRunningMap.get(stat.getRoutingGroup()).put(stat.getClusterId(),
stat.getRunningQueryCount());
}
}

routingManager.updateRoutingTable(clusterQueueMap);
routingManager.updateRoutingTable(clusterQueueMap, clusterRunningMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.lyft.data.gateway.ha.config.ProxyBackendConfiguration;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -26,17 +27,19 @@
*/
@Slf4j
public class PrestoQueueLengthRoutingTable extends HaRoutingManager {

private static final Random RANDOM = new Random();
private static final int MIN_WT = 1;
private static final int MAX_WT = 100;
private final Object lockObject = new Object();
private ConcurrentHashMap<String, Integer> routingGroupWeightSum;
private ConcurrentHashMap<String, ConcurrentHashMap<String, Integer>> clusterQueueLengthMap;

private Map<String, TreeMap<Integer, String>> weightedDistributionRouting;

/**
* A Routing Manager that distributes queries according to assigned weights based on
* Presto cluster queue length.
* Presto cluster queue length and falls back to Running Count if queue length are equal.
*/
public PrestoQueueLengthRoutingTable(GatewayBackendManager gatewayBackendManager,
QueryHistoryManager queryHistoryManager) {
Expand Down Expand Up @@ -179,7 +182,7 @@ private void computeWeightsBasedOnQueueLength(ConcurrentHashMap<String,
/**
* Update the Routing Table only if a previously known backend has been deactivated.
* Newly added backends are handled through
* {@link PrestoQueueLengthRoutingTable#updateRoutingTable(Map)}
* {@link PrestoQueueLengthRoutingTable#updateRoutingTable(Map, Map)}
* updateRoutingTable}
*/
public void updateRoutingTable(String routingGroup, Set<String> backends) {
Expand Down Expand Up @@ -208,25 +211,34 @@ public void updateRoutingTable(String routingGroup, Set<String> backends) {
/**
* Update routing Table with new Queue Lengths.
*/
public void updateRoutingTable(Map<String, Map<String, Integer>> updatedQueueLengthMap) {
public void updateRoutingTable(Map<String, Map<String, Integer>> updatedQueueLengthMap,
Map<String, Map<String, Integer>> updatedRunningLengthMap) {
synchronized (lockObject) {
log.debug("Update Routing table with new cluster queue lengths : [{}]",
updatedQueueLengthMap.toString());
updatedQueueLengthMap.toString());
clusterQueueLengthMap.clear();

for (String grp : updatedQueueLengthMap.keySet()) {
if (grp == null) {
continue;
}

ConcurrentHashMap<String, Integer> queueMap = new ConcurrentHashMap<>();
queueMap.putAll(updatedQueueLengthMap.get(grp));

int maxQueueLen = Collections.max(updatedQueueLengthMap.get(grp).values());
int minQueueLen = Collections.min(updatedQueueLengthMap.get(grp).values());

if (minQueueLen == maxQueueLen && updatedQueueLengthMap.get(grp).size() > 1) {
log.info("Queue lengths equal: {} for all clusters in the group {}."
+ " Falling back to Running Counts : {}", maxQueueLen, grp,
updatedRunningLengthMap.get(grp));
queueMap.putAll(updatedRunningLengthMap.get(grp));
} else {
queueMap.putAll(updatedQueueLengthMap.get(grp));
}
clusterQueueLengthMap.put(grp, queueMap);
}

computeWeightsBasedOnQueueLength(clusterQueueLengthMap);
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class TestPrestoQueueLengthRoutingTable {
String[] mockRoutingGroups = {"adhoc", "scheduled"};
String mockRoutingGroup = "adhoc";
Map<String, Map<String, Integer>> clusterQueueMap;
Map<String, Map<String, Integer>> clusterRunningMap;

@BeforeClass(alwaysRun = true)
public void setUp() {
Expand All @@ -52,6 +53,7 @@ private void deactiveAllBackends() {
backendManager.deactivateBackend(mockRoutingGroup + i);
}
clusterQueueMap = new HashMap<>();
clusterRunningMap = new HashMap<>();
}

private void addMockBackends(String groupName, int numBackends,
Expand All @@ -70,60 +72,49 @@ private void addMockBackends(String groupName, int numBackends,
}
}

private void registerBackEndsWithRandomQueueLength(String groupName, int numBackends) {
int mockQueueLength = 0;
String backend;
int maxLength = 200;
Random generator = new Random();

Map<String, Integer> queueLenghts = new HashMap<>();

for (int i = 0; i < numBackends; i++) {
backend = groupName + i;
backendManager.activateBackend(backend);
queueLenghts.put(backend, generator.nextInt(maxLength));
}

clusterQueueMap.put(groupName, queueLenghts);
routingTable.updateRoutingTable(clusterQueueMap);
}

private void registerBackEndsWithRandomQueueLengths(String groupName, int numBackends) {
int mockQueueLength = 0;
String backend;
Random rand = new Random();
Map<String, Integer> queueLenghts = new HashMap<>();
Map<String, Integer> queueLengths = new HashMap<>();

for (int i = 0; i < numBackends; i++) {
backend = groupName + i;
backendManager.activateBackend(backend);
queueLenghts.put(backend, mockQueueLength += rand.nextInt(100));
queueLengths.put(backend, mockQueueLength += rand.nextInt(100));
}

clusterQueueMap.put(groupName, queueLenghts);
routingTable.updateRoutingTable(clusterQueueMap);
clusterQueueMap.put(groupName, queueLengths);
// Running counts don't matter if queue lengths are random.
routingTable.updateRoutingTable(clusterQueueMap, clusterRunningMap);
}

private void registerBackEnds(String groupName, int numBackends,
int queueLengthDistributiveFactor) {
int queueLengthDistributiveFactor,
int runningLenDistributiveFactor) {
int mockQueueLength = 0;
int mockRunningLength = 0;
String backend;

Map<String, Integer> queueLenghts = new HashMap<>();
Map<String, Integer> queueLengths = new HashMap<>();
Map<String, Integer> runningLengths = new HashMap<>();

for (int i = 0; i < numBackends; i++) {
backend = groupName + i;
backendManager.activateBackend(backend);
queueLenghts.put(backend, mockQueueLength += queueLengthDistributiveFactor);
queueLengths.put(backend, mockQueueLength += queueLengthDistributiveFactor);
runningLengths.put(backend, mockRunningLength += runningLenDistributiveFactor);
}

clusterQueueMap.put(groupName, queueLenghts);
routingTable.updateRoutingTable(clusterQueueMap);
clusterQueueMap.put(groupName, queueLengths);
clusterRunningMap.put(groupName, runningLengths);
routingTable.updateRoutingTable(clusterQueueMap, clusterRunningMap);
}

private void resetBackends(String groupName, int numBk, int queueDistribution) {
private void resetBackends(String groupName, int numBk,
int queueDistribution, int runningDistribution) {
deactiveAllBackends();
registerBackEnds(groupName, numBk, queueDistribution);
registerBackEnds(groupName, numBk, queueDistribution, runningDistribution);
}

private Map<String, Integer> routeQueries(String groupName, int numRequests) {
Expand All @@ -149,11 +140,12 @@ private Map<String, Integer> routeQueries(String groupName, int numRequests) {
public void testRoutingWithEvenWeightDistribution() {

int queueDistribution = 3;
int runningDistribution = 0;

for (int numRequests : QUERY_VOLUMES) {
for (int numBk = 1; numBk <= NUM_BACKENDS; numBk++) {

resetBackends(mockRoutingGroup, numBk, queueDistribution);
resetBackends(mockRoutingGroup, numBk, queueDistribution, runningDistribution);
Map<String, Integer> routingDistribution = routeQueries(mockRoutingGroup, numRequests);

// Useful for debugging
Expand Down Expand Up @@ -201,20 +193,18 @@ public void testRoutingWithSkewedWeightDistribution() {
assert routingDistribution.values().stream().mapToInt(Integer::intValue).sum()
== numRequests;
}


}
}
}

@Test
public void testRoutingWithEqualWeightDistribution() {
int queueDistribution = 0;

int runningDistribution = 0;
for (int numRequests : QUERY_VOLUMES) {
for (int numBk = 1; numBk <= NUM_BACKENDS; numBk++) {

resetBackends(mockRoutingGroup, numBk, queueDistribution);
resetBackends(mockRoutingGroup, numBk, queueDistribution, runningDistribution);
Map<String, Integer> routingDistribution = routeQueries(mockRoutingGroup, numRequests);

//Useful Debugging Info
Expand All @@ -235,14 +225,45 @@ public void testRoutingWithEqualWeightDistribution() {
}
}


@Test
public void testRoutingWithEqualQueueSkewedRunningDistribution() {
int queueDistribution = 0;
int runningDistribution = 100;

for (int numRequests : QUERY_VOLUMES) {
for (int numBk = 1; numBk <= NUM_BACKENDS; numBk++) {

resetBackends(mockRoutingGroup, numBk, queueDistribution, runningDistribution);
Map<String, Integer> routingDistribution = routeQueries(mockRoutingGroup, numRequests);

//Useful Debugging Info
/*
System.out.println("Input :" + clusterRunningMap.toString() + " Num of Requests:" +
numRequests
+ " Internal Routing table: " + routingTable.getInternalWeightedRoutingTable
(mockRoutingGroup).toString()
+ " Distribution: " + routingDistribution.toString());
*/
if (numBk > 2 && routingDistribution.containsKey(mockRoutingGroup + (numBk - 1))) {
assert routingDistribution.get(mockRoutingGroup + (numBk - 1))
<= Math.ceil(numRequests / numBk);
} else {
assert routingDistribution.values().stream().mapToInt(Integer::intValue).sum()
== numRequests;
}
}
}
}

@Test
public void testRoutingWithMultipleGroups() {
int queueDistribution = 10;
int numRequests = 15;
int numBk = 3;

for (String grp : mockRoutingGroups) {
resetBackends(grp, numBk, queueDistribution);
resetBackends(grp, numBk, queueDistribution, 0);
Map<String, Integer> routingDistribution = routeQueries(grp, numRequests);

// Useful for debugging
Expand Down Expand Up @@ -295,10 +316,10 @@ public void testActiveClusterMonitorUpdateAndRouting() throws InterruptedExcepti
}
globalToggle.set(!globalToggle.get());
clusterQueueMap.put(mockRoutingGroup, queueLenghts);
routingTable.updateRoutingTable(clusterQueueMap);
routingTable.updateRoutingTable(clusterQueueMap, clusterQueueMap);
};

resetBackends(mockRoutingGroup, numBk, 0);
resetBackends(mockRoutingGroup, numBk, 0, 0);
scheduler.scheduleAtFixedRate(activeClusterMonitor, 0, 1, SECONDS);


Expand All @@ -308,7 +329,7 @@ public void testActiveClusterMonitorUpdateAndRouting() throws InterruptedExcepti
totalDistribution.putAll(routingDistribution);
} else {
for (String key : routingDistribution.keySet()) {
sum = totalDistribution.get(key) + routingDistribution.get(key);
sum = totalDistribution.getOrDefault(key, 0) + routingDistribution.get(key);
totalDistribution.put(key, sum);
}
}
Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
<artifactId>prestogateway-parent</artifactId>
<name>prestogateway-parent</name>
<packaging>pom</packaging>
<version>1.8.8-SNAPSHOT</version>

<version>1.8.9</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
Expand All @@ -19,7 +19,7 @@

<jetty.version>9.4.9.v20180320</jetty.version>
<slf4j.version>1.7.25</slf4j.version>
<lombok.version>1.18.10</lombok.version>
<lombok.version>1.18.22</lombok.version>
<testng.version>6.10</testng.version>
<mockwebserver.version>1.2.1</mockwebserver.version>

Expand Down
2 changes: 1 addition & 1 deletion proxyserver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>com.lyft.data</groupId>
<artifactId>prestogateway-parent</artifactId>
<version>1.8.8-SNAPSHOT</version>
<version>1.8.9</version>
<relativePath>../</relativePath>
</parent>

Expand Down

0 comments on commit 3d3159e

Please sign in to comment.