Skip to content

Commit

Permalink
CDAP-20176 : Save offset in state store for at least once processing
Browse files Browse the repository at this point in the history
  • Loading branch information
sgarg-CS committed Feb 7, 2023
1 parent 9ef5836 commit 6fdde59
Show file tree
Hide file tree
Showing 10 changed files with 559 additions and 81 deletions.
72 changes: 52 additions & 20 deletions confluent-kafka-plugins/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
<exclusions>
<!-- The one from spark should be used -->
<exclusion>
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
Expand All @@ -69,7 +76,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka10.version}</version>
<exclusions>
<exclusion>
Expand All @@ -84,16 +91,16 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark2.version}</version>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>${spark3.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<artifactId>kafka_2.12</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_2.11</artifactId>
<artifactId>spark-tags_2.12</artifactId>
</exclusion>
<exclusion>
<groupId>net.jpountz.lz4</groupId>
Expand All @@ -103,8 +110,8 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark2.version}</version>
<artifactId>spark-mllib_2.12</artifactId>
<version>${spark3.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
Expand All @@ -115,14 +122,14 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark2.version}</version>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark3.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark2.version}</version>
<artifactId>spark-core_2.12</artifactId>
<version>${spark3.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -173,19 +180,19 @@
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-spark-core2_2.11</artifactId>
<artifactId>cdap-spark-core3_2.12</artifactId>
<version>${cdap.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-data-pipeline2_2.11</artifactId>
<artifactId>cdap-data-pipeline3_2.12</artifactId>
<version>${cdap.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-data-streams2_2.11</artifactId>
<artifactId>cdap-data-streams3_2.12</artifactId>
<version>${cdap.version}</version>
<scope>test</scope>
</dependency>
Expand Down Expand Up @@ -221,19 +228,44 @@

<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.1</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>3.3.0</version>
<version>3.5.1</version>
<configuration>
<instructions>
<_exportcontents>
io.cdap.plugin.confluent.*;
org.apache.spark.streaming.kafka010.*;
org.apache.kafka.common.*;
org.apache.kafka.common.serialization.*;
org.apache.kafka.*;
io.confluent.kafka.serializers.*;
org.apache.kafka.clients.*;
</_exportcontents>
<Embed-Dependency>*;inline=false;scope=compile</Embed-Dependency>
<Embed-Transitive>true</Embed-Transitive>
Expand All @@ -255,8 +287,8 @@
<version>1.1.0</version>
<configuration>
<cdapArtifacts>
<parent>system:cdap-data-pipeline[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
<parent>system:cdap-data-streams[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
<parent>system:cdap-data-pipeline[6.8.0,7.0.0-SNAPSHOT)</parent>
<parent>system:cdap-data-streams[6.8.0,7.0.0-SNAPSHOT)</parent>
</cdapArtifacts>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.streaming.StreamingContext;
import io.cdap.cdap.etl.api.streaming.StreamingSource;
import io.cdap.cdap.etl.api.streaming.StreamingStateHandler;
import io.cdap.plugin.common.Constants;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -45,7 +48,9 @@
@Plugin(type = StreamingSource.PLUGIN_TYPE)
@Name(ConfluentStreamingSource.PLUGIN_NAME)
@Description("Confluent Kafka streaming source.")
public class ConfluentStreamingSource extends StreamingSource<StructuredRecord> {
public class ConfluentStreamingSource extends StreamingSource<StructuredRecord> implements StreamingStateHandler {

private static final Logger LOG = LoggerFactory.getLogger(ConfluentStreamingSource.class);
public static final String PLUGIN_NAME = "Confluent";

private final ConfluentStreamingSourceConfig conf;
Expand Down Expand Up @@ -79,6 +84,7 @@ public JavaDStream<StructuredRecord> getStream(StreamingContext context) throws
collector.getOrThrowException();

context.registerLineage(conf.referenceName);

return ConfluentStreamingSourceUtil.getStructuredRecordJavaDStream(context, conf, outputSchema, collector);
}

Expand Down
Loading

0 comments on commit 6fdde59

Please sign in to comment.