Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix application qps quota stalls. #14859

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@
* - broker added or removed from cluster
*/
public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHandler, QueryQuotaManager {

// Minimum 'working' value for app quota. If actual value is less than this (e.g. 0.0), it is considered as disabled.
private static final double MIN_APP_QUOTA = Math.nextUp(0.0);
// standard value meaning - no app quota limit set
private static final double DISABLED_APP_QUOTA = -1;

private static final Logger LOGGER = LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class);
private static final int ONE_SECOND_TIME_RANGE_IN_SECOND = 1;
private static final int ONE_MINUTE_TIME_RANGE_IN_SECOND = 60;
Expand Down Expand Up @@ -130,9 +136,10 @@ private void initializeApplicationQpsQuotas() {

String appName = entry.getKey();
double appQpsQuota =
entry.getValue() != null && entry.getValue() != -1.0d ? entry.getValue() : _defaultQpsQuotaForApplication;
entry.getValue() != null && entry.getValue() >= MIN_APP_QUOTA ? entry.getValue()
: _defaultQpsQuotaForApplication;

if (appQpsQuota < 0) {
if (appQpsQuota < MIN_APP_QUOTA) {
buildEmptyOrResetApplicationRateLimiter(appName);
continue;
}
Expand Down Expand Up @@ -348,16 +355,38 @@ private synchronized void createOrUpdateDatabaseRateLimiter(List<String> databas
}

public synchronized void createOrUpdateApplicationRateLimiter(String applicationName) {
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName));
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName), DISABLED_APP_QUOTA);
}

public synchronized void createOrUpdateApplicationRateLimiter(String applicationName, double override) {
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName), override);
}

// Caller method need not worry about getting lock on _applicationRateLimiterMap
// as this method will do idempotent updates to the application rate limiters
private synchronized void createOrUpdateApplicationRateLimiter(List<String> applicationNames) {
private synchronized void createOrUpdateApplicationRateLimiter(List<String> applicationNames, double override) {
ExternalView brokerResource = getBrokerResource();
Map<String, Double> quotas = null;
boolean quotasInitialized = false;

for (String appName : applicationNames) {
double qpsQuota = getEffectiveQueryQuotaOnApplication(appName);
if (qpsQuota < 0) {
double qpsQuota;
if (override >= MIN_APP_QUOTA) {
qpsQuota = override;
} else {
if (!quotasInitialized) {
quotas = ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore());
quotasInitialized = true;
}

if (quotas != null && quotas.get(appName) != null && quotas.get(appName) >= MIN_APP_QUOTA) {
qpsQuota = quotas.get(appName);
} else {
qpsQuota = _defaultQpsQuotaForApplication;
}
}

if (qpsQuota < MIN_APP_QUOTA) {
buildEmptyOrResetApplicationRateLimiter(appName);
continue;
}
Expand Down Expand Up @@ -436,22 +465,6 @@ private double getEffectiveQueryQuotaOnDatabase(String databaseName) {
return _defaultQpsQuotaForDatabase;
}

/**
* Utility to get the effective query quota being imposed on an application. It is computed based on the default quota
* set at cluster config.
*
* @param applicationName application name to get the query quota on.
* @return effective query quota limit being applied
*/
private double getEffectiveQueryQuotaOnApplication(String applicationName) {
Map<String, Double> quotas =
ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore());
if (quotas != null && quotas.get(applicationName) != null && quotas.get(applicationName) != -1.0d) {
return quotas.get(applicationName);
}
return _defaultQpsQuotaForApplication;
}

/**
* Creates a new database rate limiter. Will not update the database rate limiter if it already exists.
* @param databaseName database name for which rate limiter needs to be created
Expand All @@ -472,7 +485,7 @@ public void createApplicationRateLimiter(String applicationName) {
if (_applicationRateLimiterMap.containsKey(applicationName)) {
return;
}
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName));
createOrUpdateApplicationRateLimiter(applicationName);
}

/**
Expand Down Expand Up @@ -579,10 +592,12 @@ public boolean acquireApplication(String applicationName) {
}
QueryQuotaEntity queryQuota = _applicationRateLimiterMap.get(applicationName);
if (queryQuota == null) {
if (getDefaultQueryQuotaForApplication() < 0) {
// do not create a new rate limiter because that could lead to OOM if client floods us with many unique app names
if (_defaultQpsQuotaForApplication < MIN_APP_QUOTA) {
return true;
} else {
createOrUpdateApplicationRateLimiter(applicationName);
// create limiter without querying ZK
createOrUpdateApplicationRateLimiter(applicationName, _defaultQpsQuotaForApplication);
queryQuota = _applicationRateLimiterMap.get(applicationName);
}
}
Expand Down Expand Up @@ -809,8 +824,8 @@ public void processQueryRateLimitingExternalViewChange(ExternalView currentBroke
if (quota.getNumOnlineBrokers() != onlineBrokerCount) {
quota.setNumOnlineBrokers(onlineBrokerCount);
}
if (quota.getOverallRate() > 0) {
double qpsQuota = quota.getOverallRate() / onlineBrokerCount;
if (quota.getOverallRate() >= MIN_APP_QUOTA) {
double qpsQuota = Math.max(quota.getOverallRate() / onlineBrokerCount, MIN_APP_QUOTA);
quota.setRateLimiter(RateLimiter.create(qpsQuota));
}
}
Expand Down Expand Up @@ -843,7 +858,7 @@ public void processApplicationQueryRateLimitingClusterConfigChange() {
if (oldQpsQuota == _defaultQpsQuotaForApplication) {
return;
}
createOrUpdateApplicationRateLimiter(new ArrayList<>(_applicationRateLimiterMap.keySet()));
createOrUpdateApplicationRateLimiter(new ArrayList<>(_applicationRateLimiterMap.keySet()), DISABLED_APP_QUOTA);
}

private double getDefaultQueryQuotaForDatabase() {
Expand All @@ -857,11 +872,16 @@ private double getDefaultQueryQuotaForDatabase() {

private double getDefaultQueryQuotaForApplication() {
HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool();
HelixConfigScope configScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
_helixManager.getClusterName()).build();
return Double.parseDouble(helixAdmin.getConfig(configScope,
HelixConfigScope configScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
.forCluster(_helixManager.getClusterName()).build();
String value = helixAdmin.getConfig(configScope,
Collections.singletonList(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND))
.getOrDefault(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND, "-1"));
.get(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND);
if (value != null) {
return Double.parseDouble(value);
} else {
return DISABLED_APP_QUOTA;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class PinotApplicationQuotaRestletResource {
PinotHelixResourceManager _pinotHelixResourceManager;

/**
* API to get application quota configs. Will return null if application quotas are not defined
* API to get application quota configs. Will return empty map if application quotas are not defined at all.
*/
@GET
@Produces(MediaType.APPLICATION_JSON)
Expand All @@ -88,7 +88,7 @@ public Map<String, Double> getApplicationQuotas(@Context HttpHeaders httpHeaders
}

/**
* API to get application quota configs. Will return null if application quotas are not defined
* API to get application quota config. Will return null if application quotas is not defined.
*/
@GET
@Produces(MediaType.APPLICATION_JSON)
Expand All @@ -104,15 +104,16 @@ public Double getApplicationQuota(@Context HttpHeaders httpHeaders, @PathParam("

HelixConfigScope scope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
_pinotHelixResourceManager.getHelixClusterName()).build();

HelixAdmin helixAdmin = _pinotHelixResourceManager.getHelixAdmin();
String defaultQuota =
helixAdmin.getConfig(scope, Collections.singletonList(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND))
.getOrDefault(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND, null);
.get(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND);
return defaultQuota != null ? Double.parseDouble(defaultQuota) : null;
}

/**
* API to update the quota configs for application
* API to update the quota config for application.
*/
@POST
@Produces(MediaType.APPLICATION_JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ private void verifyQuotaUpdate(float quotaQps) {
} catch (IOException e) {
throw new RuntimeException(e);
}
}, 5000, "Failed to reflect query quota on rate limiter in 5s.");
}, 10000, "Failed to reflect query quota on rate limiter in 5s.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is not strictly necessary, so I'll revert it.

} catch (AssertionError ae) {
throw new AssertionError(
ae.getMessage() + " Expected quota:" + quotaQps + " but is: " + _quota + " set on: " + _quotaSource, ae);
Expand Down
Loading