Skip to content

Commit e0e0931

Browse files
huyuanfeng2018huyuanfeng
andauthored
[FLINK-37217][mysql] Fix MySqlErrorHandler TableNotFoundException Unable to obtain table correctly (#3892)
Co-authored-by: huyuanfeng <[email protected]>
1 parent 2850603 commit e0e0931

File tree

1 file changed

+13
-9
lines changed

1 file changed

+13
-9
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/MySqlErrorHandler.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.slf4j.Logger;
3333
import org.slf4j.LoggerFactory;
3434

35+
import java.util.Optional;
3536
import java.util.regex.Matcher;
3637
import java.util.regex.Pattern;
3738

@@ -60,12 +61,9 @@ public MySqlErrorHandler(
6061

6162
@Override
6263
public void setProducerThrowable(Throwable producerThrowable) {
63-
if (isTableNotFoundException(producerThrowable)) {
64-
Matcher matcher =
65-
NOT_FOUND_TABLE_MSG_PATTERN.matcher(producerThrowable.getCause().getMessage());
66-
String databaseName = matcher.group(1);
67-
String tableName = matcher.group(2);
68-
TableId tableId = new TableId(databaseName, null, tableName);
64+
Optional<TableId> notFoundTable = extractNotFoundTableId(producerThrowable);
65+
if (notFoundTable.isPresent()) {
66+
TableId tableId = notFoundTable.get();
6967
if (context.getSchema().schemaFor(tableId) == null) {
7068
LOG.warn("Schema for table " + tableId + " is null");
7169
return;
@@ -86,14 +84,20 @@ public void setProducerThrowable(Throwable producerThrowable) {
8684
super.setProducerThrowable(producerThrowable);
8785
}
8886

89-
private boolean isTableNotFoundException(Throwable t) {
87+
private Optional<TableId> extractNotFoundTableId(Throwable t) {
9088
if (!(t.getCause() instanceof DebeziumException)) {
91-
return false;
89+
return Optional.empty();
9290
}
9391
DebeziumException e = (DebeziumException) t.getCause();
9492
String detailMessage = e.getMessage();
9593
Matcher matcher = NOT_FOUND_TABLE_MSG_PATTERN.matcher(detailMessage);
96-
return matcher.find();
94+
if (matcher.find()) {
95+
String databaseName = matcher.group(1);
96+
String tableName = matcher.group(2);
97+
return Optional.of(new TableId(databaseName, null, tableName));
98+
} else {
99+
return Optional.empty();
100+
}
97101
}
98102

99103
private boolean isSchemaOutOfSyncException(Throwable t) {

0 commit comments

Comments
 (0)