Skip to content

Commit

Permalink
add flag in FlinkPipelineOptions to allow draining for pipelines with…
Browse files Browse the repository at this point in the history
… RequiresStableInput
  • Loading branch information
Kanishk Karanawat committed Oct 18, 2023
1 parent 65b2dcf commit 4ca8d3a
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,13 @@ public interface FlinkPipelineOptions

void setFlinkConfDir(String confDir);

@Description("Allow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining," +
"the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator.")
@Default.Boolean(false)
Boolean getEnableStableInputDrain();

void setEnableStableInputDrain(Boolean enableStableInputDrain);

static FlinkPipelineOptions defaults() {
return PipelineOptionsFactory.as(FlinkPipelineOptions.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ public class DoFnOperator<InputT, OutputT>
/** If true, we must process elements only after a checkpoint is finished. */
final boolean requiresStableInput;

final boolean enableStableInputDrain;

final int numConcurrentCheckpoints;

private final boolean usesOnWindowExpiration;
Expand Down Expand Up @@ -323,6 +325,8 @@ public DoFnOperator(
+ Math.max(0, flinkOptions.getMinPauseBetweenCheckpoints()));
}

this.enableStableInputDrain = flinkOptions.getEnableStableInputDrain();

this.numConcurrentCheckpoints = flinkOptions.getNumConcurrentCheckpoints();

this.finishBundleBeforeCheckpointing = flinkOptions.getFinishBundleBeforeCheckpointing();
Expand Down Expand Up @@ -626,7 +630,7 @@ void flushData() throws Exception {
while (bundleStarted) {
invokeFinishBundle();
}
if (requiresStableInput) {
if (requiresStableInput && enableStableInputDrain) {
// Flush any buffered events here before draining the pipeline. Note that this is best-effort
// and requiresStableInput contract might be violated in cases where buffer processing fails.
bufferingDoFnRunner.checkpointCompleted(Long.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2020,6 +2020,7 @@ public void testExactlyOnceBufferingFlushDuringDrain() throws Exception {
FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setMaxBundleSize(2L);
options.setCheckpointingInterval(1L);
options.setEnableStableInputDrain(true);

TupleTag<String> outputTag = new TupleTag<>("main-output");
WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
<td>Disable Beam metrics in Flink Runner</td>
<td>Default: <code>false</code></td>
</tr>
<tr>
<td><code>enableStableInputDrain</code></td>
<td>Allow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining,the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator.</td>
<td>Default: <code>false</code></td>
</tr>
<tr>
<td><code>executionModeForBatch</code></td>
<td>Flink mode for data exchange of batch pipelines. Reference {@link org.apache.flink.api.common.ExecutionMode}. Set this to BATCH_FORCED if pipelines get blocked, see https://issues.apache.org/jira/browse/FLINK-10672</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
<td>Disable Beam metrics in Flink Runner</td>
<td>Default: <code>false</code></td>
</tr>
<tr>
<td><code>enable_stable_input_drain</code></td>
<td>Allow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining,the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator.</td>
<td>Default: <code>false</code></td>
</tr>
<tr>
<td><code>execution_mode_for_batch</code></td>
<td>Flink mode for data exchange of batch pipelines. Reference {@link org.apache.flink.api.common.ExecutionMode}. Set this to BATCH_FORCED if pipelines get blocked, see https://issues.apache.org/jira/browse/FLINK-10672</td>
Expand Down

0 comments on commit 4ca8d3a

Please sign in to comment.