Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Integration Test Cases #2

Open
wants to merge 6 commits into
base: feature/CDAP-20176-atleast-once-processing
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 45 additions & 20 deletions confluent-kafka-plugins/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka10.version}</version>
<exclusions>
<exclusion>
Expand All @@ -84,16 +84,16 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark2.version}</version>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>${spark3.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<artifactId>kafka_2.12</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_2.11</artifactId>
<artifactId>spark-tags_2.12</artifactId>
</exclusion>
<exclusion>
<groupId>net.jpountz.lz4</groupId>
Expand All @@ -103,8 +103,8 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark2.version}</version>
<artifactId>spark-mllib_2.12</artifactId>
<version>${spark3.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
Expand All @@ -115,14 +115,14 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark2.version}</version>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark3.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark2.version}</version>
<artifactId>spark-core_2.12</artifactId>
<version>${spark3.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -173,19 +173,19 @@
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-spark-core2_2.11</artifactId>
<artifactId>cdap-spark-core3_2.12</artifactId>
<version>${cdap.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-data-pipeline2_2.11</artifactId>
<artifactId>cdap-data-pipeline3_2.12</artifactId>
<version>${cdap.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-data-streams2_2.11</artifactId>
<artifactId>cdap-data-streams3_2.12</artifactId>
<version>${cdap.version}</version>
<scope>test</scope>
</dependency>
Expand Down Expand Up @@ -221,19 +221,44 @@

<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.1</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>3.3.0</version>
<version>3.5.1</version>
<configuration>
<instructions>
<_exportcontents>
io.cdap.plugin.confluent.*;
org.apache.spark.streaming.kafka010.*;
org.apache.kafka.common.*;
org.apache.kafka.common.serialization.*;
org.apache.kafka.*;
io.confluent.kafka.serializers.*;
org.apache.kafka.clients.*;
</_exportcontents>
<Embed-Dependency>*;inline=false;scope=compile</Embed-Dependency>
<Embed-Transitive>true</Embed-Transitive>
Expand All @@ -255,8 +280,8 @@
<version>1.1.0</version>
<configuration>
<cdapArtifacts>
<parent>system:cdap-data-pipeline[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
<parent>system:cdap-data-streams[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
<parent>system:cdap-data-pipeline[6.8.0-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
<parent>system:cdap-data-streams[6.8.0-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
</cdapArtifacts>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.plugin.confluent.streaming.source;

import com.google.gson.Gson;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
Expand All @@ -27,25 +28,47 @@
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.streaming.StreamingContext;
import io.cdap.cdap.etl.api.streaming.StreamingSource;
import io.cdap.cdap.etl.api.streaming.StreamingStateHandler;
import io.cdap.plugin.batch.source.KafkaPartitionOffsets;
import io.cdap.plugin.common.Constants;
import io.cdap.plugin.confluent.source.ConfluentDStream;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.kafka010.OffsetRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* Confluent Kafka Streaming source.
*/
@Plugin(type = StreamingSource.PLUGIN_TYPE)
@Name(ConfluentStreamingSource.PLUGIN_NAME)
@Description("Confluent Kafka streaming source.")
public class ConfluentStreamingSource extends StreamingSource<StructuredRecord> {
public class ConfluentStreamingSource extends StreamingSource<StructuredRecord> implements StreamingStateHandler {

private static final Logger LOG = LoggerFactory.getLogger(ConfluentStreamingSource.class);
private static final Gson gson = new Gson();
public static final String PLUGIN_NAME = "Confluent";

private final ConfluentStreamingSourceConfig conf;
Expand Down Expand Up @@ -79,7 +102,32 @@ public JavaDStream<StructuredRecord> getStream(StreamingContext context) throws
collector.getOrThrowException();

context.registerLineage(conf.referenceName);
return ConfluentStreamingSourceUtil.getStructuredRecordJavaDStream(context, conf, outputSchema, collector);
JavaInputDStream<ConsumerRecord<Object, Object>> javaInputDStream = ConfluentStreamingSourceUtil
.getConsumerRecordJavaDStream(context, conf, outputSchema, collector, getStateSupplier(context));

JavaDStream<StructuredRecord> javaDStream;
javaDStream = ConfluentStreamingSourceUtil.getStructuredRecordJavaDStream(javaInputDStream,
new ConfluentStreamingSourceUtil.RecordTransform(conf, outputSchema));

if (conf.getSchemaRegistryUrl() != null) {
ConfluentStreamingSourceUtil.AvroRecordTransform transform =
new ConfluentStreamingSourceUtil.AvroRecordTransform(conf, outputSchema);
javaDStream = ConfluentStreamingSourceUtil.getStructuredRecordJavaDStream(javaInputDStream, transform);
}

if (!context.isStateStoreEnabled()) {
// Return the serializable Dstream in case checkpointing is enabled.
return javaDStream;
}

// Use the DStream that is state aware

ConfluentDStream confluentDStream = new ConfluentDStream(context.getSparkStreamingContext().ssc(),
javaInputDStream.inputDStream(),
ConfluentStreamingSourceUtil
.getRecordTransformFunction(conf, outputSchema),
getStateConsumer(context));
return confluentDStream.convertToJavaDStream();
}

private Schema getOutputSchema(FailureCollector failureCollector) {
Expand Down Expand Up @@ -138,4 +186,55 @@ private Schema fetchSchema(CachedSchemaRegistryClient schemaRegistryClient, Stri
}
return Schema.parseJson(schemaMetadata.getSchema());
}

private VoidFunction<OffsetRange[]> getStateConsumer(StreamingContext context) {
return offsetRanges -> {
try {
saveState(context, offsetRanges);
} catch (IOException e) {
LOG.warn("Exception in saving state.", e);
}
};
}

private void saveState(StreamingContext context, OffsetRange[] offsetRanges) throws IOException {
if (offsetRanges.length > 0) {
Map<Integer, Long> partitionOffsetMap = Arrays.stream(offsetRanges)
.collect(Collectors.toMap(OffsetRange::partition, OffsetRange::untilOffset));
byte[] state = gson.toJson(new KafkaPartitionOffsets(partitionOffsetMap)).getBytes(StandardCharsets.UTF_8);
context.saveState(conf.getTopic(), state);
}
}

private Supplier<Map<TopicPartition, Long>> getStateSupplier(StreamingContext context) {
return () -> {
try {
return getSavedState(context);
} catch (IOException e) {
throw new RuntimeException("Exception in fetching state.", e);
}
};
}

private Map<TopicPartition, Long> getSavedState(StreamingContext context) throws IOException {
//State store is not enabled, do not read state
if (!context.isStateStoreEnabled()) {
return Collections.emptyMap();
}

//If state is not present, use configured offsets or defaults
Optional<byte[]> state = context.getState(conf.getTopic());
if (!state.isPresent()) {
return Collections.emptyMap();
}

byte[] bytes = state.get();
try (Reader reader = new InputStreamReader(new ByteArrayInputStream(bytes), StandardCharsets.UTF_8)) {
KafkaPartitionOffsets partitionOffsets = gson.fromJson(reader, KafkaPartitionOffsets.class);
return partitionOffsets.getPartitionOffsets().entrySet()
.stream()
.collect(Collectors.toMap(partitionOffset -> new TopicPartition(conf.getTopic(), partitionOffset.getKey()),
Map.Entry::getValue));
}
}
}
Loading