Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable query cancellation for MSQE + cancel using client-provided id #14823

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f7a9488
add cancelClientQuery operation for SingleStageBroker (only numerical…
albertobastos Jan 15, 2025
39a4f94
avoid synchronized BiMap and checkstyle
albertobastos Jan 16, 2025
c969abd
Merge remote-tracking branch 'origin' into cancel-with-cqid
albertobastos Jan 16, 2025
8162bc6
Merge branch 'master' into cancel-with-cqid
albertobastos Jan 24, 2025
aa8c120
Merge branch 'master' of github.com:albertobastos/pinot into cancel-w…
albertobastos Jan 27, 2025
7a5f713
add cancel feature (with queryId and clientQueryId) to MSQE, some ref…
albertobastos Jan 27, 2025
a9d1e49
set and delete clientRequestId on MSQE
albertobastos Jan 27, 2025
97e7b5d
fix unimplemented method
albertobastos Jan 27, 2025
65f73a0
fix I/O parameter and related tests
albertobastos Jan 27, 2025
e3a9a5e
Merge branch 'master' of github.com:albertobastos/pinot into cancel-w…
albertobastos Jan 28, 2025
fe5c846
add clientRequestId on response test
albertobastos Jan 28, 2025
ae3260c
Merge branch 'master' of github.com:albertobastos/pinot into cancel-w…
albertobastos Jan 28, 2025
2eb506e
add sleep and random functions for further tests
albertobastos Jan 28, 2025
5dd5409
Merge branch 'master' of github.com:albertobastos/pinot into cancel-w…
albertobastos Jan 29, 2025
e9bbdac
override test server conf
albertobastos Jan 29, 2025
9dcc393
add missing superclass call
albertobastos Jan 29, 2025
5ac7d9e
add some cancel query test using internal sleep function with a trick
albertobastos Jan 29, 2025
a46659c
Merge branch 'master' of github.com:albertobastos/pinot into cancel-w…
albertobastos Jan 30, 2025
110fb16
bring master
albertobastos Jan 30, 2025
47fb9bb
reuse same broker endpoint for internal and client-based cancellation
albertobastos Jan 31, 2025
52998d3
add javadoc
albertobastos Jan 31, 2025
e2678af
add mapping comments
albertobastos Jan 31, 2025
c201cf4
refactor base broker methods
albertobastos Jan 31, 2025
c60b953
return immutable view instead of copy
albertobastos Jan 31, 2025
6042ad2
enable sleep(ms) function only during testing
albertobastos Jan 31, 2025
b458a5b
reduce unit test wait time
albertobastos Jan 31, 2025
9d0f335
replace constant with literal on test
albertobastos Jan 31, 2025
1e00bf6
linter
albertobastos Jan 31, 2025
d3061ba
Merge branch 'master' of github.com:albertobastos/pinot into cancel-w…
albertobastos Jan 31, 2025
a0e1e83
remove embarassing npe
albertobastos Jan 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -380,38 +380,48 @@ public void processSqlQueryWithBothEnginesAndCompareResults(String query, @Suspe
}

@DELETE
@Path("query/{queryId}")
@Path("query/{id}")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Cancel a query as identified by the queryId", notes = "No effect if no query exists for the "
+ "given queryId on the requested broker. Query may continue to run for a short while after calling cancel as "
@ApiOperation(value = "Cancel a query as identified by the id", notes = "No effect if no query exists for the "
+ "given id on the requested broker. Query may continue to run for a short while after calling cancel as "
+ "it's done in a non-blocking manner. The cancel method can be called multiple times.")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 404, message = "Query not found on the requested broker")
})
public String cancelQuery(
@ApiParam(value = "QueryId as assigned by the broker", required = true) @PathParam("queryId") long queryId,
@ApiParam(value = "Query id", required = true) @PathParam("id") String id,
@ApiParam(value = "Determines is query id is internal or provided by the client") @QueryParam("client")
@DefaultValue("false") boolean isClient,
@ApiParam(value = "Timeout for servers to respond the cancel request") @QueryParam("timeoutMs")
@DefaultValue("3000") int timeoutMs,
@ApiParam(value = "Return server responses for troubleshooting") @QueryParam("verbose") @DefaultValue("false")
boolean verbose) {
try {
Map<String, Integer> serverResponses = verbose ? new HashMap<>() : null;
if (_requestHandler.cancelQuery(queryId, timeoutMs, _executor, _httpConnMgr, serverResponses)) {
String resp = "Cancelled query: " + queryId;
if (isClient && _requestHandler.cancelQueryByClientId(id, timeoutMs, _executor, _httpConnMgr, serverResponses)) {
String resp = "Cancelled client query: " + id;
if (verbose) {
resp += " with responses from servers: " + serverResponses;
}
return resp;
} else if (_requestHandler.cancelQuery(Long.parseLong(id), timeoutMs, _executor, _httpConnMgr, serverResponses)) {
String resp = "Cancelled query: " + id;
if (verbose) {
resp += " with responses from servers: " + serverResponses;
}
return resp;
}
} catch (NumberFormatException e) {
Response.status(Response.Status.BAD_REQUEST).entity(String.format("Invalid internal query id: %s", id));
} catch (Exception e) {
throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity(String.format("Failed to cancel query: %s on the broker due to error: %s", queryId, e.getMessage()))
.entity(String.format("Failed to cancel query: %s on the broker due to error: %s", id, e.getMessage()))
.build());
}
throw new WebApplicationException(
Response.status(Response.Status.NOT_FOUND).entity(String.format("Query: %s not found on the broker", queryId))
Response.status(Response.Status.NOT_FOUND).entity(String.format("Query: %s not found on the broker", id))
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,24 @@
package org.apache.pinot.broker.requesthandler;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.pinot.broker.api.AccessControl;
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.broker.broker.AccessControlFactory;
Expand All @@ -51,6 +57,7 @@
import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.slf4j.Logger;
Expand All @@ -74,6 +81,10 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
protected final QueryLogger _queryLogger;
@Nullable
protected final String _enableNullHandling;
// maps broker-generated query id to the query string
protected final Map<Long, String> _queriesById;
// maps broker-generated query id to client-provided query id
protected final Map<Long, String> _clientQueryIds;

public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache) {
Expand All @@ -90,6 +101,16 @@ public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, Brok
_brokerTimeoutMs = config.getProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, Broker.DEFAULT_BROKER_TIMEOUT_MS);
_queryLogger = new QueryLogger(config);
_enableNullHandling = config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_ENABLE_NULL_HANDLING);

boolean enableQueryCancellation =
Boolean.parseBoolean(config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
if (enableQueryCancellation) {
_queriesById = new ConcurrentHashMap<>();
_clientQueryIds = new ConcurrentHashMap<>();
} else {
_queriesById = null;
_clientQueryIds = null;
}
Comment on lines +105 to +113
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not something we introduced in this PR, but something I think we need to take care of in the future:

We use BaseBrokerRequestHandler as the root/common state for the broker, probably for historical reasons. But that is not true. A single broker may have SSE, MSE, GRPC and even TSE queries running at the same time. It would be a better design to have a shared state between them instead of the trick we do with the delegate.

This is something we need to improve in the future

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the shared state refactor, we should write it down somewhere so we actually do it ;-)

}

@Override
Expand Down Expand Up @@ -179,6 +200,13 @@ protected abstract BrokerResponse handleRequest(long requestId, String query, Sq
@Nullable HttpHeaders httpHeaders, AccessControl accessControl)
throws Exception;

/**
* Attemps to cancel an ongoing query identified by its broker-generated id.
* @return true if the query was successfully cancelled, false otherwise.
*/
protected abstract boolean handleCancel(long queryId, int timeoutMs, Executor executor,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need javadoc here to explain how it should work. At least we should say that queryId may be a client or pinot generated id.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the queryId received here always refers to a broker-generated internal id. The clientQueryId -> brokerQueryId translation is done by BaseBrokerRequestHandler.cancelQueryByClientId.

Added some minimal javadoc here: 52998d3

I tried to mimic the current code design for handleRequest, but it is a bit confusing the existance of two handleRequest methods here:

  • A public method implemented by the interface and called from the endpoint layer that receives a SqlNodeAndOptions parameter.
  • A protected method called from the previous one and already receiving a requestId and the query's string itself.

To increase confusion, neither of the two methods have a javadoc.

This probably could get better designed if we move forward with the proposed shared state design.

HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses) throws Exception;

protected static void augmentStatistics(RequestContext statistics, BrokerResponse response) {
statistics.setNumRowsResultSet(response.getNumRowsResultSet());
// TODO: Add partial result flag to RequestContext
Expand Down Expand Up @@ -223,4 +251,66 @@ protected static void augmentStatistics(RequestContext statistics, BrokerRespons
statistics.setExplainPlanNumMatchAllFilterSegments(response.getExplainPlanNumMatchAllFilterSegments());
statistics.setTraceInfo(response.getTraceInfo());
}

@Override
public Map<Long, String> getRunningQueries() {
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker");
return Collections.unmodifiableMap(_queriesById);
}

@Override
public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr,
Map<String, Integer> serverResponses)
throws Exception {
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker");
try {
return handleCancel(queryId, timeoutMs, executor, connMgr, serverResponses);
} finally {
onQueryFinish(queryId);
}
}

@Override
public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor,
HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses)
throws Exception {
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker");
Optional<Long> requestId = _clientQueryIds.entrySet().stream()
.filter(e -> clientQueryId.equals(e.getValue())).map(Map.Entry::getKey).findFirst();
if (requestId.isPresent()) {
return cancelQuery(requestId.get(), timeoutMs, executor, connMgr, serverResponses);
} else {
LOGGER.warn("Query cancellation cannot be performed due to unknown client query id: {}", clientQueryId);
return false;
Comment on lines +283 to +284
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should't we throw here to notify the caller that the query id is incorrect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tried to be consistent with the behavior when we cancel using a broker-generated id: it returns true or false depending on if the query exists or not. If we do not found a match for the given clientQueryId, isn't that the same as saying that the query does not exist?

PinotClientRequest is the one that receives the false value and decides to raise a WebApplicationException enclosing aHTTP 500 response.

}
}

protected String extractClientRequestId(SqlNodeAndOptions sqlNodeAndOptions) {
return sqlNodeAndOptions.getOptions() != null
? sqlNodeAndOptions.getOptions().get(Broker.Request.QueryOptionKey.CLIENT_QUERY_ID) : null;
}

protected void onQueryStart(long requestId, String clientRequestId, String query, Object... extras) {
if (isQueryCancellationEnabled()) {
_queriesById.put(requestId, query);
if (StringUtils.isNotBlank(clientRequestId)) {
_clientQueryIds.put(requestId, clientRequestId);
LOGGER.debug("Keep track of running query: {} (with client id {})", requestId, clientRequestId);
} else {
LOGGER.debug("Keep track of running query: {}", requestId);
}
}
}

protected void onQueryFinish(long requestId) {
if (isQueryCancellationEnabled()) {
_queriesById.remove(requestId);
_clientQueryIds.remove(requestId);
LOGGER.debug("Remove track of running query: {}", requestId);
}
}

protected boolean isQueryCancellationEnabled() {
return _queriesById != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import javax.ws.rs.WebApplicationException;
Expand Down Expand Up @@ -142,9 +141,10 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ
protected final boolean _enableQueryLimitOverride;
protected final boolean _enableDistinctCountBitmapOverride;
protected final int _queryResponseLimit;
// maps broker-generated query id with the servers that are running the query
protected final Map<Long, QueryServers> _serversById;
// if >= 0, then overrides default limit of 10, otherwise setting is ignored
protected final int _defaultQueryLimit;
protected final Map<Long, QueryServers> _queriesById;
protected final boolean _enableMultistageMigrationMetric;
protected ExecutorService _multistageCompileExecutor;
protected BlockingQueue<Pair<String, String>> _multistageCompileQueryQueue;
Expand All @@ -162,11 +162,15 @@ public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String bro
_config.getProperty(CommonConstants.Helix.ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY, false);
_queryResponseLimit =
config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
if (this.isQueryCancellationEnabled()) {
_serversById = new ConcurrentHashMap<>();
} else {
_serversById = null;
}
_defaultQueryLimit = config.getProperty(Broker.CONFIG_OF_BROKER_DEFAULT_QUERY_LIMIT,
Broker.DEFAULT_BROKER_QUERY_LIMIT);
boolean enableQueryCancellation =
Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
_queriesById = enableQueryCancellation ? new ConcurrentHashMap<>() : null;

_enableMultistageMigrationMetric = _config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC,
Broker.DEFAULT_ENABLE_MULTISTAGE_MIGRATION_METRIC);
Expand Down Expand Up @@ -215,31 +219,33 @@ public void shutDown() {
}
}

@Override
public Map<Long, String> getRunningQueries() {
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
return _queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()._query));
}

@VisibleForTesting
Set<ServerInstance> getRunningServers(long requestId) {
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
QueryServers queryServers = _queriesById.get(requestId);
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker");
QueryServers queryServers = _serversById.get(requestId);
return queryServers != null ? queryServers._servers : Collections.emptySet();
}

@Override
public boolean cancelQuery(long requestId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr,
protected void onQueryFinish(long requestId) {
super.onQueryFinish(requestId);
if (isQueryCancellationEnabled()) {
_serversById.remove(requestId);
}
}

@Override
protected boolean handleCancel(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr,
Map<String, Integer> serverResponses)
throws Exception {
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
QueryServers queryServers = _queriesById.get(requestId);
QueryServers queryServers = _serversById.get(queryId);
if (queryServers == null) {
return false;
}

// TODO: Use different global query id for OFFLINE and REALTIME table after releasing 0.12.0. See QueryIdUtils for
// details
String globalQueryId = getGlobalQueryId(requestId);
String globalQueryId = getGlobalQueryId(queryId);
List<Pair<String, String>> serverUrls = new ArrayList<>();
for (ServerInstance serverInstance : queryServers._servers) {
serverUrls.add(Pair.of(String.format("%s/query/%s", serverInstance.getAdminEndpoint(), globalQueryId), null));
Expand Down Expand Up @@ -810,7 +816,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
}
}
BrokerResponseNative brokerResponse;
if (_queriesById != null) {
if (isQueryCancellationEnabled()) {
// Start to track the running query for cancellation just before sending it out to servers to avoid any
// potential failures that could happen before sending it out, like failures to calculate the routing table etc.
// TODO: Even tracking the query as late as here, a potential race condition between calling cancel API and
Expand All @@ -819,14 +825,16 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
// can always list the running queries and cancel query again until it ends. Just that such race
// condition makes cancel API less reliable. This should be rare as it assumes sending queries out to
// servers takes time, but will address later if needed.
_queriesById.put(requestId, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable));
LOGGER.debug("Keep track of running query: {}", requestId);
String clientRequestId = extractClientRequestId(sqlNodeAndOptions);
onQueryStart(
requestId, clientRequestId, query, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable));
try {
brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest,
offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats,
requestContext);
brokerResponse.setClientRequestId(clientRequestId);
} finally {
_queriesById.remove(requestId);
onQueryFinish(requestId);
LOGGER.debug("Remove track of running query: {}", requestId);
}
} else {
Expand Down Expand Up @@ -910,6 +918,14 @@ static String addRoutingPolicyInErrMsg(String errorMessage, String realtimeRouti
return errorMessage;
}

@Override
protected void onQueryStart(long requestId, String clientRequestId, String query, Object... extras) {
super.onQueryStart(requestId, clientRequestId, query, extras);
if (isQueryCancellationEnabled() && extras.length > 0 && extras[0] instanceof QueryServers) {
_serversById.put(requestId, (QueryServers) extras[0]);
}
}

private static String getRoutingPolicy(TableConfig tableConfig) {
RoutingConfig routingConfig = tableConfig.getRoutingConfig();
if (routingConfig == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,19 @@ default PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, Strin
boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr,
Map<String, Integer> serverResponses)
throws Exception;

/**
* Cancel a query as identified by the clientQueryId provided externally. This method is non-blocking so the query may
* still run for a while after calling this method. This cancel method can be called multiple times.
* @param clientQueryId the Id assigned to the query by the client
* @param timeoutMs timeout to wait for servers to respond the cancel requests
* @param executor to send cancel requests to servers in parallel
* @param connMgr to provide the http connections
* @param serverResponses to collect cancel responses from all servers if a map is provided
* @return true if there is a running query for the given clientQueryId.
*/
boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor,
HttpClientConnectionManager connMgr,
Map<String, Integer> serverResponses)
throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,21 +133,37 @@ public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String

@Override
public Map<Long, String> getRunningQueries() {
// TODO: add support for multiStaged engine: track running queries for multiStaged engine and combine its
// running queries with those from singleStaged engine. Both engines share the same request Id generator, so
// the query will have unique ids across the two engines.
return _singleStageBrokerRequestHandler.getRunningQueries();
// Both engines share the same request Id generator, so the query will have unique ids across the two engines.
Map<Long, String> queries = _singleStageBrokerRequestHandler.getRunningQueries();
if (_multiStageBrokerRequestHandler != null) {
queries.putAll(_multiStageBrokerRequestHandler.getRunningQueries());
}
return queries;
}

@Override
public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr,
Map<String, Integer> serverResponses)
throws Exception {
// TODO: add support for multiStaged engine, basically try to cancel the query on multiStaged engine firstly; if
// not found, try on the singleStaged engine.
if (_multiStageBrokerRequestHandler != null && _multiStageBrokerRequestHandler.cancelQuery(
queryId, timeoutMs, executor, connMgr, serverResponses)) {
return true;
}
return _singleStageBrokerRequestHandler.cancelQuery(queryId, timeoutMs, executor, connMgr, serverResponses);
}

@Override
public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor,
HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses)
throws Exception {
if (_multiStageBrokerRequestHandler != null && _multiStageBrokerRequestHandler.cancelQueryByClientId(
clientQueryId, timeoutMs, executor, connMgr, serverResponses)) {
return true;
}
return _singleStageBrokerRequestHandler.cancelQueryByClientId(
clientQueryId, timeoutMs, executor, connMgr, serverResponses);
}

private CursorResponse getCursorResponse(Integer numRows, BrokerResponse response)
throws Exception {
if (numRows == null) {
Expand Down
Loading
Loading