diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index fd45621e09d298..1a65000341545d 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2051,6 +2051,12 @@ public class Config extends ConfigBase { + "old records will be discarded."}) public static int max_streaming_task_show_count = 100; + @ConfField(masterOnly = true, mutable = true, description = { + "Max auto resume retry count for streaming jobs. " + + "After exceeding, the failure reason is rewritten to CANNOT_RESUME_ERR " + + "and the job requires manual intervention."}) + public static int streaming_job_max_auto_resume_count = 10; + /* job test config */ /** * If set to true, we will allow the interval unit to be set to second, when creating a recurring job. diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 0b2760a0c364d0..eb2ffed7b891fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -108,6 +108,9 @@ public abstract class AbstractJob implements Job> impl .add(new Column("ErrorMsg", ScalarType.createStringType())) .add(new Column("JobRuntimeMsg", ScalarType.createStringType())) .add(new Column("Lag", ScalarType.createStringType())) + .add(new Column("LastTaskSuccessTime", ScalarType.createStringType())) .build(); public static final ShowResultSetMetaData TASK_META_DATA = @@ -571,6 +572,8 @@ public TRow getTvfInfo() { trow.addToColumnValue(new TCell().setStringVal(failMsg == null ? FeConstants.null_string : failMsg.getMsg())); trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); + trow.addToColumnValue(new TCell().setStringVal(lastTaskSuccessTime > 0 + ? TimeUtils.longToTimeString(lastTaskSuccessTime) : FeConstants.null_string)); return trow; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index cea2c54b5872ab..bd220a70f1b7e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -435,6 +435,18 @@ public void resetFailureInfo(FailureReason reason) { this.setFailureReason(reason); // Currently, only delayMsg is present here, which needs to be cleared when the status changes. this.setJobRuntimeMsg(""); + // Clearing the failure (reason == null) means either a task just succeeded + // or the user is RESUME-ing the job — in both cases, the retry budget should + // be restored so exponential backoff starts fresh, aligning with the + // semantics of a fresh job's first failure. + if (reason == null) { + this.setAutoResumeCount(0); + this.setLatestAutoResumeTimestamp(0); + } + } + + public long getMaxAutoResumeCount() { + return Config.streaming_job_max_auto_resume_count; } @Override @@ -643,6 +655,7 @@ public void onStreamTaskSuccess(AbstractStreamingTask task) throws JobException try { resetFailureInfo(null); succeedTaskCount.incrementAndGet(); + lastTaskSuccessTime = System.currentTimeMillis(); //update metric if (MetricRepo.isInit) { MetricRepo.COUNTER_STREAMING_JOB_TASK_EXECUTE_COUNT.increase(1L); @@ -781,6 +794,7 @@ public void replayOnUpdated(StreamingInsertJob replayJob) { setSucceedTaskCount(replayJob.getSucceedTaskCount()); setFailedTaskCount(replayJob.getFailedTaskCount()); setCanceledTaskCount(replayJob.getCanceledTaskCount()); + setLastTaskSuccessTime(replayJob.getLastTaskSuccessTime()); } /** @@ -873,12 +887,19 @@ public TRow getTvfInfo() { trow.addToColumnValue(new TCell().setStringVal( nonTxnJobStatistic == null ? "" : nonTxnJobStatistic.toJson())); } + // ErrorMsg column returns the full FailureReason as JSON ({"code":"...","msg":"..."}) + // for streaming insert jobs only, so the frontend can switch on the structured code + // instead of grepping the message. One-time insert jobs (InsertJob.getTvfInfo) keep + // returning the plain FailMsg.msg string — they use a different error taxonomy + // (FailMsg/CancelType) that is out of scope for this PR. trow.addToColumnValue(new TCell().setStringVal(failureReason == null - ? "" : failureReason.getMsg())); + ? "" : GsonUtils.GSON.toJson(failureReason))); trow.addToColumnValue(new TCell().setStringVal(jobRuntimeMsg == null ? "" : jobRuntimeMsg)); trow.addToColumnValue(new TCell().setStringVal( offsetProvider != null ? offsetProvider.getLag() : "")); + trow.addToColumnValue(new TCell().setStringVal(lastTaskSuccessTime > 0 + ? TimeUtils.longToTimeString(lastTaskSuccessTime) : "")); return trow; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java index 95ace617a7e5e1..b1948dd4bd51e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java @@ -75,7 +75,6 @@ private void handlePendingState() throws JobException { streamingInsertJob.createStreamingTask(); streamingInsertJob.setSampleStartTime(System.currentTimeMillis()); streamingInsertJob.updateJobStatus(JobStatus.RUNNING); - streamingInsertJob.setAutoResumeCount(0); } private void handleRunningState() throws JobException { @@ -85,25 +84,37 @@ private void handleRunningState() throws JobException { private void autoResumeHandler() throws JobException { final FailureReason failureReason = streamingInsertJob.getFailureReason(); - final long latestAutoResumeTimestamp = streamingInsertJob.getLatestAutoResumeTimestamp(); + if (failureReason == null + || failureReason.getCode() == InternalErrorCode.MANUAL_PAUSE_ERR + || failureReason.getCode() == InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR + || failureReason.getCode() == InternalErrorCode.CANNOT_RESUME_ERR) { + return; + } + final long autoResumeCount = streamingInsertJob.getAutoResumeCount(); - final long current = System.currentTimeMillis(); + final long maxAutoResumeCount = streamingInsertJob.getMaxAutoResumeCount(); + // Retry budget exhausted: rewrite the failure reason so this handler + // short-circuits on subsequent ticks and the job effectively requires + // manual RESUME to try again. + if (autoResumeCount >= maxAutoResumeCount) { + streamingInsertJob.setFailureReason(new FailureReason( + InternalErrorCode.CANNOT_RESUME_ERR, + "Auto resume failed after " + autoResumeCount + + " attempts. Last error: " + failureReason.getMsg())); + return; + } - if (failureReason != null - && failureReason.getCode() != InternalErrorCode.MANUAL_PAUSE_ERR - && failureReason.getCode() != InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR - && failureReason.getCode() != InternalErrorCode.CANNOT_RESUME_ERR) { - long autoResumeIntervalTimeSec = autoResumeCount < 5 - ? Math.min((long) Math.pow(2, autoResumeCount) * BACK_OFF_BASIC_TIME_SEC, - MAX_BACK_OFF_TIME_SEC) : MAX_BACK_OFF_TIME_SEC; - if (current - latestAutoResumeTimestamp > autoResumeIntervalTimeSec * 1000L) { - streamingInsertJob.setLatestAutoResumeTimestamp(current); - if (autoResumeCount < Long.MAX_VALUE) { - streamingInsertJob.setAutoResumeCount(autoResumeCount + 1); - } - streamingInsertJob.updateJobStatus(JobStatus.PENDING); - return; + final long latestAutoResumeTimestamp = streamingInsertJob.getLatestAutoResumeTimestamp(); + final long current = System.currentTimeMillis(); + long autoResumeIntervalTimeSec = autoResumeCount < 5 + ? Math.min((long) Math.pow(2, autoResumeCount) * BACK_OFF_BASIC_TIME_SEC, + MAX_BACK_OFF_TIME_SEC) : MAX_BACK_OFF_TIME_SEC; + if (current - latestAutoResumeTimestamp > autoResumeIntervalTimeSec * 1000L) { + streamingInsertJob.setLatestAutoResumeTimestamp(current); + if (autoResumeCount < Long.MAX_VALUE) { + streamingInsertJob.setAutoResumeCount(autoResumeCount + 1); } + streamingInsertJob.updateJobStatus(JobStatus.PENDING); } } diff --git a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy index d8058ba5fbc805..691079eb121447 100644 --- a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy +++ b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy @@ -82,6 +82,11 @@ suite("test_streaming_insert_job") { def jobResult = sql """select * from jobs("type"="insert") where Name='${jobName}'""" log.info("show success job: " + jobResult) + // LastTaskSuccessTime should be populated after a successful task commit. + def lastSuccessTime = sql """select LastTaskSuccessTime from jobs("type"="insert") where Name='${jobName}'""" + assert lastSuccessTime.get(0).get(0) != null && lastSuccessTime.get(0).get(0) != "", + "LastTaskSuccessTime should be set after a successful task, got: " + lastSuccessTime.get(0).get(0) + qt_select """ SELECT * FROM ${tableName} order by c1 """ sql """ diff --git a/regression-test/suites/job_p0/streaming_job/test_streaming_job_max_retry.groovy b/regression-test/suites/job_p0/streaming_job/test_streaming_job_max_retry.groovy new file mode 100644 index 00000000000000..30fc6eba1c17f5 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/test_streaming_job_max_retry.groovy @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// Verify that after exceeding streaming_job_max_auto_resume_count, the streaming +// job's failure reason is rewritten to CANNOT_RESUME_ERR so the auto-resume +// handler stops trying. The job stays in PAUSED but effectively requires a +// manual RESUME to try again. +suite("test_streaming_job_max_retry", "nonConcurrent") { + def tableName = "test_streaming_job_max_retry_tbl" + def jobName = "test_streaming_job_max_retry_job" + + sql """drop table if exists `${tableName}` force""" + sql """ + DROP JOB IF EXISTS where jobname = '${jobName}' + """ + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `c1` int NULL, + `c2` string NULL, + `c3` int NULL, + ) ENGINE=OLAP + DUPLICATE KEY(`c1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`c1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + // Capture the original value so other suites (or a non-default env) are not clobbered. + def originalMaxRetry = sql """ADMIN SHOW FRONTEND CONFIG LIKE 'streaming_job_max_auto_resume_count'""" + def originalMaxRetryValue = originalMaxRetry.get(0).get(1) + // Shrink the retry budget so the test finishes quickly. + sql "ADMIN SET FRONTEND CONFIG ('streaming_job_max_auto_resume_count' = '3')" + + GetDebugPoint().enableDebugPointForAllFEs('StreamingJob.scheduleTask.exception') + try { + sql """ + CREATE JOB ${jobName} + ON STREAMING DO INSERT INTO ${tableName} + SELECT * FROM S3 + ( + "uri" = "s3://${s3BucketName}/regression/load/data/example_[0-1].csv", + "format" = "csv", + "provider" = "${getS3Provider()}", + "column_separator" = ",", + "s3.endpoint" = "${getS3Endpoint()}", + "s3.region" = "${getS3Region()}", + "s3.access_key" = "${getS3AK()}", + "s3.secret_key" = "${getS3SK()}" + ); + """ + + // Wait until the retry budget is exhausted and the failure reason + // is rewritten to CANNOT_RESUME_ERR. Status stays PAUSED throughout. + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(2, SECONDS).until( + { + def jobRes = sql """ select Status, ErrorMsg from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("jobRes waiting for CANNOT_RESUME_ERR: " + jobRes) + if (jobRes.size() != 1 || !'PAUSED'.equals(jobRes.get(0).get(0))) { + return false + } + def errMsg = jobRes.get(0).get(1) as String + return errMsg != null && errMsg.contains("CANNOT_RESUME_ERR") + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + log.info("show job: " + showjob) + throw ex + } + + // ErrorMsg is a FailureReason JSON blob containing both code and msg, + // so the frontend can switch on the structured code instead of grepping the message. + def jobInfo = sql """select Status, ErrorMsg from jobs("type"="insert") where Name='${jobName}'""" + assert jobInfo.get(0).get(0) == "PAUSED" + def errorMsgJson = jobInfo.get(0).get(1) as String + assert errorMsgJson.contains("\"code\"") && errorMsgJson.contains("\"msg\""), + "ErrorMsg should be a FailureReason JSON with code and msg fields, got: " + errorMsgJson + assert errorMsgJson.contains("CANNOT_RESUME_ERR"), + "ErrorMsg should contain CANNOT_RESUME_ERR code, got: " + errorMsgJson + assert errorMsgJson.contains("Auto resume failed after"), + "ErrorMsg should contain the burn-out message, got: " + errorMsgJson + + } finally { + GetDebugPoint().disableDebugPointForAllFEs('StreamingJob.scheduleTask.exception') + sql "ADMIN SET FRONTEND CONFIG ('streaming_job_max_auto_resume_count' = '${originalMaxRetryValue}')" + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + } +} diff --git a/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy b/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy index 973535d69284ef..73fa04be2da2a0 100644 --- a/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy +++ b/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy @@ -86,6 +86,13 @@ suite("test_streaming_job_restart_fe", "docker") { throw ex; } + // Capture LastTaskSuccessTime before restart to verify it survives editlog replay. + def lastSuccessBefore = sql """ + select LastTaskSuccessTime from jobs("type"="insert") where Name='${jobName}' + """ + assert lastSuccessBefore.get(0).get(0) != null && lastSuccessBefore.get(0).get(0) != "", + "LastTaskSuccessTime should be set before restart" + sql """ PAUSE JOB where jobname = '${jobName}' """ @@ -124,6 +131,13 @@ suite("test_streaming_job_restart_fe", "docker") { assert loadStatAfter.fileNumber == 2 assert loadStatAfter.fileSize == 256 + // Verify LastTaskSuccessTime survived the FE restart (replayOnUpdated must copy it). + def lastSuccessAfter = sql """ + select LastTaskSuccessTime from jobs("type"="insert") where Name='${jobName}' + """ + assert lastSuccessAfter.get(0).get(0) != null && lastSuccessAfter.get(0).get(0) != "", + "LastTaskSuccessTime should survive FE restart, got: " + lastSuccessAfter.get(0).get(0) + sql """ DROP JOB IF EXISTS where jobname = '${jobName}' """ sql """drop table if exists `${tableName}` force""" }