Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.doris.job.offset.SourceOffsetProvider;
import org.apache.doris.job.offset.SourceOffsetProviderFactory;
import org.apache.doris.job.offset.jdbc.JdbcSourceOffsetProvider;
import org.apache.doris.job.offset.jdbc.JdbcTvfSourceOffsetProvider;
import org.apache.doris.job.util.StreamingJobUtils;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadStatistic;
Expand Down Expand Up @@ -165,6 +164,8 @@ public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, M
@Getter
@SerializedName("tprops")
private Map<String, String> targetProperties;
// Converted form of sourceProperties; must be refreshed whenever sourceProperties changes.
private transient Map<String, String> convertedSourceProperties;

// The sampling window starts at the beginning of the sampling window.
// If the error rate exceeds `max_filter_ratio` within the window, the sampling fails.
Expand Down Expand Up @@ -238,7 +239,7 @@ private void initSourceJob() {
StreamingJobUtils.resolveAndValidateSource(
dataSourceType, sourceProperties, String.valueOf(getJobId()), createTbls);
this.offsetProvider = new JdbcSourceOffsetProvider(getJobId(), dataSourceType,
StreamingJobUtils.convertCertFile(getDbId(), sourceProperties));
getConvertedSourceProperties());
JdbcSourceOffsetProvider rdsOffsetProvider = (JdbcSourceOffsetProvider) this.offsetProvider;
rdsOffsetProvider.splitChunks(createTbls);
} catch (Exception ex) {
Expand Down Expand Up @@ -418,7 +419,14 @@ public void alterJob(AlterJobCommand alterJobCommand) throws AnalysisException,

// update source properties
if (!alterJobCommand.getSourceProperties().isEmpty()) {
this.sourceProperties.putAll(alterJobCommand.getSourceProperties());
// Convert on a merged copy first; validateSource() only checks ssl_rootcert is
// non-empty, so cert-file lookup can still fail here. Commit to fields only on success.
Map<String, String> mergedSourceProperties = new HashMap<>(this.sourceProperties);
mergedSourceProperties.putAll(alterJobCommand.getSourceProperties());
Map<String, String> newConvertedSourceProperties =
StreamingJobUtils.convertCertFile(getDbId(), mergedSourceProperties);
this.sourceProperties = mergedSourceProperties;
this.convertedSourceProperties = newConvertedSourceProperties;
logParts.add("source properties: " + alterJobCommand.getSourceProperties());
}

Expand Down Expand Up @@ -525,18 +533,33 @@ protected AbstractStreamingTask createStreamingTask() throws JobException {
* @return
*/
private AbstractStreamingTask createStreamingMultiTblTask() throws JobException {
Map<String, String> convertSourceProps = StreamingJobUtils.convertCertFile(getDbId(), sourceProperties);
return new StreamingMultiTblTask(getJobId(), Env.getCurrentEnv().getNextId(), dataSourceType,
offsetProvider, convertSourceProps, targetDb, targetProperties, jobProperties, getCreateUser());
offsetProvider, getConvertedSourceProperties(), targetDb, targetProperties, jobProperties,
getCreateUser());
}

protected AbstractStreamingTask createStreamingInsertTask() {
private Map<String, String> getConvertedSourceProperties() throws JobException {
if (convertedSourceProperties == null) {
this.convertedSourceProperties = StreamingJobUtils.convertCertFile(getDbId(), sourceProperties);
}
return convertedSourceProperties;
}

private Map<String, String> getOriginTvfProps() {
if (originTvfProps == null) {
this.originTvfProps = getCurrentTvf().getProperties().getMap();
}
return originTvfProps;
}

private Map<String, String> getProviderProps() throws JobException {
return tvfType != null ? getOriginTvfProps() : getConvertedSourceProperties();
}

protected AbstractStreamingTask createStreamingInsertTask() {
return new StreamingInsertTask(getJobId(), Env.getCurrentEnv().getNextId(),
getExecuteSql(),
offsetProvider, getCurrentDbName(), jobProperties, originTvfProps, getCreateUser());
offsetProvider, getCurrentDbName(), jobProperties, getOriginTvfProps(), getCreateUser());
}

public void recordTasks(AbstractStreamingTask task) {
Expand Down Expand Up @@ -567,16 +590,10 @@ public List<AbstractStreamingTask> queryAllStreamTasks() {
protected void fetchMeta() throws JobException {
long start = System.currentTimeMillis();
try {
if (tvfType != null) {
if (originTvfProps == null) {
this.originTvfProps = getCurrentTvf().getProperties().getMap();
}
// when fe restart, offsetProvider.jobId may be null
offsetProvider.ensureInitialized(getJobId(), originTvfProps);
offsetProvider.fetchRemoteMeta(originTvfProps);
} else {
offsetProvider.fetchRemoteMeta(new HashMap<>());
}
// when fe restart, offsetProvider.jobId may be null
Map<String, String> props = getProviderProps();
offsetProvider.ensureInitialized(getJobId(), props);
offsetProvider.fetchRemoteMeta(props);
} catch (Exception ex) {
log.warn("fetch remote meta failed, job id: {}", getJobId(), ex);
if (this.getFailureReason() == null
Expand Down Expand Up @@ -793,7 +810,18 @@ public void replayOnUpdated(StreamingInsertJob replayJob) {
if (replayJob.getNonTxnJobStatistic() != null) {
setNonTxnJobStatistic(replayJob.getNonTxnJobStatistic());
}
if (replayJob.getSourceProperties() != null) {
this.sourceProperties = replayJob.getSourceProperties();
// Drop caches a former-master role may have populated; lazy re-init on next access.
this.convertedSourceProperties = null;
}
if (replayJob.getTargetProperties() != null) {
this.targetProperties = replayJob.getTargetProperties();
}
setExecuteSql(replayJob.getExecuteSql());
// SQL-derived caches must be invalidated together so the next parse uses the replayed SQL.
this.baseCommand = null;
this.originTvfProps = null;
setSucceedTaskCount(replayJob.getSucceedTaskCount());
setFailedTaskCount(replayJob.getFailedTaskCount());
setCanceledTaskCount(replayJob.getCanceledTaskCount());
Expand Down Expand Up @@ -1360,16 +1388,13 @@ public void cleanup() throws JobException {
}
}

// For TVF path, provider fields may be null after FE restart
if (this.offsetProvider instanceof JdbcTvfSourceOffsetProvider) {
if (originTvfProps == null) {
this.originTvfProps = getCurrentTvf().getProperties().getMap();
}
offsetProvider.ensureInitialized(getJobId(), originTvfProps);
}

if (this.offsetProvider instanceof JdbcSourceOffsetProvider) {
// jdbc clean chunk meta table
// Best-effort refresh; don't block DROP JOB if SSL cert resolution fails.
try {
offsetProvider.ensureInitialized(getJobId(), getProviderProps());
} catch (JobException ex) {
log.warn("refresh provider props before cleanMeta failed, job id: {}", getJobId(), ex);
}
((JdbcSourceOffsetProvider) this.offsetProvider).cleanMeta(getJobId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ public JdbcSourceOffsetProvider(Long jobId, DataSourceType sourceType, Map<Strin
DataSourceConfigKeys.SNAPSHOT_PARALLELISM_DEFAULT));
}

// Refresh fields that may be changed via ALTER JOB; called before each use.
@Override
public void ensureInitialized(Long jobId, Map<String, String> newProps) throws JobException {
this.sourceProperties = newProps;
this.snapshotParallelism = Integer.parseInt(
newProps.getOrDefault(DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
DataSourceConfigKeys.SNAPSHOT_PARALLELISM_DEFAULT));
}

@Override
public String getSourceType() {
return "jdbc";
Expand Down Expand Up @@ -237,11 +246,12 @@ public void fetchRemoteMeta(Map<String, String> properties) throws Exception {
new TypeReference<ResponseBody<Map<String, String>>>() {
}
);
if (endBinlogOffset != null
&& !endBinlogOffset.equals(responseObj.getData())) {
Map<String, String> newEndOffset = responseObj.getData();
// null→value also counts as a change: upstream may have advanced while fetch was blocked.
if (endBinlogOffset == null || !endBinlogOffset.equals(newEndOffset)) {
hasMoreData = true;
}
endBinlogOffset = responseObj.getData();
endBinlogOffset = newEndOffset;
} catch (JsonProcessingException e) {
log.warn("Failed to parse end offset response: {}", response);
throw new JobException(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,36 @@ private void checkUnmodifiableSourceProperties(Map<String, String> originSourceP
+ "Use PROPERTIES('offset'='{...}') to alter offset");
}

// Reject keys that the runtime reads only at first initialize and never refreshes,
// so ALTER would be a silent no-op. See JdbcSourceOffsetProvider / DebeziumJsonDeserializer.
if (sourceProperties.containsKey(DataSourceConfigKeys.SNAPSHOT_PARALLELISM)) {
Preconditions.checkArgument(Objects.equals(
originSourceProperties.get(DataSourceConfigKeys.SNAPSHOT_PARALLELISM),
sourceProperties.get(DataSourceConfigKeys.SNAPSHOT_PARALLELISM)),
"The " + DataSourceConfigKeys.SNAPSHOT_PARALLELISM
+ " property cannot be modified in ALTER JOB");
}
if (sourceProperties.containsKey(DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE)) {
Preconditions.checkArgument(Objects.equals(
originSourceProperties.get(DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE),
sourceProperties.get(DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE)),
"The " + DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE
+ " property cannot be modified in ALTER JOB");
}
String tablePrefix = DataSourceConfigKeys.TABLE + ".";
for (String key : sourceProperties.keySet()) {
if (!key.startsWith(tablePrefix)) {
continue;
}
if (key.endsWith("." + DataSourceConfigKeys.TABLE_EXCLUDE_COLUMNS_SUFFIX)
|| key.endsWith("." + DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX)) {
Preconditions.checkArgument(Objects.equals(
originSourceProperties.get(key),
sourceProperties.get(key)),
"The " + key + " property cannot be modified in ALTER JOB");
}
}

// slot_name / publication_name decide Doris-vs-user ownership at create time; flipping
// them afterwards would orphan Doris-created resources or let Doris drop user-owned ones.
if (sourceProperties.containsKey(DataSourceConfigKeys.SLOT_NAME)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !final_data --
A1 1
B1 2
C1 3

Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.


import org.awaitility.Awaitility

import static java.util.concurrent.TimeUnit.SECONDS

// Verify ALTER JOB on source credentials reaches JdbcSourceOffsetProvider at runtime
// (no FE restart). Without the fix, provider keeps a stale copy seeded at init time
// and would continue connecting upstream with the original password.
suite("test_streaming_mysql_job_alter_cred",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
def jobName = "test_streaming_mysql_job_alter_cred"
def currentDb = (sql "select database()")[0][0]
def table1 = "alter_cred_user_info"
def mysqlDb = "test_cdc_db"

sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
sql """drop table if exists ${currentDb}.${table1} force"""

String enabled = context.config.otherConfigs.get("enableJdbcTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String mysql_port = context.config.otherConfigs.get("mysql_57_port")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
String s3_endpoint = getS3Endpoint()
String bucket = getS3BucketName()
String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"

connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") {
sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
sql """CREATE TABLE ${mysqlDb}.${table1} (
`name` varchar(200) NOT NULL,
`age` int DEFAULT NULL,
PRIMARY KEY (`name`)
) ENGINE=InnoDB"""
sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('A1', 1)"""
sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('B1', 2)"""
}

// Create with correct credentials.
sql """CREATE JOB ${jobName}
ON STREAMING
FROM MYSQL (
"jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
"driver_url" = "${driver_url}",
"driver_class" = "com.mysql.cj.jdbc.Driver",
"user" = "root",
"password" = "123456",
"database" = "${mysqlDb}",
"include_tables" = "${table1}",
"offset" = "initial"
)
TO DATABASE ${currentDb} (
"table.create.properties.replication_num" = "1"
)
"""

try {
Awaitility.await().atMost(300, SECONDS).pollInterval(1, SECONDS).until({
def count = sql """SELECT count(1) FROM ${currentDb}.${table1}"""
log.info("initial row count: " + count)
(count.get(0).get(0) as int) == 2
})
} catch (Exception ex) {
log.info("job: " + (sql """select * from jobs("type"="insert") where Name='${jobName}'"""))
log.info("tasks: " + (sql """select * from tasks("type"="insert") where JobName='${jobName}'"""))
throw ex
}

sql """PAUSE JOB where jobname = '${jobName}'"""
Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({
def s = sql """select status from jobs("type"="insert") where Name='${jobName}'"""
s.size() == 1 && s.get(0).get(0) == "PAUSED"
})

// ALTER to a wrong password. If the provider still holds the original credentials
// (Bug A), resume will succeed; with the fix, provider picks up the bad password
// and fetchMeta / task RPC fail, auto-pausing the job.
sql """ALTER JOB ${jobName}
FROM MYSQL (
"password" = "wrong_password_for_test"
)
TO DATABASE ${currentDb}
"""

sql """RESUME JOB where jobname = '${jobName}'"""

// Auto-resume cycles PAUSED→PENDING→RUNNING→PAUSED, so the PAUSED state is transient.
// FailureReason is only cleared by onStreamTaskSuccess, so ErrorMsg is sticky across cycles.
try {
Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({
def r = sql """select ErrorMsg from jobs("type"="insert") where Name='${jobName}'"""
String msg = r.get(0).get(0)
log.info("ErrorMsg after wrong cred resume: " + msg)
return msg != null && !msg.isEmpty()
})
} catch (Exception ex) {
log.info("job: " + (sql """select * from jobs("type"="insert") where Name='${jobName}'"""))
throw ex
}

// ALTER back to the correct password and verify recovery.
sql """ALTER JOB ${jobName}
FROM MYSQL (
"password" = "123456"
)
TO DATABASE ${currentDb}
"""

sql """RESUME JOB where jobname = '${jobName}'"""

try {
Awaitility.await().atMost(60, SECONDS).pollInterval(1, SECONDS).until({
def s = sql """select status from jobs("type"="insert") where Name='${jobName}'"""
log.info("status after correct cred resume: " + s)
s.size() == 1 && s.get(0).get(0) == "RUNNING"
})
} catch (Exception ex) {
log.info("job: " + (sql """select * from jobs("type"="insert") where Name='${jobName}'"""))
throw ex
}

connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") {
sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('C1', 3)"""
}

try {
Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({
def count = sql """SELECT count(1) FROM ${currentDb}.${table1}"""
log.info("row count after recovery: " + count)
(count.get(0).get(0) as int) == 3
})
} catch (Exception ex) {
log.info("job: " + (sql """select * from jobs("type"="insert") where Name='${jobName}'"""))
log.info("tasks: " + (sql """select * from tasks("type"="insert") where JobName='${jobName}'"""))
throw ex
}

qt_final_data """SELECT * FROM ${currentDb}.${table1} ORDER BY name"""

sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
sql """drop table if exists ${currentDb}.${table1} force"""
}
}
Loading
Loading