diff --git a/build.gradle.kts b/build.gradle.kts index 7512ee2560447..b1d672986f6d3 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -244,6 +244,8 @@ tasks.register("javaPreCommit") { dependsOn(":runners:flink:1.15:job-server:build") dependsOn(":runners:flink:1.16:build") dependsOn(":runners:flink:1.16:job-server:build") + dependsOn(":runners:flink:1.17:build") + dependsOn(":runners:flink:1.17:job-server:build") dependsOn(":runners:google-cloud-dataflow-java:build") dependsOn(":runners:google-cloud-dataflow-java:examples-streaming:build") dependsOn(":runners:google-cloud-dataflow-java:examples:build") @@ -342,6 +344,7 @@ tasks.register("javaPostCommitSickbay") { dependsOn(":runners:flink:1.14:validatesRunnerSickbay") dependsOn(":runners:flink:1.15:validatesRunnerSickbay") dependsOn(":runners:flink:1.16:validatesRunnerSickbay") + dependsOn(":runners:flink:1.17:validatesRunnerSickbay") dependsOn(":runners:spark:3:job-server:validatesRunnerSickbay") dependsOn(":runners:direct-java:validatesRunnerSickbay") dependsOn(":runners:portability:java:validatesRunnerSickbay") diff --git a/gradle.properties b/gradle.properties index 8b039cd5bde12..fafb95d6ec798 100644 --- a/gradle.properties +++ b/gradle.properties @@ -37,5 +37,7 @@ javaVersion=1.8 docker_image_default_repo_root=apache docker_image_default_repo_prefix=beam_ -flink_versions=1.12,1.13,1.14,1.15,1.16 - +# supported flink versions +flink_versions=1.12,1.13,1.14,1.15,1.16,1.17 +# supported python versions +python_versions=3.7,3.8,3.9,3.10,3.11 \ No newline at end of file diff --git a/runners/flink/1.17/build.gradle b/runners/flink/1.17/build.gradle new file mode 100644 index 0000000000000..2dea7b056053a --- /dev/null +++ b/runners/flink/1.17/build.gradle @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '..' + +/* All properties required for loading the Flink build script */ +project.ext { + // Set the version of all Flink-related dependencies here. + flink_version = '1.17.0' + // Version specific code overrides. + main_source_overrides = ["${basePath}/1.12/src/main/java", "${basePath}/1.13/src/main/java", "${basePath}/1.14/src/main/java", "${basePath}/1.15/src/main/java", "${basePath}/1.16/src/main/java", './src/main/java'] + test_source_overrides = ["${basePath}/1.12/src/test/java", "${basePath}/1.13/src/test/java", "${basePath}/1.14/src/test/java", "${basePath}/1.15/src/test/java", "${basePath}/1.16/src/test/java", './src/test/java'] + main_resources_overrides = [] + test_resources_overrides = [] + archives_base_name = 'beam-runners-flink-1.17' +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_runner.gradle" diff --git a/runners/flink/1.17/job-server-container/build.gradle b/runners/flink/1.17/job-server-container/build.gradle new file mode 100644 index 0000000000000..afdb68a0fc91c --- /dev/null +++ b/runners/flink/1.17/job-server-container/build.gradle @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server-container' + +project.ext { + resource_path = basePath +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server_container.gradle" diff --git a/runners/flink/1.17/job-server/build.gradle b/runners/flink/1.17/job-server/build.gradle new file mode 100644 index 0000000000000..89915349ae9a8 --- /dev/null +++ b/runners/flink/1.17/job-server/build.gradle @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server' + +project.ext { + // Look for the source code in the parent module + main_source_dirs = ["$basePath/src/main/java"] + test_source_dirs = ["$basePath/src/test/java"] + main_resources_dirs = ["$basePath/src/main/resources"] + test_resources_dirs = ["$basePath/src/test/resources"] + archives_base_name = 'beam-runners-flink-1.17-job-server' +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server.gradle" diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java index de460ea7c2153..f092b72efe1bd 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java @@ -17,8 +17,6 @@ */ package org.apache.beam.runners.flink.translation.types; -import java.io.EOFException; -import java.io.IOException; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper; @@ -28,14 +26,17 @@ import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.core.io.VersionedIOReadableWritable; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.InstantiationUtil; import org.checkerframework.checker.nullness.qual.Nullable; +import java.io.EOFException; +import java.io.IOException; + /** * Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for Beam {@link * org.apache.beam.sdk.coders.Coder Coders}. @@ -158,26 +159,81 @@ public TypeSerializerSnapshot snapshotConfiguration() { return new LegacySnapshot<>(this); } - /** A legacy snapshot which does not care about schema compatibility. */ - public static class LegacySnapshot extends TypeSerializerConfigSnapshot { + public static class LegacySnapshot implements TypeSerializerSnapshot { + + int CURRENT_VERSION = 2; + CoderTypeSerializer serializer; - /** Needs to be public to work with {@link VersionedIOReadableWritable}. */ - public LegacySnapshot() {} + public LegacySnapshot() { + } public LegacySnapshot(CoderTypeSerializer serializer) { - setPriorSerializer(serializer); + this.serializer = serializer; } @Override - public int getVersion() { - // We always return the same version - return 1; + public int getCurrentVersion() { + return CURRENT_VERSION; + } + + @Override + public void writeSnapshot(DataOutputView out) throws IOException { + ByteArrayOutputStreamWithPos streamWithPos = new ByteArrayOutputStreamWithPos(); + InstantiationUtil.serializeObject(streamWithPos, this.serializer); + out.writeInt(streamWithPos.getPosition()); + out.write(streamWithPos.getBuf(), 0, streamWithPos.getPosition()); + } + + @Override + public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException { + switch (readVersion) { + case 1: + throw new UnsupportedOperationException( + String.format("No longer supported version [%d].", readVersion)); + case 2: + try { + int serializerBytes = in.readInt(); + byte[] buffer = new byte[serializerBytes]; + in.readFully(buffer); + this.serializer = InstantiationUtil.deserializeObject(buffer, userCodeClassLoader); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + break; + default: + throw new IllegalArgumentException("Unrecognized version: " + readVersion); + } } @Override - public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( - TypeSerializer newSerializer) { - // We assume compatibility because we don't have a way of checking schema compatibility + public TypeSerializer restoreSerializer() { + if (serializer == null) { + throw new IllegalStateException( + "Trying to restore the prior serializer but the prior serializer has not been set."); + } + return this.serializer; + } + + @Override + public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(TypeSerializer newSerializer) { + if (newSerializer.getClass() != this.getClass().getDeclaringClass()) { + return TypeSerializerSchemaCompatibility.incompatible(); + } + + CoderTypeSerializer coderTypeSerializer = (CoderTypeSerializer) newSerializer; + + if (this.serializer == null) { + return TypeSerializerSchemaCompatibility.incompatible(); + } + + if (!this.serializer.coder.getEncodedTypeDescriptor().getType().getTypeName().equals(coderTypeSerializer.coder.getEncodedTypeDescriptor().getType().getTypeName())) { + return TypeSerializerSchemaCompatibility.incompatible(); + } + + if (!this.serializer.pipelineOptions.toString().equals(coderTypeSerializer.pipelineOptions.toString())) { + return TypeSerializerSchemaCompatibility.incompatible(); + } + return TypeSerializerSchemaCompatibility.compatibleAsIs(); } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java index 3c4e43bd339f8..1e3ef97c775a7 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializerTest.java @@ -74,7 +74,9 @@ private void testWriteAndReadConfigSnapshot(Coder coder) throws IOExcept TypeSerializerSnapshot readSnapshot = new CoderTypeSerializer.LegacySnapshot(); readSnapshot.readSnapshot( writtenSnapshot.getCurrentVersion(), outView.getInputView(), getClass().getClassLoader()); + CoderTypeSerializer restoreSerializer = (CoderTypeSerializer) readSnapshot.restoreSerializer(); - assertThat(readSnapshot.restoreSerializer(), is(serializer)); + assertThat(restoreSerializer, is(serializer)); + assertThat("TypeSerializerSchemaCompatibility should be compatible", writtenSnapshot.resolveSchemaCompatibility(restoreSerializer).isCompatibleAsIs() == true); } } diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index ee0b1095fa28c..1ec448e0d6f3e 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1463,7 +1463,7 @@ def _add_argparse_args(cls, parser): class FlinkRunnerOptions(PipelineOptions): # These should stay in sync with gradle.properties. - PUBLISHED_FLINK_VERSIONS = ['1.12', '1.13', '1.14', '1.15', '1.16'] + PUBLISHED_FLINK_VERSIONS = ['1.12', '1.13', '1.14', '1.15', '1.16', '1.17'] @classmethod def _add_argparse_args(cls, parser): diff --git a/settings.gradle.kts b/settings.gradle.kts index bed62bc0a223f..7b5ae1114a46d 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -113,6 +113,10 @@ include(":runners:flink:1.15:job-server-container") include(":runners:flink:1.16") include(":runners:flink:1.16:job-server") include(":runners:flink:1.16:job-server-container") +// Flink 1.17 +include(":runners:flink:1.17") +include(":runners:flink:1.17:job-server") +include(":runners:flink:1.17:job-server-container") /* End Flink Runner related settings */ include(":runners:twister2") include(":runners:google-cloud-dataflow-java")