Skip to content

Commit

Permalink
[FLINK-28048][connectors] Introduce Source API alternative to FiniteT…
Browse files Browse the repository at this point in the history
…estSource (apache#23777)
  • Loading branch information
afedulov authored Apr 19, 2024
1 parent 3977982 commit 7c4dec6
Show file tree
Hide file tree
Showing 29 changed files with 768 additions and 166 deletions.

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions flink-connectors/flink-connector-datagen/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,20 @@
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.datagen.functions;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;

/**
* A stream generator function that returns elements from the collection based on their index.
*
* <p>This generator function serializes the elements using Flink's type information. That way, any
* object transport using Java serialization will not be affected by the serializability of the
* elements.
*
* @param <OUT> The type of elements returned by this function.
*/
@Internal
public class IndexLookupGeneratorFunction<OUT> implements GeneratorFunction<Long, OUT> {

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(IndexLookupGeneratorFunction.class);

/** The (de)serializer to be used for the data elements. */
private final TypeSerializer<OUT> serializer;

/** The actual data elements, in serialized form. */
private byte[] elementsSerialized;

private int numElements;

private transient DataInputView input;

private transient Map<Long, OUT> lookupMap;

public IndexLookupGeneratorFunction(TypeInformation<OUT> typeInfo, Iterable<OUT> elements) {
this(typeInfo, new ExecutionConfig(), elements);
}

public IndexLookupGeneratorFunction(
TypeInformation<OUT> typeInfo, ExecutionConfig config, Iterable<OUT> elements) {
// must not have null elements and mixed elements
checkIterable(elements, typeInfo.getTypeClass());
this.serializer = typeInfo.createSerializer(config);
trySerialize(elements);
}

@VisibleForTesting
@Nullable
public TypeSerializer<OUT> getSerializer() {
return serializer;
}

@Override
public void open(SourceReaderContext readerContext) throws Exception {
ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized);
this.input = new DataInputViewStreamWrapper(bais);
lookupMap = new HashMap<>();
buildLookup();
}

@Override
public OUT map(Long index) throws Exception {
return lookupMap.get(index);
}

/**
* Verifies that all elements in the iterable are non-null, and are of the given class, or a
* subclass thereof.
*
* @param elements The iterable to check.
* @param viewedAs The class to which the elements must be assignable to.
*/
private void checkIterable(Iterable<OUT> elements, Class<?> viewedAs) {
for (OUT elem : elements) {
numElements++;
if (elem == null) {
throw new IllegalArgumentException("The collection contains a null element");
}

if (!viewedAs.isAssignableFrom(elem.getClass())) {
throw new IllegalArgumentException(
"The elements in the collection are not all subclasses of "
+ viewedAs.getCanonicalName());
}
}
}

private void serializeElements(Iterable<OUT> elements) throws IOException {
Preconditions.checkState(serializer != null, "serializer not set");
LOG.info("Serializing elements using {}", serializer);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);

try {
for (OUT element : elements) {
serializer.serialize(element, wrapper);
}
} catch (Exception e) {
throw new IOException("Serializing the source elements failed: " + e.getMessage(), e);
}
this.elementsSerialized = baos.toByteArray();
}

private OUT tryDeserialize() throws IOException {
try {
return serializer.deserialize(input);
} catch (EOFException eof) {
throw new NoSuchElementException(
"Reached the end of the collection. This could be caused by issues with the "
+ "serializer or by calling the map() function more times than there "
+ "are elements in the collection. Make sure that you set the number "
+ "of records to be produced by the DataGeneratorSource equal to the "
+ "number of elements in the collection.");
} catch (Exception e) {
throw new IOException(
"Failed to deserialize an element from the source. "
+ "If you are using user-defined serialization (Value and Writable "
+ "types), check the serialization functions.\nSerializer is "
+ serializer,
e);
}
}

private void buildLookup() throws IOException {
for (long i = 0; i < numElements; i++) {
lookupMap.put(i, tryDeserialize());
}
}

private void trySerialize(Iterable<OUT> elements) {
try {
serializeElements(elements);
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public DataGeneratorSource(
rateLimiterStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
}

private DataGeneratorSource(
DataGeneratorSource(
SourceReaderFactory<OUT, NumberSequenceSplit> sourceReaderFactory,
GeneratorFunction<Long, OUT> generatorFunction,
long count,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.datagen.source;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceReaderBase;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.util.FlinkRuntimeException;

import javax.annotation.Nullable;

import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.function.BooleanSupplier;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A {@link SourceReader} that synchronizes emission of N elements on the arrival of the checkpoint
* barriers It 1) emits a list of elements without checkpoints in-between, 2) then waits for two
* checkpoints to complete, 3) then re-emits the same elements before 4) waiting for another two
* checkpoints and 5) exiting.
*
* <p>This lockstep execution is possible because {@code pollNext} and {@code snapshotState} are
* executed in the same thread and the fact that {@code pollNext} can emit N elements at once. This
* reader is meant to be used solely for testing purposes as the substitution for the {@code
* FiniteTestSource} which implements the deprecated {@code SourceFunction} API.
*/
@Experimental
public class DoubleEmittingSourceReaderWithCheckpointsInBetween<
E, O, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
extends IteratorSourceReaderBase<E, O, IterT, SplitT> {

private final GeneratorFunction<E, O> generatorFunction;

private BooleanSupplier allowedToExit;
private int snapshotsCompleted;
private int snapshotsToWaitFor;
private boolean done;

public DoubleEmittingSourceReaderWithCheckpointsInBetween(
SourceReaderContext context,
GeneratorFunction<E, O> generatorFunction,
@Nullable BooleanSupplier allowedToExit) {
super(context);
this.generatorFunction = checkNotNull(generatorFunction);
this.allowedToExit = allowedToExit;
}

public DoubleEmittingSourceReaderWithCheckpointsInBetween(
SourceReaderContext context, GeneratorFunction<E, O> generatorFunction) {
super(context);
this.generatorFunction = checkNotNull(generatorFunction);
}

// ------------------------------------------------------------------------

@Override
public void start(SourceReaderContext context) {
try {
generatorFunction.open(context);
} catch (Exception e) {
throw new FlinkRuntimeException("Failed to open the GeneratorFunction", e);
}
}

@Override
public InputStatus pollNext(ReaderOutput<O> output) {
// This is the termination path after the test data has been emitted twice
if (done) {
if (allowedToExit != null) { // Termination is controlled externally
return allowedToExit.getAsBoolean()
? InputStatus.END_OF_INPUT
: InputStatus.NOTHING_AVAILABLE;
} else {
return InputStatus.END_OF_INPUT;
}
}
// This is the initial path
if (currentSplit == null) {
InputStatus inputStatus = tryMoveToNextSplit();
switch (inputStatus) {
case MORE_AVAILABLE:
emitElements(output);
break;
case END_OF_INPUT:
// This can happen if the source parallelism is larger than the number of
// available splits
return inputStatus;
}
} else {
// This is the path that emits the same split the second time
emitElements(output);
done = true;
}
availability = new CompletableFuture<>();
return InputStatus.NOTHING_AVAILABLE;
}

private void emitElements(ReaderOutput<O> output) {
iterator = currentSplit.getIterator();
while (iterator.hasNext()) {
E next = iterator.next();
O converted = convert(next);
output.collect(converted);
}
// Always wait for two snapshots after emitting the elements
snapshotsToWaitFor = 2;
snapshotsCompleted = 0;
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
snapshotsCompleted++;
if (snapshotsCompleted >= snapshotsToWaitFor) {
availability.complete(null);
}

if (allowedToExit != null) {
if (allowedToExit.getAsBoolean()) {
availability.complete(null);
}
}
}

@Override
protected O convert(E value) {
try {
return generatorFunction.map(value);
} catch (Exception e) {
String message =
String.format(
"A user-provided generator function threw an exception on this input: %s",
value.toString());
throw new FlinkRuntimeException(message, e);
}
}
}
Loading

0 comments on commit 7c4dec6

Please sign in to comment.