-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable query cancellation for MSQE + cancel using client-provided id #14823
base: master
Are you sure you want to change the base?
Changes from all commits
f7a9488
39a4f94
c969abd
8162bc6
aa8c120
7a5f713
a9d1e49
97e7b5d
65f73a0
e3a9a5e
fe5c846
ae3260c
2eb506e
5dd5409
e9bbdac
9dcc393
5ac7d9e
a46659c
110fb16
47fb9bb
52998d3
e2678af
c201cf4
c60b953
6042ad2
b458a5b
9d0f335
1e00bf6
d3061ba
a0e1e83
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,18 +19,24 @@ | |
package org.apache.pinot.broker.requesthandler; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.google.common.base.Preconditions; | ||
import com.google.common.collect.Maps; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.Executor; | ||
import javax.annotation.Nullable; | ||
import javax.annotation.concurrent.ThreadSafe; | ||
import javax.ws.rs.WebApplicationException; | ||
import javax.ws.rs.core.HttpHeaders; | ||
import javax.ws.rs.core.MultivaluedMap; | ||
import javax.ws.rs.core.Response; | ||
import org.apache.commons.lang3.StringUtils; | ||
import org.apache.hc.client5.http.io.HttpClientConnectionManager; | ||
import org.apache.pinot.broker.api.AccessControl; | ||
import org.apache.pinot.broker.api.RequesterIdentity; | ||
import org.apache.pinot.broker.broker.AccessControlFactory; | ||
|
@@ -51,6 +57,7 @@ | |
import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory; | ||
import org.apache.pinot.spi.exception.BadQueryRequestException; | ||
import org.apache.pinot.spi.trace.RequestContext; | ||
import org.apache.pinot.spi.utils.CommonConstants; | ||
import org.apache.pinot.spi.utils.CommonConstants.Broker; | ||
import org.apache.pinot.sql.parsers.SqlNodeAndOptions; | ||
import org.slf4j.Logger; | ||
|
@@ -74,6 +81,10 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { | |
protected final QueryLogger _queryLogger; | ||
@Nullable | ||
protected final String _enableNullHandling; | ||
// maps broker-generated query id to the query string | ||
protected final Map<Long, String> _queriesById; | ||
// maps broker-generated query id to client-provided query id | ||
protected final Map<Long, String> _clientQueryIds; | ||
|
||
public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager, | ||
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache) { | ||
|
@@ -90,6 +101,16 @@ public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, Brok | |
_brokerTimeoutMs = config.getProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, Broker.DEFAULT_BROKER_TIMEOUT_MS); | ||
_queryLogger = new QueryLogger(config); | ||
_enableNullHandling = config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_ENABLE_NULL_HANDLING); | ||
|
||
boolean enableQueryCancellation = | ||
Boolean.parseBoolean(config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION)); | ||
if (enableQueryCancellation) { | ||
_queriesById = new ConcurrentHashMap<>(); | ||
_clientQueryIds = new ConcurrentHashMap<>(); | ||
} else { | ||
_queriesById = null; | ||
_clientQueryIds = null; | ||
} | ||
} | ||
|
||
@Override | ||
|
@@ -179,6 +200,13 @@ protected abstract BrokerResponse handleRequest(long requestId, String query, Sq | |
@Nullable HttpHeaders httpHeaders, AccessControl accessControl) | ||
throws Exception; | ||
|
||
/** | ||
* Attemps to cancel an ongoing query identified by its broker-generated id. | ||
* @return true if the query was successfully cancelled, false otherwise. | ||
*/ | ||
protected abstract boolean handleCancel(long queryId, int timeoutMs, Executor executor, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need javadoc here to explain how it should work. At least we should say that queryId may be a client or pinot generated id. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually the Added some minimal javadoc here: 52998d3 I tried to mimic the current code design for
To increase confusion, neither of the two methods have a javadoc. This probably could get better designed if we move forward with the proposed shared state design. |
||
HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses) throws Exception; | ||
|
||
protected static void augmentStatistics(RequestContext statistics, BrokerResponse response) { | ||
statistics.setNumRowsResultSet(response.getNumRowsResultSet()); | ||
// TODO: Add partial result flag to RequestContext | ||
|
@@ -223,4 +251,66 @@ protected static void augmentStatistics(RequestContext statistics, BrokerRespons | |
statistics.setExplainPlanNumMatchAllFilterSegments(response.getExplainPlanNumMatchAllFilterSegments()); | ||
statistics.setTraceInfo(response.getTraceInfo()); | ||
} | ||
|
||
@Override | ||
public Map<Long, String> getRunningQueries() { | ||
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); | ||
return Collections.unmodifiableMap(_queriesById); | ||
} | ||
|
||
@Override | ||
public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, | ||
Map<String, Integer> serverResponses) | ||
throws Exception { | ||
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); | ||
try { | ||
return handleCancel(queryId, timeoutMs, executor, connMgr, serverResponses); | ||
} finally { | ||
onQueryFinish(queryId); | ||
} | ||
} | ||
|
||
@Override | ||
public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor, | ||
HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses) | ||
throws Exception { | ||
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); | ||
Optional<Long> requestId = _clientQueryIds.entrySet().stream() | ||
.filter(e -> clientQueryId.equals(e.getValue())).map(Map.Entry::getKey).findFirst(); | ||
if (requestId.isPresent()) { | ||
return cancelQuery(requestId.get(), timeoutMs, executor, connMgr, serverResponses); | ||
} else { | ||
LOGGER.warn("Query cancellation cannot be performed due to unknown client query id: {}", clientQueryId); | ||
return false; | ||
Comment on lines
+283
to
+284
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should't we throw here to notify the caller that the query id is incorrect? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just tried to be consistent with the behavior when we cancel using a broker-generated id: it returns
|
||
} | ||
} | ||
|
||
protected String extractClientRequestId(SqlNodeAndOptions sqlNodeAndOptions) { | ||
return sqlNodeAndOptions.getOptions() != null | ||
? sqlNodeAndOptions.getOptions().get(Broker.Request.QueryOptionKey.CLIENT_QUERY_ID) : null; | ||
} | ||
|
||
protected void onQueryStart(long requestId, String clientRequestId, String query, Object... extras) { | ||
if (isQueryCancellationEnabled()) { | ||
_queriesById.put(requestId, query); | ||
if (StringUtils.isNotBlank(clientRequestId)) { | ||
_clientQueryIds.put(requestId, clientRequestId); | ||
LOGGER.debug("Keep track of running query: {} (with client id {})", requestId, clientRequestId); | ||
} else { | ||
LOGGER.debug("Keep track of running query: {}", requestId); | ||
} | ||
} | ||
} | ||
|
||
protected void onQueryFinish(long requestId) { | ||
if (isQueryCancellationEnabled()) { | ||
_queriesById.remove(requestId); | ||
_clientQueryIds.remove(requestId); | ||
LOGGER.debug("Remove track of running query: {}", requestId); | ||
} | ||
} | ||
|
||
protected boolean isQueryCancellationEnabled() { | ||
return _queriesById != null; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not something we introduced in this PR, but something I think we need to take care of in the future:
We use BaseBrokerRequestHandler as the root/common state for the broker, probably for historical reasons. But that is not true. A single broker may have SSE, MSE, GRPC and even TSE queries running at the same time. It would be a better design to have a shared state between them instead of the trick we do with the delegate.
This is something we need to improve in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with the shared state refactor, we should write it down somewhere so we actually do it ;-)