Skip to content

Commit

Permalink
[BugFix] In the 3.1 separated storage and computation version, the us…
Browse files Browse the repository at this point in the history
…e of the "insert into" statement in Flink SQL will lose the data with Delete semantics.

Signed-off-by: andystenhe <[email protected]>
  • Loading branch information
hexiufeng authored and andystenhe committed May 30, 2024
1 parent cc8689d commit a61f923
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -424,21 +424,8 @@ private void validateTableStructure(TableSchema flinkSchema) {
}
// validate primary keys
List<String> primayKeys = new ArrayList<>();
for (int i = 0; i < rows.size(); i++) {
String keysType = rows.get(i).get("COLUMN_KEY").toString();
if (!"PRI".equals(keysType)) {
continue;
}
primayKeys.add(rows.get(i).get("COLUMN_NAME").toString().toLowerCase());
}
flinkSchema.getPrimaryKey().ifPresent(c -> c.getColumns().forEach(colName -> primayKeys.add(colName.toLowerCase())));
if (!primayKeys.isEmpty()) {
if (!constraint.isPresent()) {
throw new IllegalArgumentException("Primary keys not defined in the sink `TableSchema`.");
}
if (constraint.get().getColumns().size() != primayKeys.size() ||
!constraint.get().getColumns().stream().allMatch(col -> primayKeys.contains(col.toLowerCase()))) {
throw new IllegalArgumentException("Primary keys of the flink `TableSchema` do not match with the ones from starrocks table.");
}
sinkOptions.enableUpsertDelete();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,21 +102,8 @@ public void validateTableStructure(StarRocksSinkOptions sinkOptions, TableSchema
}
// validate primary keys
List<String> primaryKeys = new ArrayList<>();
for (Map<String, Object> row : rows) {
String keysType = row.get("COLUMN_KEY").toString();
if (!"PRI".equals(keysType)) {
continue;
}
primaryKeys.add(row.get("COLUMN_NAME").toString().toLowerCase());
}
flinkSchema.getPrimaryKey().ifPresent(c -> c.getColumns().forEach(colName -> primaryKeys.add(colName.toLowerCase())));
if (!primaryKeys.isEmpty()) {
if (!constraint.isPresent()) {
throw new IllegalArgumentException("Primary keys not defined in the sink `TableSchema`.");
}
if (constraint.get().getColumns().size() != primaryKeys.size() ||
!constraint.get().getColumns().stream().allMatch(col -> primaryKeys.contains(col.toLowerCase()))) {
throw new IllegalArgumentException("Primary keys of the flink `TableSchema` do not match with the ones from starrocks table.");
}
sinkOptions.enableUpsertDelete();
}

Expand Down

0 comments on commit a61f923

Please sign in to comment.