Skip to content

Commit

Permalink
fix flink-release 1.17 CoderTypeSerializer related bug
Browse files Browse the repository at this point in the history
  • Loading branch information
Kanishk Karanawat committed Dec 1, 2023
1 parent 63fe1ba commit 8cbe2d7
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

import java.io.EOFException;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;

/**
* Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for Beam {@link
Expand Down Expand Up @@ -230,14 +232,25 @@ public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSeria
return TypeSerializerSchemaCompatibility.incompatible();
}

if (!this.serializer.pipelineOptions.toString().equals(coderTypeSerializer.pipelineOptions.toString())) {
if (this.serializer.coder.getCoderArguments().size() != coderTypeSerializer.coder.getCoderArguments().size()) {
return TypeSerializerSchemaCompatibility.incompatible();
}

if (!getCoderArguments(this.serializer.coder.getCoderArguments()).equals(getCoderArguments(coderTypeSerializer.coder.getCoderArguments()))) {
return TypeSerializerSchemaCompatibility.incompatible();
}

return TypeSerializerSchemaCompatibility.compatibleAsIs();
}
}

private static List<String> getCoderArguments(List<? extends Coder<?>> coderArgList) {
return coderArgList.stream()
.map(coder -> coder.getEncodedTypeDescriptor().getType().getTypeName())
.sorted()
.collect(Collectors.toList());
}

@Override
public String toString() {
return "CoderTypeSerializer{" + "coder=" + coder + '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@
*/
package org.apache.beam.runners.flink.translation.types;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;

import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
Expand All @@ -33,6 +31,9 @@
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.junit.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;

/** Tests {@link CoderTypeSerializer}. */
@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
Expand Down

0 comments on commit 8cbe2d7

Please sign in to comment.