-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable query cancellation for MSQE + cancel using client-provided id #14823
base: master
Are you sure you want to change the base?
Conversation
} | ||
String clientQueryId = extractClientQueryId(sqlNodeAndOptions); | ||
if (StringUtils.isBlank(clientQueryId)) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) in general we don't recommend returning NULL as a coding practice
Is ClientQueryID a new concept? Is it same as requestID ? How does the support added here improve the existing Query Cancellation (which is also exposed to user IIRC) ? |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #14823 +/- ##
============================================
+ Coverage 61.75% 63.68% +1.93%
- Complexity 207 1483 +1276
============================================
Files 2436 2712 +276
Lines 133233 152210 +18977
Branches 20636 23518 +2882
============================================
+ Hits 82274 96936 +14662
- Misses 44911 47977 +3066
- Partials 6048 7297 +1249
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Hi Siddharth, AFAIK, the current cancellation feature depends on the internal requestId generated by the broker itself. That request id is not returned until the query completes, so an external user requires first to ask for the active running queries, determine from the responded array the requestId assigned to the one he's interested in (just comparing the query body) and finally use the cancel operation to abort it. That's two back-and-forth trips between the user and the cluster. With a client-provided requestId he can skip one step, going straight to the cancel operation using his own ID to abort the query. |
As some extra context, the endgame of this is enable on UI a "Cancel" button the customer can use to abort an ongoing query. Using a query id provided by the customer or the UI itself, that can be done without need of any internal id retrieval. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm adding several comments but I wasn't able to read the whole PR.
Although I'm asking for changes, it is a good PR overall. We just need to finish the last mile.
} catch (InterruptedException e) { | ||
//TODO: handle interruption | ||
//Thread.currentThread().interrupt(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to fix this TODO before merging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any suggestion on how we should deal with an interruption here? Just warn it and skip the sleep or propagate the error?
public String cancelClientQuery( | ||
@ApiParam(value = "ClientQueryId given by the client", required = true) | ||
@PathParam("clientQueryId") String clientQueryId, | ||
@ApiParam(value = "Timeout for servers to respond the cancel request") @QueryParam("timeoutMs") | ||
@DefaultValue("3000") int timeoutMs, | ||
@ApiParam(value = "Return server responses for troubleshooting") @QueryParam("verbose") @DefaultValue("false") | ||
boolean verbose) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use the same endpoint we already have?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could expand the already existing endpoint by adding a @QueryParam
to determine if the provided id is either internal or client-based, being internal as default.
The only drawback here is that internal ids are long
whereas client ids are string
, so type validation could no longer been done by Jersey but by the method itself.
The Controller scenario is different, though. There the existing endpoint is DELETE /query/{brokerId}/{queryId}
, but for clientid-based cancellations we do not want to know the exact broker where the query felt into, so we need an endpoint such as DELETE /clientQuery/{clientQueryId}
. Can't see how to unify these two.
boolean enableQueryCancellation = | ||
Boolean.parseBoolean(config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION)); | ||
if (enableQueryCancellation) { | ||
_queriesById = new ConcurrentHashMap<>(); | ||
_clientQueryIds = new ConcurrentHashMap<>(); | ||
} else { | ||
_queriesById = null; | ||
_clientQueryIds = null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not something we introduced in this PR, but something I think we need to take care of in the future:
We use BaseBrokerRequestHandler as the root/common state for the broker, probably for historical reasons. But that is not true. A single broker may have SSE, MSE, GRPC and even TSE queries running at the same time. It would be a better design to have a shared state between them instead of the trick we do with the delegate.
This is something we need to improve in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with the shared state refactor, we should write it down somewhere so we actually do it ;-)
@@ -179,6 +198,9 @@ protected abstract BrokerResponse handleRequest(long requestId, String query, Sq | |||
@Nullable HttpHeaders httpHeaders, AccessControl accessControl) | |||
throws Exception; | |||
|
|||
protected abstract boolean handleCancel(long queryId, int timeoutMs, Executor executor, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need javadoc here to explain how it should work. At least we should say that queryId may be a client or pinot generated id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the queryId
received here always refers to a broker-generated internal id. The clientQueryId -> brokerQueryId
translation is done by BaseBrokerRequestHandler.cancelQueryByClientId
.
Added some minimal javadoc here: 52998d3
I tried to mimic the current code design for handleRequest
, but it is a bit confusing the existance of two handleRequest
methods here:
- A public method implemented by the interface and called from the endpoint layer that receives a
SqlNodeAndOptions
parameter. - A protected method called from the previous one and already receiving a
requestId
and thequery
's string itself.
To increase confusion, neither of the two methods have a javadoc.
This probably could get better designed if we move forward with the proposed shared state design.
LOGGER.warn("Query cancellation cannot be performed due to unknown client query id: {}", clientQueryId); | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should't we throw here to notify the caller that the query id is incorrect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just tried to be consistent with the behavior when we cancel using a broker-generated id: it returns true
or false
depending on if the query exists or not. If we do not found a match for the given clientQueryId
, isn't that the same as saying that the query does not exist?
PinotClientRequest
is the one that receives the false
value and decides to raise a WebApplicationException
enclosing aHTTP 500
response.
@@ -810,14 +813,17 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO | |||
// can always list the running queries and cancel query again until it ends. Just that such race | |||
// condition makes cancel API less reliable. This should be rare as it assumes sending queries out to | |||
// servers takes time, but will address later if needed. | |||
_queriesById.put(requestId, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable)); | |||
LOGGER.debug("Keep track of running query: {}", requestId); | |||
String clientRequestId = maybeSaveQuery(requestId, sqlNodeAndOptions, query); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another reason to split this method is that we may want to set brokerRespose.setClientRequestId
even if query cancellation is disabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already convinced, no need for more reasons :-)
public Map<Long, String> getRunningQueries() { | ||
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); | ||
return new HashMap<>(_queriesById); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is safer and better practice to return an immutable view of the map instead. In the delegate we can create a copy we can modify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, makes total sense c60b953
@@ -547,6 +547,17 @@ public static long now() { | |||
return System.currentTimeMillis(); | |||
} | |||
|
|||
@ScalarFunction | |||
public static long sleep(long millis) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like the sleep trick. I mean, to have a function that wait some MS or until some epoch per row is great for tests. But given we didn't find a way to make evaluation lazy (sleep with constant arguments is executed at optimization phase) we had to call sleep as sleep(col + constant)
. That is the trick I don't like.
Instead, I suggest including an option or something like that that can be understood by different parts of the code so we can sleep whenever we want (in the broker, in the leaf operator or in SSE).
The sleep function can also be used to create attacks (see https://www.sqlinjection.net/time-based/). I understand the sleep function is not the topic of this PR but just a utility to test the PR, so I don't think it is correct to block the PR until we have a perfect sleep function. Therefore my suggestion is to at least change the implementation so this function only works if tests are enabled. We can do that by using this horrible Java trick:
boolean assertEnabled = false;
assert assertEnabled = true;
if (assertEnabled) {
Thread.sleep(millis);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with that minimal compromise so we can move forward the PR.
Ugly but funny trick, didn't think about it.
Personally I think that if we manage to force the lazy evaluation, that would be enough. Without the column+constant hack, it is less painful to the eyes.
public void testSleepFunction() { | ||
long startTime = System.currentTimeMillis(); | ||
testFunction("sleep(500)", Collections.emptyList(), new GenericRow(), result -> { | ||
assertTrue((long) result >= 500); | ||
}); | ||
long endTime = System.currentTimeMillis(); | ||
assertTrue(endTime - startTime >= 500); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can reduce time to something in the order of tens of millis
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
50 ms, final offer
String clientRequestId = UUID.randomUUID().toString(); | ||
// tricky query: use sleep with some column data to avoid Calcite from optimizing it on compile time | ||
String sqlQuery = | ||
"SET " + CommonConstants.Broker.Request.QueryOptionKey.CLIENT_QUERY_ID + "='" + clientRequestId + "'; " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: directly use the option name instead of the const to make it easier to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, in case we change the const the test will break anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review, @gortiz
Besides my doubts on how to handle the sleep interruption (is it really necessary now that we only enable it during tests?) and some future tasks and refactors derived from the PR, I believe I follow your advice on all your suggestions.
} catch (InterruptedException e) { | ||
//TODO: handle interruption | ||
//Thread.currentThread().interrupt(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any suggestion on how we should deal with an interruption here? Just warn it and skip the sleep or propagate the error?
public String cancelClientQuery( | ||
@ApiParam(value = "ClientQueryId given by the client", required = true) | ||
@PathParam("clientQueryId") String clientQueryId, | ||
@ApiParam(value = "Timeout for servers to respond the cancel request") @QueryParam("timeoutMs") | ||
@DefaultValue("3000") int timeoutMs, | ||
@ApiParam(value = "Return server responses for troubleshooting") @QueryParam("verbose") @DefaultValue("false") | ||
boolean verbose) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could expand the already existing endpoint by adding a @QueryParam
to determine if the provided id is either internal or client-based, being internal as default.
The only drawback here is that internal ids are long
whereas client ids are string
, so type validation could no longer been done by Jersey but by the method itself.
The Controller scenario is different, though. There the existing endpoint is DELETE /query/{brokerId}/{queryId}
, but for clientid-based cancellations we do not want to know the exact broker where the query felt into, so we need an endpoint such as DELETE /clientQuery/{clientQueryId}
. Can't see how to unify these two.
boolean enableQueryCancellation = | ||
Boolean.parseBoolean(config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION)); | ||
if (enableQueryCancellation) { | ||
_queriesById = new ConcurrentHashMap<>(); | ||
_clientQueryIds = new ConcurrentHashMap<>(); | ||
} else { | ||
_queriesById = null; | ||
_clientQueryIds = null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with the shared state refactor, we should write it down somewhere so we actually do it ;-)
@@ -179,6 +198,9 @@ protected abstract BrokerResponse handleRequest(long requestId, String query, Sq | |||
@Nullable HttpHeaders httpHeaders, AccessControl accessControl) | |||
throws Exception; | |||
|
|||
protected abstract boolean handleCancel(long queryId, int timeoutMs, Executor executor, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the queryId
received here always refers to a broker-generated internal id. The clientQueryId -> brokerQueryId
translation is done by BaseBrokerRequestHandler.cancelQueryByClientId
.
Added some minimal javadoc here: 52998d3
I tried to mimic the current code design for handleRequest
, but it is a bit confusing the existance of two handleRequest
methods here:
- A public method implemented by the interface and called from the endpoint layer that receives a
SqlNodeAndOptions
parameter. - A protected method called from the previous one and already receiving a
requestId
and thequery
's string itself.
To increase confusion, neither of the two methods have a javadoc.
This probably could get better designed if we move forward with the proposed shared state design.
LOGGER.warn("Query cancellation cannot be performed due to unknown client query id: {}", clientQueryId); | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just tried to be consistent with the behavior when we cancel using a broker-generated id: it returns true
or false
depending on if the query exists or not. If we do not found a match for the given clientQueryId
, isn't that the same as saying that the query does not exist?
PinotClientRequest
is the one that receives the false
value and decides to raise a WebApplicationException
enclosing aHTTP 500
response.
@@ -810,14 +813,17 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO | |||
// can always list the running queries and cancel query again until it ends. Just that such race | |||
// condition makes cancel API less reliable. This should be rare as it assumes sending queries out to | |||
// servers takes time, but will address later if needed. | |||
_queriesById.put(requestId, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable)); | |||
LOGGER.debug("Keep track of running query: {}", requestId); | |||
String clientRequestId = maybeSaveQuery(requestId, sqlNodeAndOptions, query); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already convinced, no need for more reasons :-)
public Map<Long, String> getRunningQueries() { | ||
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); | ||
return new HashMap<>(_queriesById); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, makes total sense c60b953
@@ -547,6 +547,17 @@ public static long now() { | |||
return System.currentTimeMillis(); | |||
} | |||
|
|||
@ScalarFunction | |||
public static long sleep(long millis) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with that minimal compromise so we can move forward the PR.
Ugly but funny trick, didn't think about it.
Personally I think that if we manage to force the lazy evaluation, that would be enough. Without the column+constant hack, it is less painful to the eyes.
public void testSleepFunction() { | ||
long startTime = System.currentTimeMillis(); | ||
testFunction("sleep(500)", Collections.emptyList(), new GenericRow(), result -> { | ||
assertTrue((long) result >= 500); | ||
}); | ||
long endTime = System.currentTimeMillis(); | ||
assertTrue(endTime - startTime >= 500); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
50 ms, final offer
String clientRequestId = UUID.randomUUID().toString(); | ||
// tricky query: use sleep with some column data to avoid Calcite from optimizing it on compile time | ||
String sqlQuery = | ||
"SET " + CommonConstants.Broker.Request.QueryOptionKey.CLIENT_QUERY_ID + "='" + clientRequestId + "'; " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, in case we change the const the test will break anyway.
clientQueryId
query option that can be used when using theclientQuery/{clientQueryId}
endpoint.sleep(ms)
function, as for today only recommended for testing purposes.Some refactor involved to reuse as much as possible cancellation logic between SSQE and MSQE.