From 8cbe2d71dc1063888ffff667aed617a068598d39 Mon Sep 17 00:00:00 2001 From: Kanishk Karanawat Date: Fri, 1 Dec 2023 10:31:29 -0800 Subject: [PATCH] fix flink-release 1.17 CoderTypeSerializer related bug --- .../translation/types/CoderTypeSerializer.java | 15 ++++++++++++++- .../types/CoderTypeSerializerTest.java | 7 ++++--- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java index f092b72efe1bd..d7d92fdf1e8bb 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java @@ -36,6 +36,8 @@ import java.io.EOFException; import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; /** * Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for Beam {@link @@ -230,7 +232,11 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(TypeSeria return TypeSerializerSchemaCompatibility.incompatible(); } - if (!this.serializer.pipelineOptions.toString().equals(coderTypeSerializer.pipelineOptions.toString())) { + if (this.serializer.coder.getCoderArguments().size() != coderTypeSerializer.coder.getCoderArguments().size()) { + return TypeSerializerSchemaCompatibility.incompatible(); + } + + if (!getCoderArguments(this.serializer.coder.getCoderArguments()).equals(getCoderArguments(coderTypeSerializer.coder.getCoderArguments()))) { return TypeSerializerSchemaCompatibility.incompatible(); } @@ -238,6 +244,13 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(TypeSeria } } + private static List getCoderArguments(List> coderArgList) { + return coderArgList.stream() + .map(coder -> coder.getEncodedTypeDescriptor().getType().getTypeName()) + .sorted() + .collect(Collectors.toList()); + } + @Override public String toString() { return "CoderTypeSerializer{" + "coder=" + coder + '}'; diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java index 1e3ef97c775a7..81048a4ea5405 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java @@ -17,13 +17,11 @@ */ package org.apache.beam.runners.flink.translation.types; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.Is.is; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; + import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; @@ -33,6 +31,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.junit.Test; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + /** Tests {@link CoderTypeSerializer}. */ @SuppressWarnings({ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)