diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java index 8aeb56c7246b68..df26c19eae3158 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java @@ -97,11 +97,18 @@ default void afterComplete(AbstractInsertExecutor insertExecutor, StmtExecutor e */ public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String labelName, NereidsPlanner planner, Optional insertCtx, boolean emptyInsert, long jobId) { + this(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId, false); + } + + /** + * constructor + */ + public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String labelName, NereidsPlanner planner, + Optional insertCtx, boolean emptyInsert, long jobId, boolean needRegister) { this.ctx = ctx; this.database = table.getDatabase(); this.insertLoadJob = new InsertLoadJob(database.getId(), labelName, jobId); - // Do not add load job if job id is -1. - if (jobId != -1) { + if (needRegister) { ctx.getEnv().getLoadManager().addLoadJob(insertLoadJob); } this.coordinator = EnvFactory.getInstance().createCoordinator( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index c2fe0b555a2100..b5eb1f3b1d7f8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -63,6 +63,7 @@ import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel; import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync; import org.apache.doris.nereids.trees.plans.commands.NeedAuditEncryption; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.commands.insert.AbstractInsertExecutor.InsertExecutorListener; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink; @@ -447,13 +448,22 @@ ExecutorFactory selectInsertExecutorFactory( if (getLogicalQuery().containsType(InlineTable.class)) { jobId = -1; } + // Do not register internal group commit loads to LoadManager. + // Internal group commit is identified by DMLCommandType.GROUP_COMMIT, which is set + // by the parser when the target table is specified via tableId (doris_internal_table_id). + // The actual commit is managed by BE group commit mechanism, so these jobs will + // never transition to a completed state through FE, causing a memory leak if registered. + if (((PhysicalOlapTableSink) physicalSink).getDmlCommandType() == DMLCommandType.GROUP_COMMIT) { + jobId = -1; + } if (targetTableIf instanceof RemoteDorisExternalTable) { executorFactory = ExecutorFactory.from( planner, dataSink, physicalSink, () -> new RemoteOlapInsertExecutor( - ctx, (RemoteOlapTable) olapTable, label, planner, insertCtx, emptyInsert, jobId) + ctx, (RemoteOlapTable) olapTable, label, planner, insertCtx, emptyInsert, + jobId) ); } else { executorFactory = ExecutorFactory.from( @@ -461,7 +471,7 @@ ExecutorFactory selectInsertExecutorFactory( dataSink, physicalSink, () -> new OlapInsertExecutor( - ctx, olapTable, label, planner, insertCtx, emptyInsert, jobId) + ctx, olapTable, label, planner, insertCtx, emptyInsert, jobId, jobId != -1) ); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java index 6532aee8008089..63e6e51efeb90b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java @@ -88,7 +88,16 @@ public class OlapInsertExecutor extends AbstractInsertExecutor { public OlapInsertExecutor(ConnectContext ctx, Table table, String labelName, NereidsPlanner planner, Optional insertCtx, boolean emptyInsert, long jobId) { - super(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId); + this(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId, false); + } + + /** + * constructor + */ + public OlapInsertExecutor(ConnectContext ctx, Table table, + String labelName, NereidsPlanner planner, Optional insertCtx, boolean emptyInsert, + long jobId, boolean needRegister) { + super(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId, needRegister); this.olapTable = (OlapTable) table; } @@ -307,41 +316,36 @@ protected void onFail(Throwable t) { } String finalErrorMsg = InsertUtils.getFinalErrorMsg(errMsg, firstErrorMsgPart, urlPart); ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, finalErrorMsg); + recordLoadJob(ctx.getCurrentUserIdentity()); } - @Override - protected void afterExec(StmtExecutor executor) { - // Go here, which means: - // 1. transaction is finished successfully (COMMITTED or VISIBLE), or - // 2. transaction failed but Config.using_old_load_usage_pattern is true. - // we will record the load job info for these 2 cases - try { - // the statement parsed by Nereids is saved at executor::parsedStmt. - StatementBase statement = executor.getParsedStmt(); - UserIdentity userIdentity; - //if we use job scheduler, parse statement will not set user identity,so we need to get it from context - if (null == statement) { - userIdentity = ctx.getCurrentUserIdentity(); - } else { - userIdentity = statement.getUserInfo(); - } - EtlJobType etlJobType = EtlJobType.INSERT; - // Do not register job if job id is -1. - if (!Config.enable_nereids_load && jobId != -1) { - // just record for loadv2 here + private void recordLoadJob(UserIdentity userIdentity) { + if (!Config.enable_nereids_load && jobId != -1) { + try { ctx.getEnv().getLoadManager() .recordFinishedLoadJob(labelName, txnId, database.getFullName(), - table.getId(), - etlJobType, createTime, errMsg, - coordinator.getTrackingUrl(), - coordinator.getFirstErrorMsg(), + table.getId(), EtlJobType.INSERT, createTime, errMsg, + coordinator.getTrackingUrl(), coordinator.getFirstErrorMsg(), userIdentity, insertLoadJob.getId()); + } catch (MetaNotFoundException e) { + LOG.warn("Record info of insert load with error {}", e.getMessage(), e); + errMsg = "Record info of insert load with error " + e.getMessage(); } - } catch (MetaNotFoundException e) { - LOG.warn("Record info of insert load with error {}", e.getMessage(), e); - errMsg = "Record info of insert load with error " + e.getMessage(); } + } + @Override + protected void afterExec(StmtExecutor executor) { + // the statement parsed by Nereids is saved at executor::parsedStmt. + StatementBase statement = executor.getParsedStmt(); + UserIdentity userIdentity; + // if we use job scheduler, parse statement will not set user identity, so we need to get it from context + if (null == statement) { + userIdentity = ctx.getCurrentUserIdentity(); + } else { + userIdentity = statement.getUserInfo(); + } + recordLoadJob(userIdentity); setReturnInfo(); } diff --git a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy index 7e929450cd9fcb..d1a068d4ab0cb7 100644 --- a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy +++ b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy @@ -138,6 +138,12 @@ suite("test_iceberg_write_insert", "p0,external") { """ order_qt_q01 """ select * from iceberg_all_types_${format_compression}; """ + // external table insert should not register a load job in LoadManager + sql """ SWITCH internal """ + def showLoadResult = sql """ SHOW LOAD """ + assertEquals(0, showLoadResult.size()) + sql """ SWITCH ${catalog_name} """ + sql """ USE iceberg_write_test """ sql """ INSERT INTO iceberg_all_types_${format_compression} diff --git a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy index b83ad4f0b6f822..b160b478c8e06c 100644 --- a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy +++ b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy @@ -251,6 +251,10 @@ suite("test_group_commit_http_stream") { getRowCount(19) qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; " + + // group commit http stream (SELECT * FROM http_stream(...)) should not register load jobs + def showLoadResult = sql """ SHOW LOAD FROM ${db} """ + assertEquals(0, showLoadResult.size()) } finally { // try_sql("DROP TABLE ${tableName}") } diff --git a/regression-test/suites/load_p0/insert/test_insert_statistic.groovy b/regression-test/suites/load_p0/insert/test_insert_statistic.groovy index b70146d8eeb35f..62e8201f00fa9c 100644 --- a/regression-test/suites/load_p0/insert/test_insert_statistic.groovy +++ b/regression-test/suites/load_p0/insert/test_insert_statistic.groovy @@ -75,10 +75,11 @@ suite("test_insert_statistic", "p0") { "replication_num"="1" ); """ - sql """ - INSERT INTO ${insert_tbl}_2 select * from ${insert_tbl}_1 - """ - result = sql "SHOW LOAD FROM ${dbName}" + def selectLabel = "label_insert_select_" + System.currentTimeMillis() + sql """ INSERT INTO ${insert_tbl}_2 WITH LABEL ${selectLabel} select * from ${insert_tbl}_1 """ + result = sql "SHOW LOAD FROM ${dbName} WHERE LABEL = '${selectLabel}'" + assertEquals(1, result.size()) + assertEquals("FINISHED", result[0][2]) logger.info("JobDetails: " + result[0][14]) def json = parseJson(result[0][14]) assertEquals(json.ScannedRows, 3) @@ -86,6 +87,27 @@ suite("test_insert_statistic", "p0") { assertEquals(json.FileSize, 0) assertTrue(json.LoadBytes > 0) + // failed insert into select → job should be CANCELLED + sql """ DROP TABLE IF EXISTS ${insert_tbl}_fail""" + sql """ + CREATE TABLE ${insert_tbl}_fail ( + `k1` varchar(3) NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ("replication_num"="1"); + """ + sql """ set enable_insert_strict = true """ + try { + sql """ INSERT INTO ${insert_tbl}_fail SELECT 'this_value_is_too_long_for_varchar3' """ + } catch (Exception e) { + logger.info("Expected insert failure: " + e.getMessage()) + } + sql """ set enable_insert_strict = false """ + result = sql "SHOW LOAD FROM ${dbName}" + def cancelledJob = result.find { it[2] == "CANCELLED" } + assertNotNull(cancelledJob, "Expected a CANCELLED load job for failed insert") + // insert into s3 tvf String ak = getS3AK() String sk = getS3SK() @@ -107,7 +129,8 @@ suite("test_insert_statistic", "p0") { DISTRIBUTED BY HASH(`k1`) BUCKETS 3 PROPERTIES ("replication_allocation" = "tag.location.default: 1"); """ - sql """ insert into ${insert_tbl}_3 select * from S3 ( + def s3Label = "label_insert_s3_" + System.currentTimeMillis() + sql """ insert into ${insert_tbl}_3 WITH LABEL ${s3Label} select * from S3 ( "uri" = "http://${bucket}.${s3_endpoint}/regression/load/data/empty_field_as_null.csv", "ACCESS_KEY"= "${ak}", "SECRET_KEY" = "${sk}", @@ -117,12 +140,13 @@ suite("test_insert_statistic", "p0") { "region" = "${region}" ); """ - result = sql "SHOW LOAD FROM ${dbName}" - logger.info("JobDetails: " + result[1][14]) - json = parseJson(result[1][14]) + result = sql "SHOW LOAD FROM ${dbName} WHERE LABEL = '${s3Label}'" + assertEquals(1, result.size()) + assertEquals("FINISHED", result[0][2]) + logger.info("JobDetails: " + result[0][14]) + json = parseJson(result[0][14]) assertEquals(json.ScannedRows, 2) assertEquals(json.FileNumber, 1) assertEquals(json.FileSize, 86) - assertEquals(result.size(), 2) assertTrue(json.LoadBytes > 0) } \ No newline at end of file