Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable query cancellation for MSQE + cancel using client-provided id #14823

Open
wants to merge 30 commits into
base: master
Choose a base branch
from

Conversation

albertobastos
Copy link
Contributor

@albertobastos albertobastos commented Jan 15, 2025

  • Enables query cancellation feature for MSQE queries (wasn't supported until now).
  • Lets the client setting a clientQueryId query option that can be used when using the clientQuery/{clientQueryId} endpoint.
  • Creates a sleep(ms) function, as for today only recommended for testing purposes.

Some refactor involved to reuse as much as possible cancellation logic between SSQE and MSQE.

}
String clientQueryId = extractClientQueryId(sqlNodeAndOptions);
if (StringUtils.isBlank(clientQueryId)) {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) in general we don't recommend returning NULL as a coding practice

@siddharthteotia
Copy link
Contributor

Is ClientQueryID a new concept? Is it same as requestID ?

How does the support added here improve the existing Query Cancellation (which is also exposed to user IIRC) ?

@codecov-commenter
Copy link

codecov-commenter commented Jan 16, 2025

Codecov Report

Attention: Patch coverage is 27.75120% with 151 lines in your changes missing coverage. Please review.

Project coverage is 63.68%. Comparing base (59551e4) to head (a0e1e83).
Report is 1659 commits behind head on master.

Files with missing lines Patch % Lines
...oller/api/resources/PinotRunningQueryResource.java 0.00% 76 Missing ⚠️
.../pinot/query/service/dispatch/QueryDispatcher.java 40.62% 13 Missing and 6 partials ⚠️
...roker/requesthandler/BaseBrokerRequestHandler.java 52.94% 12 Missing and 4 partials ⚠️
...pinot/broker/api/resources/PinotClientRequest.java 0.00% 11 Missing ⚠️
...r/requesthandler/BrokerRequestHandlerDelegate.java 0.00% 9 Missing ⚠️
...sthandler/BaseSingleStageBrokerRequestHandler.java 70.00% 3 Missing and 3 partials ⚠️
...requesthandler/MultiStageBrokerRequestHandler.java 50.00% 5 Missing ⚠️
...common/response/broker/BrokerResponseNativeV2.java 0.00% 3 Missing ⚠️
...roker/requesthandler/TimeSeriesRequestHandler.java 0.00% 2 Missing ⚠️
...inot/common/function/scalar/DateTimeFunctions.java 60.00% 1 Missing and 1 partial ⚠️
... and 2 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14823      +/-   ##
============================================
+ Coverage     61.75%   63.68%   +1.93%     
- Complexity      207     1483    +1276     
============================================
  Files          2436     2712     +276     
  Lines        133233   152210   +18977     
  Branches      20636    23518    +2882     
============================================
+ Hits          82274    96936   +14662     
- Misses        44911    47977    +3066     
- Partials       6048     7297    +1249     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.63% <27.75%> (+1.92%) ⬆️
java-21 63.57% <27.75%> (+1.95%) ⬆️
skip-bytebuffers-false 63.67% <27.75%> (+1.93%) ⬆️
skip-bytebuffers-true 63.53% <27.75%> (+35.80%) ⬆️
temurin 63.68% <27.75%> (+1.93%) ⬆️
unittests 63.68% <27.75%> (+1.93%) ⬆️
unittests1 56.23% <44.68%> (+9.34%) ⬆️
unittests2 34.02% <19.61%> (+6.29%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@albertobastos
Copy link
Contributor Author

Is ClientQueryID a new concept? Is it same as requestID ?

How does the support added here improve the existing Query Cancellation (which is also exposed to user IIRC) ?

Hi Siddharth,

AFAIK, the current cancellation feature depends on the internal requestId generated by the broker itself. That request id is not returned until the query completes, so an external user requires first to ask for the active running queries, determine from the responded array the requestId assigned to the one he's interested in (just comparing the query body) and finally use the cancel operation to abort it. That's two back-and-forth trips between the user and the cluster.

With a client-provided requestId he can skip one step, going straight to the cancel operation using his own ID to abort the query.

@albertobastos
Copy link
Contributor Author

As some extra context, the endgame of this is enable on UI a "Cancel" button the customer can use to abort an ongoing query. Using a query id provided by the customer or the UI itself, that can be done without need of any internal id retrieval.

@albertobastos albertobastos changed the title add clientQueryId and its cancel operation Enable query cancellation for MSQE + cancel using client-provided id Jan 27, 2025
@albertobastos albertobastos marked this pull request as ready for review January 29, 2025 14:19
Copy link
Contributor

@gortiz gortiz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm adding several comments but I wasn't able to read the whole PR.

Although I'm asking for changes, it is a good PR overall. We just need to finish the last mile.

Comment on lines +554 to +557
} catch (InterruptedException e) {
//TODO: handle interruption
//Thread.currentThread().interrupt();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to fix this TODO before merging

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggestion on how we should deal with an interruption here? Just warn it and skip the sleep or propagate the error?

Comment on lines 429 to 435
public String cancelClientQuery(
@ApiParam(value = "ClientQueryId given by the client", required = true)
@PathParam("clientQueryId") String clientQueryId,
@ApiParam(value = "Timeout for servers to respond the cancel request") @QueryParam("timeoutMs")
@DefaultValue("3000") int timeoutMs,
@ApiParam(value = "Return server responses for troubleshooting") @QueryParam("verbose") @DefaultValue("false")
boolean verbose) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use the same endpoint we already have?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could expand the already existing endpoint by adding a @QueryParam to determine if the provided id is either internal or client-based, being internal as default.

The only drawback here is that internal ids are long whereas client ids are string, so type validation could no longer been done by Jersey but by the method itself.

47fb9bb786

The Controller scenario is different, though. There the existing endpoint is DELETE /query/{brokerId}/{queryId}, but for clientid-based cancellations we do not want to know the exact broker where the query felt into, so we need an endpoint such as DELETE /clientQuery/{clientQueryId}. Can't see how to unify these two.

Comment on lines +103 to +111
boolean enableQueryCancellation =
Boolean.parseBoolean(config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
if (enableQueryCancellation) {
_queriesById = new ConcurrentHashMap<>();
_clientQueryIds = new ConcurrentHashMap<>();
} else {
_queriesById = null;
_clientQueryIds = null;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not something we introduced in this PR, but something I think we need to take care of in the future:

We use BaseBrokerRequestHandler as the root/common state for the broker, probably for historical reasons. But that is not true. A single broker may have SSE, MSE, GRPC and even TSE queries running at the same time. It would be a better design to have a shared state between them instead of the trick we do with the delegate.

This is something we need to improve in the future

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the shared state refactor, we should write it down somewhere so we actually do it ;-)

@@ -179,6 +198,9 @@ protected abstract BrokerResponse handleRequest(long requestId, String query, Sq
@Nullable HttpHeaders httpHeaders, AccessControl accessControl)
throws Exception;

protected abstract boolean handleCancel(long queryId, int timeoutMs, Executor executor,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need javadoc here to explain how it should work. At least we should say that queryId may be a client or pinot generated id.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the queryId received here always refers to a broker-generated internal id. The clientQueryId -> brokerQueryId translation is done by BaseBrokerRequestHandler.cancelQueryByClientId.

Added some minimal javadoc here: 52998d3

I tried to mimic the current code design for handleRequest, but it is a bit confusing the existance of two handleRequest methods here:

  • A public method implemented by the interface and called from the endpoint layer that receives a SqlNodeAndOptions parameter.
  • A protected method called from the previous one and already receiving a requestId and the query's string itself.

To increase confusion, neither of the two methods have a javadoc.

This probably could get better designed if we move forward with the proposed shared state design.

Comment on lines +277 to +278
LOGGER.warn("Query cancellation cannot be performed due to unknown client query id: {}", clientQueryId);
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should't we throw here to notify the caller that the query id is incorrect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tried to be consistent with the behavior when we cancel using a broker-generated id: it returns true or false depending on if the query exists or not. If we do not found a match for the given clientQueryId, isn't that the same as saying that the query does not exist?

PinotClientRequest is the one that receives the false value and decides to raise a WebApplicationException enclosing aHTTP 500 response.

@@ -810,14 +813,17 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
// can always list the running queries and cancel query again until it ends. Just that such race
// condition makes cancel API less reliable. This should be rare as it assumes sending queries out to
// servers takes time, but will address later if needed.
_queriesById.put(requestId, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable));
LOGGER.debug("Keep track of running query: {}", requestId);
String clientRequestId = maybeSaveQuery(requestId, sqlNodeAndOptions, query);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another reason to split this method is that we may want to set brokerRespose.setClientRequestId even if query cancellation is disabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already convinced, no need for more reasons :-)

Comment on lines 250 to 253
public Map<Long, String> getRunningQueries() {
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker");
return new HashMap<>(_queriesById);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is safer and better practice to return an immutable view of the map instead. In the delegate we can create a copy we can modify.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, makes total sense c60b953

@@ -547,6 +547,17 @@ public static long now() {
return System.currentTimeMillis();
}

@ScalarFunction
public static long sleep(long millis) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the sleep trick. I mean, to have a function that wait some MS or until some epoch per row is great for tests. But given we didn't find a way to make evaluation lazy (sleep with constant arguments is executed at optimization phase) we had to call sleep as sleep(col + constant). That is the trick I don't like.

Instead, I suggest including an option or something like that that can be understood by different parts of the code so we can sleep whenever we want (in the broker, in the leaf operator or in SSE).

The sleep function can also be used to create attacks (see https://www.sqlinjection.net/time-based/). I understand the sleep function is not the topic of this PR but just a utility to test the PR, so I don't think it is correct to block the PR until we have a perfect sleep function. Therefore my suggestion is to at least change the implementation so this function only works if tests are enabled. We can do that by using this horrible Java trick:

    boolean assertEnabled = false;
    assert assertEnabled = true;
    if (assertEnabled) {
      Thread.sleep(millis);
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with that minimal compromise so we can move forward the PR.

6042ad2

Ugly but funny trick, didn't think about it.

Personally I think that if we manage to force the lazy evaluation, that would be enough. Without the column+constant hack, it is less painful to the eyes.

Comment on lines 792 to 799
public void testSleepFunction() {
long startTime = System.currentTimeMillis();
testFunction("sleep(500)", Collections.emptyList(), new GenericRow(), result -> {
assertTrue((long) result >= 500);
});
long endTime = System.currentTimeMillis();
assertTrue(endTime - startTime >= 500);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can reduce time to something in the order of tens of millis

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

50 ms, final offer

b458a5b

String clientRequestId = UUID.randomUUID().toString();
// tricky query: use sleep with some column data to avoid Calcite from optimizing it on compile time
String sqlQuery =
"SET " + CommonConstants.Broker.Request.QueryOptionKey.CLIENT_QUERY_ID + "='" + clientRequestId + "'; "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: directly use the option name instead of the const to make it easier to read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, in case we change the const the test will break anyway.

9d0f335

Copy link
Contributor Author

@albertobastos albertobastos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, @gortiz

Besides my doubts on how to handle the sleep interruption (is it really necessary now that we only enable it during tests?) and some future tasks and refactors derived from the PR, I believe I follow your advice on all your suggestions.

Comment on lines +554 to +557
} catch (InterruptedException e) {
//TODO: handle interruption
//Thread.currentThread().interrupt();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggestion on how we should deal with an interruption here? Just warn it and skip the sleep or propagate the error?

Comment on lines 429 to 435
public String cancelClientQuery(
@ApiParam(value = "ClientQueryId given by the client", required = true)
@PathParam("clientQueryId") String clientQueryId,
@ApiParam(value = "Timeout for servers to respond the cancel request") @QueryParam("timeoutMs")
@DefaultValue("3000") int timeoutMs,
@ApiParam(value = "Return server responses for troubleshooting") @QueryParam("verbose") @DefaultValue("false")
boolean verbose) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could expand the already existing endpoint by adding a @QueryParam to determine if the provided id is either internal or client-based, being internal as default.

The only drawback here is that internal ids are long whereas client ids are string, so type validation could no longer been done by Jersey but by the method itself.

47fb9bb786

The Controller scenario is different, though. There the existing endpoint is DELETE /query/{brokerId}/{queryId}, but for clientid-based cancellations we do not want to know the exact broker where the query felt into, so we need an endpoint such as DELETE /clientQuery/{clientQueryId}. Can't see how to unify these two.

Comment on lines +103 to +111
boolean enableQueryCancellation =
Boolean.parseBoolean(config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
if (enableQueryCancellation) {
_queriesById = new ConcurrentHashMap<>();
_clientQueryIds = new ConcurrentHashMap<>();
} else {
_queriesById = null;
_clientQueryIds = null;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the shared state refactor, we should write it down somewhere so we actually do it ;-)

@@ -179,6 +198,9 @@ protected abstract BrokerResponse handleRequest(long requestId, String query, Sq
@Nullable HttpHeaders httpHeaders, AccessControl accessControl)
throws Exception;

protected abstract boolean handleCancel(long queryId, int timeoutMs, Executor executor,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the queryId received here always refers to a broker-generated internal id. The clientQueryId -> brokerQueryId translation is done by BaseBrokerRequestHandler.cancelQueryByClientId.

Added some minimal javadoc here: 52998d3

I tried to mimic the current code design for handleRequest, but it is a bit confusing the existance of two handleRequest methods here:

  • A public method implemented by the interface and called from the endpoint layer that receives a SqlNodeAndOptions parameter.
  • A protected method called from the previous one and already receiving a requestId and the query's string itself.

To increase confusion, neither of the two methods have a javadoc.

This probably could get better designed if we move forward with the proposed shared state design.

Comment on lines +277 to +278
LOGGER.warn("Query cancellation cannot be performed due to unknown client query id: {}", clientQueryId);
return false;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tried to be consistent with the behavior when we cancel using a broker-generated id: it returns true or false depending on if the query exists or not. If we do not found a match for the given clientQueryId, isn't that the same as saying that the query does not exist?

PinotClientRequest is the one that receives the false value and decides to raise a WebApplicationException enclosing aHTTP 500 response.

@@ -810,14 +813,17 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
// can always list the running queries and cancel query again until it ends. Just that such race
// condition makes cancel API less reliable. This should be rare as it assumes sending queries out to
// servers takes time, but will address later if needed.
_queriesById.put(requestId, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable));
LOGGER.debug("Keep track of running query: {}", requestId);
String clientRequestId = maybeSaveQuery(requestId, sqlNodeAndOptions, query);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already convinced, no need for more reasons :-)

Comment on lines 250 to 253
public Map<Long, String> getRunningQueries() {
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker");
return new HashMap<>(_queriesById);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, makes total sense c60b953

@@ -547,6 +547,17 @@ public static long now() {
return System.currentTimeMillis();
}

@ScalarFunction
public static long sleep(long millis) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with that minimal compromise so we can move forward the PR.

6042ad2

Ugly but funny trick, didn't think about it.

Personally I think that if we manage to force the lazy evaluation, that would be enough. Without the column+constant hack, it is less painful to the eyes.

Comment on lines 792 to 799
public void testSleepFunction() {
long startTime = System.currentTimeMillis();
testFunction("sleep(500)", Collections.emptyList(), new GenericRow(), result -> {
assertTrue((long) result >= 500);
});
long endTime = System.currentTimeMillis();
assertTrue(endTime - startTime >= 500);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

50 ms, final offer

b458a5b

String clientRequestId = UUID.randomUUID().toString();
// tricky query: use sleep with some column data to avoid Calcite from optimizing it on compile time
String sqlQuery =
"SET " + CommonConstants.Broker.Request.QueryOptionKey.CLIENT_QUERY_ID + "='" + clientRequestId + "'; "
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, in case we change the const the test will break anyway.

9d0f335

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants