diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index 02d4a895cddcb..68c50960794a7 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -299,6 +299,13 @@ public interface FlinkPipelineOptions void setFlinkConfDir(String confDir); + @Description("Allow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining," + + "the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator.") + @Default.Boolean(false) + Boolean getEnableStableInputDrain(); + + void setEnableStableInputDrain(Boolean enableStableInputDrain); + static FlinkPipelineOptions defaults() { return PipelineOptionsFactory.as(FlinkPipelineOptions.class); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 8fe3afbe5bde4..394aaf50c06d9 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -199,6 +199,8 @@ public class DoFnOperator /** If true, we must process elements only after a checkpoint is finished. */ final boolean requiresStableInput; + final boolean enableStableInputDrain; + final int numConcurrentCheckpoints; private final boolean usesOnWindowExpiration; @@ -323,6 +325,8 @@ public DoFnOperator( + Math.max(0, flinkOptions.getMinPauseBetweenCheckpoints())); } + this.enableStableInputDrain = flinkOptions.getEnableStableInputDrain(); + this.numConcurrentCheckpoints = flinkOptions.getNumConcurrentCheckpoints(); this.finishBundleBeforeCheckpointing = flinkOptions.getFinishBundleBeforeCheckpointing(); @@ -626,7 +630,7 @@ void flushData() throws Exception { while (bundleStarted) { invokeFinishBundle(); } - if (requiresStableInput) { + if (requiresStableInput && enableStableInputDrain) { // Flush any buffered events here before draining the pipeline. Note that this is best-effort // and requiresStableInput contract might be violated in cases where buffer processing fails. bufferingDoFnRunner.checkpointCompleted(Long.MAX_VALUE); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java index b0365db9adbf7..ee5bc90093e8d 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java @@ -2020,6 +2020,7 @@ public void testExactlyOnceBufferingFlushDuringDrain() throws Exception { FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setMaxBundleSize(2L); options.setCheckpointingInterval(1L); + options.setEnableStableInputDrain(true); TupleTag outputTag = new TupleTag<>("main-output"); WindowedValue.ValueOnlyWindowedValueCoder windowedValueCoder = diff --git a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html index ba8b597aaeeb0..87d69ee60fe32 100644 --- a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html +++ b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html @@ -52,6 +52,11 @@ Disable Beam metrics in Flink Runner Default: false + + enableStableInputDrain + Allow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining,the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator. + Default: false + executionModeForBatch Flink mode for data exchange of batch pipelines. Reference {@link org.apache.flink.api.common.ExecutionMode}. Set this to BATCH_FORCED if pipelines get blocked, see https://issues.apache.org/jira/browse/FLINK-10672 diff --git a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html index 5293f35e6a1e3..27ae27ad05a3e 100644 --- a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html +++ b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html @@ -52,6 +52,11 @@ Disable Beam metrics in Flink Runner Default: false + + enable_stable_input_drain + Allow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining,the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator. + Default: false + execution_mode_for_batch Flink mode for data exchange of batch pipelines. Reference {@link org.apache.flink.api.common.ExecutionMode}. Set this to BATCH_FORCED if pipelines get blocked, see https://issues.apache.org/jira/browse/FLINK-10672