diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java index 8fd1d2b468241..7cedadbe1d462 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java @@ -19,26 +19,44 @@ package org.apache.flink.streaming.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; import org.apache.flink.runtime.operators.testutils.MockEnvironment; -import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackendParametersImpl; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateInitializationContextImpl; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; -import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.SourceOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.operators.source.CollectingDataOutput; +import org.apache.flink.streaming.runtime.io.DataInputStatus; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorChain; -import org.apache.flink.streaming.runtime.tasks.RegularOperatorChain; -import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.TimerService; -import org.apache.flink.streaming.util.CollectorOutput; +import org.apache.flink.streaming.util.MockOutput; +import org.apache.flink.streaming.util.MockStreamConfig; import org.apache.flink.streaming.util.MockStreamTask; import org.apache.flink.streaming.util.MockStreamTaskBuilder; @@ -47,7 +65,9 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import static org.apache.flink.streaming.api.operators.StreamOperatorUtils.setProcessingTimeService; import static org.apache.flink.streaming.api.operators.StreamOperatorUtils.setupStreamOperator; @@ -163,7 +183,8 @@ void testLatencyMarkEmissionDisabledOverrideViaExecutionConfig() throws Exceptio private interface OperatorSetupOperation { void setupSourceOperator( - StreamSource operator, TestProcessingTimeService testProcessingTimeService) + SourceOperator operator, + TestProcessingTimeService testProcessingTimeService) throws Exception; } @@ -175,26 +196,52 @@ private void testLatencyMarkEmission( testProcessingTimeService.setCurrentTime(0L); final List processingTimes = Arrays.asList(1L, 10L, 11L, 21L, maxProcessingTime); - // regular stream source operator - final StreamSource operator = - new StreamSource<>( - new ProcessingTimeServiceSource( - testProcessingTimeService, processingTimes)); + ProcessingTimeServiceSource source = + new ProcessingTimeServiceSource(testProcessingTimeService, processingTimes); + + Environment env = MockEnvironment.builder().build(); + final SourceOperator operator = + createTestLatencySourceOperator(env, source, testProcessingTimeService); operatorSetup.setupSourceOperator(operator, testProcessingTimeService); - // run and wait to be stopped - OperatorChain operatorChain = - new RegularOperatorChain<>( - operator.getContainingTask(), - StreamTask.createRecordWriterDelegate( - operator.getOperatorConfig(), - new MockEnvironmentBuilder().build())); try { - operator.run(new Object(), new CollectorOutput<>(output), operatorChain); - operator.finish(); - } finally { - operatorChain.close(); + AbstractStateBackend abstractStateBackend = new HashMapStateBackend(); + Environment stateEnv = MockEnvironment.builder().build(); + CloseableRegistry cancelStreamRegistry = new CloseableRegistry(); + final OperatorStateStore operatorStateStore = + abstractStateBackend.createOperatorStateBackend( + new OperatorStateBackendParametersImpl( + stateEnv, + "test-source-operator", + Collections.emptyList(), + cancelStreamRegistry)); + + final StateInitializationContext stateContext = + new StateInitializationContextImpl(null, operatorStateStore, null, null, null); + + operator.initializeState(stateContext); + operator.open(); + } catch (Exception e) { + throw new RuntimeException("Failed to initialize operator", e); + } + + // Use CollectingDataOutput to collect the results + CollectingDataOutput dataOutput = new CollectingDataOutput<>(); + + // Run operator until completion with timeout protection + long deadline = System.currentTimeMillis() + 5000; + while (operator.emitNext(dataOutput) != DataInputStatus.END_OF_DATA) { + assertThat(System.currentTimeMillis()) + .as("Test timed out waiting for END_OF_DATA") + .isLessThan(deadline); + } + + // Extract stream elements from the output + for (Object event : dataOutput.getEvents()) { + if (event instanceof StreamElement) { + output.add((StreamElement) event); + } } assertThat(output).hasSize(numberLatencyMarkers); @@ -223,11 +270,42 @@ private void testLatencyMarkEmission( } } + private static SourceOperator createTestLatencySourceOperator( + Environment env, + ProcessingTimeServiceSource source, + TestProcessingTimeService testProcessingTimeService) + throws Exception { + + MockStreamTask mockTask = new MockStreamTaskBuilder(env).build(); + final SourceOperator operator = + new SourceOperator<>( + new StreamOperatorParameters<>( + mockTask, + new MockStreamConfig(new Configuration(), 1), + new MockOutput<>(new ArrayList<>()), + TestProcessingTimeService::new, + null, + null), + (context) -> source.createReader(context), + new MockOperatorEventGateway(), + source.getSplitSerializer(), + WatermarkStrategy.noWatermarks(), + testProcessingTimeService, + new Configuration(), + "localhost", + false, + () -> false, + Collections.emptyMap()); + + // The operator will be initialized via the setupSourceOperator method + return operator; + } + // ------------------------------------------------------------------------ @SuppressWarnings("unchecked") private static void setupSourceOperator( - StreamSource operator, + SourceOperator operator, ExecutionConfig executionConfig, Environment env, TimerService timerService) @@ -254,13 +332,17 @@ private static void setupSourceOperator( // ------------------------------------------------------------------------ - private static final class ProcessingTimeServiceSource implements SourceFunction { + /** + * A test source used solely to advance processing time during test execution. This source does + * not emit any records; it only manipulates the processing time service to trigger latency + * marker emissions at specific time intervals. + */ + private static final class ProcessingTimeServiceSource + implements Source { private final TestProcessingTimeService processingTimeService; private final List processingTimes; - private boolean cancelled = false; - private ProcessingTimeServiceSource( TestProcessingTimeService processingTimeService, List processingTimes) { this.processingTimeService = processingTimeService; @@ -268,19 +350,126 @@ private ProcessingTimeServiceSource( } @Override - public void run(SourceContext ctx) throws Exception { - for (Long processingTime : processingTimes) { - if (cancelled) { - break; + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public SourceReader createReader(SourceReaderContext readerContext) { + return new SourceReader<>() { + private int currentIndex = 0; + private final CompletableFuture available = + CompletableFuture.completedFuture(null); + + @Override + public void start() {} + + @Override + public InputStatus pollNext(ReaderOutput output) throws Exception { + if (currentIndex >= processingTimes.size()) { + return InputStatus.END_OF_INPUT; + } + + // Simulates time progression to trigger latency marker timers. + // Does not emit real records or require splits. + processingTimeService.setCurrentTime(processingTimes.get(currentIndex++)); + + return currentIndex < processingTimes.size() + ? InputStatus.MORE_AVAILABLE + : InputStatus.END_OF_INPUT; } - processingTimeService.setCurrentTime(processingTime); - } + @Override + public List snapshotState(long checkpointId) { + return Collections.emptyList(); + } + + @Override + public CompletableFuture isAvailable() { + return available; + } + + @Override + public void addSplits(List splits) {} + + @Override + public void notifyNoMoreSplits() {} + + @Override + public void close() {} + }; } @Override - public void cancel() { - cancelled = true; + public SplitEnumerator createEnumerator( + SplitEnumeratorContext enumContext) { + return new SplitEnumerator() { + @Override + public void start() {} + + @Override + public void handleSplitRequest(int subtaskId, String requesterHostname) {} + + @Override + public void addSplitsBack(List splits, int subtaskId) {} + + @Override + public void addReader(int subtaskId) {} + + @Override + public Void snapshotState(long checkpointId) { + return null; + } + + @Override + public void close() {} + }; + } + + @Override + public SplitEnumerator restoreEnumerator( + SplitEnumeratorContext enumContext, Void checkpoint) { + return createEnumerator(enumContext); + } + + @Override + public SimpleVersionedSerializer getSplitSerializer() { + return new SimpleVersionedSerializer() { + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(SourceSplit obj) { + return new byte[0]; + } + + @Override + public SourceSplit deserialize(int version, byte[] serialized) { + return null; + } + }; + } + + @Override + public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { + return new SimpleVersionedSerializer() { + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(Void obj) { + return new byte[0]; + } + + @Override + public Void deserialize(int version, byte[] serialized) { + return null; + } + }; } } }