From b79b7b5d6034ad652c41c14a757496f8cd6aadfb Mon Sep 17 00:00:00 2001 From: oleg-vlsk Date: Fri, 21 Jun 2024 08:50:33 +1000 Subject: [PATCH 01/14] IGNITE-17086 MapQueryResult#executeWithTimer added --- .../query/h2/twostep/MapQueryResult.java | 30 +++- .../query/LongQueryWarningTest.java | 139 ++++++++++++++++++ .../IgniteBinaryCacheQueryTestSuite4.java | 4 +- 3 files changed, 170 insertions(+), 3 deletions(-) create mode 100644 modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongQueryWarningTest.java diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java index d7d8736e43e44..f0c336496c249 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; import org.apache.ignite.IgniteLogger; import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.internal.GridKernalContext; @@ -196,7 +197,7 @@ boolean fetchNextPage(List rows, int pageSize, Boolean dataPageScanEnab try { for (int i = 0; i < pageSize; i++) { - if (!res.res.next()) + if (!executeWithTimer(res.res::next)) return true; Value[] row = res.res.currentRow(); @@ -253,7 +254,7 @@ boolean fetchNextPage(List rows, int pageSize, Boolean dataPageScanEnab res.resultSetChecker.checkOnFetchNext(); } - return !res.res.hasNext(); + return !executeWithTimer(res.res::hasNext); } finally { CacheDataTree.setDataPageScanEnabled(false); @@ -324,6 +325,31 @@ public void checkTablesVersions() { GridH2Table.checkTablesVersions(ses); } + /** */ + public boolean executeWithTimer(Supplier supplier) { + HeavyQueriesTracker heavyQryTracker = h2.heavyQueriesTracker(); + + MapH2QueryInfo qryInfo = res.qryInfo; + + if (qryInfo != null) + heavyQryTracker.startTracking(qryInfo); + + Throwable err = null; + + try { + return supplier.get(); + } + catch (Throwable e) { + err = e; + + throw e; + } + finally { + if (qryInfo != null) + heavyQryTracker.stopTracking(qryInfo, err); + } + } + /** */ private class Result { /** */ diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongQueryWarningTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongQueryWarningTest.java new file mode 100644 index 0000000000000..6ef2253c86952 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongQueryWarningTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.SqlConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.apache.ignite.testframework.LogListener; +import org.apache.ignite.testframework.junits.GridAbstractTest; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_EXEC_MSG; +import static org.apache.ignite.testframework.GridTestUtils.cartesianProduct; + +/** */ +@RunWith(Parameterized.class) +public class LongQueryWarningTest extends GridCommonAbstractTest { + /** */ + private static final long WARN_TIMEOUT = 1000; + + /** */ + private static final long SLEEP = 2 * WARN_TIMEOUT; + + /** */ + private LogListener lsnr; + + /** */ + @Parameterized.Parameter + public boolean lazy; + + /** */ + @Parameterized.Parameters(name = "lazy={0}") + public static Collection runConfig() { + return cartesianProduct(F.asList(false, true)); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { + lsnr = LogListener.matches(LONG_QUERY_EXEC_MSG).atLeast(1).build(); + + ListeningTestLogger log = new ListeningTestLogger(GridAbstractTest.log); + + log.registerListener(lsnr); + + QueryEntity qryEntity = new QueryEntity() + .setTableName("test") + .setKeyType(Integer.class.getName()) + .setValueType(Integer.class.getName()); + + CacheConfiguration cacheCfg = new CacheConfiguration("test") + .setQueryEntities(Collections.singleton(qryEntity)) + .setSqlFunctionClasses(SleepFunction.class); + + return super.getConfiguration(name) + .setSqlConfiguration(new SqlConfiguration().setLongQueryWarningTimeout(WARN_TIMEOUT)) + .setGridLogger(log) + .setCacheConfiguration(cacheCfg); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * Checks that warning message is printed when long query is executed in both lazy and non-lazy modes. + */ + @Test + public void test() throws Exception { + IgniteEx ignite = startGrid(getConfiguration("srv")); + + IgniteCache cache = ignite.cache("test"); + + for (int i = 0; i < 5; i++) + cache.put(i, i); + + long startTime = System.currentTimeMillis(); + + Iterator> iter = cache.query( + new SqlFieldsQuery("SELECT * FROM test WHERE _key < sleep(?)") + .setLazy(lazy) + .setPageSize(1) + .setArgs(SLEEP)).iterator(); + + assertTrue("Iterator with query results must not be empty.", !iter.next().isEmpty()); + + long resultSetTime = System.currentTimeMillis() - startTime; + + assertTrue("Expected long query: " + resultSetTime, resultSetTime >= WARN_TIMEOUT); + + assertTrue("Expected long query warning.", lsnr.check()); + } + + /** */ + public static class SleepFunction { + /** */ + @QuerySqlFunction + public static long sleep(long x) { + if (x >= 0) + try { + Thread.sleep(x); + } + catch (InterruptedException ignored) { + // No-op. + } + + return x; + } + } +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java index 3d2732584b35f..07af21b4aeb73 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.query.DmlBatchSizeDeadlockTest; import org.apache.ignite.internal.processors.query.IgniteSqlCreateTableTemplateTest; import org.apache.ignite.internal.processors.query.LocalQueryLazyTest; +import org.apache.ignite.internal.processors.query.LongQueryWarningTest; import org.apache.ignite.internal.processors.query.LongRunningQueryTest; import org.apache.ignite.internal.processors.query.SqlLocalQueryConnectionAndStatementTest; import org.apache.ignite.internal.processors.query.SqlPartOfComplexPkLookupTest; @@ -93,8 +94,9 @@ SqlQueriesTopologyMappingTest.class, - IgniteCacheQueryReservationOnUnstableTopologyTest.class + IgniteCacheQueryReservationOnUnstableTopologyTest.class, + LongQueryWarningTest.class }) public class IgniteBinaryCacheQueryTestSuite4 { /** Setup lazy mode default. */ From 64b7ce789d84ef020854e66fe72d701ddeabdd03 Mon Sep 17 00:00:00 2001 From: oleg-vlsk Date: Fri, 28 Jun 2024 16:33:55 +1000 Subject: [PATCH 02/14] IGNITE-17086 Execution with timer refactored for queries and fetches --- .../query/h2/H2ResultSetIterator.java | 14 +- .../processors/query/h2/IgniteH2Indexing.java | 71 ++++++--- .../h2/twostep/GridMapQueryExecutor.java | 32 +++- .../query/h2/twostep/MapQueryResult.java | 29 +--- .../query/LongQueryWarningTest.java | 139 ------------------ .../query/LongRunningQueryTest.java | 61 +++++++- .../IgniteBinaryCacheQueryTestSuite4.java | 4 +- 7 files changed, 155 insertions(+), 195 deletions(-) delete mode 100644 modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongQueryWarningTest.java diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java index 9529b4430b1b0..8776e5529bebc 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java @@ -120,6 +120,9 @@ public abstract class H2ResultSetIterator extends GridIteratorAdapter impl /** */ private final H2QueryInfo qryInfo; + /** */ + final IgniteH2Indexing h2; + /** * @param data Data array. * @param log Logger. @@ -141,6 +144,7 @@ protected H2ResultSetIterator( this.data = data; this.tracing = tracing; this.qryInfo = qryInfo; + this.h2 = h2; try { res = (ResultInterface)RESULT_FIELD.get(data); @@ -310,6 +314,14 @@ private synchronized boolean fetchNext() throws IgniteCheckedException { return false; } + /** */ + private boolean fetchNextWithTimer() throws IgniteCheckedException { + return h2.executeWithTimer( + () -> fetchNext(), + qryInfo, + false); + } + /** * @return Row. */ @@ -391,7 +403,7 @@ private synchronized void closeInternal() throws IgniteCheckedException { if (closed) return false; - return hasRow || (hasRow = fetchNext()); + return hasRow || (hasRow = fetchNextWithTimer()); } /** {@inheritDoc} */ diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index f355dd7091922..aa02d68ebb156 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -117,6 +117,7 @@ import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; import org.apache.ignite.internal.util.lang.IgniteSingletonIterator; +import org.apache.ignite.internal.util.lang.IgniteThrowableSupplier; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; @@ -772,30 +773,17 @@ public ResultSet executeSqlQueryWithTimer( Boolean dataPageScanEnabled, final H2QueryInfo qryInfo ) throws IgniteCheckedException { - if (qryInfo != null) - heavyQryTracker.startTracking(qryInfo); - - enableDataPageScan(dataPageScanEnabled); - - Throwable err = null; - try ( - TraceSurroundings ignored = MTC.support(ctx.tracing() - .create(SQL_QRY_EXECUTE, MTC.span()) - .addTag(SQL_QRY_TEXT, () -> sql)) - ) { - return executeSqlQuery(conn, stmt, timeoutMillis, cancel); - } - catch (Throwable e) { - err = e; - - throw e; - } - finally { - CacheDataTree.setDataPageScanEnabled(false); - - if (qryInfo != null) - heavyQryTracker.stopTracking(qryInfo, err); - } + return executeWithTimer( + () -> { + try ( + TraceSurroundings ignored = MTC.support(ctx.tracing() + .create(SQL_QRY_EXECUTE, MTC.span()) + .addTag(SQL_QRY_TEXT, () -> sql)) + ) { + return executeSqlQuery(conn, stmt, timeoutMillis, cancel); + }}, + qryInfo, + dataPageScanEnabled); } /** {@inheritDoc} */ @@ -2253,4 +2241,39 @@ public HeavyQueriesTracker heavyQueriesTracker() { public DistributedIndexingConfiguration distributedConfiguration() { return distrCfg; } + + /** + * Executes a query/fetch and prints a warning if it took too long to finish the task. + * + * @param task Query/fetch to execute. + * @param qryInfo Query info. + * @param dataPageScanEnabled Page scan enabled flag. + * @throws IgniteCheckedException If failed. + */ + public T executeWithTimer( + IgniteThrowableSupplier task, + final H2QueryInfo qryInfo, + Boolean dataPageScanEnabled + ) throws IgniteCheckedException { + if (qryInfo != null) + heavyQryTracker.startTracking(qryInfo); + + enableDataPageScan(dataPageScanEnabled); + + Throwable err = null; + try { + return task.get(); + } + catch (Throwable e) { + err = e; + + throw e; + } + finally { + CacheDataTree.setDataPageScanEnabled(false); + + if (qryInfo != null) + heavyQryTracker.stopTracking(qryInfo, err); + } + } } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 3d388561840f2..57209a83c94fa 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -507,7 +507,7 @@ private void onQueryRequest0( res.openResult(rs, qryInfo); - final GridQueryNextPageResponse msg = prepareNextPage( + final GridQueryNextPageResponse msg = prepareNextPageWithTimer( nodeRess, node, qryResults, @@ -862,7 +862,7 @@ else if (qryResults.cancelled()) Boolean dataPageScanEnabled = isDataPageScanEnabled(req.getFlags()); - GridQueryNextPageResponse msg = prepareNextPage( + GridQueryNextPageResponse msg = prepareNextPageWithTimer( nodeRess, node, qryResults, @@ -963,6 +963,34 @@ private GridQueryNextPageResponse prepareNextPage( } } + /** + * Prepares next page for query and prints warning if it took too long. + * + * @param nodeRess Results. + * @param node Node. + * @param qr Query results. + * @param qry Query. + * @param segmentId Index segment ID. + * @param pageSize Page size. + * @param dataPageScanEnabled If data page scan is enabled. + * @return Next page. + * @throws IgniteCheckedException If failed. + */ + private GridQueryNextPageResponse prepareNextPageWithTimer( + MapNodeResults nodeRess, + ClusterNode node, + MapQueryResults qr, + int qry, + int segmentId, + int pageSize, + Boolean dataPageScanEnabled + ) throws IgniteCheckedException { + return h2.executeWithTimer( + () -> prepareNextPage(nodeRess, node, qr, qry, segmentId, pageSize, dataPageScanEnabled), + qr.result(qry).qryInfo(), + dataPageScanEnabled); + } + /** * @param node Node. * @param msg Message to send. diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java index f0c336496c249..f644a7805e1cc 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Supplier; import org.apache.ignite.IgniteLogger; import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.internal.GridKernalContext; @@ -197,7 +196,7 @@ boolean fetchNextPage(List rows, int pageSize, Boolean dataPageScanEnab try { for (int i = 0; i < pageSize; i++) { - if (!executeWithTimer(res.res::next)) + if (!res.res.next()) return true; Value[] row = res.res.currentRow(); @@ -254,7 +253,7 @@ boolean fetchNextPage(List rows, int pageSize, Boolean dataPageScanEnab res.resultSetChecker.checkOnFetchNext(); } - return !executeWithTimer(res.res::hasNext); + return !res.res.hasNext(); } finally { CacheDataTree.setDataPageScanEnabled(false); @@ -326,28 +325,8 @@ public void checkTablesVersions() { } /** */ - public boolean executeWithTimer(Supplier supplier) { - HeavyQueriesTracker heavyQryTracker = h2.heavyQueriesTracker(); - - MapH2QueryInfo qryInfo = res.qryInfo; - - if (qryInfo != null) - heavyQryTracker.startTracking(qryInfo); - - Throwable err = null; - - try { - return supplier.get(); - } - catch (Throwable e) { - err = e; - - throw e; - } - finally { - if (qryInfo != null) - heavyQryTracker.stopTracking(qryInfo, err); - } + public MapH2QueryInfo qryInfo() { + return res.qryInfo; } /** */ diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongQueryWarningTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongQueryWarningTest.java deleted file mode 100644 index 6ef2253c86952..0000000000000 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongQueryWarningTest.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query; - -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.cache.QueryEntity; -import org.apache.ignite.cache.query.SqlFieldsQuery; -import org.apache.ignite.cache.query.annotations.QuerySqlFunction; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.SqlConfiguration; -import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.testframework.ListeningTestLogger; -import org.apache.ignite.testframework.LogListener; -import org.apache.ignite.testframework.junits.GridAbstractTest; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import static org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_EXEC_MSG; -import static org.apache.ignite.testframework.GridTestUtils.cartesianProduct; - -/** */ -@RunWith(Parameterized.class) -public class LongQueryWarningTest extends GridCommonAbstractTest { - /** */ - private static final long WARN_TIMEOUT = 1000; - - /** */ - private static final long SLEEP = 2 * WARN_TIMEOUT; - - /** */ - private LogListener lsnr; - - /** */ - @Parameterized.Parameter - public boolean lazy; - - /** */ - @Parameterized.Parameters(name = "lazy={0}") - public static Collection runConfig() { - return cartesianProduct(F.asList(false, true)); - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { - lsnr = LogListener.matches(LONG_QUERY_EXEC_MSG).atLeast(1).build(); - - ListeningTestLogger log = new ListeningTestLogger(GridAbstractTest.log); - - log.registerListener(lsnr); - - QueryEntity qryEntity = new QueryEntity() - .setTableName("test") - .setKeyType(Integer.class.getName()) - .setValueType(Integer.class.getName()); - - CacheConfiguration cacheCfg = new CacheConfiguration("test") - .setQueryEntities(Collections.singleton(qryEntity)) - .setSqlFunctionClasses(SleepFunction.class); - - return super.getConfiguration(name) - .setSqlConfiguration(new SqlConfiguration().setLongQueryWarningTimeout(WARN_TIMEOUT)) - .setGridLogger(log) - .setCacheConfiguration(cacheCfg); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - } - - /** - * Checks that warning message is printed when long query is executed in both lazy and non-lazy modes. - */ - @Test - public void test() throws Exception { - IgniteEx ignite = startGrid(getConfiguration("srv")); - - IgniteCache cache = ignite.cache("test"); - - for (int i = 0; i < 5; i++) - cache.put(i, i); - - long startTime = System.currentTimeMillis(); - - Iterator> iter = cache.query( - new SqlFieldsQuery("SELECT * FROM test WHERE _key < sleep(?)") - .setLazy(lazy) - .setPageSize(1) - .setArgs(SLEEP)).iterator(); - - assertTrue("Iterator with query results must not be empty.", !iter.next().isEmpty()); - - long resultSetTime = System.currentTimeMillis() - startTime; - - assertTrue("Expected long query: " + resultSetTime, resultSetTime >= WARN_TIMEOUT); - - assertTrue("Expected long query warning.", lsnr.check()); - } - - /** */ - public static class SleepFunction { - /** */ - @QuerySqlFunction - public static long sleep(long x) { - if (x >= 0) - try { - Thread.sleep(x); - } - catch (InterruptedException ignored) { - // No-op. - } - - return x; - } - } -} diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java index 8ba7223c99809..d2852ba5243d4 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java @@ -30,6 +30,8 @@ import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.SqlConfiguration; import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker; @@ -49,12 +51,24 @@ public class LongRunningQueryTest extends AbstractIndexingCommonTest { /** Keys count. */ private static final int KEY_CNT = 1000; + /** Timeout for long query warnings. */ + private static final long WARN_TIMEOUT = 1000; + + /** Sleep duration for sql sleep function. */ + private static final long SLEEP = WARN_TIMEOUT * 2; + /** Local query mode. */ private boolean local; /** Lazy query mode. */ private boolean lazy; + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { + return super.getConfiguration(name) + .setSqlConfiguration(new SqlConfiguration().setLongQueryWarningTimeout(WARN_TIMEOUT)); + } + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); @@ -109,6 +123,30 @@ public void testLongLocal() { checkFastQueries(); } + /** + * + */ + @Test + public void testLongDistributedLazy() { + local = false; + lazy = true; + + checkLongRunning(); + checkFastQueries(); + } + + /** + * + */ + @Test + public void testLongLocalLazy() { + local = true; + lazy = true; + + checkLongRunning(); + checkFastQueries(); + } + /** * Test checks the correctness of thread name when displaying errors * about long queries. @@ -184,7 +222,10 @@ private void checkLongRunning() { testLog.registerListener(lsnr); - sqlCheckLongRunning(); + if (lazy) + sqlCheckLongRunningLazy(); + else + sqlCheckLongRunning(); assertTrue(lsnr.check()); } @@ -239,6 +280,24 @@ private FieldsQueryCursor> sql(String sql, Object... args) { .setArgs(args), false); } + /** */ + private void sqlCheckLongRunningLazy() { + long startTime = System.currentTimeMillis(); + + Iterator> iter = grid().cache("test").query( + new SqlFieldsQuery("SELECT * FROM test WHERE _key < sleep_func(?)") + .setLazy(lazy) + .setPageSize(1) + .setLocal(local) + .setArgs(SLEEP)).iterator(); + + assertTrue("Iterator with query results must not be empty.", !iter.next().isEmpty()); + + long resultSetTime = System.currentTimeMillis() - startTime; + + assertTrue("Expected long query: " + resultSetTime, resultSetTime >= WARN_TIMEOUT); + } + /** * Utility class with custom SQL functions. */ diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java index 07af21b4aeb73..3d2732584b35f 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java @@ -26,7 +26,6 @@ import org.apache.ignite.internal.processors.query.DmlBatchSizeDeadlockTest; import org.apache.ignite.internal.processors.query.IgniteSqlCreateTableTemplateTest; import org.apache.ignite.internal.processors.query.LocalQueryLazyTest; -import org.apache.ignite.internal.processors.query.LongQueryWarningTest; import org.apache.ignite.internal.processors.query.LongRunningQueryTest; import org.apache.ignite.internal.processors.query.SqlLocalQueryConnectionAndStatementTest; import org.apache.ignite.internal.processors.query.SqlPartOfComplexPkLookupTest; @@ -94,9 +93,8 @@ SqlQueriesTopologyMappingTest.class, - IgniteCacheQueryReservationOnUnstableTopologyTest.class, + IgniteCacheQueryReservationOnUnstableTopologyTest.class - LongQueryWarningTest.class }) public class IgniteBinaryCacheQueryTestSuite4 { /** Setup lazy mode default. */ From ad0eda48731665075956022a3e850e5fc616ce78 Mon Sep 17 00:00:00 2001 From: oleg-vlsk Date: Fri, 28 Jun 2024 18:43:42 +1000 Subject: [PATCH 03/14] IGNITE-17086 Test refactoring --- .../query/LongRunningQueryTest.java | 51 +++++-------------- 1 file changed, 14 insertions(+), 37 deletions(-) diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java index d2852ba5243d4..2df5d8ab3a795 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java @@ -30,8 +30,6 @@ import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.SqlConfiguration; import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker; @@ -51,24 +49,12 @@ public class LongRunningQueryTest extends AbstractIndexingCommonTest { /** Keys count. */ private static final int KEY_CNT = 1000; - /** Timeout for long query warnings. */ - private static final long WARN_TIMEOUT = 1000; - - /** Sleep duration for sql sleep function. */ - private static final long SLEEP = WARN_TIMEOUT * 2; - /** Local query mode. */ private boolean local; /** Lazy query mode. */ private boolean lazy; - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { - return super.getConfiguration(name) - .setSqlConfiguration(new SqlConfiguration().setLongQueryWarningTimeout(WARN_TIMEOUT)); - } - /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); @@ -222,10 +208,7 @@ private void checkLongRunning() { testLog.registerListener(lsnr); - if (lazy) - sqlCheckLongRunningLazy(); - else - sqlCheckLongRunning(); + sqlCheckLongRunning(); assertTrue(lsnr.check()); } @@ -259,11 +242,22 @@ private void sqlCheckLongRunning(String sql, Object... args) { GridTestUtils.assertThrowsAnyCause(log, () -> sql(sql, args).getAll(), QueryCancelledException.class, ""); } + /** + * @param sql SQL query. + * @param args Query parameters. + */ + private void sqlCheckLongRunningLazy(String sql, Object... args) { + assertFalse("Iterator with query results must not be empty.", sql(sql, args).iterator().next().isEmpty()); + } + /** * Execute long-running sql with a check for errors. */ private void sqlCheckLongRunning() { - sqlCheckLongRunning("SELECT T0.id FROM test AS T0, test AS T1, test AS T2 where T0.id > ?", 0); + if (lazy) + sqlCheckLongRunningLazy("SELECT * FROM test WHERE _key < sleep_func(?)", 2000); + else + sqlCheckLongRunning("SELECT T0.id FROM test AS T0, test AS T1, test AS T2 where T0.id > ?", 0); } /** @@ -277,27 +271,10 @@ private FieldsQueryCursor> sql(String sql, Object... args) { .setLocal(local) .setLazy(lazy) .setSchema("TEST") + .setPageSize(1) .setArgs(args), false); } - /** */ - private void sqlCheckLongRunningLazy() { - long startTime = System.currentTimeMillis(); - - Iterator> iter = grid().cache("test").query( - new SqlFieldsQuery("SELECT * FROM test WHERE _key < sleep_func(?)") - .setLazy(lazy) - .setPageSize(1) - .setLocal(local) - .setArgs(SLEEP)).iterator(); - - assertTrue("Iterator with query results must not be empty.", !iter.next().isEmpty()); - - long resultSetTime = System.currentTimeMillis() - startTime; - - assertTrue("Expected long query: " + resultSetTime, resultSetTime >= WARN_TIMEOUT); - } - /** * Utility class with custom SQL functions. */ From b32155bb15c3e914e2b12dccdb638cb8441ce01a Mon Sep 17 00:00:00 2001 From: oleg-vlsk Date: Fri, 28 Jun 2024 19:18:06 +1000 Subject: [PATCH 04/14] IGNITE-17086 Test refactoring --- .../processors/query/LongRunningQueryTest.java | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java index 2df5d8ab3a795..0d43ad3deecea 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java @@ -41,6 +41,7 @@ import static java.lang.Thread.currentThread; import static org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_EXEC_MSG; +import static org.h2.engine.Constants.DEFAULT_PAGE_SIZE; /** * Tests for log print for long-running query. @@ -49,6 +50,9 @@ public class LongRunningQueryTest extends AbstractIndexingCommonTest { /** Keys count. */ private static final int KEY_CNT = 1000; + /** Page size. */ + private int pageSize = DEFAULT_PAGE_SIZE; + /** Local query mode. */ private boolean local; @@ -254,8 +258,16 @@ private void sqlCheckLongRunningLazy(String sql, Object... args) { * Execute long-running sql with a check for errors. */ private void sqlCheckLongRunning() { - if (lazy) - sqlCheckLongRunningLazy("SELECT * FROM test WHERE _key < sleep_func(?)", 2000); + if (lazy) { + pageSize = 1; + + try { + sqlCheckLongRunningLazy("SELECT * FROM test WHERE _key < sleep_func(?)", 2000); + } + finally { + pageSize = DEFAULT_PAGE_SIZE; + } + } else sqlCheckLongRunning("SELECT T0.id FROM test AS T0, test AS T1, test AS T2 where T0.id > ?", 0); } @@ -270,8 +282,8 @@ private FieldsQueryCursor> sql(String sql, Object... args) { .setTimeout(10, TimeUnit.SECONDS) .setLocal(local) .setLazy(lazy) + .setPageSize(pageSize) .setSchema("TEST") - .setPageSize(1) .setArgs(args), false); } From 088a45062205c421210b4172964df4589a18c3d8 Mon Sep 17 00:00:00 2001 From: oleg-vlsk Date: Tue, 9 Jul 2024 17:25:26 +1000 Subject: [PATCH 05/14] IGNITE-17086 Long query tracking has been bound to H2ResultSetIterator and ReduceIndexIterator --- .../processors/query/h2/H2FieldsIterator.java | 6 +- .../query/h2/H2FullTrackableQuery.java | 124 +++++++++++++++ .../query/h2/H2ResultSetIterator.java | 21 +-- .../processors/query/h2/IgniteH2Indexing.java | 101 ++++++------ .../h2/twostep/GridMapQueryExecutor.java | 35 +---- .../h2/twostep/GridReduceQueryExecutor.java | 42 ++++- .../query/h2/twostep/MapQueryResult.java | 5 - .../query/h2/twostep/ReduceIndexIterator.java | 12 +- .../processors/query/LazyOnDmlTest.java | 5 +- .../query/LongRunningQueryTest.java | 145 +++++++++++++++--- .../query/h2/QueryDataPageScanTest.java | 5 +- 11 files changed, 372 insertions(+), 129 deletions(-) create mode 100644 modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FullTrackableQuery.java diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java index 0174fa487fae2..a392f1f56cff9 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java @@ -24,6 +24,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.tracing.Tracing; +import org.jetbrains.annotations.Nullable; /** * Special field set iterator based on database result set. @@ -49,10 +50,11 @@ public H2FieldsIterator( IgniteLogger log, IgniteH2Indexing h2, H2QueryInfo qryInfo, - Tracing tracing + Tracing tracing, + @Nullable H2FullTrackableQuery fullTrackableQry ) throws IgniteCheckedException { - super(data, pageSize, log, h2, qryInfo, tracing); + super(data, pageSize, log, h2, qryInfo, tracing, fullTrackableQry); assert conn != null; diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FullTrackableQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FullTrackableQuery.java new file mode 100644 index 0000000000000..a57bede4b37a4 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FullTrackableQuery.java @@ -0,0 +1,124 @@ +package org.apache.ignite.internal.processors.query.h2; + +import java.util.Collection; +import java.util.UUID; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.processors.query.running.RunningQueryManager; +import org.apache.ignite.internal.processors.query.running.TrackableQuery; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** */ +public class H2FullTrackableQuery implements TrackableQuery { + /** Begin timestamp. */ + private final long beginTs; + + /** Query id. */ + private final long qryId; + + /** Local node id. */ + private final UUID locNodeId; + + /** Distributed joins flag. */ + private final boolean distributedJoin; + + /** Enforce join order flag. */ + private final boolean enforceJoinOrder; + + /** Lazy mode flag. */ + private final boolean lazy; + + /** Schema name. */ + private final String schema; + + /** Sql text. */ + private final String sql; + + /** Map nodes. */ + private final Collection mapNodes; + + /** + * @param beginTs Begin timestamp. + * @param qryId Query id. + * @param locNodeId Local node id. + * @param distributedJoin Distributed joins flag. + * @param enforceJoinOrder Enforce join order flag. + * @param lazy Lazy mode flag. + * @param schema Schema name. + * @param sql Sql text. + * @param mapNodes Map nodes. + */ + public H2FullTrackableQuery( + long beginTs, + long qryId, + UUID locNodeId, + boolean distributedJoin, + boolean enforceJoinOrder, + boolean lazy, + String schema, + String sql, + @Nullable Collection mapNodes + ) { + this.beginTs = beginTs; + this.qryId = qryId; + this.locNodeId = locNodeId; + this.distributedJoin = distributedJoin; + this.enforceJoinOrder = enforceJoinOrder; + this.lazy = lazy; + this.schema = schema; + this.sql = sql; + this.mapNodes = mapNodes; + } + + /** {@inheritDoc} */ + @Override public long time() { + return U.currentTimeMillis() - beginTs; + } + + /** {@inheritDoc} */ + @Override public String queryInfo(@Nullable String additionalInfo) { + StringBuilder msgSb = new StringBuilder(); + + if (qryId == RunningQueryManager.UNDEFINED_QUERY_ID) + msgSb.append(" [globalQueryId=(undefined), node=").append(locNodeId); + else + msgSb.append(" [globalQueryId=").append(QueryUtils.globalQueryId(locNodeId, qryId)); + + if (additionalInfo != null) + msgSb.append(", ").append(additionalInfo); + + msgSb.append(", duration=").append(time()).append("ms") + .append(", distributedJoin=").append(distributedJoin) + .append(", enforceJoinOrder=").append(enforceJoinOrder) + .append(", lazy=").append(lazy) + .append(", schema=").append(schema); + + if (mapNodes != null) + msgSb.append(", map nodes: ").append(getMapNodes(mapNodes)); + + msgSb.append(", sql='").append(sql).append("'") + .append(']'); + + return msgSb.toString(); + } + + /** + * @param mapNodes Map nodes. + * @return String representation of the list of all map nodes. + */ + private String getMapNodes(Collection mapNodes) { + StringBuilder nodesSb = new StringBuilder(); + + int i = 0; + + for (ClusterNode node : mapNodes) { + nodesSb.append(node.id()); + + if (i++ != mapNodes.size() - 1) + nodesSb.append(", "); + } + + return nodesSb.toString(); + } +} diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java index 8776e5529bebc..e3d0f57a37cb2 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java @@ -46,6 +46,7 @@ import org.h2.jdbc.JdbcResultSet; import org.h2.result.ResultInterface; import org.h2.value.Value; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.tracing.SpanTags.SQL_PAGE_ROWS; import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_ITER_CLOSE; @@ -123,6 +124,9 @@ public abstract class H2ResultSetIterator extends GridIteratorAdapter impl /** */ final IgniteH2Indexing h2; + /** */ + private final H2FullTrackableQuery fullTrackableQry; + /** * @param data Data array. * @param log Logger. @@ -137,7 +141,8 @@ protected H2ResultSetIterator( IgniteLogger log, IgniteH2Indexing h2, H2QueryInfo qryInfo, - Tracing tracing + Tracing tracing, + @Nullable H2FullTrackableQuery fullTrackableQry ) throws IgniteCheckedException { ctx = h2.ctx; this.pageSize = pageSize; @@ -145,6 +150,7 @@ protected H2ResultSetIterator( this.tracing = tracing; this.qryInfo = qryInfo; this.h2 = h2; + this.fullTrackableQry = fullTrackableQry; try { res = (ResultInterface)RESULT_FIELD.get(data); @@ -314,14 +320,6 @@ private synchronized boolean fetchNext() throws IgniteCheckedException { return false; } - /** */ - private boolean fetchNextWithTimer() throws IgniteCheckedException { - return h2.executeWithTimer( - () -> fetchNext(), - qryInfo, - false); - } - /** * @return Row. */ @@ -337,6 +335,9 @@ public void onClose() throws IgniteCheckedException { lockTables(); + if (fullTrackableQry != null) + h2.heavyQueriesTracker().stopTracking(fullTrackableQry, null); + try { resultSetChecker.checkOnClose(); @@ -403,7 +404,7 @@ private synchronized void closeInternal() throws IgniteCheckedException { if (closed) return false; - return hasRow || (hasRow = fetchNextWithTimer()); + return hasRow || (hasRow = fetchNext()); } /** {@inheritDoc} */ diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index aa02d68ebb156..070086d7f3a11 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -117,7 +117,6 @@ import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; import org.apache.ignite.internal.util.lang.IgniteSingletonIterator; -import org.apache.ignite.internal.util.lang.IgniteThrowableSupplier; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; @@ -413,6 +412,20 @@ private GridQueryFieldsResult executeSelectLocal( @Override public GridCloseableIterator> iterator() throws IgniteCheckedException { H2PooledConnection conn = connections().connection(qryDesc.schemaName()); + H2FullTrackableQuery fullTrackableQry = new H2FullTrackableQuery( + System.currentTimeMillis(), + qryId, + ctx.localNodeId(), + qryDesc.distributedJoins(), + qryDesc.enforceJoinOrder(), + qryParams.lazy(), + qryDesc.schemaName(), + qry, + null + ); + + heavyQryTracker.startTracking(fullTrackableQry); + try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_ITER_OPEN, MTC.span()))) { H2Utils.setupConnection(conn, qctx, qryDesc.distributedJoins(), qryDesc.enforceJoinOrder(), qryParams.lazy()); @@ -455,7 +468,8 @@ private GridQueryFieldsResult executeSelectLocal( timeout, cancel, qryParams.dataPageScanEnabled(), - qryInfo + qryInfo, + false ); return new H2FieldsIterator( @@ -465,12 +479,15 @@ private GridQueryFieldsResult executeSelectLocal( log, IgniteH2Indexing.this, qryInfo, - ctx.tracing() + ctx.tracing(), + fullTrackableQry ); } catch (IgniteCheckedException | RuntimeException | Error e) { conn.close(); + heavyQryTracker.stopTracking(fullTrackableQry, e); + throw e; } } @@ -733,14 +750,15 @@ public ResultSet executeSqlQueryWithTimer( int timeoutMillis, @Nullable GridQueryCancel cancel, Boolean dataPageScanEnabled, - final H2QueryInfo qryInfo + final H2QueryInfo qryInfo, + boolean executeWithTracker ) throws IgniteCheckedException { PreparedStatement stmt = conn.prepareStatementNoCache(sql); H2Utils.bindParameters(stmt, params); return executeSqlQueryWithTimer(stmt, - conn, sql, timeoutMillis, cancel, dataPageScanEnabled, qryInfo); + conn, sql, timeoutMillis, cancel, dataPageScanEnabled, qryInfo, executeWithTracker); } /** @@ -771,19 +789,33 @@ public ResultSet executeSqlQueryWithTimer( int timeoutMillis, @Nullable GridQueryCancel cancel, Boolean dataPageScanEnabled, - final H2QueryInfo qryInfo + final H2QueryInfo qryInfo, + boolean executeWithTracker ) throws IgniteCheckedException { - return executeWithTimer( - () -> { - try ( - TraceSurroundings ignored = MTC.support(ctx.tracing() - .create(SQL_QRY_EXECUTE, MTC.span()) - .addTag(SQL_QRY_TEXT, () -> sql)) - ) { - return executeSqlQuery(conn, stmt, timeoutMillis, cancel); - }}, - qryInfo, - dataPageScanEnabled); + if (executeWithTracker && qryInfo != null) + heavyQryTracker.startTracking(qryInfo); + + enableDataPageScan(dataPageScanEnabled); + + Throwable err = null; + try ( + TraceSurroundings ignored = MTC.support(ctx.tracing() + .create(SQL_QRY_EXECUTE, MTC.span()) + .addTag(SQL_QRY_TEXT, () -> sql)) + ) { + return executeSqlQuery(conn, stmt, timeoutMillis, cancel); + } + catch (Throwable e) { + err = e; + + throw e; + } + finally { + CacheDataTree.setDataPageScanEnabled(false); + + if (executeWithTracker && qryInfo != null) + heavyQryTracker.stopTracking(qryInfo, err); + } } /** {@inheritDoc} */ @@ -2241,39 +2273,4 @@ public HeavyQueriesTracker heavyQueriesTracker() { public DistributedIndexingConfiguration distributedConfiguration() { return distrCfg; } - - /** - * Executes a query/fetch and prints a warning if it took too long to finish the task. - * - * @param task Query/fetch to execute. - * @param qryInfo Query info. - * @param dataPageScanEnabled Page scan enabled flag. - * @throws IgniteCheckedException If failed. - */ - public T executeWithTimer( - IgniteThrowableSupplier task, - final H2QueryInfo qryInfo, - Boolean dataPageScanEnabled - ) throws IgniteCheckedException { - if (qryInfo != null) - heavyQryTracker.startTracking(qryInfo); - - enableDataPageScan(dataPageScanEnabled); - - Throwable err = null; - try { - return task.get(); - } - catch (Throwable e) { - err = e; - - throw e; - } - finally { - CacheDataTree.setDataPageScanEnabled(false); - - if (qryInfo != null) - heavyQryTracker.stopTracking(qryInfo, err); - } - } } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 57209a83c94fa..ac8f2a46edef7 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -479,7 +479,8 @@ private void onQueryRequest0( timeout, qryResults.queryCancel(qryIdx), dataPageScanEnabled, - qryInfo); + qryInfo, + false); if (evt) { ctx.event().record(new CacheQueryExecutedEvent<>( @@ -507,7 +508,7 @@ private void onQueryRequest0( res.openResult(rs, qryInfo); - final GridQueryNextPageResponse msg = prepareNextPageWithTimer( + final GridQueryNextPageResponse msg = prepareNextPage( nodeRess, node, qryResults, @@ -862,7 +863,7 @@ else if (qryResults.cancelled()) Boolean dataPageScanEnabled = isDataPageScanEnabled(req.getFlags()); - GridQueryNextPageResponse msg = prepareNextPageWithTimer( + GridQueryNextPageResponse msg = prepareNextPage( nodeRess, node, qryResults, @@ -963,34 +964,6 @@ private GridQueryNextPageResponse prepareNextPage( } } - /** - * Prepares next page for query and prints warning if it took too long. - * - * @param nodeRess Results. - * @param node Node. - * @param qr Query results. - * @param qry Query. - * @param segmentId Index segment ID. - * @param pageSize Page size. - * @param dataPageScanEnabled If data page scan is enabled. - * @return Next page. - * @throws IgniteCheckedException If failed. - */ - private GridQueryNextPageResponse prepareNextPageWithTimer( - MapNodeResults nodeRess, - ClusterNode node, - MapQueryResults qr, - int qry, - int segmentId, - int pageSize, - Boolean dataPageScanEnabled - ) throws IgniteCheckedException { - return h2.executeWithTimer( - () -> prepareNextPage(nodeRess, node, qr, qry, segmentId, pageSize, dataPageScanEnabled), - qr.result(qry).qryInfo(), - dataPageScanEnabled); - } - /** * @param node Node. * @param msg Message to send. diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 3fdadabe361e5..01094890a6288 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.h2.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.query.h2.H2FieldsIterator; +import org.apache.ignite.internal.processors.query.h2.H2FullTrackableQuery; import org.apache.ignite.internal.processors.query.h2.H2PooledConnection; import org.apache.ignite.internal.processors.query.h2.H2Utils; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; @@ -77,6 +78,7 @@ import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlRequest; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlResponse; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; +import org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker; import org.apache.ignite.internal.processors.tracing.MTC; import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings; import org.apache.ignite.internal.util.typedef.C2; @@ -413,6 +415,19 @@ public Iterator> query( boolean release = true; + H2FullTrackableQuery fullTrackableQry = new H2FullTrackableQuery( + qryStartTime, + qryId, + ctx.localNodeId(), + qry.distributedJoins(), + enforceJoinOrder, + lazy, + schemaName, + qry.originalSql(), + nodes); + + heavyQryTracker().startTracking(fullTrackableQry); + try { final ReduceQueryRun r = createReduceQueryRun(conn, mapQueries, nodes, pageSize, nodeToSegmentsCnt, skipMergeTbl, qry.explain(), dataPageScanEnabled); @@ -482,7 +497,8 @@ else if (QueryUtils.wasCancelled(err)) r, qryReqId, qry.distributedJoins(), - ctx.tracing()); + ctx.tracing(), + fullTrackableQry); release = false; @@ -500,8 +516,11 @@ else if (QueryUtils.wasCancelled(err)) H2Utils.setupConnection(conn, qctx, false, enforceJoinOrder); - if (qry.explain()) + if (qry.explain()) { + heavyQryTracker().stopTracking(fullTrackableQry, null); + return explainPlan(conn, qry, params); + } GridCacheSqlQuery rdc = qry.reduceQuery(); @@ -528,7 +547,8 @@ else if (QueryUtils.wasCancelled(err)) timeoutMillis, cancel, dataPageScanEnabled, - qryInfo + qryInfo, + false ); resIter = new H2FieldsIterator( @@ -538,7 +558,8 @@ else if (QueryUtils.wasCancelled(err)) log, h2, qryInfo, - ctx.tracing() + ctx.tracing(), + fullTrackableQry ); conn = null; @@ -549,6 +570,9 @@ else if (QueryUtils.wasCancelled(err)) catch (IgniteCheckedException | RuntimeException e) { release = true; + if (fullTrackableQry != null) + heavyQryTracker().stopTracking(fullTrackableQry, e); + if (e instanceof CacheException) { if (QueryUtils.wasCancelled(e)) throw new CacheException("Failed to run reduce query locally.", @@ -1120,7 +1144,8 @@ private Iterator> explainPlan(H2PooledConnection c, GridCacheTwoStepQuer 0, null, null, - null); + null, + true); lists.add(F.asList(getPlan(rs))); } @@ -1140,7 +1165,7 @@ private Iterator> explainPlan(H2PooledConnection c, GridCacheTwoStepQuer F.asList(rdc.parameters(params)), 0, null, - null, null); + null, null, true); lists.add(F.asList(getPlan(rs))); @@ -1382,4 +1407,9 @@ private List prepareMapQueryForSinglePartition(GridCacheTwoSt return Collections.singletonList(originalQry); } + + /** */ + public HeavyQueriesTracker heavyQryTracker() { + return h2.heavyQueriesTracker(); + } } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java index f644a7805e1cc..d7d8736e43e44 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java @@ -324,11 +324,6 @@ public void checkTablesVersions() { GridH2Table.checkTablesVersions(ses); } - /** */ - public MapH2QueryInfo qryInfo() { - return res.qryInfo; - } - /** */ private class Result { /** */ diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexIterator.java index dc39ac73c7910..40d8fb01edf87 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexIterator.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexIterator.java @@ -23,11 +23,13 @@ import java.util.List; import java.util.NoSuchElementException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.query.running.TrackableQuery; import org.apache.ignite.internal.processors.tracing.MTC; import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings; import org.apache.ignite.internal.processors.tracing.Tracing; import org.h2.index.Cursor; import org.h2.result.Row; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_ITER_CLOSE; @@ -65,6 +67,9 @@ public class ReduceIndexIterator implements Iterator>, AutoCloseable { /** Tracing processor. */ private final Tracing tracing; + /** Trackable query. */ + private final TrackableQuery fullTrackableQry; + /** * Constructor. * @@ -80,7 +85,8 @@ public ReduceIndexIterator(GridReduceQueryExecutor rdcExec, ReduceQueryRun run, long qryReqId, boolean distributedJoins, - Tracing tracing + Tracing tracing, + @Nullable TrackableQuery fullTrackableQry ) { this.rdcExec = rdcExec; this.nodes = nodes; @@ -88,6 +94,7 @@ public ReduceIndexIterator(GridReduceQueryExecutor rdcExec, this.qryReqId = qryReqId; this.distributedJoins = distributedJoins; this.tracing = tracing; + this.fullTrackableQry = fullTrackableQry; rdcIter = run.reducers().iterator(); @@ -118,6 +125,9 @@ public ReduceIndexIterator(GridReduceQueryExecutor rdcExec, /** {@inheritDoc} */ @Override public void close() throws Exception { + if (fullTrackableQry != null) + rdcExec.heavyQryTracker().stopTracking(fullTrackableQry, null); + try (TraceSurroundings ignored = MTC.support(tracing.create(SQL_ITER_CLOSE, MTC.span()))) { releaseIfNeeded(); } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyOnDmlTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyOnDmlTest.java index 0690cb3c0d005..9a4098a5e7b1f 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyOnDmlTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyOnDmlTest.java @@ -299,7 +299,7 @@ private static class CheckLazyIndexing extends IgniteH2Indexing { /** {@inheritDoc} */ @Override public ResultSet executeSqlQueryWithTimer(PreparedStatement stmt, H2PooledConnection conn, String sql, int timeoutMillis, @Nullable GridQueryCancel cancel, Boolean dataPageScanEnabled, - H2QueryInfo qryInfo) throws IgniteCheckedException { + H2QueryInfo qryInfo, boolean executeWithTracker) throws IgniteCheckedException { if (expectedLazy != null) { assertEquals( "Unexpected lazy flag [sql=" + sql + ']', @@ -310,7 +310,8 @@ private static class CheckLazyIndexing extends IgniteH2Indexing { qryCnt++; - return super.executeSqlQueryWithTimer(stmt, conn, sql, timeoutMillis, cancel, dataPageScanEnabled, qryInfo); + return super.executeSqlQueryWithTimer(stmt, conn, sql, timeoutMillis, cancel, dataPageScanEnabled, qryInfo, + executeWithTracker); } /** */ diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java index 0d43ad3deecea..cbe3f4dc86912 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java @@ -22,12 +22,14 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest; @@ -59,15 +61,23 @@ public class LongRunningQueryTest extends AbstractIndexingCommonTest { /** Lazy query mode. */ private boolean lazy; + /** Using merge table flag. */ + private boolean withMergeTable; + + /** Distributed joins flag. */ + private boolean distributedJoins; + + /** Ignite instance. */ + private Ignite ignite; + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); - startGrid(); + ignite = startGrid(); IgniteCache c = grid().createCache(new CacheConfiguration() .setName("test") - .setSqlSchema("TEST") .setQueryEntities(Collections.singleton(new QueryEntity(Long.class, Long.class) .setTableName("test") .addQueryField("id", Long.class.getName(), null) @@ -89,6 +99,18 @@ public class LongRunningQueryTest extends AbstractIndexingCommonTest { super.afterTest(); } + /** + * @param name Name. + * @param idxTypes Index types. + */ + @SuppressWarnings("unchecked") + private static CacheConfiguration cacheConfig(String name, Class... idxTypes) { + return new CacheConfiguration() + .setName(name) + .setIndexedTypes(idxTypes) + .setSqlFunctionClasses(TestSQLFunctions.class); + } + /** * */ @@ -125,6 +147,24 @@ public void testLongDistributedLazy() { checkFastQueries(); } + /** + * + */ + @Test + public void testLongDistributedLazyWithMergeTable() { + local = false; + lazy = true; + + withMergeTable = true; + + try { + checkLongRunning(); + } + finally { + withMergeTable = false; + } + } + /** * */ @@ -194,7 +234,7 @@ private void checkFastQueries() { // Several fast queries. for (int i = 0; i < 10; ++i) - sql("SELECT * FROM test").getAll(); + sql("test", "SELECT * FROM test").getAll(); assertFalse(lsnr.check()); } @@ -228,7 +268,7 @@ private void checkBigResultSet() throws Exception { testLog.registerListener(lsnr); - try (FieldsQueryCursor cur = sql("SELECT T0.id FROM test AS T0, test AS T1")) { + try (FieldsQueryCursor cur = sql("test", "SELECT T0.id FROM test AS T0, test AS T1")) { Iterator it = cur.iterator(); while (it.hasNext()) @@ -243,7 +283,7 @@ private void checkBigResultSet() throws Exception { * @param args Query parameters. */ private void sqlCheckLongRunning(String sql, Object... args) { - GridTestUtils.assertThrowsAnyCause(log, () -> sql(sql, args).getAll(), QueryCancelledException.class, ""); + GridTestUtils.assertThrowsAnyCause(log, () -> sql("test", sql, args).getAll(), QueryCancelledException.class, ""); } /** @@ -251,40 +291,75 @@ private void sqlCheckLongRunning(String sql, Object... args) { * @param args Query parameters. */ private void sqlCheckLongRunningLazy(String sql, Object... args) { - assertFalse("Iterator with query results must not be empty.", sql(sql, args).iterator().next().isEmpty()); + pageSize = 1; + + try { + assertFalse(sql("test", sql, args).iterator().next().isEmpty()); + } + finally { + pageSize = DEFAULT_PAGE_SIZE; + } + } + + /** + * @param sql SQL query. + * @param args Query parameters. + */ + private void sqlCheckLongRunningLazyWithMergeTable(String sql, Object... args) { + distributedJoins = true; + + try { + CacheConfiguration ccfg1 = cacheConfig("pers", Integer.class, Person.class); + CacheConfiguration ccfg2 = cacheConfig("org", Integer.class, Organization.class); + + IgniteCache cache1 = ignite.getOrCreateCache(ccfg1); + IgniteCache cache2 = ignite.getOrCreateCache(ccfg2); + + cache2.put(1, new Organization("o1")); + cache2.put(2, new Organization("o2")); + cache1.put(3, new Person(1, "p1")); + cache1.put(4, new Person(2, "p2")); + cache1.put(5, new Person(3, "p3")); + + assertFalse(sql("pers", sql, args).getAll().isEmpty()); + } + finally { + distributedJoins = false; + } } /** * Execute long-running sql with a check for errors. */ private void sqlCheckLongRunning() { - if (lazy) { - pageSize = 1; + if (lazy && withMergeTable) { + String select = "select o.name n1, p.name n2 from Person p, \"org\".Organization o" + + " where p.orgId = o._key and o._key=1 and o._key < sleep_func(?)" + + " union select o.name n1, p.name n2 from Person p, \"org\".Organization o" + + " where p.orgId = o._key and o._key=2"; - try { - sqlCheckLongRunningLazy("SELECT * FROM test WHERE _key < sleep_func(?)", 2000); - } - finally { - pageSize = DEFAULT_PAGE_SIZE; - } + sqlCheckLongRunningLazyWithMergeTable(select, 2000); } + else if (lazy && !withMergeTable) + sqlCheckLongRunningLazy("SELECT * FROM test WHERE _key < sleep_func(?)", 2000); else sqlCheckLongRunning("SELECT T0.id FROM test AS T0, test AS T1, test AS T2 where T0.id > ?", 0); } /** + * @param cacheName Cache name. * @param sql SQL query. * @param args Query parameters. * @return Results cursor. */ - private FieldsQueryCursor> sql(String sql, Object... args) { - return grid().context().query().querySqlFields(new SqlFieldsQuery(sql) + private FieldsQueryCursor> sql(String cacheName, String sql, Object... args) { + return ignite.cache(cacheName).query(new SqlFieldsQuery(sql) .setTimeout(10, TimeUnit.SECONDS) .setLocal(local) .setLazy(lazy) .setPageSize(pageSize) - .setSchema("TEST") - .setArgs(args), false); + .setDistributedJoins(distributedJoins) + .setArgs(args)); } /** @@ -335,4 +410,38 @@ private ListeningTestLogger testLog() { private HeavyQueriesTracker heavyQueriesTracker() { return ((IgniteH2Indexing)grid().context().query().getIndexing()).heavyQueriesTracker(); } + + /** */ + private static class Person { + /** */ + @QuerySqlField(index = true) + int orgId; + + /** */ + @QuerySqlField(index = true) + String name; + + /** + * @param orgId Organization ID. + * @param name Name. + */ + public Person(int orgId, String name) { + this.orgId = orgId; + this.name = name; + } + } + + /** */ + private static class Organization { + /** */ + @QuerySqlField + String name; + + /** + * @param name Organization name. + */ + public Organization(String name) { + this.name = name; + } + } } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java index 0f424a50d4416..3800f1f831f15 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java @@ -498,13 +498,14 @@ static class DirectPageScanIndexing extends IgniteH2Indexing { int timeoutMillis, @Nullable GridQueryCancel cancel, Boolean dataPageScanEnabled, - final H2QueryInfo qryInfo + final H2QueryInfo qryInfo, + boolean executeWithTracker ) throws IgniteCheckedException { callsCnt.incrementAndGet(); assertEquals(expectedDataPageScanEnabled, dataPageScanEnabled); return super.executeSqlQueryWithTimer(stmt, conn, sql, timeoutMillis, - cancel, dataPageScanEnabled, qryInfo); + cancel, dataPageScanEnabled, qryInfo, executeWithTracker); } } From 607f6402f18cb5d2a24f7b787968c9cf118e0a6e Mon Sep 17 00:00:00 2001 From: oleg-vlsk Date: Tue, 9 Jul 2024 18:46:59 +1000 Subject: [PATCH 06/14] IGNITE-17086 Minor refactoring --- .../internal/processors/query/h2/H2ResultSetIterator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java index e3d0f57a37cb2..8de4bb28198fb 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java @@ -122,7 +122,7 @@ public abstract class H2ResultSetIterator extends GridIteratorAdapter impl private final H2QueryInfo qryInfo; /** */ - final IgniteH2Indexing h2; + private final IgniteH2Indexing h2; /** */ private final H2FullTrackableQuery fullTrackableQry; From 35f430dc5ba600a3b5dc1c1ef5f7285e40c4515d Mon Sep 17 00:00:00 2001 From: oleg-vlsk Date: Wed, 10 Jul 2024 15:00:20 +1000 Subject: [PATCH 07/14] IGNITE-17086 Minor refactoring --- .../processors/query/h2/H2FullTrackableQuery.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FullTrackableQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FullTrackableQuery.java index a57bede4b37a4..7d0c577c01d2c 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FullTrackableQuery.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FullTrackableQuery.java @@ -17,8 +17,8 @@ public class H2FullTrackableQuery implements TrackableQuery { /** Query id. */ private final long qryId; - /** Local node id. */ - private final UUID locNodeId; + /** Aggregator node id. */ + private final UUID aggrNodeId; /** Distributed joins flag. */ private final boolean distributedJoin; @@ -41,7 +41,7 @@ public class H2FullTrackableQuery implements TrackableQuery { /** * @param beginTs Begin timestamp. * @param qryId Query id. - * @param locNodeId Local node id. + * @param aggrNodeId Aggregator node id. * @param distributedJoin Distributed joins flag. * @param enforceJoinOrder Enforce join order flag. * @param lazy Lazy mode flag. @@ -52,7 +52,7 @@ public class H2FullTrackableQuery implements TrackableQuery { public H2FullTrackableQuery( long beginTs, long qryId, - UUID locNodeId, + UUID aggrNodeId, boolean distributedJoin, boolean enforceJoinOrder, boolean lazy, @@ -62,7 +62,7 @@ public H2FullTrackableQuery( ) { this.beginTs = beginTs; this.qryId = qryId; - this.locNodeId = locNodeId; + this.aggrNodeId = aggrNodeId; this.distributedJoin = distributedJoin; this.enforceJoinOrder = enforceJoinOrder; this.lazy = lazy; @@ -81,9 +81,9 @@ public H2FullTrackableQuery( StringBuilder msgSb = new StringBuilder(); if (qryId == RunningQueryManager.UNDEFINED_QUERY_ID) - msgSb.append(" [globalQueryId=(undefined), node=").append(locNodeId); + msgSb.append(" [globalQueryId=(undefined), node=").append(aggrNodeId); else - msgSb.append(" [globalQueryId=").append(QueryUtils.globalQueryId(locNodeId, qryId)); + msgSb.append(" [globalQueryId=").append(QueryUtils.globalQueryId(aggrNodeId, qryId)); if (additionalInfo != null) msgSb.append(", ").append(additionalInfo); From b96f1d83884b361ed89341d61f6c7c7f6403dc48 Mon Sep 17 00:00:00 2001 From: oleg-vlsk Date: Thu, 18 Jul 2024 12:39:21 +1000 Subject: [PATCH 08/14] Revert "IGNITE-17086 Long query tracking has been bound to H2ResultSetIterator and ReduceIndexIterator" This reverts commits 088a45062205c421210b4172964df4589a18c3d8, 607f6402f18cb5d2a24f7b787968c9cf118e0a6e, 35f430dc5ba600a3b5dc1c1ef5f7285e40c4515d --- .../processors/query/h2/H2FieldsIterator.java | 6 +- .../query/h2/H2FullTrackableQuery.java | 124 --------------- .../query/h2/H2ResultSetIterator.java | 23 ++- .../processors/query/h2/IgniteH2Indexing.java | 101 ++++++------ .../h2/twostep/GridMapQueryExecutor.java | 35 ++++- .../h2/twostep/GridReduceQueryExecutor.java | 42 +---- .../query/h2/twostep/MapQueryResult.java | 5 + .../query/h2/twostep/ReduceIndexIterator.java | 12 +- .../processors/query/LazyOnDmlTest.java | 5 +- .../query/LongRunningQueryTest.java | 145 +++--------------- .../query/h2/QueryDataPageScanTest.java | 5 +- 11 files changed, 130 insertions(+), 373 deletions(-) delete mode 100644 modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FullTrackableQuery.java diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java index a392f1f56cff9..0174fa487fae2 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java @@ -24,7 +24,6 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.tracing.Tracing; -import org.jetbrains.annotations.Nullable; /** * Special field set iterator based on database result set. @@ -50,11 +49,10 @@ public H2FieldsIterator( IgniteLogger log, IgniteH2Indexing h2, H2QueryInfo qryInfo, - Tracing tracing, - @Nullable H2FullTrackableQuery fullTrackableQry + Tracing tracing ) throws IgniteCheckedException { - super(data, pageSize, log, h2, qryInfo, tracing, fullTrackableQry); + super(data, pageSize, log, h2, qryInfo, tracing); assert conn != null; diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FullTrackableQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FullTrackableQuery.java deleted file mode 100644 index 7d0c577c01d2c..0000000000000 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FullTrackableQuery.java +++ /dev/null @@ -1,124 +0,0 @@ -package org.apache.ignite.internal.processors.query.h2; - -import java.util.Collection; -import java.util.UUID; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.processors.query.QueryUtils; -import org.apache.ignite.internal.processors.query.running.RunningQueryManager; -import org.apache.ignite.internal.processors.query.running.TrackableQuery; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.Nullable; - -/** */ -public class H2FullTrackableQuery implements TrackableQuery { - /** Begin timestamp. */ - private final long beginTs; - - /** Query id. */ - private final long qryId; - - /** Aggregator node id. */ - private final UUID aggrNodeId; - - /** Distributed joins flag. */ - private final boolean distributedJoin; - - /** Enforce join order flag. */ - private final boolean enforceJoinOrder; - - /** Lazy mode flag. */ - private final boolean lazy; - - /** Schema name. */ - private final String schema; - - /** Sql text. */ - private final String sql; - - /** Map nodes. */ - private final Collection mapNodes; - - /** - * @param beginTs Begin timestamp. - * @param qryId Query id. - * @param aggrNodeId Aggregator node id. - * @param distributedJoin Distributed joins flag. - * @param enforceJoinOrder Enforce join order flag. - * @param lazy Lazy mode flag. - * @param schema Schema name. - * @param sql Sql text. - * @param mapNodes Map nodes. - */ - public H2FullTrackableQuery( - long beginTs, - long qryId, - UUID aggrNodeId, - boolean distributedJoin, - boolean enforceJoinOrder, - boolean lazy, - String schema, - String sql, - @Nullable Collection mapNodes - ) { - this.beginTs = beginTs; - this.qryId = qryId; - this.aggrNodeId = aggrNodeId; - this.distributedJoin = distributedJoin; - this.enforceJoinOrder = enforceJoinOrder; - this.lazy = lazy; - this.schema = schema; - this.sql = sql; - this.mapNodes = mapNodes; - } - - /** {@inheritDoc} */ - @Override public long time() { - return U.currentTimeMillis() - beginTs; - } - - /** {@inheritDoc} */ - @Override public String queryInfo(@Nullable String additionalInfo) { - StringBuilder msgSb = new StringBuilder(); - - if (qryId == RunningQueryManager.UNDEFINED_QUERY_ID) - msgSb.append(" [globalQueryId=(undefined), node=").append(aggrNodeId); - else - msgSb.append(" [globalQueryId=").append(QueryUtils.globalQueryId(aggrNodeId, qryId)); - - if (additionalInfo != null) - msgSb.append(", ").append(additionalInfo); - - msgSb.append(", duration=").append(time()).append("ms") - .append(", distributedJoin=").append(distributedJoin) - .append(", enforceJoinOrder=").append(enforceJoinOrder) - .append(", lazy=").append(lazy) - .append(", schema=").append(schema); - - if (mapNodes != null) - msgSb.append(", map nodes: ").append(getMapNodes(mapNodes)); - - msgSb.append(", sql='").append(sql).append("'") - .append(']'); - - return msgSb.toString(); - } - - /** - * @param mapNodes Map nodes. - * @return String representation of the list of all map nodes. - */ - private String getMapNodes(Collection mapNodes) { - StringBuilder nodesSb = new StringBuilder(); - - int i = 0; - - for (ClusterNode node : mapNodes) { - nodesSb.append(node.id()); - - if (i++ != mapNodes.size() - 1) - nodesSb.append(", "); - } - - return nodesSb.toString(); - } -} diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java index 8de4bb28198fb..8776e5529bebc 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java @@ -46,7 +46,6 @@ import org.h2.jdbc.JdbcResultSet; import org.h2.result.ResultInterface; import org.h2.value.Value; -import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.tracing.SpanTags.SQL_PAGE_ROWS; import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_ITER_CLOSE; @@ -122,10 +121,7 @@ public abstract class H2ResultSetIterator extends GridIteratorAdapter impl private final H2QueryInfo qryInfo; /** */ - private final IgniteH2Indexing h2; - - /** */ - private final H2FullTrackableQuery fullTrackableQry; + final IgniteH2Indexing h2; /** * @param data Data array. @@ -141,8 +137,7 @@ protected H2ResultSetIterator( IgniteLogger log, IgniteH2Indexing h2, H2QueryInfo qryInfo, - Tracing tracing, - @Nullable H2FullTrackableQuery fullTrackableQry + Tracing tracing ) throws IgniteCheckedException { ctx = h2.ctx; this.pageSize = pageSize; @@ -150,7 +145,6 @@ protected H2ResultSetIterator( this.tracing = tracing; this.qryInfo = qryInfo; this.h2 = h2; - this.fullTrackableQry = fullTrackableQry; try { res = (ResultInterface)RESULT_FIELD.get(data); @@ -320,6 +314,14 @@ private synchronized boolean fetchNext() throws IgniteCheckedException { return false; } + /** */ + private boolean fetchNextWithTimer() throws IgniteCheckedException { + return h2.executeWithTimer( + () -> fetchNext(), + qryInfo, + false); + } + /** * @return Row. */ @@ -335,9 +337,6 @@ public void onClose() throws IgniteCheckedException { lockTables(); - if (fullTrackableQry != null) - h2.heavyQueriesTracker().stopTracking(fullTrackableQry, null); - try { resultSetChecker.checkOnClose(); @@ -404,7 +403,7 @@ private synchronized void closeInternal() throws IgniteCheckedException { if (closed) return false; - return hasRow || (hasRow = fetchNext()); + return hasRow || (hasRow = fetchNextWithTimer()); } /** {@inheritDoc} */ diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 070086d7f3a11..aa02d68ebb156 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -117,6 +117,7 @@ import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; import org.apache.ignite.internal.util.lang.IgniteSingletonIterator; +import org.apache.ignite.internal.util.lang.IgniteThrowableSupplier; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; @@ -412,20 +413,6 @@ private GridQueryFieldsResult executeSelectLocal( @Override public GridCloseableIterator> iterator() throws IgniteCheckedException { H2PooledConnection conn = connections().connection(qryDesc.schemaName()); - H2FullTrackableQuery fullTrackableQry = new H2FullTrackableQuery( - System.currentTimeMillis(), - qryId, - ctx.localNodeId(), - qryDesc.distributedJoins(), - qryDesc.enforceJoinOrder(), - qryParams.lazy(), - qryDesc.schemaName(), - qry, - null - ); - - heavyQryTracker.startTracking(fullTrackableQry); - try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_ITER_OPEN, MTC.span()))) { H2Utils.setupConnection(conn, qctx, qryDesc.distributedJoins(), qryDesc.enforceJoinOrder(), qryParams.lazy()); @@ -468,8 +455,7 @@ private GridQueryFieldsResult executeSelectLocal( timeout, cancel, qryParams.dataPageScanEnabled(), - qryInfo, - false + qryInfo ); return new H2FieldsIterator( @@ -479,15 +465,12 @@ private GridQueryFieldsResult executeSelectLocal( log, IgniteH2Indexing.this, qryInfo, - ctx.tracing(), - fullTrackableQry + ctx.tracing() ); } catch (IgniteCheckedException | RuntimeException | Error e) { conn.close(); - heavyQryTracker.stopTracking(fullTrackableQry, e); - throw e; } } @@ -750,15 +733,14 @@ public ResultSet executeSqlQueryWithTimer( int timeoutMillis, @Nullable GridQueryCancel cancel, Boolean dataPageScanEnabled, - final H2QueryInfo qryInfo, - boolean executeWithTracker + final H2QueryInfo qryInfo ) throws IgniteCheckedException { PreparedStatement stmt = conn.prepareStatementNoCache(sql); H2Utils.bindParameters(stmt, params); return executeSqlQueryWithTimer(stmt, - conn, sql, timeoutMillis, cancel, dataPageScanEnabled, qryInfo, executeWithTracker); + conn, sql, timeoutMillis, cancel, dataPageScanEnabled, qryInfo); } /** @@ -789,33 +771,19 @@ public ResultSet executeSqlQueryWithTimer( int timeoutMillis, @Nullable GridQueryCancel cancel, Boolean dataPageScanEnabled, - final H2QueryInfo qryInfo, - boolean executeWithTracker + final H2QueryInfo qryInfo ) throws IgniteCheckedException { - if (executeWithTracker && qryInfo != null) - heavyQryTracker.startTracking(qryInfo); - - enableDataPageScan(dataPageScanEnabled); - - Throwable err = null; - try ( - TraceSurroundings ignored = MTC.support(ctx.tracing() - .create(SQL_QRY_EXECUTE, MTC.span()) - .addTag(SQL_QRY_TEXT, () -> sql)) - ) { - return executeSqlQuery(conn, stmt, timeoutMillis, cancel); - } - catch (Throwable e) { - err = e; - - throw e; - } - finally { - CacheDataTree.setDataPageScanEnabled(false); - - if (executeWithTracker && qryInfo != null) - heavyQryTracker.stopTracking(qryInfo, err); - } + return executeWithTimer( + () -> { + try ( + TraceSurroundings ignored = MTC.support(ctx.tracing() + .create(SQL_QRY_EXECUTE, MTC.span()) + .addTag(SQL_QRY_TEXT, () -> sql)) + ) { + return executeSqlQuery(conn, stmt, timeoutMillis, cancel); + }}, + qryInfo, + dataPageScanEnabled); } /** {@inheritDoc} */ @@ -2273,4 +2241,39 @@ public HeavyQueriesTracker heavyQueriesTracker() { public DistributedIndexingConfiguration distributedConfiguration() { return distrCfg; } + + /** + * Executes a query/fetch and prints a warning if it took too long to finish the task. + * + * @param task Query/fetch to execute. + * @param qryInfo Query info. + * @param dataPageScanEnabled Page scan enabled flag. + * @throws IgniteCheckedException If failed. + */ + public T executeWithTimer( + IgniteThrowableSupplier task, + final H2QueryInfo qryInfo, + Boolean dataPageScanEnabled + ) throws IgniteCheckedException { + if (qryInfo != null) + heavyQryTracker.startTracking(qryInfo); + + enableDataPageScan(dataPageScanEnabled); + + Throwable err = null; + try { + return task.get(); + } + catch (Throwable e) { + err = e; + + throw e; + } + finally { + CacheDataTree.setDataPageScanEnabled(false); + + if (qryInfo != null) + heavyQryTracker.stopTracking(qryInfo, err); + } + } } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index ac8f2a46edef7..57209a83c94fa 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -479,8 +479,7 @@ private void onQueryRequest0( timeout, qryResults.queryCancel(qryIdx), dataPageScanEnabled, - qryInfo, - false); + qryInfo); if (evt) { ctx.event().record(new CacheQueryExecutedEvent<>( @@ -508,7 +507,7 @@ private void onQueryRequest0( res.openResult(rs, qryInfo); - final GridQueryNextPageResponse msg = prepareNextPage( + final GridQueryNextPageResponse msg = prepareNextPageWithTimer( nodeRess, node, qryResults, @@ -863,7 +862,7 @@ else if (qryResults.cancelled()) Boolean dataPageScanEnabled = isDataPageScanEnabled(req.getFlags()); - GridQueryNextPageResponse msg = prepareNextPage( + GridQueryNextPageResponse msg = prepareNextPageWithTimer( nodeRess, node, qryResults, @@ -964,6 +963,34 @@ private GridQueryNextPageResponse prepareNextPage( } } + /** + * Prepares next page for query and prints warning if it took too long. + * + * @param nodeRess Results. + * @param node Node. + * @param qr Query results. + * @param qry Query. + * @param segmentId Index segment ID. + * @param pageSize Page size. + * @param dataPageScanEnabled If data page scan is enabled. + * @return Next page. + * @throws IgniteCheckedException If failed. + */ + private GridQueryNextPageResponse prepareNextPageWithTimer( + MapNodeResults nodeRess, + ClusterNode node, + MapQueryResults qr, + int qry, + int segmentId, + int pageSize, + Boolean dataPageScanEnabled + ) throws IgniteCheckedException { + return h2.executeWithTimer( + () -> prepareNextPage(nodeRess, node, qr, qry, segmentId, pageSize, dataPageScanEnabled), + qr.result(qry).qryInfo(), + dataPageScanEnabled); + } + /** * @param node Node. * @param msg Message to send. diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 01094890a6288..3fdadabe361e5 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -61,7 +61,6 @@ import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.h2.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.query.h2.H2FieldsIterator; -import org.apache.ignite.internal.processors.query.h2.H2FullTrackableQuery; import org.apache.ignite.internal.processors.query.h2.H2PooledConnection; import org.apache.ignite.internal.processors.query.h2.H2Utils; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; @@ -78,7 +77,6 @@ import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlRequest; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlResponse; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; -import org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker; import org.apache.ignite.internal.processors.tracing.MTC; import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings; import org.apache.ignite.internal.util.typedef.C2; @@ -415,19 +413,6 @@ public Iterator> query( boolean release = true; - H2FullTrackableQuery fullTrackableQry = new H2FullTrackableQuery( - qryStartTime, - qryId, - ctx.localNodeId(), - qry.distributedJoins(), - enforceJoinOrder, - lazy, - schemaName, - qry.originalSql(), - nodes); - - heavyQryTracker().startTracking(fullTrackableQry); - try { final ReduceQueryRun r = createReduceQueryRun(conn, mapQueries, nodes, pageSize, nodeToSegmentsCnt, skipMergeTbl, qry.explain(), dataPageScanEnabled); @@ -497,8 +482,7 @@ else if (QueryUtils.wasCancelled(err)) r, qryReqId, qry.distributedJoins(), - ctx.tracing(), - fullTrackableQry); + ctx.tracing()); release = false; @@ -516,11 +500,8 @@ else if (QueryUtils.wasCancelled(err)) H2Utils.setupConnection(conn, qctx, false, enforceJoinOrder); - if (qry.explain()) { - heavyQryTracker().stopTracking(fullTrackableQry, null); - + if (qry.explain()) return explainPlan(conn, qry, params); - } GridCacheSqlQuery rdc = qry.reduceQuery(); @@ -547,8 +528,7 @@ else if (QueryUtils.wasCancelled(err)) timeoutMillis, cancel, dataPageScanEnabled, - qryInfo, - false + qryInfo ); resIter = new H2FieldsIterator( @@ -558,8 +538,7 @@ else if (QueryUtils.wasCancelled(err)) log, h2, qryInfo, - ctx.tracing(), - fullTrackableQry + ctx.tracing() ); conn = null; @@ -570,9 +549,6 @@ else if (QueryUtils.wasCancelled(err)) catch (IgniteCheckedException | RuntimeException e) { release = true; - if (fullTrackableQry != null) - heavyQryTracker().stopTracking(fullTrackableQry, e); - if (e instanceof CacheException) { if (QueryUtils.wasCancelled(e)) throw new CacheException("Failed to run reduce query locally.", @@ -1144,8 +1120,7 @@ private Iterator> explainPlan(H2PooledConnection c, GridCacheTwoStepQuer 0, null, null, - null, - true); + null); lists.add(F.asList(getPlan(rs))); } @@ -1165,7 +1140,7 @@ private Iterator> explainPlan(H2PooledConnection c, GridCacheTwoStepQuer F.asList(rdc.parameters(params)), 0, null, - null, null, true); + null, null); lists.add(F.asList(getPlan(rs))); @@ -1407,9 +1382,4 @@ private List prepareMapQueryForSinglePartition(GridCacheTwoSt return Collections.singletonList(originalQry); } - - /** */ - public HeavyQueriesTracker heavyQryTracker() { - return h2.heavyQueriesTracker(); - } } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java index d7d8736e43e44..f644a7805e1cc 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java @@ -324,6 +324,11 @@ public void checkTablesVersions() { GridH2Table.checkTablesVersions(ses); } + /** */ + public MapH2QueryInfo qryInfo() { + return res.qryInfo; + } + /** */ private class Result { /** */ diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexIterator.java index 40d8fb01edf87..dc39ac73c7910 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexIterator.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexIterator.java @@ -23,13 +23,11 @@ import java.util.List; import java.util.NoSuchElementException; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.processors.query.running.TrackableQuery; import org.apache.ignite.internal.processors.tracing.MTC; import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings; import org.apache.ignite.internal.processors.tracing.Tracing; import org.h2.index.Cursor; import org.h2.result.Row; -import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_ITER_CLOSE; @@ -67,9 +65,6 @@ public class ReduceIndexIterator implements Iterator>, AutoCloseable { /** Tracing processor. */ private final Tracing tracing; - /** Trackable query. */ - private final TrackableQuery fullTrackableQry; - /** * Constructor. * @@ -85,8 +80,7 @@ public ReduceIndexIterator(GridReduceQueryExecutor rdcExec, ReduceQueryRun run, long qryReqId, boolean distributedJoins, - Tracing tracing, - @Nullable TrackableQuery fullTrackableQry + Tracing tracing ) { this.rdcExec = rdcExec; this.nodes = nodes; @@ -94,7 +88,6 @@ public ReduceIndexIterator(GridReduceQueryExecutor rdcExec, this.qryReqId = qryReqId; this.distributedJoins = distributedJoins; this.tracing = tracing; - this.fullTrackableQry = fullTrackableQry; rdcIter = run.reducers().iterator(); @@ -125,9 +118,6 @@ public ReduceIndexIterator(GridReduceQueryExecutor rdcExec, /** {@inheritDoc} */ @Override public void close() throws Exception { - if (fullTrackableQry != null) - rdcExec.heavyQryTracker().stopTracking(fullTrackableQry, null); - try (TraceSurroundings ignored = MTC.support(tracing.create(SQL_ITER_CLOSE, MTC.span()))) { releaseIfNeeded(); } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyOnDmlTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyOnDmlTest.java index 9a4098a5e7b1f..0690cb3c0d005 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyOnDmlTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyOnDmlTest.java @@ -299,7 +299,7 @@ private static class CheckLazyIndexing extends IgniteH2Indexing { /** {@inheritDoc} */ @Override public ResultSet executeSqlQueryWithTimer(PreparedStatement stmt, H2PooledConnection conn, String sql, int timeoutMillis, @Nullable GridQueryCancel cancel, Boolean dataPageScanEnabled, - H2QueryInfo qryInfo, boolean executeWithTracker) throws IgniteCheckedException { + H2QueryInfo qryInfo) throws IgniteCheckedException { if (expectedLazy != null) { assertEquals( "Unexpected lazy flag [sql=" + sql + ']', @@ -310,8 +310,7 @@ private static class CheckLazyIndexing extends IgniteH2Indexing { qryCnt++; - return super.executeSqlQueryWithTimer(stmt, conn, sql, timeoutMillis, cancel, dataPageScanEnabled, qryInfo, - executeWithTracker); + return super.executeSqlQueryWithTimer(stmt, conn, sql, timeoutMillis, cancel, dataPageScanEnabled, qryInfo); } /** */ diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java index cbe3f4dc86912..0d43ad3deecea 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java @@ -22,14 +22,12 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; -import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.cache.query.SqlFieldsQuery; -import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest; @@ -61,23 +59,15 @@ public class LongRunningQueryTest extends AbstractIndexingCommonTest { /** Lazy query mode. */ private boolean lazy; - /** Using merge table flag. */ - private boolean withMergeTable; - - /** Distributed joins flag. */ - private boolean distributedJoins; - - /** Ignite instance. */ - private Ignite ignite; - /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); - ignite = startGrid(); + startGrid(); IgniteCache c = grid().createCache(new CacheConfiguration() .setName("test") + .setSqlSchema("TEST") .setQueryEntities(Collections.singleton(new QueryEntity(Long.class, Long.class) .setTableName("test") .addQueryField("id", Long.class.getName(), null) @@ -99,18 +89,6 @@ public class LongRunningQueryTest extends AbstractIndexingCommonTest { super.afterTest(); } - /** - * @param name Name. - * @param idxTypes Index types. - */ - @SuppressWarnings("unchecked") - private static CacheConfiguration cacheConfig(String name, Class... idxTypes) { - return new CacheConfiguration() - .setName(name) - .setIndexedTypes(idxTypes) - .setSqlFunctionClasses(TestSQLFunctions.class); - } - /** * */ @@ -147,24 +125,6 @@ public void testLongDistributedLazy() { checkFastQueries(); } - /** - * - */ - @Test - public void testLongDistributedLazyWithMergeTable() { - local = false; - lazy = true; - - withMergeTable = true; - - try { - checkLongRunning(); - } - finally { - withMergeTable = false; - } - } - /** * */ @@ -234,7 +194,7 @@ private void checkFastQueries() { // Several fast queries. for (int i = 0; i < 10; ++i) - sql("test", "SELECT * FROM test").getAll(); + sql("SELECT * FROM test").getAll(); assertFalse(lsnr.check()); } @@ -268,7 +228,7 @@ private void checkBigResultSet() throws Exception { testLog.registerListener(lsnr); - try (FieldsQueryCursor cur = sql("test", "SELECT T0.id FROM test AS T0, test AS T1")) { + try (FieldsQueryCursor cur = sql("SELECT T0.id FROM test AS T0, test AS T1")) { Iterator it = cur.iterator(); while (it.hasNext()) @@ -283,7 +243,7 @@ private void checkBigResultSet() throws Exception { * @param args Query parameters. */ private void sqlCheckLongRunning(String sql, Object... args) { - GridTestUtils.assertThrowsAnyCause(log, () -> sql("test", sql, args).getAll(), QueryCancelledException.class, ""); + GridTestUtils.assertThrowsAnyCause(log, () -> sql(sql, args).getAll(), QueryCancelledException.class, ""); } /** @@ -291,75 +251,40 @@ private void sqlCheckLongRunning(String sql, Object... args) { * @param args Query parameters. */ private void sqlCheckLongRunningLazy(String sql, Object... args) { - pageSize = 1; - - try { - assertFalse(sql("test", sql, args).iterator().next().isEmpty()); - } - finally { - pageSize = DEFAULT_PAGE_SIZE; - } - } - - /** - * @param sql SQL query. - * @param args Query parameters. - */ - private void sqlCheckLongRunningLazyWithMergeTable(String sql, Object... args) { - distributedJoins = true; - - try { - CacheConfiguration ccfg1 = cacheConfig("pers", Integer.class, Person.class); - CacheConfiguration ccfg2 = cacheConfig("org", Integer.class, Organization.class); - - IgniteCache cache1 = ignite.getOrCreateCache(ccfg1); - IgniteCache cache2 = ignite.getOrCreateCache(ccfg2); - - cache2.put(1, new Organization("o1")); - cache2.put(2, new Organization("o2")); - cache1.put(3, new Person(1, "p1")); - cache1.put(4, new Person(2, "p2")); - cache1.put(5, new Person(3, "p3")); - - assertFalse(sql("pers", sql, args).getAll().isEmpty()); - } - finally { - distributedJoins = false; - } + assertFalse("Iterator with query results must not be empty.", sql(sql, args).iterator().next().isEmpty()); } /** * Execute long-running sql with a check for errors. */ private void sqlCheckLongRunning() { - if (lazy && withMergeTable) { - String select = "select o.name n1, p.name n2 from Person p, \"org\".Organization o" + - " where p.orgId = o._key and o._key=1 and o._key < sleep_func(?)" + - " union select o.name n1, p.name n2 from Person p, \"org\".Organization o" + - " where p.orgId = o._key and o._key=2"; + if (lazy) { + pageSize = 1; - sqlCheckLongRunningLazyWithMergeTable(select, 2000); + try { + sqlCheckLongRunningLazy("SELECT * FROM test WHERE _key < sleep_func(?)", 2000); + } + finally { + pageSize = DEFAULT_PAGE_SIZE; + } } - else if (lazy && !withMergeTable) - sqlCheckLongRunningLazy("SELECT * FROM test WHERE _key < sleep_func(?)", 2000); else sqlCheckLongRunning("SELECT T0.id FROM test AS T0, test AS T1, test AS T2 where T0.id > ?", 0); } /** - * @param cacheName Cache name. * @param sql SQL query. * @param args Query parameters. * @return Results cursor. */ - private FieldsQueryCursor> sql(String cacheName, String sql, Object... args) { - return ignite.cache(cacheName).query(new SqlFieldsQuery(sql) + private FieldsQueryCursor> sql(String sql, Object... args) { + return grid().context().query().querySqlFields(new SqlFieldsQuery(sql) .setTimeout(10, TimeUnit.SECONDS) .setLocal(local) .setLazy(lazy) .setPageSize(pageSize) - .setDistributedJoins(distributedJoins) - .setArgs(args)); + .setSchema("TEST") + .setArgs(args), false); } /** @@ -410,38 +335,4 @@ private ListeningTestLogger testLog() { private HeavyQueriesTracker heavyQueriesTracker() { return ((IgniteH2Indexing)grid().context().query().getIndexing()).heavyQueriesTracker(); } - - /** */ - private static class Person { - /** */ - @QuerySqlField(index = true) - int orgId; - - /** */ - @QuerySqlField(index = true) - String name; - - /** - * @param orgId Organization ID. - * @param name Name. - */ - public Person(int orgId, String name) { - this.orgId = orgId; - this.name = name; - } - } - - /** */ - private static class Organization { - /** */ - @QuerySqlField - String name; - - /** - * @param name Organization name. - */ - public Organization(String name) { - this.name = name; - } - } } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java index 3800f1f831f15..0f424a50d4416 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java @@ -498,14 +498,13 @@ static class DirectPageScanIndexing extends IgniteH2Indexing { int timeoutMillis, @Nullable GridQueryCancel cancel, Boolean dataPageScanEnabled, - final H2QueryInfo qryInfo, - boolean executeWithTracker + final H2QueryInfo qryInfo ) throws IgniteCheckedException { callsCnt.incrementAndGet(); assertEquals(expectedDataPageScanEnabled, dataPageScanEnabled); return super.executeSqlQueryWithTimer(stmt, conn, sql, timeoutMillis, - cancel, dataPageScanEnabled, qryInfo, executeWithTracker); + cancel, dataPageScanEnabled, qryInfo); } } From 11c3b6c51faeeda14ad851826fee94f06c31d398 Mon Sep 17 00:00:00 2001 From: oleg-vlsk Date: Fri, 28 Jun 2024 19:18:06 +1000 Subject: [PATCH 09/14] IGNITE-17086 Separate tracking for Map and Reduce phases, introduction of suspension/resuming for long-running queries tracking --- .../processors/query/h2/H2QueryInfo.java | 41 ++++- .../query/h2/H2ResultSetIterator.java | 13 +- .../processors/query/h2/IgniteH2Indexing.java | 86 ++++++----- .../h2/twostep/GridMapQueryExecutor.java | 108 ++++++------- .../h2/twostep/GridReduceQueryExecutor.java | 27 +++- .../query/LongRunningQueryTest.java | 145 +++++++++++++++--- 6 files changed, 294 insertions(+), 126 deletions(-) diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java index 7cdbed157fe33..f7f7e4101a6cd 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java @@ -41,6 +41,15 @@ public class H2QueryInfo implements TrackableQuery { /** Begin timestamp. */ private final long beginTs; + /** The most recent point in time when the tracking of a long query was suspended. */ + private volatile long lastSuspendTs; + + /** External wait time. */ + private volatile long extWait; + + /** Long query time tracking suspension flag. */ + private volatile boolean isSuspended; + /** Query schema. */ private final String schema; @@ -65,6 +74,9 @@ public class H2QueryInfo implements TrackableQuery { /** Query id. */ private final long queryId; + /** Lock object. */ + private final Object lock = new Object(); + /** * @param type Query type. * @param stmt Query statement. @@ -123,7 +135,29 @@ protected void printInfo(StringBuilder msg) { /** {@inheritDoc} */ @Override public long time() { - return U.currentTimeMillis() - beginTs; + return (isSuspended ? lastSuspendTs : U.currentTimeMillis()) - beginTs - extWait; + } + + /** */ + public void suspendTracking() { + synchronized (lock) { + if (!isSuspended) { + isSuspended = true; + + lastSuspendTs = U.currentTimeMillis(); + } + } + } + + /** */ + public void resumeTracking() { + synchronized (lock) { + if (isSuspended) { + isSuspended = false; + + extWait += U.currentTimeMillis() - lastSuspendTs; + } + } } /** @@ -156,6 +190,11 @@ protected void printInfo(StringBuilder msg) { return msgSb.toString(); } + /** */ + public boolean isSuspended() { + return isSuspended; + } + /** * Query type. */ diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java index 8776e5529bebc..18cbac6b494af 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java @@ -314,14 +314,6 @@ private synchronized boolean fetchNext() throws IgniteCheckedException { return false; } - /** */ - private boolean fetchNextWithTimer() throws IgniteCheckedException { - return h2.executeWithTimer( - () -> fetchNext(), - qryInfo, - false); - } - /** * @return Row. */ @@ -337,6 +329,9 @@ public void onClose() throws IgniteCheckedException { lockTables(); + if (qryInfo != null) + h2.heavyQueriesTracker().stopTracking(qryInfo, null); + try { resultSetChecker.checkOnClose(); @@ -403,7 +398,7 @@ private synchronized void closeInternal() throws IgniteCheckedException { if (closed) return false; - return hasRow || (hasRow = fetchNextWithTimer()); + return hasRow || (hasRow = h2.executeWithResumableTimeTracking(this::fetchNext, qryInfo)); } /** {@inheritDoc} */ diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index aa02d68ebb156..3e4748beaa221 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -413,6 +413,8 @@ private GridQueryFieldsResult executeSelectLocal( @Override public GridCloseableIterator> iterator() throws IgniteCheckedException { H2PooledConnection conn = connections().connection(qryDesc.schemaName()); + H2QueryInfo qryInfo = null; + try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_ITER_OPEN, MTC.span()))) { H2Utils.setupConnection(conn, qctx, qryDesc.distributedJoins(), qryDesc.enforceJoinOrder(), qryParams.lazy()); @@ -435,9 +437,11 @@ private GridQueryFieldsResult executeSelectLocal( H2Utils.bindParameters(stmt, F.asList(params)); - H2QueryInfo qryInfo = new H2QueryInfo(H2QueryInfo.QueryType.LOCAL, stmt, qry, + qryInfo = new H2QueryInfo(H2QueryInfo.QueryType.LOCAL, stmt, qry, ctx.localNodeId(), qryId); + heavyQryTracker.startTracking(qryInfo); + if (ctx.performanceStatistics().enabled()) { ctx.performanceStatistics().queryProperty( GridCacheQueryType.SQL_FIELDS, @@ -448,13 +452,16 @@ private GridQueryFieldsResult executeSelectLocal( ); } - ResultSet rs = executeSqlQueryWithTimer( - stmt, - conn, - qry, - timeout, - cancel, - qryParams.dataPageScanEnabled(), + ResultSet rs = executeWithResumableTimeTracking( + () -> executeSqlQueryWithTimer( + stmt, + conn, + qry, + timeout, + cancel, + qryParams.dataPageScanEnabled(), + null + ), qryInfo ); @@ -471,6 +478,9 @@ private GridQueryFieldsResult executeSelectLocal( catch (IgniteCheckedException | RuntimeException | Error e) { conn.close(); + if (qryInfo != null) + heavyQryTracker.stopTracking(qryInfo, e); + throw e; } } @@ -773,17 +783,30 @@ public ResultSet executeSqlQueryWithTimer( Boolean dataPageScanEnabled, final H2QueryInfo qryInfo ) throws IgniteCheckedException { - return executeWithTimer( - () -> { - try ( - TraceSurroundings ignored = MTC.support(ctx.tracing() - .create(SQL_QRY_EXECUTE, MTC.span()) - .addTag(SQL_QRY_TEXT, () -> sql)) - ) { - return executeSqlQuery(conn, stmt, timeoutMillis, cancel); - }}, - qryInfo, - dataPageScanEnabled); + if (qryInfo != null) + heavyQryTracker.startTracking(qryInfo); + + enableDataPageScan(dataPageScanEnabled); + + Throwable err = null; + try ( + TraceSurroundings ignored = MTC.support(ctx.tracing() + .create(SQL_QRY_EXECUTE, MTC.span()) + .addTag(SQL_QRY_TEXT, () -> sql)) + ) { + return executeSqlQuery(conn, stmt, timeoutMillis, cancel); + } + catch (Throwable e) { + err = e; + + throw e; + } + finally { + CacheDataTree.setDataPageScanEnabled(false); + + if (qryInfo != null) + heavyQryTracker.stopTracking(qryInfo, err); + } } /** {@inheritDoc} */ @@ -2243,37 +2266,24 @@ public DistributedIndexingConfiguration distributedConfiguration() { } /** - * Executes a query/fetch and prints a warning if it took too long to finish the task. + * Resumes time tracking before the task (if needed) and suspends time tracking after the task is finished. * * @param task Query/fetch to execute. * @param qryInfo Query info. - * @param dataPageScanEnabled Page scan enabled flag. * @throws IgniteCheckedException If failed. */ - public T executeWithTimer( + public T executeWithResumableTimeTracking( IgniteThrowableSupplier task, - final H2QueryInfo qryInfo, - Boolean dataPageScanEnabled + final H2QueryInfo qryInfo ) throws IgniteCheckedException { - if (qryInfo != null) - heavyQryTracker.startTracking(qryInfo); - - enableDataPageScan(dataPageScanEnabled); + if (qryInfo.isSuspended()) + qryInfo.resumeTracking(); - Throwable err = null; try { return task.get(); } - catch (Throwable e) { - err = e; - - throw e; - } finally { - CacheDataTree.setDataPageScanEnabled(false); - - if (qryInfo != null) - heavyQryTracker.stopTracking(qryInfo, err); + qryInfo.suspendTracking(); } } } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 57209a83c94fa..e0a295b50ebe8 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -446,6 +446,8 @@ private void onQueryRequest0( qryResults.addResult(qryIdx, res); + MapH2QueryInfo qryInfo = null; + try { res.lock(); @@ -460,7 +462,9 @@ private void onQueryRequest0( H2Utils.bindParameters(stmt, params0); - MapH2QueryInfo qryInfo = new MapH2QueryInfo(stmt, qry.query(), node.id(), qryId, reqId, segmentId); + qryInfo = new MapH2QueryInfo(stmt, qry.query(), node.id(), qryId, reqId, segmentId); + + h2.heavyQueriesTracker().startTracking(qryInfo); if (performanceStatsEnabled) { ctx.performanceStatistics().queryProperty( @@ -472,14 +476,20 @@ private void onQueryRequest0( ); } - ResultSet rs = h2.executeSqlQueryWithTimer( - stmt, - conn, - sql, - timeout, - qryResults.queryCancel(qryIdx), - dataPageScanEnabled, - qryInfo); + GridQueryCancel qryCancel = qryResults.queryCancel(qryIdx); + + ResultSet rs = h2.executeWithResumableTimeTracking( + () -> h2.executeSqlQueryWithTimer( + stmt, + conn, + sql, + timeout, + qryCancel, + dataPageScanEnabled, + null + ), + qryInfo + ); if (evt) { ctx.event().record(new CacheQueryExecutedEvent<>( @@ -507,14 +517,21 @@ private void onQueryRequest0( res.openResult(rs, qryInfo); - final GridQueryNextPageResponse msg = prepareNextPageWithTimer( - nodeRess, - node, - qryResults, - qryIdx, - segmentId, - pageSize, - dataPageScanEnabled + MapQueryResults qryResults0 = qryResults; + + int qryIdx0 = qryIdx; + + final GridQueryNextPageResponse msg = h2.executeWithResumableTimeTracking( + () -> prepareNextPage( + nodeRess, + node, + qryResults0, + qryIdx0, + segmentId, + pageSize, + dataPageScanEnabled + ), + qryInfo ); if (msg != null) @@ -528,6 +545,12 @@ private void onQueryRequest0( qryIdx++; } + catch (Throwable e) { + if (qryInfo != null) + h2.heavyQueriesTracker().stopTracking(qryInfo, e); + + throw e; + } finally { try { res.unlockTables(); @@ -862,14 +885,18 @@ else if (qryResults.cancelled()) Boolean dataPageScanEnabled = isDataPageScanEnabled(req.getFlags()); - GridQueryNextPageResponse msg = prepareNextPageWithTimer( - nodeRess, - node, - qryResults, - req.query(), - req.segmentId(), - req.pageSize(), - dataPageScanEnabled); + GridQueryNextPageResponse msg = h2.executeWithResumableTimeTracking( + () -> prepareNextPage( + nodeRess, + node, + qryResults, + req.query(), + req.segmentId(), + req.pageSize(), + dataPageScanEnabled + ), + res.qryInfo() + ); if (msg != null) sendNextPage(node, msg); @@ -939,6 +966,9 @@ private GridQueryNextPageResponse prepareNextPage( if (last) { qr.closeResult(qry); + if (res.qryInfo() != null) + h2.heavyQueriesTracker().stopTracking(res.qryInfo(), null); + if (qr.isAllClosed()) { nodeRess.remove(qr.queryRequestId(), segmentId, qr); @@ -963,34 +993,6 @@ private GridQueryNextPageResponse prepareNextPage( } } - /** - * Prepares next page for query and prints warning if it took too long. - * - * @param nodeRess Results. - * @param node Node. - * @param qr Query results. - * @param qry Query. - * @param segmentId Index segment ID. - * @param pageSize Page size. - * @param dataPageScanEnabled If data page scan is enabled. - * @return Next page. - * @throws IgniteCheckedException If failed. - */ - private GridQueryNextPageResponse prepareNextPageWithTimer( - MapNodeResults nodeRess, - ClusterNode node, - MapQueryResults qr, - int qry, - int segmentId, - int pageSize, - Boolean dataPageScanEnabled - ) throws IgniteCheckedException { - return h2.executeWithTimer( - () -> prepareNextPage(nodeRess, node, qr, qry, segmentId, pageSize, dataPageScanEnabled), - qr.result(qry).qryInfo(), - dataPageScanEnabled); - } - /** * @param node Node. * @param msg Message to send. diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 3fdadabe361e5..cffa0dfd9b8e6 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -419,6 +419,8 @@ public Iterator> query( runs.put(qryReqId, r); + ReduceH2QueryInfo qryInfo = null; + try { cancel.add(() -> send(nodes, new GridQueryCancelRequest(qryReqId), null, true)); @@ -509,9 +511,11 @@ else if (QueryUtils.wasCancelled(err)) H2Utils.bindParameters(stmt, F.asList(rdc.parameters(params))); - ReduceH2QueryInfo qryInfo = new ReduceH2QueryInfo(stmt, qry.originalSql(), + qryInfo = new ReduceH2QueryInfo(stmt, qry.originalSql(), ctx.localNodeId(), qryId, qryReqId); + h2.heavyQueriesTracker().startTracking(qryInfo); + if (ctx.performanceStatistics().enabled()) { ctx.performanceStatistics().queryProperty( GridCacheQueryType.SQL_FIELDS, @@ -522,12 +526,18 @@ else if (QueryUtils.wasCancelled(err)) ); } - ResultSet res = h2.executeSqlQueryWithTimer(stmt, - conn, - rdc.query(), - timeoutMillis, - cancel, - dataPageScanEnabled, + H2PooledConnection conn0 = conn; + + ResultSet res = h2.executeWithResumableTimeTracking( + () -> h2.executeSqlQueryWithTimer( + stmt, + conn0, + rdc.query(), + timeoutMillis, + cancel, + dataPageScanEnabled, + null + ), qryInfo ); @@ -549,6 +559,9 @@ else if (QueryUtils.wasCancelled(err)) catch (IgniteCheckedException | RuntimeException e) { release = true; + if (qryInfo != null) + h2.heavyQueriesTracker().stopTracking(qryInfo, e); + if (e instanceof CacheException) { if (QueryUtils.wasCancelled(e)) throw new CacheException("Failed to run reduce query locally.", diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java index 0d43ad3deecea..2095550f008c7 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java @@ -22,12 +22,14 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest; @@ -59,15 +61,23 @@ public class LongRunningQueryTest extends AbstractIndexingCommonTest { /** Lazy query mode. */ private boolean lazy; + /** Merge table usage flag. */ + private boolean withMergeTable; + + /** Distributed joins flag. */ + private boolean distributedJoins; + + /** Ignite instance. */ + private Ignite ignite; + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); - startGrid(); + ignite = startGrid(); IgniteCache c = grid().createCache(new CacheConfiguration() .setName("test") - .setSqlSchema("TEST") .setQueryEntities(Collections.singleton(new QueryEntity(Long.class, Long.class) .setTableName("test") .addQueryField("id", Long.class.getName(), null) @@ -89,6 +99,18 @@ public class LongRunningQueryTest extends AbstractIndexingCommonTest { super.afterTest(); } + /** + * @param name Name. + * @param idxTypes Index types. + */ + @SuppressWarnings("unchecked") + private static CacheConfiguration cacheConfig(String name, Class... idxTypes) { + return new CacheConfiguration() + .setName(name) + .setIndexedTypes(idxTypes) + .setSqlFunctionClasses(TestSQLFunctions.class); + } + /** * */ @@ -125,6 +147,24 @@ public void testLongDistributedLazy() { checkFastQueries(); } + /** + * + */ + @Test + public void testLongDistributedLazyWithMergeTable() { + local = false; + lazy = true; + + withMergeTable = true; + + try { + checkLongRunning(); + } + finally { + withMergeTable = false; + } + } + /** * */ @@ -194,7 +234,7 @@ private void checkFastQueries() { // Several fast queries. for (int i = 0; i < 10; ++i) - sql("SELECT * FROM test").getAll(); + sql("test", "SELECT * FROM test").getAll(); assertFalse(lsnr.check()); } @@ -228,7 +268,7 @@ private void checkBigResultSet() throws Exception { testLog.registerListener(lsnr); - try (FieldsQueryCursor cur = sql("SELECT T0.id FROM test AS T0, test AS T1")) { + try (FieldsQueryCursor cur = sql("test", "SELECT T0.id FROM test AS T0, test AS T1")) { Iterator it = cur.iterator(); while (it.hasNext()) @@ -243,7 +283,7 @@ private void checkBigResultSet() throws Exception { * @param args Query parameters. */ private void sqlCheckLongRunning(String sql, Object... args) { - GridTestUtils.assertThrowsAnyCause(log, () -> sql(sql, args).getAll(), QueryCancelledException.class, ""); + GridTestUtils.assertThrowsAnyCause(log, () -> sql("test", sql, args).getAll(), QueryCancelledException.class, ""); } /** @@ -251,40 +291,75 @@ private void sqlCheckLongRunning(String sql, Object... args) { * @param args Query parameters. */ private void sqlCheckLongRunningLazy(String sql, Object... args) { - assertFalse("Iterator with query results must not be empty.", sql(sql, args).iterator().next().isEmpty()); + pageSize = 1; + + try { + assertFalse(sql("test", sql, args).iterator().next().isEmpty()); + } + finally { + pageSize = DEFAULT_PAGE_SIZE; + } + } + + /** + * @param sql SQL query. + * @param args Query parameters. + */ + private void sqlCheckLongRunningLazyWithMergeTable(String sql, Object... args) { + distributedJoins = true; + + try { + CacheConfiguration ccfg1 = cacheConfig("pers", Integer.class, Person.class); + CacheConfiguration ccfg2 = cacheConfig("org", Integer.class, Organization.class); + + IgniteCache cache1 = ignite.getOrCreateCache(ccfg1); + IgniteCache cache2 = ignite.getOrCreateCache(ccfg2); + + cache2.put(1, new Organization("o1")); + cache2.put(2, new Organization("o2")); + cache1.put(3, new Person(1, "p1")); + cache1.put(4, new Person(2, "p2")); + cache1.put(5, new Person(3, "p3")); + + assertFalse(sql("pers", sql, args).getAll().isEmpty()); + } + finally { + distributedJoins = false; + } } /** * Execute long-running sql with a check for errors. */ private void sqlCheckLongRunning() { - if (lazy) { - pageSize = 1; + if (lazy && withMergeTable) { + String select = "select o.name n1, p.name n2 from Person p, \"org\".Organization o" + + " where p.orgId = o._key and o._key=1 and o._key < sleep_func(?)" + + " union select o.name n1, p.name n2 from Person p, \"org\".Organization o" + + " where p.orgId = o._key and o._key=2"; - try { - sqlCheckLongRunningLazy("SELECT * FROM test WHERE _key < sleep_func(?)", 2000); - } - finally { - pageSize = DEFAULT_PAGE_SIZE; - } + sqlCheckLongRunningLazyWithMergeTable(select, 2000); } + else if (lazy && !withMergeTable) + sqlCheckLongRunningLazy("SELECT * FROM test WHERE _key < sleep_func(?)", 2000); else sqlCheckLongRunning("SELECT T0.id FROM test AS T0, test AS T1, test AS T2 where T0.id > ?", 0); } /** + * @param cacheName Cache name. * @param sql SQL query. * @param args Query parameters. * @return Results cursor. */ - private FieldsQueryCursor> sql(String sql, Object... args) { - return grid().context().query().querySqlFields(new SqlFieldsQuery(sql) + private FieldsQueryCursor> sql(String cacheName, String sql, Object... args) { + return ignite.cache(cacheName).query(new SqlFieldsQuery(sql) .setTimeout(10, TimeUnit.SECONDS) .setLocal(local) .setLazy(lazy) .setPageSize(pageSize) - .setSchema("TEST") - .setArgs(args), false); + .setDistributedJoins(distributedJoins) + .setArgs(args)); } /** @@ -335,4 +410,38 @@ private ListeningTestLogger testLog() { private HeavyQueriesTracker heavyQueriesTracker() { return ((IgniteH2Indexing)grid().context().query().getIndexing()).heavyQueriesTracker(); } + + /** */ + private static class Person { + /** */ + @QuerySqlField(index = true) + int orgId; + + /** */ + @QuerySqlField(index = true) + String name; + + /** + * @param orgId Organization ID. + * @param name Name. + */ + public Person(int orgId, String name) { + this.orgId = orgId; + this.name = name; + } + } + + /** */ + private static class Organization { + /** */ + @QuerySqlField + String name; + + /** + * @param name Organization name. + */ + public Organization(String name) { + this.name = name; + } + } } From 36a6d9fadc76cc76fb37ffe71229f32f70552712 Mon Sep 17 00:00:00 2001 From: oleg-vlsk Date: Fri, 19 Jul 2024 16:12:54 +1000 Subject: [PATCH 10/14] IGNITE-17086 Minor refactoring --- .../ignite/internal/processors/query/h2/IgniteH2Indexing.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 3e4748beaa221..b4a05713640e1 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -2276,8 +2276,7 @@ public T executeWithResumableTimeTracking( IgniteThrowableSupplier task, final H2QueryInfo qryInfo ) throws IgniteCheckedException { - if (qryInfo.isSuspended()) - qryInfo.resumeTracking(); + qryInfo.resumeTracking(); try { return task.get(); From 1650a4d092bdfb2e33ca0b8d6c30cebb05f66abf Mon Sep 17 00:00:00 2001 From: oleg-vlsk Date: Sun, 21 Jul 2024 16:08:40 +1000 Subject: [PATCH 11/14] IGNITE-17086 LongRunningQueryTest#testLazyWithExternalWait added --- .../query/LongRunningQueryTest.java | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java index 2095550f008c7..51b0df54c9cf2 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import org.apache.ignite.Ignite; @@ -33,6 +34,7 @@ import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest; +import org.apache.ignite.internal.processors.query.h2.H2QueryInfo; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker; import org.apache.ignite.internal.util.worker.GridWorker; @@ -52,6 +54,12 @@ public class LongRunningQueryTest extends AbstractIndexingCommonTest { /** Keys count. */ private static final int KEY_CNT = 1000; + /** External wait time. */ + private static final int EXT_WAIT_TIME = 2000; + + /** External wait time relative error. */ + private static final int EXT_WAIT_REL_ERR = (int)(EXT_WAIT_TIME * 0.01); + /** Page size. */ private int pageSize = DEFAULT_PAGE_SIZE; @@ -177,6 +185,46 @@ public void testLongLocalLazy() { checkFastQueries(); } + /** + * Test checks that no long-running queries warnings are printed in case of external waits. + */ + @Test + public void testLazyWithExternalWait() throws InterruptedException { + local = false; + lazy = true; + + pageSize = 1; + + LogListener lsnr = LogListener + .matches(LONG_QUERY_EXEC_MSG) + .build(); + + testLog().registerListener(lsnr); + + try { + Iterator> it = sql("test", "select * from test").iterator(); + + it.next(); + + Thread.sleep(EXT_WAIT_TIME); + + it.next(); + + ConcurrentHashMap qrys = GridTestUtils.getFieldValue(heavyQueriesTracker(), "qrys"); + + H2QueryInfo qry = (H2QueryInfo)qrys.keySet().iterator().next(); + + long extWait = GridTestUtils.getFieldValue(qry, "extWait"); + + assertTrue(extWait >= EXT_WAIT_TIME - EXT_WAIT_REL_ERR); + + assertFalse(lsnr.check()); + } + finally { + pageSize = DEFAULT_PAGE_SIZE; + } + } + /** * Test checks the correctness of thread name when displaying errors * about long queries. From 3d769316eddf4780b5113ea09e884139a523364b Mon Sep 17 00:00:00 2001 From: oleg-vlsk Date: Mon, 22 Jul 2024 13:49:15 +1000 Subject: [PATCH 12/14] IGNITE-17086 External wait checks added for local queries --- .../query/LongRunningQueryTest.java | 78 ++++++++++++------- 1 file changed, 48 insertions(+), 30 deletions(-) diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java index 51b0df54c9cf2..0794e8d968e92 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java @@ -186,43 +186,27 @@ public void testLongLocalLazy() { } /** - * Test checks that no long-running queries warnings are printed in case of external waits. + * Test checks that no long-running queries warnings are printed in case of external waits during + * the execution of distributed queries. */ @Test - public void testLazyWithExternalWait() throws InterruptedException { + public void testDistributedLazyWithExternalWait() throws InterruptedException { local = false; lazy = true; - pageSize = 1; - - LogListener lsnr = LogListener - .matches(LONG_QUERY_EXEC_MSG) - .build(); - - testLog().registerListener(lsnr); - - try { - Iterator> it = sql("test", "select * from test").iterator(); - - it.next(); - - Thread.sleep(EXT_WAIT_TIME); - - it.next(); - - ConcurrentHashMap qrys = GridTestUtils.getFieldValue(heavyQueriesTracker(), "qrys"); - - H2QueryInfo qry = (H2QueryInfo)qrys.keySet().iterator().next(); - - long extWait = GridTestUtils.getFieldValue(qry, "extWait"); + checkLazyWithExternalWait(); + } - assertTrue(extWait >= EXT_WAIT_TIME - EXT_WAIT_REL_ERR); + /** + * Test checks that no long-running queries warnings are printed in case of external waits during + * the execution of local queries. + */ + @Test + public void testlocalLazyWithExternalWait() throws InterruptedException { + local = true; + lazy = true; - assertFalse(lsnr.check()); - } - finally { - pageSize = DEFAULT_PAGE_SIZE; - } + checkLazyWithExternalWait(); } /** @@ -410,6 +394,40 @@ private FieldsQueryCursor> sql(String cacheName, String sql, Object... a .setArgs(args)); } + /** */ + public void checkLazyWithExternalWait() throws InterruptedException { + pageSize = 1; + + LogListener lsnr = LogListener + .matches(LONG_QUERY_EXEC_MSG) + .build(); + + testLog().registerListener(lsnr); + + try { + Iterator> it = sql("test", "select * from test").iterator(); + + it.next(); + + Thread.sleep(EXT_WAIT_TIME); + + it.next(); + + ConcurrentHashMap qrys = GridTestUtils.getFieldValue(heavyQueriesTracker(), "qrys"); + + H2QueryInfo qry = (H2QueryInfo)qrys.keySet().iterator().next(); + + long extWait = GridTestUtils.getFieldValue(qry, "extWait"); + + assertTrue(extWait >= EXT_WAIT_TIME - EXT_WAIT_REL_ERR); + + assertFalse(lsnr.check()); + } + finally { + pageSize = DEFAULT_PAGE_SIZE; + } + } + /** * Utility class with custom SQL functions. */ From 1c455fb2a5dc5c99c2183e20268304a57c18f9de Mon Sep 17 00:00:00 2001 From: oleg-vlsk Date: Thu, 25 Jul 2024 09:32:14 +1000 Subject: [PATCH 13/14] IGNITE-17086 Corrections as per review --- .../query/running/HeavyQueriesTracker.java | 6 ++++ .../processors/query/h2/H2QueryInfo.java | 28 +++++++++---------- .../h2/twostep/GridMapQueryExecutor.java | 6 ++++ .../query/LongRunningQueryTest.java | 24 +++++++--------- 4 files changed, 35 insertions(+), 29 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java index d7a0679334083..1a3654a415aa9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.query.running; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridKernalContext; @@ -254,6 +255,11 @@ public void setResultSetSizeThresholdMultiplier(int rsSizeThresholdMult) { this.rsSizeThresholdMult = rsSizeThresholdMult <= 1 ? 1 : rsSizeThresholdMult; } + /** */ + public Set getQueries() { + return qrys.keySet(); + } + /** * Holds timeout settings for the specified query. */ diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java index f7f7e4101a6cd..95ce6c1bf7e64 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java @@ -74,9 +74,6 @@ public class H2QueryInfo implements TrackableQuery { /** Query id. */ private final long queryId; - /** Lock object. */ - private final Object lock = new Object(); - /** * @param type Query type. * @param stmt Query statement. @@ -124,6 +121,11 @@ public String plan() { return stmt.getPlanSQL(); } + /** */ + public long extWait() { + return extWait; + } + /** * Print info specified by children. * @@ -139,24 +141,20 @@ protected void printInfo(StringBuilder msg) { } /** */ - public void suspendTracking() { - synchronized (lock) { - if (!isSuspended) { - isSuspended = true; + public synchronized void suspendTracking() { + if (!isSuspended) { + isSuspended = true; - lastSuspendTs = U.currentTimeMillis(); - } + lastSuspendTs = U.currentTimeMillis(); } } /** */ - public void resumeTracking() { - synchronized (lock) { - if (isSuspended) { - isSuspended = false; + public synchronized void resumeTracking() { + if (isSuspended) { + isSuspended = false; - extWait += U.currentTimeMillis() - lastSuspendTs; - } + extWait += U.currentTimeMillis() - lastSuspendTs; } } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index e0a295b50ebe8..56afe5f3404ef 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -901,6 +901,12 @@ else if (qryResults.cancelled()) if (msg != null) sendNextPage(node, msg); } + catch (Throwable e) { + if (res.qryInfo() != null) + h2.heavyQueriesTracker().stopTracking(res.qryInfo(), e); + + throw e; + } finally { try { res.unlockTables(); diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java index 0794e8d968e92..a7faf240c2da8 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java @@ -20,7 +20,6 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import org.apache.ignite.Ignite; @@ -37,6 +36,7 @@ import org.apache.ignite.internal.processors.query.h2.H2QueryInfo; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.ListeningTestLogger; @@ -57,9 +57,6 @@ public class LongRunningQueryTest extends AbstractIndexingCommonTest { /** External wait time. */ private static final int EXT_WAIT_TIME = 2000; - /** External wait time relative error. */ - private static final int EXT_WAIT_REL_ERR = (int)(EXT_WAIT_TIME * 0.01); - /** Page size. */ private int pageSize = DEFAULT_PAGE_SIZE; @@ -190,7 +187,7 @@ public void testLongLocalLazy() { * the execution of distributed queries. */ @Test - public void testDistributedLazyWithExternalWait() throws InterruptedException { + public void testDistributedLazyWithExternalWait() { local = false; lazy = true; @@ -202,7 +199,7 @@ public void testDistributedLazyWithExternalWait() throws InterruptedException { * the execution of local queries. */ @Test - public void testlocalLazyWithExternalWait() throws InterruptedException { + public void testlocalLazyWithExternalWait() { local = true; lazy = true; @@ -395,7 +392,7 @@ private FieldsQueryCursor> sql(String cacheName, String sql, Object... a } /** */ - public void checkLazyWithExternalWait() throws InterruptedException { + public void checkLazyWithExternalWait() { pageSize = 1; LogListener lsnr = LogListener @@ -409,17 +406,16 @@ public void checkLazyWithExternalWait() throws InterruptedException { it.next(); - Thread.sleep(EXT_WAIT_TIME); - - it.next(); + long sleepStartTs = U.currentTimeMillis(); - ConcurrentHashMap qrys = GridTestUtils.getFieldValue(heavyQueriesTracker(), "qrys"); + while (U.currentTimeMillis() - sleepStartTs <= EXT_WAIT_TIME) + doSleep(100L); - H2QueryInfo qry = (H2QueryInfo)qrys.keySet().iterator().next(); + it.next(); - long extWait = GridTestUtils.getFieldValue(qry, "extWait"); + H2QueryInfo qry = (H2QueryInfo)heavyQueriesTracker().getQueries().iterator().next(); - assertTrue(extWait >= EXT_WAIT_TIME - EXT_WAIT_REL_ERR); + assertTrue(qry.extWait() >= EXT_WAIT_TIME); assertFalse(lsnr.check()); } From 1454fc0997073f3018216a4579cc13f1bb682365 Mon Sep 17 00:00:00 2001 From: oleg-vlsk Date: Thu, 25 Jul 2024 09:48:09 +1000 Subject: [PATCH 14/14] IGNITE-17086 Minor refactoring of GridMapQueryExecutor#onNextPageRequest --- .../query/h2/twostep/GridMapQueryExecutor.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 56afe5f3404ef..78a0e0aeb9657 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -866,13 +866,15 @@ else if (nodeRess.cancelled(reqId)) { final MapQueryResults qryResults = nodeRess.get(reqId, req.segmentId()); + MapQueryResult res = null; + if (qryResults == null) sendError(node, reqId, new CacheException("No query result found for request: " + req)); else if (qryResults.cancelled()) sendQueryCancel(node, reqId); else { try { - MapQueryResult res = qryResults.result(req.query()); + res = qryResults.result(req.query()); assert res != null; @@ -901,12 +903,6 @@ else if (qryResults.cancelled()) if (msg != null) sendNextPage(node, msg); } - catch (Throwable e) { - if (res.qryInfo() != null) - h2.heavyQueriesTracker().stopTracking(res.qryInfo(), e); - - throw e; - } finally { try { res.unlockTables(); @@ -917,6 +913,9 @@ else if (qryResults.cancelled()) } } catch (Exception e) { + if (res.qryInfo() != null) + h2.heavyQueriesTracker().stopTracking(res.qryInfo(), e); + QueryRetryException retryEx = X.cause(e, QueryRetryException.class); if (retryEx != null)