Skip to content

Commit

Permalink
v2.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
arnett, stu committed Jun 24, 2015
1 parent 44daedf commit 956f6de
Show file tree
Hide file tree
Showing 19 changed files with 1,283 additions and 157 deletions.
111 changes: 84 additions & 27 deletions src/main/java/com/emc/rest/smart/Host.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.log4j.LogMF;
import org.apache.log4j.Logger;

import java.util.ArrayDeque;
import java.util.Date;
import java.util.LinkedList;
import java.util.Queue;

/**
Expand All @@ -46,12 +46,13 @@
public class Host implements HostStats {
private static final Logger l4j = Logger.getLogger(Host.class);

protected static final int DEFAULT_RESPONSE_WINDOW_SIZE = 20;
protected static final int DEFAULT_ERROR_COOL_DOWN_SECS = 30;
public static final int DEFAULT_RESPONSE_WINDOW_SIZE = 25;
public static final int DEFAULT_ERROR_COOL_DOWN_SECS = 10;

private String name;
protected int responseWindowSize;
protected int errorCoolDownSecs;
private boolean healthy = true;
protected int responseWindowSize = DEFAULT_RESPONSE_WINDOW_SIZE;
protected int errorCoolDownSecs = DEFAULT_ERROR_COOL_DOWN_SECS;

protected int openConnections;
protected long lastConnectionTime;
Expand All @@ -60,24 +61,13 @@ public class Host implements HostStats {
protected long consecutiveErrors;
protected long responseQueueAverage;

protected Queue<Long> responseQueue = new LinkedList<Long>();
protected Queue<Long> responseQueue = new ArrayDeque<Long>();

/**
* Uses a default error cool down of 30 secs and a response window size of 20.
* @param name the host name or IP address of this host
*/
public Host(String name) {
this(name, DEFAULT_RESPONSE_WINDOW_SIZE, DEFAULT_ERROR_COOL_DOWN_SECS);
}

/**
* @param name the host name or IP address of this host
* @param errorCoolDownSecs the cool down period for errors (number of seconds after an error when the host is
* considered normalized). compounded for multiple consecutive errors
*/
public Host(String name, int responseWindowSize, int errorCoolDownSecs) {
this.name = name;
this.responseWindowSize = responseWindowSize;
this.errorCoolDownSecs = errorCoolDownSecs;
}

public synchronized void connectionOpened() {
Expand All @@ -102,7 +92,7 @@ public synchronized void callComplete(long duration, boolean isError) {

// log response time
responseQueue.add(duration);
if (responseQueue.size() > responseWindowSize)
while (responseQueue.size() > responseWindowSize)
responseQueue.remove();

// recalculate average
Expand All @@ -120,17 +110,39 @@ public String getName() {
return name;
}

public synchronized long getResponseIndex() {
// error adjustment adjust the index up based on the number of consecutive errors
long errorAdjustment = consecutiveErrors * errorCoolDownSecs * 1000;
public boolean isHealthy() {
return healthy;
}

public void setHealthy(boolean healthy) {
this.healthy = healthy;
}

public long getResponseIndex() {
long currentTime = System.currentTimeMillis();

// open connection adjustment adjusts the index up based on the number of open connections to the host
long openConnectionAdjustment = openConnections * errorCoolDownSecs; // cool down secs as ms instead
synchronized (this) {
// error adjustment adjust the index up based on the number of consecutive errors
long errorAdjustment = consecutiveErrors * errorCoolDownSecs * 1000;

// dormant adjustment adjusts the index down based on how long it's been since the host was last used
long msSinceLastUse = System.currentTimeMillis() - lastConnectionTime;
// open connection adjustment adjusts the index up based on the number of open connections to the host
long openConnectionAdjustment = openConnections * errorCoolDownSecs; // cool down secs as ms instead

return responseQueueAverage + errorAdjustment + openConnectionAdjustment - msSinceLastUse;
// dormant adjustment adjusts the index down based on how long it's been since the host was last used
long msSinceLastUse = currentTime - lastConnectionTime;

return responseQueueAverage + errorAdjustment + openConnectionAdjustment - msSinceLastUse;
}
}

/**
* Resets historical metrics. Use with care!
*/
public synchronized void resetStats() {
totalConnections = openConnections;
totalErrors = 0;
consecutiveErrors = 0;
responseQueueAverage = 0;
}

@Override
Expand Down Expand Up @@ -158,9 +170,54 @@ public long getResponseQueueAverage() {
return responseQueueAverage;
}

public long getConsecutiveErrors() {
return consecutiveErrors;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Host)) return false;

Host host = (Host) o;

return getName().equals(host.getName());
}

@Override
public int hashCode() {
return getName().hashCode();
}

@Override
public String toString() {
return String.format("%s{totalConnections=%d, totalErrors=%d, openConnections=%d, lastConnectionTime=%s, responseQueueAverage=%d}",
name, totalConnections, totalErrors, openConnections, new Date(lastConnectionTime).toString(), responseQueueAverage);
}

public int getResponseWindowSize() {
return responseWindowSize;
}

public void setResponseWindowSize(int responseWindowSize) {
this.responseWindowSize = responseWindowSize;
}

public int getErrorCoolDownSecs() {
return errorCoolDownSecs;
}

public void setErrorCoolDownSecs(int errorCoolDownSecs) {
this.errorCoolDownSecs = errorCoolDownSecs;
}

public Host withResponseWindowSize(int responseWindowSize) {
setResponseWindowSize(responseWindowSize);
return this;
}

public Host withErrorCoolDownSecs(int errorCoolDownSecs) {
setErrorCoolDownSecs(errorCoolDownSecs);
return this;
}
}
9 changes: 8 additions & 1 deletion src/main/java/com/emc/rest/smart/HostListProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,12 @@
import java.util.List;

public interface HostListProvider {
public abstract List<String> getHostList();
List<Host> getHostList();

/**
* If this completes without throwing an exception, the host is considered healthy
* (<code>host.setHealthy(true)</code> is called). Otherwise, the host is considered unhealthy/down
* (<code>host.setHealthy(false)</code> is called).
*/
void runHealthCheck(Host host);
}
33 changes: 33 additions & 0 deletions src/main/java/com/emc/rest/smart/HostVetoRule.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2015, EMC Corporation.
* Redistribution and use in source and binary forms, with or without modification,
* are permitted provided that the following conditions are met:
*
* + Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* + Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* + The name of EMC Corporation may not be used to endorse or promote
* products derived from this software without specific prior written
* permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package com.emc.rest.smart;

import java.util.Map;

public interface HostVetoRule {
boolean shouldVeto(Host host, Map<String, Object> requestProperties);
}
77 changes: 58 additions & 19 deletions src/main/java/com/emc/rest/smart/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,35 @@
*/
package com.emc.rest.smart;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.*;

public class LoadBalancer {
private final Queue<Host> hosts = new ConcurrentLinkedQueue<Host>();
private final Queue<Host> hosts = new ArrayDeque<Host>();
private List<HostVetoRule> vetoRules;

public LoadBalancer(List<String> initialHosts) {
public LoadBalancer(List<Host> initialHosts) {

// seed the host map
for (String host : initialHosts) {
hosts.add(new Host(host));
}
hosts.addAll(initialHosts);
}

/**
* Returns the host with the lowest response index.
*/
public Host getTopHost() {
public Host getTopHost(Map<String, Object> requestProperties) {
Host topHost = null;

long lowestIndex = Long.MAX_VALUE;

synchronized (hosts) {
for (Host host : hosts) {

// apply any veto rules
if (shouldVeto(host, requestProperties)) continue;

// if the host is unhealthy/down, ignore it
if (!host.isHealthy()) continue;

// get response index for a host
long hostIndex = host.getResponseIndex();

Expand All @@ -72,16 +73,41 @@ public Host getTopHost() {
return topHost;
}

protected boolean shouldVeto(Host host, Map<String, Object> requestProperties) {
if (vetoRules != null) {
for (HostVetoRule vetoRule : vetoRules) {
if (vetoRule.shouldVeto(host, requestProperties)) return true;
}
}
return false;
}

/**
* Returns a list of all known hosts. This list is a clone; modification will not affect the load balancer
*/
public synchronized List<Host> getAllHosts() {
return new ArrayList<Host>(hosts);
}

/**
* Returns stats for all active hosts in this load balancer
*/
public HostStats[] getHostStats() {
public synchronized HostStats[] getHostStats() {
return hosts.toArray(new HostStats[hosts.size()]);
}

protected void updateHosts(List<String> updatedHosts) throws Exception {
/**
* Resets connection metrics. Use with care!
*/
public void resetStats() {
for (Host host : getAllHosts()) {
host.resetStats();
}
}

protected void updateHosts(List<Host> updatedHosts) throws Exception {
// don't modify the parameter
List<String> hostList = new ArrayList<String>(updatedHosts);
List<Host> hostList = new ArrayList<Host>(updatedHosts);

synchronized (hosts) {
// remove hosts from stored list that are not present in updated list
Expand All @@ -90,10 +116,10 @@ protected void updateHosts(List<String> updatedHosts) throws Exception {
while (hostI.hasNext()) {
Host host = hostI.next();
boolean stillThere = false;
Iterator<String> hostListI = hostList.iterator();
Iterator<Host> hostListI = hostList.iterator();
while (hostListI.hasNext()) {
String hostFromUpdate = hostListI.next();
if (host.getName().equalsIgnoreCase(hostFromUpdate)) {
Host hostFromUpdate = hostListI.next();
if (host.equals(hostFromUpdate)) {

// this host is in both the stored list and the updated list
stillThere = true;
Expand All @@ -107,9 +133,22 @@ protected void updateHosts(List<String> updatedHosts) throws Exception {
}

// what's left in the updated list are new hosts, so add them
for (String newHost : hostList) {
hosts.add(new Host(newHost));
for (Host newHost : hostList) {
hosts.add(newHost);
}
}
}

public List<HostVetoRule> getVetoRules() {
return vetoRules;
}

public void setVetoRules(List<HostVetoRule> vetoRules) {
this.vetoRules = vetoRules;
}

public LoadBalancer withVetoRules(HostVetoRule... vetoRules) {
setVetoRules(Arrays.asList(vetoRules));
return this;
}
}
31 changes: 25 additions & 6 deletions src/main/java/com/emc/rest/smart/PollingDaemon.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,24 +54,43 @@ public void run() {
LoadBalancer loadBalancer = smartConfig.getLoadBalancer();
HostListProvider hostListProvider = smartConfig.getHostListProvider();

if (smartConfig.isDisablePolling()) {
l4j.info("host polling is disabled; not updating hosts");
if (!smartConfig.isHostUpdateEnabled()) {
l4j.info("host update is disabled; not updating hosts");
} else if (hostListProvider == null) {
l4j.info("no host list provider; not updating hosts");
} else {
try {
loadBalancer.updateHosts(hostListProvider.getHostList());
} catch (Throwable t) {
l4j.warn("Unable to enumerate servers", t);
l4j.warn("unable to enumerate servers", t);
}
}

if (!smartConfig.isHealthCheckEnabled()) {
l4j.info("health check is disabled; not checking hosts");
} else if (hostListProvider == null) {
l4j.info("no host list provider; not checking hosts");
} else {
for (Host host : loadBalancer.getAllHosts()) {
try {
hostListProvider.runHealthCheck(host);
host.setHealthy(true);
LogMF.debug(l4j, "health check successful for {0}; host is marked healthy", host.getName());
} catch (Throwable t) {
host.setHealthy(false);
l4j.warn("health check failed for " + host.getName() + "; host is marked unhealthy", t);
}
}
}

long callTime = System.currentTimeMillis() - start;
try {
LogMF.debug(l4j, "polling daemon finished; sleeping for {0} seconds..", smartConfig.getPollInterval());
Thread.sleep(smartConfig.getPollInterval() * 1000 - callTime);
long sleepTime = smartConfig.getPollInterval() * 1000 - callTime;
if (sleepTime < 0) sleepTime = 0;
LogMF.debug(l4j, "polling daemon finished; will poll again in {0}ms..", sleepTime);
if (sleepTime > 0) Thread.sleep(sleepTime);
} catch (InterruptedException e) {
l4j.warn("Interrupted while sleeping");
l4j.warn("interrupted while sleeping", e);
}
}
}
Expand Down
Loading

0 comments on commit 956f6de

Please sign in to comment.