diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java index bfa887fa3..c7c4dc2ad 100644 --- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java +++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java @@ -424,21 +424,8 @@ private void validateTableStructure(TableSchema flinkSchema) { } // validate primary keys List primayKeys = new ArrayList<>(); - for (int i = 0; i < rows.size(); i++) { - String keysType = rows.get(i).get("COLUMN_KEY").toString(); - if (!"PRI".equals(keysType)) { - continue; - } - primayKeys.add(rows.get(i).get("COLUMN_NAME").toString().toLowerCase()); - } + flinkSchema.getPrimaryKey().ifPresent(c -> c.getColumns().forEach(colName -> primayKeys.add(colName.toLowerCase()))); if (!primayKeys.isEmpty()) { - if (!constraint.isPresent()) { - throw new IllegalArgumentException("Primary keys not defined in the sink `TableSchema`."); - } - if (constraint.get().getColumns().size() != primayKeys.size() || - !constraint.get().getColumns().stream().allMatch(col -> primayKeys.contains(col.toLowerCase()))) { - throw new IllegalArgumentException("Primary keys of the flink `TableSchema` do not match with the ones from starrocks table."); - } sinkOptions.enableUpsertDelete(); } diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkTable.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkTable.java index 1064952ee..f2040b849 100644 --- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkTable.java +++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkTable.java @@ -102,21 +102,8 @@ public void validateTableStructure(StarRocksSinkOptions sinkOptions, TableSchema } // validate primary keys List primaryKeys = new ArrayList<>(); - for (Map row : rows) { - String keysType = row.get("COLUMN_KEY").toString(); - if (!"PRI".equals(keysType)) { - continue; - } - primaryKeys.add(row.get("COLUMN_NAME").toString().toLowerCase()); - } + flinkSchema.getPrimaryKey().ifPresent(c -> c.getColumns().forEach(colName -> primaryKeys.add(colName.toLowerCase()))); if (!primaryKeys.isEmpty()) { - if (!constraint.isPresent()) { - throw new IllegalArgumentException("Primary keys not defined in the sink `TableSchema`."); - } - if (constraint.get().getColumns().size() != primaryKeys.size() || - !constraint.get().getColumns().stream().allMatch(col -> primaryKeys.contains(col.toLowerCase()))) { - throw new IllegalArgumentException("Primary keys of the flink `TableSchema` do not match with the ones from starrocks table."); - } sinkOptions.enableUpsertDelete(); }