Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2051,6 +2051,12 @@ public class Config extends ConfigBase {
+ "old records will be discarded."})
public static int max_streaming_task_show_count = 100;

@ConfField(masterOnly = true, mutable = true, description = {
"Max auto resume retry count for streaming jobs. "
+ "After exceeding, the failure reason is rewritten to CANNOT_RESUME_ERR "
+ "and the job requires manual intervention."})
public static int streaming_job_max_auto_resume_count = 10;

/* job test config */
/**
* If set to true, we will allow the interval unit to be set to second, when creating a recurring job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
@SerializedName(value = "ctc")
protected AtomicLong canceledTaskCount = new AtomicLong(0);

@SerializedName(value = "ltst")
protected long lastTaskSuccessTime = 0;

public AbstractJob() {
}

Expand Down Expand Up @@ -363,6 +366,7 @@ public void onTaskFail(T task) throws JobException {
@Override
public void onTaskSuccess(T task) throws JobException {
succeedTaskCount.incrementAndGet();
lastTaskSuccessTime = System.currentTimeMillis();
updateJobStatusIfEnd(true, task.getTaskType());
runningTasks.remove(task);
logUpdateOperation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl
.add(new Column("ErrorMsg", ScalarType.createStringType()))
.add(new Column("JobRuntimeMsg", ScalarType.createStringType()))
.add(new Column("Lag", ScalarType.createStringType()))
.add(new Column("LastTaskSuccessTime", ScalarType.createStringType()))
.build();

public static final ShowResultSetMetaData TASK_META_DATA =
Expand Down Expand Up @@ -571,6 +572,8 @@ public TRow getTvfInfo() {
trow.addToColumnValue(new TCell().setStringVal(failMsg == null ? FeConstants.null_string : failMsg.getMsg()));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(lastTaskSuccessTime > 0
? TimeUtils.longToTimeString(lastTaskSuccessTime) : FeConstants.null_string));
return trow;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,18 @@ public void resetFailureInfo(FailureReason reason) {
this.setFailureReason(reason);
// Currently, only delayMsg is present here, which needs to be cleared when the status changes.
this.setJobRuntimeMsg("");
// Clearing the failure (reason == null) means either a task just succeeded
// or the user is RESUME-ing the job — in both cases, the retry budget should
// be restored so exponential backoff starts fresh, aligning with the
// semantics of a fresh job's first failure.
if (reason == null) {
this.setAutoResumeCount(0);
this.setLatestAutoResumeTimestamp(0);
}
}

public long getMaxAutoResumeCount() {
return Config.streaming_job_max_auto_resume_count;
}

@Override
Expand Down Expand Up @@ -643,6 +655,7 @@ public void onStreamTaskSuccess(AbstractStreamingTask task) throws JobException
try {
resetFailureInfo(null);
succeedTaskCount.incrementAndGet();
lastTaskSuccessTime = System.currentTimeMillis();
//update metric
if (MetricRepo.isInit) {
MetricRepo.COUNTER_STREAMING_JOB_TASK_EXECUTE_COUNT.increase(1L);
Expand Down Expand Up @@ -781,6 +794,7 @@ public void replayOnUpdated(StreamingInsertJob replayJob) {
setSucceedTaskCount(replayJob.getSucceedTaskCount());
setFailedTaskCount(replayJob.getFailedTaskCount());
setCanceledTaskCount(replayJob.getCanceledTaskCount());
setLastTaskSuccessTime(replayJob.getLastTaskSuccessTime());
}

/**
Expand Down Expand Up @@ -873,12 +887,19 @@ public TRow getTvfInfo() {
trow.addToColumnValue(new TCell().setStringVal(
nonTxnJobStatistic == null ? "" : nonTxnJobStatistic.toJson()));
}
// ErrorMsg column returns the full FailureReason as JSON ({"code":"...","msg":"..."})
// for streaming insert jobs only, so the frontend can switch on the structured code
// instead of grepping the message. One-time insert jobs (InsertJob.getTvfInfo) keep
// returning the plain FailMsg.msg string — they use a different error taxonomy
// (FailMsg/CancelType) that is out of scope for this PR.
trow.addToColumnValue(new TCell().setStringVal(failureReason == null
? "" : failureReason.getMsg()));
? "" : GsonUtils.GSON.toJson(failureReason)));
trow.addToColumnValue(new TCell().setStringVal(jobRuntimeMsg == null
? "" : jobRuntimeMsg));
trow.addToColumnValue(new TCell().setStringVal(
offsetProvider != null ? offsetProvider.getLag() : ""));
trow.addToColumnValue(new TCell().setStringVal(lastTaskSuccessTime > 0
? TimeUtils.longToTimeString(lastTaskSuccessTime) : ""));
return trow;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ private void handlePendingState() throws JobException {
streamingInsertJob.createStreamingTask();
streamingInsertJob.setSampleStartTime(System.currentTimeMillis());
streamingInsertJob.updateJobStatus(JobStatus.RUNNING);
streamingInsertJob.setAutoResumeCount(0);
}

private void handleRunningState() throws JobException {
Expand All @@ -85,25 +84,37 @@ private void handleRunningState() throws JobException {

private void autoResumeHandler() throws JobException {
final FailureReason failureReason = streamingInsertJob.getFailureReason();
final long latestAutoResumeTimestamp = streamingInsertJob.getLatestAutoResumeTimestamp();
if (failureReason == null
|| failureReason.getCode() == InternalErrorCode.MANUAL_PAUSE_ERR
|| failureReason.getCode() == InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR
|| failureReason.getCode() == InternalErrorCode.CANNOT_RESUME_ERR) {
return;
}

final long autoResumeCount = streamingInsertJob.getAutoResumeCount();
final long current = System.currentTimeMillis();
final long maxAutoResumeCount = streamingInsertJob.getMaxAutoResumeCount();
// Retry budget exhausted: rewrite the failure reason so this handler
// short-circuits on subsequent ticks and the job effectively requires
// manual RESUME to try again.
if (autoResumeCount >= maxAutoResumeCount) {
streamingInsertJob.setFailureReason(new FailureReason(
InternalErrorCode.CANNOT_RESUME_ERR,
"Auto resume failed after " + autoResumeCount
+ " attempts. Last error: " + failureReason.getMsg()));
return;
}

if (failureReason != null
&& failureReason.getCode() != InternalErrorCode.MANUAL_PAUSE_ERR
&& failureReason.getCode() != InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR
&& failureReason.getCode() != InternalErrorCode.CANNOT_RESUME_ERR) {
long autoResumeIntervalTimeSec = autoResumeCount < 5
? Math.min((long) Math.pow(2, autoResumeCount) * BACK_OFF_BASIC_TIME_SEC,
MAX_BACK_OFF_TIME_SEC) : MAX_BACK_OFF_TIME_SEC;
if (current - latestAutoResumeTimestamp > autoResumeIntervalTimeSec * 1000L) {
streamingInsertJob.setLatestAutoResumeTimestamp(current);
if (autoResumeCount < Long.MAX_VALUE) {
streamingInsertJob.setAutoResumeCount(autoResumeCount + 1);
}
streamingInsertJob.updateJobStatus(JobStatus.PENDING);
return;
final long latestAutoResumeTimestamp = streamingInsertJob.getLatestAutoResumeTimestamp();
final long current = System.currentTimeMillis();
long autoResumeIntervalTimeSec = autoResumeCount < 5
? Math.min((long) Math.pow(2, autoResumeCount) * BACK_OFF_BASIC_TIME_SEC,
MAX_BACK_OFF_TIME_SEC) : MAX_BACK_OFF_TIME_SEC;
if (current - latestAutoResumeTimestamp > autoResumeIntervalTimeSec * 1000L) {
streamingInsertJob.setLatestAutoResumeTimestamp(current);
if (autoResumeCount < Long.MAX_VALUE) {
streamingInsertJob.setAutoResumeCount(autoResumeCount + 1);
}
streamingInsertJob.updateJobStatus(JobStatus.PENDING);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ suite("test_streaming_insert_job") {
def jobResult = sql """select * from jobs("type"="insert") where Name='${jobName}'"""
log.info("show success job: " + jobResult)

// LastTaskSuccessTime should be populated after a successful task commit.
def lastSuccessTime = sql """select LastTaskSuccessTime from jobs("type"="insert") where Name='${jobName}'"""
assert lastSuccessTime.get(0).get(0) != null && lastSuccessTime.get(0).get(0) != "",
"LastTaskSuccessTime should be set after a successful task, got: " + lastSuccessTime.get(0).get(0)

qt_select """ SELECT * FROM ${tableName} order by c1 """

sql """
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.


import org.awaitility.Awaitility

import static java.util.concurrent.TimeUnit.SECONDS

// Verify that after exceeding streaming_job_max_auto_resume_count, the streaming
// job's failure reason is rewritten to CANNOT_RESUME_ERR so the auto-resume
// handler stops trying. The job stays in PAUSED but effectively requires a
// manual RESUME to try again.
suite("test_streaming_job_max_retry", "nonConcurrent") {
def tableName = "test_streaming_job_max_retry_tbl"
def jobName = "test_streaming_job_max_retry_job"

sql """drop table if exists `${tableName}` force"""
sql """
DROP JOB IF EXISTS where jobname = '${jobName}'
"""

sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`c1` int NULL,
`c2` string NULL,
`c3` int NULL,
) ENGINE=OLAP
DUPLICATE KEY(`c1`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`c1`) BUCKETS 3
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""

// Capture the original value so other suites (or a non-default env) are not clobbered.
def originalMaxRetry = sql """ADMIN SHOW FRONTEND CONFIG LIKE 'streaming_job_max_auto_resume_count'"""
def originalMaxRetryValue = originalMaxRetry.get(0).get(1)
// Shrink the retry budget so the test finishes quickly.
sql "ADMIN SET FRONTEND CONFIG ('streaming_job_max_auto_resume_count' = '3')"

GetDebugPoint().enableDebugPointForAllFEs('StreamingJob.scheduleTask.exception')
try {
sql """
CREATE JOB ${jobName}
ON STREAMING DO INSERT INTO ${tableName}
SELECT * FROM S3
(
"uri" = "s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
"format" = "csv",
"provider" = "${getS3Provider()}",
"column_separator" = ",",
"s3.endpoint" = "${getS3Endpoint()}",
"s3.region" = "${getS3Region()}",
"s3.access_key" = "${getS3AK()}",
"s3.secret_key" = "${getS3SK()}"
);
"""

// Wait until the retry budget is exhausted and the failure reason
// is rewritten to CANNOT_RESUME_ERR. Status stays PAUSED throughout.
try {
Awaitility.await().atMost(300, SECONDS)
.pollInterval(2, SECONDS).until(
{
def jobRes = sql """ select Status, ErrorMsg from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
log.info("jobRes waiting for CANNOT_RESUME_ERR: " + jobRes)
if (jobRes.size() != 1 || !'PAUSED'.equals(jobRes.get(0).get(0))) {
return false
}
def errMsg = jobRes.get(0).get(1) as String
return errMsg != null && errMsg.contains("CANNOT_RESUME_ERR")
}
)
} catch (Exception ex) {
def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'"""
log.info("show job: " + showjob)
throw ex
}

// ErrorMsg is a FailureReason JSON blob containing both code and msg,
// so the frontend can switch on the structured code instead of grepping the message.
def jobInfo = sql """select Status, ErrorMsg from jobs("type"="insert") where Name='${jobName}'"""
assert jobInfo.get(0).get(0) == "PAUSED"
def errorMsgJson = jobInfo.get(0).get(1) as String
assert errorMsgJson.contains("\"code\"") && errorMsgJson.contains("\"msg\""),
"ErrorMsg should be a FailureReason JSON with code and msg fields, got: " + errorMsgJson
assert errorMsgJson.contains("CANNOT_RESUME_ERR"),
"ErrorMsg should contain CANNOT_RESUME_ERR code, got: " + errorMsgJson
assert errorMsgJson.contains("Auto resume failed after"),
"ErrorMsg should contain the burn-out message, got: " + errorMsgJson

} finally {
GetDebugPoint().disableDebugPointForAllFEs('StreamingJob.scheduleTask.exception')
sql "ADMIN SET FRONTEND CONFIG ('streaming_job_max_auto_resume_count' = '${originalMaxRetryValue}')"
sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ suite("test_streaming_job_restart_fe", "docker") {
throw ex;
}

// Capture LastTaskSuccessTime before restart to verify it survives editlog replay.
def lastSuccessBefore = sql """
select LastTaskSuccessTime from jobs("type"="insert") where Name='${jobName}'
"""
assert lastSuccessBefore.get(0).get(0) != null && lastSuccessBefore.get(0).get(0) != "",
"LastTaskSuccessTime should be set before restart"

sql """
PAUSE JOB where jobname = '${jobName}'
"""
Expand Down Expand Up @@ -124,6 +131,13 @@ suite("test_streaming_job_restart_fe", "docker") {
assert loadStatAfter.fileNumber == 2
assert loadStatAfter.fileSize == 256

// Verify LastTaskSuccessTime survived the FE restart (replayOnUpdated must copy it).
def lastSuccessAfter = sql """
select LastTaskSuccessTime from jobs("type"="insert") where Name='${jobName}'
"""
assert lastSuccessAfter.get(0).get(0) != null && lastSuccessAfter.get(0).get(0) != "",
"LastTaskSuccessTime should survive FE restart, got: " + lastSuccessAfter.get(0).get(0)

sql """ DROP JOB IF EXISTS where jobname = '${jobName}' """
sql """drop table if exists `${tableName}` force"""
}
Expand Down
Loading