Skip to content

Commit

Permalink
flink-release 1.17
Browse files Browse the repository at this point in the history
(cherry picked from commit b006a7c)
  • Loading branch information
Kanishk Karanawat committed Oct 5, 2023
1 parent ec204be commit 9c3c918
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 19 deletions.
3 changes: 3 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ tasks.register("javaPreCommit") {
dependsOn(":runners:flink:1.15:job-server:build")
dependsOn(":runners:flink:1.16:build")
dependsOn(":runners:flink:1.16:job-server:build")
dependsOn(":runners:flink:1.17:build")
dependsOn(":runners:flink:1.17:job-server:build")
dependsOn(":runners:google-cloud-dataflow-java:build")
dependsOn(":runners:google-cloud-dataflow-java:examples-streaming:build")
dependsOn(":runners:google-cloud-dataflow-java:examples:build")
Expand Down Expand Up @@ -342,6 +344,7 @@ tasks.register("javaPostCommitSickbay") {
dependsOn(":runners:flink:1.14:validatesRunnerSickbay")
dependsOn(":runners:flink:1.15:validatesRunnerSickbay")
dependsOn(":runners:flink:1.16:validatesRunnerSickbay")
dependsOn(":runners:flink:1.17:validatesRunnerSickbay")
dependsOn(":runners:spark:3:job-server:validatesRunnerSickbay")
dependsOn(":runners:direct-java:validatesRunnerSickbay")
dependsOn(":runners:portability:java:validatesRunnerSickbay")
Expand Down
6 changes: 4 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,7 @@ javaVersion=1.8
docker_image_default_repo_root=apache
docker_image_default_repo_prefix=beam_

flink_versions=1.12,1.13,1.14,1.15,1.16

# supported flink versions
flink_versions=1.12,1.13,1.14,1.15,1.16,1.17
# supported python versions
python_versions=3.7,3.8,3.9,3.10,3.11
34 changes: 34 additions & 0 deletions runners/flink/1.17/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '..'

/* All properties required for loading the Flink build script */
project.ext {
// Set the version of all Flink-related dependencies here.
flink_version = '1.17.0'
// Version specific code overrides.
main_source_overrides = ["${basePath}/1.12/src/main/java", "${basePath}/1.13/src/main/java", "${basePath}/1.14/src/main/java", "${basePath}/1.15/src/main/java", "${basePath}/1.16/src/main/java", './src/main/java']
test_source_overrides = ["${basePath}/1.12/src/test/java", "${basePath}/1.13/src/test/java", "${basePath}/1.14/src/test/java", "${basePath}/1.15/src/test/java", "${basePath}/1.16/src/test/java", './src/test/java']
main_resources_overrides = []
test_resources_overrides = []
archives_base_name = 'beam-runners-flink-1.17'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_runner.gradle"
26 changes: 26 additions & 0 deletions runners/flink/1.17/job-server-container/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '../../job-server-container'

project.ext {
resource_path = basePath
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_job_server_container.gradle"
31 changes: 31 additions & 0 deletions runners/flink/1.17/job-server/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '../../job-server'

project.ext {
// Look for the source code in the parent module
main_source_dirs = ["$basePath/src/main/java"]
test_source_dirs = ["$basePath/src/test/java"]
main_resources_dirs = ["$basePath/src/main/resources"]
test_resources_dirs = ["$basePath/src/test/resources"]
archives_base_name = 'beam-runners-flink-1.17-job-server'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_job_server.gradle"
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.beam.runners.flink.translation.types;

import java.io.EOFException;
import java.io.IOException;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
Expand All @@ -28,14 +26,17 @@
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.io.VersionedIOReadableWritable;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.InstantiationUtil;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.io.EOFException;
import java.io.IOException;

/**
* Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for Beam {@link
* org.apache.beam.sdk.coders.Coder Coders}.
Expand Down Expand Up @@ -158,26 +159,81 @@ public TypeSerializerSnapshot<T> snapshotConfiguration() {
return new LegacySnapshot<>(this);
}

/** A legacy snapshot which does not care about schema compatibility. */
public static class LegacySnapshot<T> extends TypeSerializerConfigSnapshot<T> {
public static class LegacySnapshot<T> implements TypeSerializerSnapshot<T> {

int CURRENT_VERSION = 2;
CoderTypeSerializer<T> serializer;

/** Needs to be public to work with {@link VersionedIOReadableWritable}. */
public LegacySnapshot() {}
public LegacySnapshot() {
}

public LegacySnapshot(CoderTypeSerializer<T> serializer) {
setPriorSerializer(serializer);
this.serializer = serializer;
}

@Override
public int getVersion() {
// We always return the same version
return 1;
public int getCurrentVersion() {
return CURRENT_VERSION;
}

@Override
public void writeSnapshot(DataOutputView out) throws IOException {
ByteArrayOutputStreamWithPos streamWithPos = new ByteArrayOutputStreamWithPos();
InstantiationUtil.serializeObject(streamWithPos, this.serializer);
out.writeInt(streamWithPos.getPosition());
out.write(streamWithPos.getBuf(), 0, streamWithPos.getPosition());
}

@Override
public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
switch (readVersion) {
case 1:
throw new UnsupportedOperationException(
String.format("No longer supported version [%d].", readVersion));
case 2:
try {
int serializerBytes = in.readInt();
byte[] buffer = new byte[serializerBytes];
in.readFully(buffer);
this.serializer = InstantiationUtil.deserializeObject(buffer, userCodeClassLoader);
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
break;
default:
throw new IllegalArgumentException("Unrecognized version: " + readVersion);
}
}

@Override
public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
TypeSerializer<T> newSerializer) {
// We assume compatibility because we don't have a way of checking schema compatibility
public TypeSerializer<T> restoreSerializer() {
if (serializer == null) {
throw new IllegalStateException(
"Trying to restore the prior serializer but the prior serializer has not been set.");
}
return this.serializer;
}

@Override
public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
if (newSerializer.getClass() != this.getClass().getDeclaringClass()) {
return TypeSerializerSchemaCompatibility.incompatible();
}

CoderTypeSerializer<T> coderTypeSerializer = (CoderTypeSerializer<T>) newSerializer;

if (this.serializer == null) {
return TypeSerializerSchemaCompatibility.incompatible();
}

if (!this.serializer.coder.getEncodedTypeDescriptor().getType().getTypeName().equals(coderTypeSerializer.coder.getEncodedTypeDescriptor().getType().getTypeName())) {
return TypeSerializerSchemaCompatibility.incompatible();
}

if (!this.serializer.pipelineOptions.toString().equals(coderTypeSerializer.pipelineOptions.toString())) {
return TypeSerializerSchemaCompatibility.incompatible();
}

return TypeSerializerSchemaCompatibility.compatibleAsIs();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ private void testWriteAndReadConfigSnapshot(Coder<String> coder) throws IOExcept
TypeSerializerSnapshot readSnapshot = new CoderTypeSerializer.LegacySnapshot();
readSnapshot.readSnapshot(
writtenSnapshot.getCurrentVersion(), outView.getInputView(), getClass().getClassLoader());
CoderTypeSerializer<String> restoreSerializer = (CoderTypeSerializer<String>) readSnapshot.restoreSerializer();

assertThat(readSnapshot.restoreSerializer(), is(serializer));
assertThat(restoreSerializer, is(serializer));
assertThat("TypeSerializerSchemaCompatibility should be compatible", writtenSnapshot.resolveSchemaCompatibility(restoreSerializer).isCompatibleAsIs() == true);
}
}
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1463,7 +1463,7 @@ def _add_argparse_args(cls, parser):
class FlinkRunnerOptions(PipelineOptions):

# These should stay in sync with gradle.properties.
PUBLISHED_FLINK_VERSIONS = ['1.12', '1.13', '1.14', '1.15', '1.16']
PUBLISHED_FLINK_VERSIONS = ['1.12', '1.13', '1.14', '1.15', '1.16', '1.17']

@classmethod
def _add_argparse_args(cls, parser):
Expand Down
4 changes: 4 additions & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ include(":runners:flink:1.15:job-server-container")
include(":runners:flink:1.16")
include(":runners:flink:1.16:job-server")
include(":runners:flink:1.16:job-server-container")
// Flink 1.17
include(":runners:flink:1.17")
include(":runners:flink:1.17:job-server")
include(":runners:flink:1.17:job-server-container")
/* End Flink Runner related settings */
include(":runners:twister2")
include(":runners:google-cloud-dataflow-java")
Expand Down

0 comments on commit 9c3c918

Please sign in to comment.