Skip to content

[FLINK-32695] [Tests] Migrate ProcessingTimeServiceSource from Source Function to Source V2 API in StreamSourceOperatorLatencyMetricsTest #26813

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,44 @@
package org.apache.flink.streaming.runtime.operators;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackendParametersImpl;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.RegularOperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.streaming.util.CollectorOutput;
import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.apache.flink.streaming.util.MockStreamTask;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;

Expand All @@ -47,7 +65,9 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.streaming.api.operators.StreamOperatorUtils.setProcessingTimeService;
import static org.apache.flink.streaming.api.operators.StreamOperatorUtils.setupStreamOperator;
Expand Down Expand Up @@ -163,7 +183,8 @@ void testLatencyMarkEmissionDisabledOverrideViaExecutionConfig() throws Exceptio

private interface OperatorSetupOperation {
void setupSourceOperator(
StreamSource<Long, ?> operator, TestProcessingTimeService testProcessingTimeService)
SourceOperator<Long, SourceSplit> operator,
TestProcessingTimeService testProcessingTimeService)
throws Exception;
}

Expand All @@ -175,26 +196,52 @@ private void testLatencyMarkEmission(
testProcessingTimeService.setCurrentTime(0L);
final List<Long> processingTimes = Arrays.asList(1L, 10L, 11L, 21L, maxProcessingTime);

// regular stream source operator
final StreamSource<Long, ProcessingTimeServiceSource> operator =
new StreamSource<>(
new ProcessingTimeServiceSource(
testProcessingTimeService, processingTimes));
ProcessingTimeServiceSource source =
new ProcessingTimeServiceSource(testProcessingTimeService, processingTimes);

Environment env = MockEnvironment.builder().build();
final SourceOperator<Long, SourceSplit> operator =
createTestLatencySourceOperator(env, source, testProcessingTimeService);

operatorSetup.setupSourceOperator(operator, testProcessingTimeService);

// run and wait to be stopped
OperatorChain<?, ?> operatorChain =
new RegularOperatorChain<>(
operator.getContainingTask(),
StreamTask.createRecordWriterDelegate(
operator.getOperatorConfig(),
new MockEnvironmentBuilder().build()));
try {
operator.run(new Object(), new CollectorOutput<>(output), operatorChain);
operator.finish();
} finally {
operatorChain.close();
AbstractStateBackend abstractStateBackend = new HashMapStateBackend();
Environment stateEnv = MockEnvironment.builder().build();
CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
final OperatorStateStore operatorStateStore =
abstractStateBackend.createOperatorStateBackend(
new OperatorStateBackendParametersImpl(
stateEnv,
"test-source-operator",
Collections.emptyList(),
cancelStreamRegistry));

final StateInitializationContext stateContext =
new StateInitializationContextImpl(null, operatorStateStore, null, null, null);

operator.initializeState(stateContext);
operator.open();
} catch (Exception e) {
throw new RuntimeException("Failed to initialize operator", e);
}

// Use CollectingDataOutput to collect the results
CollectingDataOutput<Long> dataOutput = new CollectingDataOutput<>();

// Run operator until completion with timeout protection
long deadline = System.currentTimeMillis() + 5000;
while (operator.emitNext(dataOutput) != DataInputStatus.END_OF_DATA) {
assertThat(System.currentTimeMillis())
.as("Test timed out waiting for END_OF_DATA")
.isLessThan(deadline);
}

// Extract stream elements from the output
for (Object event : dataOutput.getEvents()) {
if (event instanceof StreamElement) {
output.add((StreamElement) event);
}
}

assertThat(output).hasSize(numberLatencyMarkers);
Expand Down Expand Up @@ -223,11 +270,42 @@ private void testLatencyMarkEmission(
}
}

private static SourceOperator<Long, SourceSplit> createTestLatencySourceOperator(
Environment env,
ProcessingTimeServiceSource source,
TestProcessingTimeService testProcessingTimeService)
throws Exception {

MockStreamTask mockTask = new MockStreamTaskBuilder(env).build();
final SourceOperator<Long, SourceSplit> operator =
new SourceOperator<>(
new StreamOperatorParameters<>(
mockTask,
new MockStreamConfig(new Configuration(), 1),
new MockOutput<>(new ArrayList<>()),
TestProcessingTimeService::new,
null,
null),
(context) -> source.createReader(context),
new MockOperatorEventGateway(),
source.getSplitSerializer(),
WatermarkStrategy.noWatermarks(),
testProcessingTimeService,
new Configuration(),
"localhost",
false,
() -> false,
Collections.emptyMap());

// The operator will be initialized via the setupSourceOperator method
return operator;
}

// ------------------------------------------------------------------------

@SuppressWarnings("unchecked")
private static <T> void setupSourceOperator(
StreamSource<T, ?> operator,
SourceOperator<T, SourceSplit> operator,
ExecutionConfig executionConfig,
Environment env,
TimerService timerService)
Expand All @@ -254,33 +332,144 @@ private static <T> void setupSourceOperator(

// ------------------------------------------------------------------------

private static final class ProcessingTimeServiceSource implements SourceFunction<Long> {
/**
* A test source used solely to advance processing time during test execution. This source does
* not emit any records; it only manipulates the processing time service to trigger latency
* marker emissions at specific time intervals.
*/
private static final class ProcessingTimeServiceSource
implements Source<Long, SourceSplit, Void> {

private final TestProcessingTimeService processingTimeService;
private final List<Long> processingTimes;

private boolean cancelled = false;

private ProcessingTimeServiceSource(
TestProcessingTimeService processingTimeService, List<Long> processingTimes) {
this.processingTimeService = processingTimeService;
this.processingTimes = processingTimes;
}

@Override
public void run(SourceContext<Long> ctx) throws Exception {
for (Long processingTime : processingTimes) {
if (cancelled) {
break;
public Boundedness getBoundedness() {
return Boundedness.BOUNDED;
}

@Override
public SourceReader<Long, SourceSplit> createReader(SourceReaderContext readerContext) {
return new SourceReader<>() {
private int currentIndex = 0;
private final CompletableFuture<Void> available =
CompletableFuture.completedFuture(null);

@Override
public void start() {}

@Override
public InputStatus pollNext(ReaderOutput<Long> output) throws Exception {
if (currentIndex >= processingTimes.size()) {
return InputStatus.END_OF_INPUT;
}

// Simulates time progression to trigger latency marker timers.
// Does not emit real records or require splits.
processingTimeService.setCurrentTime(processingTimes.get(currentIndex++));

return currentIndex < processingTimes.size()
? InputStatus.MORE_AVAILABLE
: InputStatus.END_OF_INPUT;
}

processingTimeService.setCurrentTime(processingTime);
}
@Override
public List<SourceSplit> snapshotState(long checkpointId) {
return Collections.emptyList();
}

@Override
public CompletableFuture<Void> isAvailable() {
return available;
}

@Override
public void addSplits(List<SourceSplit> splits) {}

@Override
public void notifyNoMoreSplits() {}

@Override
public void close() {}
};
}

@Override
public void cancel() {
cancelled = true;
public SplitEnumerator<SourceSplit, Void> createEnumerator(
SplitEnumeratorContext<SourceSplit> enumContext) {
return new SplitEnumerator<SourceSplit, Void>() {
@Override
public void start() {}

@Override
public void handleSplitRequest(int subtaskId, String requesterHostname) {}

@Override
public void addSplitsBack(List<SourceSplit> splits, int subtaskId) {}

@Override
public void addReader(int subtaskId) {}

@Override
public Void snapshotState(long checkpointId) {
return null;
}

@Override
public void close() {}
};
}

@Override
public SplitEnumerator<SourceSplit, Void> restoreEnumerator(
SplitEnumeratorContext<SourceSplit> enumContext, Void checkpoint) {
return createEnumerator(enumContext);
}

@Override
public SimpleVersionedSerializer<SourceSplit> getSplitSerializer() {
return new SimpleVersionedSerializer<SourceSplit>() {
@Override
public int getVersion() {
return 0;
}

@Override
public byte[] serialize(SourceSplit obj) {
return new byte[0];
}

@Override
public SourceSplit deserialize(int version, byte[] serialized) {
return null;
}
};
}

@Override
public SimpleVersionedSerializer<Void> getEnumeratorCheckpointSerializer() {
return new SimpleVersionedSerializer<Void>() {
@Override
public int getVersion() {
return 0;
}

@Override
public byte[] serialize(Void obj) {
return new byte[0];
}

@Override
public Void deserialize(int version, byte[] serialized) {
return null;
}
};
}
}
}