From b15a610cddd86ea9d723be44a14fbe688d39b9d1 Mon Sep 17 00:00:00 2001 From: wudi Date: Mon, 27 Apr 2026 10:15:03 +0800 Subject: [PATCH] [fix](streaming-job) propagate ALTER source/target properties to runtime and persistence (#62553) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What problem does this PR solve? ALTER JOB on a non-TVF streaming CDC job could silently fail in two ways: 1. **Persistence**: `replayOnUpdated` didn't sync `sourceProperties` / `targetProperties`, so the values were lost after FE checkpoint/restart or on follower FE. 2. **Runtime**: `JdbcSourceOffsetProvider` held an independent copy of `sourceProperties` seeded at init time. ALTER on credentials (password, driver_url, etc.) never reached the provider's BE RPCs — the job would keep trying the old credentials. ### Fix 1. `replayOnUpdated` now copies `sourceProperties` / `targetProperties` from the replay job. 2. Job caches a `convertedSourceProperties` (transient) that's refreshed on ALTER; all provider and task entry points (`fetchMeta` / `cleanup` / `createStreamingMultiTblTask`) push it to the provider via a new `ensureInitialized` override. --- .../insert/streaming/StreamingInsertJob.java | 77 ++++--- .../offset/jdbc/JdbcSourceOffsetProvider.java | 16 +- .../trees/plans/commands/AlterJobCommand.java | 30 +++ .../test_streaming_mysql_job_alter_cred.out | 6 + ...test_streaming_mysql_job_alter_cred.groovy | 160 ++++++++++++++ ...ng_mysql_job_alter_props_restart_fe.groovy | 206 ++++++++++++++++++ ...st_streaming_mysql_job_create_alter.groovy | 50 ++++- 7 files changed, 514 insertions(+), 31 deletions(-) create mode 100644 regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_alter_cred.out create mode 100644 regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_alter_cred.groovy create mode 100644 regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_alter_props_restart_fe.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index d098ff5d83c25f..67841aa9d2bf66 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -51,7 +51,6 @@ import org.apache.doris.job.offset.SourceOffsetProvider; import org.apache.doris.job.offset.SourceOffsetProviderFactory; import org.apache.doris.job.offset.jdbc.JdbcSourceOffsetProvider; -import org.apache.doris.job.offset.jdbc.JdbcTvfSourceOffsetProvider; import org.apache.doris.job.util.StreamingJobUtils; import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.load.loadv2.LoadStatistic; @@ -165,6 +164,8 @@ public class StreamingInsertJob extends AbstractJob targetProperties; + // Converted form of sourceProperties; must be refreshed whenever sourceProperties changes. + private transient Map convertedSourceProperties; // The sampling window starts at the beginning of the sampling window. // If the error rate exceeds `max_filter_ratio` within the window, the sampling fails. @@ -238,7 +239,7 @@ private void initSourceJob() { StreamingJobUtils.resolveAndValidateSource( dataSourceType, sourceProperties, String.valueOf(getJobId()), createTbls); this.offsetProvider = new JdbcSourceOffsetProvider(getJobId(), dataSourceType, - StreamingJobUtils.convertCertFile(getDbId(), sourceProperties)); + getConvertedSourceProperties()); JdbcSourceOffsetProvider rdsOffsetProvider = (JdbcSourceOffsetProvider) this.offsetProvider; rdsOffsetProvider.splitChunks(createTbls); } catch (Exception ex) { @@ -418,7 +419,14 @@ public void alterJob(AlterJobCommand alterJobCommand) throws AnalysisException, // update source properties if (!alterJobCommand.getSourceProperties().isEmpty()) { - this.sourceProperties.putAll(alterJobCommand.getSourceProperties()); + // Convert on a merged copy first; validateSource() only checks ssl_rootcert is + // non-empty, so cert-file lookup can still fail here. Commit to fields only on success. + Map mergedSourceProperties = new HashMap<>(this.sourceProperties); + mergedSourceProperties.putAll(alterJobCommand.getSourceProperties()); + Map newConvertedSourceProperties = + StreamingJobUtils.convertCertFile(getDbId(), mergedSourceProperties); + this.sourceProperties = mergedSourceProperties; + this.convertedSourceProperties = newConvertedSourceProperties; logParts.add("source properties: " + alterJobCommand.getSourceProperties()); } @@ -525,18 +533,33 @@ protected AbstractStreamingTask createStreamingTask() throws JobException { * @return */ private AbstractStreamingTask createStreamingMultiTblTask() throws JobException { - Map convertSourceProps = StreamingJobUtils.convertCertFile(getDbId(), sourceProperties); return new StreamingMultiTblTask(getJobId(), Env.getCurrentEnv().getNextId(), dataSourceType, - offsetProvider, convertSourceProps, targetDb, targetProperties, jobProperties, getCreateUser()); + offsetProvider, getConvertedSourceProperties(), targetDb, targetProperties, jobProperties, + getCreateUser()); } - protected AbstractStreamingTask createStreamingInsertTask() { + private Map getConvertedSourceProperties() throws JobException { + if (convertedSourceProperties == null) { + this.convertedSourceProperties = StreamingJobUtils.convertCertFile(getDbId(), sourceProperties); + } + return convertedSourceProperties; + } + + private Map getOriginTvfProps() { if (originTvfProps == null) { this.originTvfProps = getCurrentTvf().getProperties().getMap(); } + return originTvfProps; + } + + private Map getProviderProps() throws JobException { + return tvfType != null ? getOriginTvfProps() : getConvertedSourceProperties(); + } + + protected AbstractStreamingTask createStreamingInsertTask() { return new StreamingInsertTask(getJobId(), Env.getCurrentEnv().getNextId(), getExecuteSql(), - offsetProvider, getCurrentDbName(), jobProperties, originTvfProps, getCreateUser()); + offsetProvider, getCurrentDbName(), jobProperties, getOriginTvfProps(), getCreateUser()); } public void recordTasks(AbstractStreamingTask task) { @@ -567,16 +590,10 @@ public List queryAllStreamTasks() { protected void fetchMeta() throws JobException { long start = System.currentTimeMillis(); try { - if (tvfType != null) { - if (originTvfProps == null) { - this.originTvfProps = getCurrentTvf().getProperties().getMap(); - } - // when fe restart, offsetProvider.jobId may be null - offsetProvider.ensureInitialized(getJobId(), originTvfProps); - offsetProvider.fetchRemoteMeta(originTvfProps); - } else { - offsetProvider.fetchRemoteMeta(new HashMap<>()); - } + // when fe restart, offsetProvider.jobId may be null + Map props = getProviderProps(); + offsetProvider.ensureInitialized(getJobId(), props); + offsetProvider.fetchRemoteMeta(props); } catch (Exception ex) { log.warn("fetch remote meta failed, job id: {}", getJobId(), ex); if (this.getFailureReason() == null @@ -793,7 +810,18 @@ public void replayOnUpdated(StreamingInsertJob replayJob) { if (replayJob.getNonTxnJobStatistic() != null) { setNonTxnJobStatistic(replayJob.getNonTxnJobStatistic()); } + if (replayJob.getSourceProperties() != null) { + this.sourceProperties = replayJob.getSourceProperties(); + // Drop caches a former-master role may have populated; lazy re-init on next access. + this.convertedSourceProperties = null; + } + if (replayJob.getTargetProperties() != null) { + this.targetProperties = replayJob.getTargetProperties(); + } setExecuteSql(replayJob.getExecuteSql()); + // SQL-derived caches must be invalidated together so the next parse uses the replayed SQL. + this.baseCommand = null; + this.originTvfProps = null; setSucceedTaskCount(replayJob.getSucceedTaskCount()); setFailedTaskCount(replayJob.getFailedTaskCount()); setCanceledTaskCount(replayJob.getCanceledTaskCount()); @@ -1360,16 +1388,13 @@ public void cleanup() throws JobException { } } - // For TVF path, provider fields may be null after FE restart - if (this.offsetProvider instanceof JdbcTvfSourceOffsetProvider) { - if (originTvfProps == null) { - this.originTvfProps = getCurrentTvf().getProperties().getMap(); - } - offsetProvider.ensureInitialized(getJobId(), originTvfProps); - } - if (this.offsetProvider instanceof JdbcSourceOffsetProvider) { - // jdbc clean chunk meta table + // Best-effort refresh; don't block DROP JOB if SSL cert resolution fails. + try { + offsetProvider.ensureInitialized(getJobId(), getProviderProps()); + } catch (JobException ex) { + log.warn("refresh provider props before cleanMeta failed, job id: {}", getJobId(), ex); + } ((JdbcSourceOffsetProvider) this.offsetProvider).cleanMeta(getJobId()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java index 605de01bd2223c..cfa2fe3273b6fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java @@ -114,6 +114,15 @@ public JdbcSourceOffsetProvider(Long jobId, DataSourceType sourceType, Map newProps) throws JobException { + this.sourceProperties = newProps; + this.snapshotParallelism = Integer.parseInt( + newProps.getOrDefault(DataSourceConfigKeys.SNAPSHOT_PARALLELISM, + DataSourceConfigKeys.SNAPSHOT_PARALLELISM_DEFAULT)); + } + @Override public String getSourceType() { return "jdbc"; @@ -237,11 +246,12 @@ public void fetchRemoteMeta(Map properties) throws Exception { new TypeReference>>() { } ); - if (endBinlogOffset != null - && !endBinlogOffset.equals(responseObj.getData())) { + Map newEndOffset = responseObj.getData(); + // null→value also counts as a change: upstream may have advanced while fetch was blocked. + if (endBinlogOffset == null || !endBinlogOffset.equals(newEndOffset)) { hasMoreData = true; } - endBinlogOffset = responseObj.getData(); + endBinlogOffset = newEndOffset; } catch (JsonProcessingException e) { log.warn("Failed to parse end offset response: {}", response); throw new JobException(response); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java index 88bbc9adc836b9..7886ff65ee3c92 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java @@ -222,6 +222,36 @@ private void checkUnmodifiableSourceProperties(Map originSourceP + "Use PROPERTIES('offset'='{...}') to alter offset"); } + // Reject keys that the runtime reads only at first initialize and never refreshes, + // so ALTER would be a silent no-op. See JdbcSourceOffsetProvider / DebeziumJsonDeserializer. + if (sourceProperties.containsKey(DataSourceConfigKeys.SNAPSHOT_PARALLELISM)) { + Preconditions.checkArgument(Objects.equals( + originSourceProperties.get(DataSourceConfigKeys.SNAPSHOT_PARALLELISM), + sourceProperties.get(DataSourceConfigKeys.SNAPSHOT_PARALLELISM)), + "The " + DataSourceConfigKeys.SNAPSHOT_PARALLELISM + + " property cannot be modified in ALTER JOB"); + } + if (sourceProperties.containsKey(DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE)) { + Preconditions.checkArgument(Objects.equals( + originSourceProperties.get(DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE), + sourceProperties.get(DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE)), + "The " + DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE + + " property cannot be modified in ALTER JOB"); + } + String tablePrefix = DataSourceConfigKeys.TABLE + "."; + for (String key : sourceProperties.keySet()) { + if (!key.startsWith(tablePrefix)) { + continue; + } + if (key.endsWith("." + DataSourceConfigKeys.TABLE_EXCLUDE_COLUMNS_SUFFIX) + || key.endsWith("." + DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX)) { + Preconditions.checkArgument(Objects.equals( + originSourceProperties.get(key), + sourceProperties.get(key)), + "The " + key + " property cannot be modified in ALTER JOB"); + } + } + // slot_name / publication_name decide Doris-vs-user ownership at create time; flipping // them afterwards would orphan Doris-created resources or let Doris drop user-owned ones. if (sourceProperties.containsKey(DataSourceConfigKeys.SLOT_NAME)) { diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_alter_cred.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_alter_cred.out new file mode 100644 index 00000000000000..400f0acddd4966 --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_alter_cred.out @@ -0,0 +1,6 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !final_data -- +A1 1 +B1 2 +C1 3 + diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_alter_cred.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_alter_cred.groovy new file mode 100644 index 00000000000000..6287b08a232d21 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_alter_cred.groovy @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// Verify ALTER JOB on source credentials reaches JdbcSourceOffsetProvider at runtime +// (no FE restart). Without the fix, provider keeps a stale copy seeded at init time +// and would continue connecting upstream with the original password. +suite("test_streaming_mysql_job_alter_cred", + "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { + def jobName = "test_streaming_mysql_job_alter_cred" + def currentDb = (sql "select database()")[0][0] + def table1 = "alter_cred_user_info" + def mysqlDb = "test_cdc_db" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String mysql_port = context.config.otherConfigs.get("mysql_57_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}""" + sql """CREATE TABLE ${mysqlDb}.${table1} ( + `name` varchar(200) NOT NULL, + `age` int DEFAULT NULL, + PRIMARY KEY (`name`) + ) ENGINE=InnoDB""" + sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('A1', 1)""" + sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('B1', 2)""" + } + + // Create with correct credentials. + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + try { + Awaitility.await().atMost(300, SECONDS).pollInterval(1, SECONDS).until({ + def count = sql """SELECT count(1) FROM ${currentDb}.${table1}""" + log.info("initial row count: " + count) + (count.get(0).get(0) as int) == 2 + }) + } catch (Exception ex) { + log.info("job: " + (sql """select * from jobs("type"="insert") where Name='${jobName}'""")) + log.info("tasks: " + (sql """select * from tasks("type"="insert") where JobName='${jobName}'""")) + throw ex + } + + sql """PAUSE JOB where jobname = '${jobName}'""" + Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({ + def s = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + s.size() == 1 && s.get(0).get(0) == "PAUSED" + }) + + // ALTER to a wrong password. If the provider still holds the original credentials + // (Bug A), resume will succeed; with the fix, provider picks up the bad password + // and fetchMeta / task RPC fail, auto-pausing the job. + sql """ALTER JOB ${jobName} + FROM MYSQL ( + "password" = "wrong_password_for_test" + ) + TO DATABASE ${currentDb} + """ + + sql """RESUME JOB where jobname = '${jobName}'""" + + // Auto-resume cycles PAUSED→PENDING→RUNNING→PAUSED, so the PAUSED state is transient. + // FailureReason is only cleared by onStreamTaskSuccess, so ErrorMsg is sticky across cycles. + try { + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def r = sql """select ErrorMsg from jobs("type"="insert") where Name='${jobName}'""" + String msg = r.get(0).get(0) + log.info("ErrorMsg after wrong cred resume: " + msg) + return msg != null && !msg.isEmpty() + }) + } catch (Exception ex) { + log.info("job: " + (sql """select * from jobs("type"="insert") where Name='${jobName}'""")) + throw ex + } + + // ALTER back to the correct password and verify recovery. + sql """ALTER JOB ${jobName} + FROM MYSQL ( + "password" = "123456" + ) + TO DATABASE ${currentDb} + """ + + sql """RESUME JOB where jobname = '${jobName}'""" + + try { + Awaitility.await().atMost(60, SECONDS).pollInterval(1, SECONDS).until({ + def s = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + log.info("status after correct cred resume: " + s) + s.size() == 1 && s.get(0).get(0) == "RUNNING" + }) + } catch (Exception ex) { + log.info("job: " + (sql """select * from jobs("type"="insert") where Name='${jobName}'""")) + throw ex + } + + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('C1', 3)""" + } + + try { + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def count = sql """SELECT count(1) FROM ${currentDb}.${table1}""" + log.info("row count after recovery: " + count) + (count.get(0).get(0) as int) == 3 + }) + } catch (Exception ex) { + log.info("job: " + (sql """select * from jobs("type"="insert") where Name='${jobName}'""")) + log.info("tasks: " + (sql """select * from tasks("type"="insert") where JobName='${jobName}'""")) + throw ex + } + + qt_final_data """SELECT * FROM ${currentDb}.${table1} ORDER BY name""" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_alter_props_restart_fe.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_alter_props_restart_fe.groovy new file mode 100644 index 00000000000000..6ebdfdcfa17ae2 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_alter_props_restart_fe.groovy @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.apache.doris.regression.suite.ClusterOptions +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +/** + * Verify that ALTERed source/target properties survive FE restart. + * + * replayOnUpdated() currently syncs properties (job props), executeSql, + * offsetProviderPersist, etc., but does NOT sync sourceProperties and + * targetProperties. After FE restart, the CDC job reverts to the original + * source/target properties written at CREATE time. + * + * This test: + * 1. Creates a CDC job with only "replication_num" in target properties + * 2. ALTERs target properties to add "load.max_filter_ratio" + * 3. ALTERs source properties to change "user" to a different value + * 4. Restarts FE + * 5. Asserts that ExecuteSql (built from sourceProperties/targetProperties) + * still contains the ALTERed values + */ +suite("test_streaming_mysql_job_alter_props_restart_fe", "docker,mysql,external_docker,external_docker_mysql,nondatalake") { + def jobName = "test_streaming_mysql_job_alter_props_restart_fe" + def options = new ClusterOptions() + options.setFeNum(1) + options.cloudMode = null + + docker(options) { + def currentDb = (sql "select database()")[0][0] + def table1 = "alter_props_restart_user" + def mysqlDb = "test_cdc_db" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String mysql_port = context.config.otherConfigs.get("mysql_57_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + + // Prepare upstream MySQL data + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}""" + sql """CREATE TABLE ${mysqlDb}.${table1} ( + `name` varchar(200) NOT NULL, + `age` int DEFAULT NULL, + PRIMARY KEY (`name`) + ) ENGINE=InnoDB""" + sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('A1', 1);""" + sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('B1', 2);""" + } + + // Step 1: Create CDC job with initial target properties + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + // Step 2: Wait for initial data to be consumed + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(1, SECONDS).until( + { + def jobSucceedCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("jobSucceedCount: " + jobSucceedCount) + jobSucceedCount.size() == 1 && '2' <= jobSucceedCount.get(0).get(0) + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex; + } + + // Step 3: PAUSE the job + sql """PAUSE JOB where jobname = '${jobName}'""" + Awaitility.await().atMost(30, SECONDS) + .pollInterval(1, SECONDS).until( + { + def st = sql """ select status from jobs("type"="insert") where Name = '${jobName}' """ + st.get(0).get(0) == "PAUSED" + } + ) + + // Step 4: ALTER source properties (change user to a non-existent value) + // and target properties (add load.max_filter_ratio). + // Using a distinct user value gives the assertion discriminating power: the + // original CREATE also sets user=root, so same-value ALTER would not prove + // that replayOnUpdated actually propagated the new map. + sql """ALTER JOB ${jobName} + FROM MYSQL ( + "user" = "alter_restart_probe_user" + ) + TO DATABASE ${currentDb} ( + "load.max_filter_ratio" = "0.5" + ) + """ + + // Step 5: Verify ALTER took effect before restart + def execSqlBeforeRestart = sql """ + select ExecuteSql from jobs("type"="insert") where Name='${jobName}' + """ + log.info("ExecuteSql before restart: " + execSqlBeforeRestart.get(0).get(0)) + assert execSqlBeforeRestart.get(0).get(0).contains("max_filter_ratio") : + "target properties should contain max_filter_ratio after ALTER" + assert execSqlBeforeRestart.get(0).get(0).contains("alter_restart_probe_user") : + "source properties should contain the altered user after ALTER" + + // Step 6: Restart FE + cluster.restartFrontends() + sleep(60000) + context.reconnectFe() + + // Step 7: Verify properties survived restart + def jobInfoAfterRestart = sql """ + select status, ExecuteSql from jobs("type"="insert") where Name='${jobName}' + """ + log.info("jobInfoAfterRestart: " + jobInfoAfterRestart) + assert jobInfoAfterRestart.get(0).get(0) == "PAUSED" + + def execSqlAfterRestart = jobInfoAfterRestart.get(0).get(1) + log.info("ExecuteSql after restart: " + execSqlAfterRestart) + // If replayOnUpdated does not sync targetProperties, max_filter_ratio will be lost + assert execSqlAfterRestart.contains("max_filter_ratio") : + "targetProperties lost after FE restart — replayOnUpdated did not sync targetProperties" + // If replayOnUpdated does not sync sourceProperties, the altered user will be lost + assert execSqlAfterRestart.contains("alter_restart_probe_user") : + "sourceProperties lost after FE restart — replayOnUpdated did not sync sourceProperties" + + // Step 8: ALTER user back so RESUME can authenticate upstream + sql """ALTER JOB ${jobName} + FROM MYSQL ( + "user" = "root" + ) + TO DATABASE ${currentDb} + """ + + // Step 9: RESUME and verify job can still run with new data + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('C1', 3);""" + } + + sql """RESUME JOB where jobname = '${jobName}'""" + + try { + Awaitility.await().atMost(120, SECONDS) + .pollInterval(2, SECONDS).until( + { + def loadStat = sql """ select loadStatistic from jobs("type"="insert") where Name = '${jobName}' """ + def stat = parseJson(loadStat.get(0).get(0)) + log.info("scannedRows after restart resume: " + stat.scannedRows) + stat.scannedRows == 3 + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job after restart resume: " + showjob) + log.info("show task after restart resume: " + showtask) + throw ex; + } + + def result = sql """select * from ${currentDb}.${table1} order by name""" + log.info("final result: " + result) + assert result.size() == 3 + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + } + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy index 0b3a2d2eee85e9..64cce88fb97963 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy @@ -339,8 +339,8 @@ suite("test_streaming_mysql_job_create_alter", "p0,external,mysql,external_docke "user" = "root", "password" = "123456", "database" = "${mysqlDb}", - "include_tables" = "${table1}", - "exclude_tables" = "xxxx", + "include_tables" = "${table1}", + "exclude_tables" = "xxxx", "offset" = "initial" ) TO DATABASE ${currentDb} ( @@ -350,6 +350,52 @@ suite("test_streaming_mysql_job_create_alter", "p0,external,mysql,external_docke exception "The exclude_tables property cannot be modified in ALTER JOB" } + // snapshot_parallelism is cached in BE reader's pollExecutor on first initialize; + // reject to avoid silent staleness + test { + sql """ALTER JOB ${jobName} + FROM MYSQL ( + "snapshot_parallelism" = "4" + ) + TO DATABASE ${currentDb} + """ + exception "The snapshot_parallelism property cannot be modified in ALTER JOB" + } + + // snapshot_split_size only affects the initial splitChunks; subsequent restarts + // restore persisted splits, so ALTER would be a silent no-op; reject + test { + sql """ALTER JOB ${jobName} + FROM MYSQL ( + "snapshot_split_size" = "2048" + ) + TO DATABASE ${currentDb} + """ + exception "The snapshot_split_size property cannot be modified in ALTER JOB" + } + + // table..exclude_columns is cached in DebeziumJsonDeserializer; reject + test { + sql """ALTER JOB ${jobName} + FROM MYSQL ( + "table.${table1}.exclude_columns" = "age" + ) + TO DATABASE ${currentDb} + """ + exception "table.${table1}.exclude_columns property cannot be modified in ALTER JOB" + } + + // table..target_table is cached in DebeziumJsonDeserializer; reject + test { + sql """ALTER JOB ${jobName} + FROM MYSQL ( + "table.${table1}.target_table" = "renamed_target" + ) + TO DATABASE ${currentDb} + """ + exception "table.${table1}.target_table property cannot be modified in ALTER JOB" + } + // unexcept properties test { sql """ALTER JOB ${jobName}