From dac9cd3752a62d1a89247a2abb1a0d77c0f36a41 Mon Sep 17 00:00:00 2001 From: fcfangcc Date: Thu, 9 Oct 2025 16:16:05 +0800 Subject: [PATCH 1/3] fixed iceberg compaction --- .../connectors/iceberg/sink/IcebergDataSinkFactory.java | 1 + .../flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java | 6 +++++- .../cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java | 8 ++++++++ 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java index 722f5f7bc98..d13f8f070d4 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java @@ -122,6 +122,7 @@ public Set> optionalOptions() { options.add(IcebergDataSinkOptions.PARTITION_KEY); options.add(IcebergDataSinkOptions.SINK_COMPACTION_ENABLED); options.add(IcebergDataSinkOptions.SINK_COMPACTION_COMMIT_INTERVAL); + options.add(IcebergDataSinkOptions.SINK_COMPACTION_PARALLELISM); return options; } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java index 02d693aba3b..3b3019c5fba 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java @@ -121,6 +121,10 @@ public SimpleVersionedSerializer getWriteResultSerializer() return new WriteResultWrapperSerializer(); } + public int partitionCustom(int bucket, int numPartitions) { + return Math.floorMod(bucket, numPartitions); + } + @Override public void addPostCommitTopology( DataStream> committableMessageDataStream) { @@ -136,7 +140,7 @@ public void addPostCommitTopology( // Shuffle by different table id. DataStream> keyedStream = committableMessageDataStream.partitionCustom( - (bucket, numPartitions) -> bucket % numPartitions, + this::partitionCustom, (committableMessage) -> { if (committableMessage instanceof CommittableWithLineage) { WriteResultWrapper multiTableCommittable = diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java index cb8daccbae9..b7075f4f325 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java @@ -192,4 +192,12 @@ private List fetchTableContent(Catalog catalog, TableId tableId) { } return results; } + + @Test + public void testPartitionCustom() { + IcebergSink icebergSink = + new IcebergSink(new HashMap<>(), null, null, CompactionOptions.builder().build()); + Assertions.assertThat(icebergSink.partitionCustom(-5, 4)).isEqualTo(3); + Assertions.assertThat(icebergSink.partitionCustom(5, 4)).isEqualTo(1); + } } From f6748691a85e33828bdc52cdf3333a5e1307ec79 Mon Sep 17 00:00:00 2001 From: fcfangcc Date: Tue, 14 Oct 2025 09:53:38 +0800 Subject: [PATCH 2/3] mod test and comment --- .../flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java | 6 +----- .../iceberg/sink/v2/CompactionOperatorTest.java | 3 ++- .../cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java | 8 -------- 3 files changed, 3 insertions(+), 14 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java index 3b3019c5fba..2d3269f55d5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java @@ -121,10 +121,6 @@ public SimpleVersionedSerializer getWriteResultSerializer() return new WriteResultWrapperSerializer(); } - public int partitionCustom(int bucket, int numPartitions) { - return Math.floorMod(bucket, numPartitions); - } - @Override public void addPostCommitTopology( DataStream> committableMessageDataStream) { @@ -140,7 +136,7 @@ public void addPostCommitTopology( // Shuffle by different table id. DataStream> keyedStream = committableMessageDataStream.partitionCustom( - this::partitionCustom, + Math::floorMod, (committableMessage) -> { if (committableMessage instanceof CommittableWithLineage) { WriteResultWrapper multiTableCommittable = diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java index a150e7188fa..f73a8c9217f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java @@ -150,7 +150,8 @@ public void testCompationOperator() throws IOException, InterruptedException { } CompactionOperator compactionOperator = new CompactionOperator( - catalogOptions, CompactionOptions.builder().commitInterval(1).build()); + catalogOptions, + CompactionOptions.builder().commitInterval(1).parallelism(4).build()); compactionOperator.processElement( new StreamRecord<>( new CommittableWithLineage<>( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java index b7075f4f325..cb8daccbae9 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java @@ -192,12 +192,4 @@ private List fetchTableContent(Catalog catalog, TableId tableId) { } return results; } - - @Test - public void testPartitionCustom() { - IcebergSink icebergSink = - new IcebergSink(new HashMap<>(), null, null, CompactionOptions.builder().build()); - Assertions.assertThat(icebergSink.partitionCustom(-5, 4)).isEqualTo(3); - Assertions.assertThat(icebergSink.partitionCustom(5, 4)).isEqualTo(1); - } } From e085505c4c4237b3ef1a0cd7427b40a5f05cd2aa Mon Sep 17 00:00:00 2001 From: fcfangcc Date: Tue, 14 Oct 2025 09:57:11 +0800 Subject: [PATCH 3/3] add test --- .../cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java index 8a5e5e4d0f0..126491b73f0 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java @@ -40,6 +40,7 @@ void testCreateDataSink() { Configuration conf = Configuration.fromMap(ImmutableMap.builder().build()); conf.set(IcebergDataSinkOptions.WAREHOUSE, "/tmp/warehouse"); + conf.set(IcebergDataSinkOptions.SINK_COMPACTION_PARALLELISM, 4); DataSink dataSink = sinkFactory.createDataSink( new FactoryHelper.DefaultContext(