Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-17086 Warnings added for long lazy SQL queries #11405

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.running;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
Expand Down Expand Up @@ -254,6 +255,11 @@ public void setResultSetSizeThresholdMultiplier(int rsSizeThresholdMult) {
this.rsSizeThresholdMult = rsSizeThresholdMult <= 1 ? 1 : rsSizeThresholdMult;
}

/** */
public Set<TrackableQuery> getQueries() {
return qrys.keySet();
}

/**
* Holds timeout settings for the specified query.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ public class H2QueryInfo implements TrackableQuery {
/** Begin timestamp. */
private final long beginTs;

/** The most recent point in time when the tracking of a long query was suspended. */
private volatile long lastSuspendTs;

/** External wait time. */
private volatile long extWait;

/** Long query time tracking suspension flag. */
private volatile boolean isSuspended;

/** Query schema. */
private final String schema;

Expand Down Expand Up @@ -112,6 +121,11 @@ public String plan() {
return stmt.getPlanSQL();
}

/** */
public long extWait() {
return extWait;
}

/**
* Print info specified by children.
*
Expand All @@ -123,7 +137,25 @@ protected void printInfo(StringBuilder msg) {

/** {@inheritDoc} */
@Override public long time() {
return U.currentTimeMillis() - beginTs;
return (isSuspended ? lastSuspendTs : U.currentTimeMillis()) - beginTs - extWait;
}

/** */
public synchronized void suspendTracking() {
if (!isSuspended) {
isSuspended = true;

lastSuspendTs = U.currentTimeMillis();
}
}

/** */
public synchronized void resumeTracking() {
if (isSuspended) {
isSuspended = false;

extWait += U.currentTimeMillis() - lastSuspendTs;
}
}

/**
Expand Down Expand Up @@ -156,6 +188,11 @@ protected void printInfo(StringBuilder msg) {
return msgSb.toString();
}

/** */
public boolean isSuspended() {
return isSuspended;
}

/**
* Query type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ public abstract class H2ResultSetIterator<T> extends GridIteratorAdapter<T> impl
/** */
private final H2QueryInfo qryInfo;

/** */
final IgniteH2Indexing h2;

/**
* @param data Data array.
* @param log Logger.
Expand All @@ -141,6 +144,7 @@ protected H2ResultSetIterator(
this.data = data;
this.tracing = tracing;
this.qryInfo = qryInfo;
this.h2 = h2;

try {
res = (ResultInterface)RESULT_FIELD.get(data);
Expand Down Expand Up @@ -325,6 +329,9 @@ public void onClose() throws IgniteCheckedException {

lockTables();

if (qryInfo != null)
h2.heavyQueriesTracker().stopTracking(qryInfo, null);

try {
resultSetChecker.checkOnClose();

Expand Down Expand Up @@ -391,7 +398,7 @@ private synchronized void closeInternal() throws IgniteCheckedException {
if (closed)
return false;

return hasRow || (hasRow = fetchNext());
return hasRow || (hasRow = h2.executeWithResumableTimeTracking(this::fetchNext, qryInfo));
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.lang.IgniteSingletonIterator;
import org.apache.ignite.internal.util.lang.IgniteThrowableSupplier;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand Down Expand Up @@ -412,6 +413,8 @@ private GridQueryFieldsResult executeSelectLocal(
@Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException {
H2PooledConnection conn = connections().connection(qryDesc.schemaName());

H2QueryInfo qryInfo = null;

try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_ITER_OPEN, MTC.span()))) {
H2Utils.setupConnection(conn, qctx,
qryDesc.distributedJoins(), qryDesc.enforceJoinOrder(), qryParams.lazy());
Expand All @@ -434,9 +437,11 @@ private GridQueryFieldsResult executeSelectLocal(

H2Utils.bindParameters(stmt, F.asList(params));

H2QueryInfo qryInfo = new H2QueryInfo(H2QueryInfo.QueryType.LOCAL, stmt, qry,
qryInfo = new H2QueryInfo(H2QueryInfo.QueryType.LOCAL, stmt, qry,
ctx.localNodeId(), qryId);

heavyQryTracker.startTracking(qryInfo);

if (ctx.performanceStatistics().enabled()) {
ctx.performanceStatistics().queryProperty(
GridCacheQueryType.SQL_FIELDS,
Expand All @@ -447,13 +452,16 @@ private GridQueryFieldsResult executeSelectLocal(
);
}

ResultSet rs = executeSqlQueryWithTimer(
stmt,
conn,
qry,
timeout,
cancel,
qryParams.dataPageScanEnabled(),
ResultSet rs = executeWithResumableTimeTracking(
() -> executeSqlQueryWithTimer(
stmt,
conn,
qry,
timeout,
cancel,
qryParams.dataPageScanEnabled(),
null
),
qryInfo
);

Expand All @@ -470,6 +478,9 @@ private GridQueryFieldsResult executeSelectLocal(
catch (IgniteCheckedException | RuntimeException | Error e) {
conn.close();

if (qryInfo != null)
heavyQryTracker.stopTracking(qryInfo, e);

throw e;
}
}
Expand Down Expand Up @@ -2253,4 +2264,25 @@ public HeavyQueriesTracker heavyQueriesTracker() {
public DistributedIndexingConfiguration distributedConfiguration() {
return distrCfg;
}

/**
* Resumes time tracking before the task (if needed) and suspends time tracking after the task is finished.
*
* @param task Query/fetch to execute.
* @param qryInfo Query info.
* @throws IgniteCheckedException If failed.
*/
public <T> T executeWithResumableTimeTracking(
IgniteThrowableSupplier<T> task,
final H2QueryInfo qryInfo
) throws IgniteCheckedException {
qryInfo.resumeTracking();

try {
return task.get();
}
finally {
qryInfo.suspendTracking();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,8 @@ private void onQueryRequest0(

qryResults.addResult(qryIdx, res);

MapH2QueryInfo qryInfo = null;

try {
res.lock();

Expand All @@ -460,7 +462,9 @@ private void onQueryRequest0(

H2Utils.bindParameters(stmt, params0);

MapH2QueryInfo qryInfo = new MapH2QueryInfo(stmt, qry.query(), node.id(), qryId, reqId, segmentId);
qryInfo = new MapH2QueryInfo(stmt, qry.query(), node.id(), qryId, reqId, segmentId);

h2.heavyQueriesTracker().startTracking(qryInfo);

if (performanceStatsEnabled) {
ctx.performanceStatistics().queryProperty(
Expand All @@ -472,14 +476,20 @@ private void onQueryRequest0(
);
}

ResultSet rs = h2.executeSqlQueryWithTimer(
stmt,
conn,
sql,
timeout,
qryResults.queryCancel(qryIdx),
dataPageScanEnabled,
qryInfo);
GridQueryCancel qryCancel = qryResults.queryCancel(qryIdx);

ResultSet rs = h2.executeWithResumableTimeTracking(
() -> h2.executeSqlQueryWithTimer(
stmt,
conn,
sql,
timeout,
qryCancel,
dataPageScanEnabled,
null
),
qryInfo
);

if (evt) {
ctx.event().record(new CacheQueryExecutedEvent<>(
Expand Down Expand Up @@ -507,14 +517,21 @@ private void onQueryRequest0(

res.openResult(rs, qryInfo);

final GridQueryNextPageResponse msg = prepareNextPage(
nodeRess,
node,
qryResults,
qryIdx,
segmentId,
pageSize,
dataPageScanEnabled
MapQueryResults qryResults0 = qryResults;

int qryIdx0 = qryIdx;

final GridQueryNextPageResponse msg = h2.executeWithResumableTimeTracking(
() -> prepareNextPage(
nodeRess,
node,
qryResults0,
qryIdx0,
segmentId,
pageSize,
dataPageScanEnabled
),
qryInfo
);

if (msg != null)
Expand All @@ -528,6 +545,12 @@ private void onQueryRequest0(

qryIdx++;
}
catch (Throwable e) {
if (qryInfo != null)
h2.heavyQueriesTracker().stopTracking(qryInfo, e);

throw e;
}
finally {
try {
res.unlockTables();
Expand Down Expand Up @@ -843,13 +866,15 @@ else if (nodeRess.cancelled(reqId)) {

final MapQueryResults qryResults = nodeRess.get(reqId, req.segmentId());

MapQueryResult res = null;

if (qryResults == null)
sendError(node, reqId, new CacheException("No query result found for request: " + req));
else if (qryResults.cancelled())
sendQueryCancel(node, reqId);
else {
try {
MapQueryResult res = qryResults.result(req.query());
res = qryResults.result(req.query());

assert res != null;

Expand All @@ -862,14 +887,18 @@ else if (qryResults.cancelled())

Boolean dataPageScanEnabled = isDataPageScanEnabled(req.getFlags());

GridQueryNextPageResponse msg = prepareNextPage(
nodeRess,
node,
qryResults,
req.query(),
req.segmentId(),
req.pageSize(),
dataPageScanEnabled);
GridQueryNextPageResponse msg = h2.executeWithResumableTimeTracking(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracking is not stopped in case of exception here

Copy link
Contributor Author

@oleg-vlsk oleg-vlsk Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stopping the time tracking is added to the catch section.

() -> prepareNextPage(
nodeRess,
node,
qryResults,
req.query(),
req.segmentId(),
req.pageSize(),
dataPageScanEnabled
),
res.qryInfo()
);

if (msg != null)
sendNextPage(node, msg);
Expand All @@ -884,6 +913,9 @@ else if (qryResults.cancelled())
}
}
catch (Exception e) {
if (res.qryInfo() != null)
h2.heavyQueriesTracker().stopTracking(res.qryInfo(), e);

QueryRetryException retryEx = X.cause(e, QueryRetryException.class);

if (retryEx != null)
Expand Down Expand Up @@ -939,6 +971,9 @@ private GridQueryNextPageResponse prepareNextPage(
if (last) {
qr.closeResult(qry);

if (res.qryInfo() != null)
h2.heavyQueriesTracker().stopTracking(res.qryInfo(), null);

if (qr.isAllClosed()) {
nodeRess.remove(qr.queryRequestId(), segmentId, qr);

Expand Down
Loading
Loading