Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,18 @@ default void afterComplete(AbstractInsertExecutor insertExecutor, StmtExecutor e
*/
public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx, boolean emptyInsert, long jobId) {
this(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId, false);
}

/**
* constructor
*/
public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx, boolean emptyInsert, long jobId, boolean needRegister) {
this.ctx = ctx;
this.database = table.getDatabase();
this.insertLoadJob = new InsertLoadJob(database.getId(), labelName, jobId);
// Do not add load job if job id is -1.
if (jobId != -1) {
if (needRegister) {
ctx.getEnv().getLoadManager().addLoadJob(insertLoadJob);
}
this.coordinator = EnvFactory.getInstance().createCoordinator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,21 +447,31 @@ ExecutorFactory selectInsertExecutorFactory(
if (getLogicalQuery().containsType(InlineTable.class)) {
jobId = -1;
}
// Do not register internal group commit loads to LoadManager.
// Internal group commit uses 'SELECT * FROM group_commit(...)' as source,
// and the actual commit is managed by BE group commit mechanism.
// These jobs will never transition to a completed state through FE,
// causing a memory leak if registered.
if (getAllTVFRelation().stream().anyMatch(
rel -> "group_commit".equals(rel.getFunctionName()))) {
jobId = -1;
}
if (targetTableIf instanceof RemoteDorisExternalTable) {
executorFactory = ExecutorFactory.from(
planner,
dataSink,
physicalSink,
() -> new RemoteOlapInsertExecutor(
ctx, (RemoteOlapTable) olapTable, label, planner, insertCtx, emptyInsert, jobId)
ctx, (RemoteOlapTable) olapTable, label, planner, insertCtx, emptyInsert,
jobId)
);
} else {
executorFactory = ExecutorFactory.from(
planner,
dataSink,
physicalSink,
() -> new OlapInsertExecutor(
ctx, olapTable, label, planner, insertCtx, emptyInsert, jobId)
ctx, olapTable, label, planner, insertCtx, emptyInsert, jobId, jobId != -1)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ public class OlapInsertExecutor extends AbstractInsertExecutor {
*/
public OlapInsertExecutor(ConnectContext ctx, Table table,
String labelName, NereidsPlanner planner, Optional<InsertCommandContext> insertCtx, boolean emptyInsert,
long jobId) {
super(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId);
long jobId, boolean needRegister) {
super(ctx, table, labelName, planner, insertCtx, emptyInsert, jobId, needRegister);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing OlapInsertExecutor to an 8-argument constructor breaks the subclasses that still call the old 7-argument super(...). In this patch, OlapTxnInsertExecutor (line 46), OlapGroupCommitInsertExecutor (line 69), and RemoteOlapInsertExecutor (line 81) still invoke the removed signature, so FE no longer compiles. Please either keep the old overload or update all subclasses in the same change.

this.olapTable = (OlapTable) table;
}

Expand Down Expand Up @@ -307,41 +307,36 @@ protected void onFail(Throwable t) {
}
String finalErrorMsg = InsertUtils.getFinalErrorMsg(errMsg, firstErrorMsgPart, urlPart);
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, finalErrorMsg);
recordLoadJob(ctx.getCurrentUserIdentity());
}

@Override
protected void afterExec(StmtExecutor executor) {
// Go here, which means:
// 1. transaction is finished successfully (COMMITTED or VISIBLE), or
// 2. transaction failed but Config.using_old_load_usage_pattern is true.
// we will record the load job info for these 2 cases
try {
// the statement parsed by Nereids is saved at executor::parsedStmt.
StatementBase statement = executor.getParsedStmt();
UserIdentity userIdentity;
//if we use job scheduler, parse statement will not set user identity,so we need to get it from context
if (null == statement) {
userIdentity = ctx.getCurrentUserIdentity();
} else {
userIdentity = statement.getUserInfo();
}
EtlJobType etlJobType = EtlJobType.INSERT;
// Do not register job if job id is -1.
if (!Config.enable_nereids_load && jobId != -1) {
// just record for loadv2 here
private void recordLoadJob(UserIdentity userIdentity) {
if (!Config.enable_nereids_load && jobId != -1) {
try {
ctx.getEnv().getLoadManager()
.recordFinishedLoadJob(labelName, txnId, database.getFullName(),
table.getId(),
etlJobType, createTime, errMsg,
coordinator.getTrackingUrl(),
coordinator.getFirstErrorMsg(),
table.getId(), EtlJobType.INSERT, createTime, errMsg,
coordinator.getTrackingUrl(), coordinator.getFirstErrorMsg(),
userIdentity, insertLoadJob.getId());
} catch (MetaNotFoundException e) {
LOG.warn("Record info of insert load with error {}", e.getMessage(), e);
errMsg = "Record info of insert load with error " + e.getMessage();
}
} catch (MetaNotFoundException e) {
LOG.warn("Record info of insert load with error {}", e.getMessage(), e);
errMsg = "Record info of insert load with error " + e.getMessage();
}
}

@Override
protected void afterExec(StmtExecutor executor) {
// the statement parsed by Nereids is saved at executor::parsedStmt.
StatementBase statement = executor.getParsedStmt();
UserIdentity userIdentity;
// if we use job scheduler, parse statement will not set user identity, so we need to get it from context
if (null == statement) {
userIdentity = ctx.getCurrentUserIdentity();
} else {
userIdentity = statement.getUserInfo();
}
recordLoadJob(userIdentity);
setReturnInfo();
}

Expand Down
Loading