Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework MSE query throttling to take into account estimated number of threads used by a query #14847

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,11 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
}

Timer queryTimer = new Timer(queryTimeoutMs);
int estimatedNumQueryThreads = dispatchableSubPlan.getEstimatedNumQueryThreads();
try {
// It's fine to block in this thread because we use a separate thread pool from the main Jersey server to process
// these requests.
if (!_queryThrottler.tryAcquire(queryTimeoutMs, TimeUnit.MILLISECONDS)) {
if (!_queryThrottler.tryAcquire(estimatedNumQueryThreads, queryTimeoutMs, TimeUnit.MILLISECONDS)) {
LOGGER.warn("Timed out waiting to execute request {}: {}", requestId, query);
requestContext.setErrorCode(QueryException.EXECUTION_TIMEOUT_ERROR_CODE);
return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR);
Expand Down Expand Up @@ -311,7 +312,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO

return brokerResponse;
} finally {
_queryThrottler.release();
_queryThrottler.release(estimatedNumQueryThreads);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,14 @@

/**
* This class helps limit the number of multi-stage queries being executed concurrently. Note that the cluster
* configuration is a "per server" value and the broker currently simply assumes that a query will be across all
* servers. Another assumption here is that queries are evenly distributed across brokers.
* configuration is a "per server" value and the broker currently computes the max server query threads as
* <em>CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS * numServers / numBrokers</em>. Note that the config value,
* number of servers, and number of brokers are all dynamically updated here.
* <p>
* Another assumption made here is that queries are evenly distributed across brokers.
* <p>
* This is designed to limit the number of multi-stage queries being concurrently executed across a cluster and is not
* intended to prevent individual large queries from being executed.
*/
public class MultiStageQueryThrottler implements ClusterChangeHandler {

Expand All @@ -50,10 +56,10 @@ public class MultiStageQueryThrottler implements ClusterChangeHandler {
private int _numBrokers;
private int _numServers;
/**
* If _maxConcurrentQueries is <= 0, it means that the cluster is not configured to limit the number of multi-stage
* If _maxServerQueryThreads is <= 0, it means that the cluster is not configured to limit the number of multi-stage
* queries that can be executed concurrently. In this case, we should not block the query.
*/
private int _maxConcurrentQueries;
private int _maxServerQueryThreads;
private AdjustableSemaphore _semaphore;

@Override
Expand All @@ -63,11 +69,11 @@ public void init(HelixManager helixManager) {
_helixConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
_helixManager.getClusterName()).build();

_maxConcurrentQueries = Integer.parseInt(
_maxServerQueryThreads = Integer.parseInt(
_helixAdmin.getConfig(_helixConfigScope,
Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES))
.getOrDefault(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES,
CommonConstants.Helix.DEFAULT_MAX_CONCURRENT_MULTI_STAGE_QUERIES));
Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS))
.getOrDefault(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS,
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS));

List<String> clusterInstances = _helixAdmin.getInstancesInCluster(_helixManager.getClusterName());
_numBrokers = Math.max(1, (int) clusterInstances.stream()
Expand All @@ -77,36 +83,49 @@ public void init(HelixManager helixManager) {
.filter(instance -> instance.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE))
.count());

if (_maxConcurrentQueries > 0) {
_semaphore = new AdjustableSemaphore(Math.max(1, _maxConcurrentQueries * _numServers / _numBrokers), true);
if (_maxServerQueryThreads > 0) {
_semaphore = new AdjustableSemaphore(Math.max(1, _maxServerQueryThreads * _numServers / _numBrokers), true);
}
}

/**
* Returns true if the query can be executed (waiting until it can be executed if necessary), false otherwise.
* <p>
* {@link #release()} should be called after the query is done executing. It is the responsibility of the caller to
* ensure that {@link #release()} is called exactly once for each call to this method.
* {@link #release(int)} should be called after the query is done executing. It is the responsibility of the caller to
* ensure that {@link #release(int)} is called exactly once for each call to this method.
*
* @param numQueryThreads the estimated number of query server threads
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
*
* @throws InterruptedException if the current thread is interrupted
* @throws RuntimeException if the query can never be dispatched due to the number of estimated query server threads
* being greater than the maximum number of server query threads calculated on the basis of
* <em>CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS * numServers / numBrokers</em>
*/
public boolean tryAcquire(long timeout, TimeUnit unit)
public boolean tryAcquire(int numQueryThreads, long timeout, TimeUnit unit)
throws InterruptedException {
if (_maxConcurrentQueries <= 0) {
if (_maxServerQueryThreads <= 0) {
return true;
}
return _semaphore.tryAcquire(timeout, unit);

if (numQueryThreads > _semaphore.getTotalPermits()) {
throw new RuntimeException(
"Can't dispatch query because the estimated number of server threads for this query is too large for the "
+ "configured value of '" + CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS
+ "'. Consider increasing the value of this configuration");
}

return _semaphore.tryAcquire(numQueryThreads, timeout, unit);
}

/**
* Should be called after the query is done executing. It is the responsibility of the caller to ensure that this
* method is called exactly once for each call to {@link #tryAcquire(long, TimeUnit)}.
* method is called exactly once for each call to {@link #tryAcquire(int, long, TimeUnit)}.
*/
public void release() {
if (_maxConcurrentQueries > 0) {
_semaphore.release();
public void release(int numQueryThreads) {
if (_maxServerQueryThreads > 0) {
_semaphore.release(numQueryThreads);
}
}

Expand All @@ -128,34 +147,33 @@ public void processClusterChange(HelixConstants.ChangeType changeType) {
if (numBrokers != _numBrokers || numServers != _numServers) {
_numBrokers = numBrokers;
_numServers = numServers;
if (_maxConcurrentQueries > 0) {
_semaphore.setPermits(Math.max(1, _maxConcurrentQueries * _numServers / _numBrokers));
if (_maxServerQueryThreads > 0) {
_semaphore.setPermits(Math.max(1, _maxServerQueryThreads * _numServers / _numBrokers));
}
}
} else {
int maxConcurrentQueries = Integer.parseInt(
_helixAdmin.getConfig(_helixConfigScope,
Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES))
.getOrDefault(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES,
CommonConstants.Helix.DEFAULT_MAX_CONCURRENT_MULTI_STAGE_QUERIES));
int maxServerQueryThreads = Integer.parseInt(_helixAdmin.getConfig(_helixConfigScope,
Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS))
.getOrDefault(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS,
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS));

if (_maxConcurrentQueries == maxConcurrentQueries) {
if (_maxServerQueryThreads == maxServerQueryThreads) {
return;
}

if (_maxConcurrentQueries <= 0 && maxConcurrentQueries > 0
|| _maxConcurrentQueries > 0 && maxConcurrentQueries <= 0) {
if (_maxServerQueryThreads <= 0 && maxServerQueryThreads > 0
|| _maxServerQueryThreads > 0 && maxServerQueryThreads <= 0) {
// This operation isn't safe to do while queries are running so we require a restart of the broker for this
// change to take effect.
LOGGER.warn("Enabling or disabling limitation of the maximum number of multi-stage queries running "
+ "concurrently requires a restart of the broker to take effect");
return;
}

if (maxConcurrentQueries > 0) {
_semaphore.setPermits(Math.max(1, maxConcurrentQueries * _numServers / _numBrokers));
if (maxServerQueryThreads > 0) {
_semaphore.setPermits(Math.max(1, maxServerQueryThreads * _numServers / _numBrokers));
}
_maxConcurrentQueries = maxConcurrentQueries;
_maxServerQueryThreads = maxServerQueryThreads;
}
}

Expand Down
Loading
Loading