diff --git a/confluent-kafka-plugins/pom.xml b/confluent-kafka-plugins/pom.xml index 8fd5d47..683513f 100644 --- a/confluent-kafka-plugins/pom.xml +++ b/confluent-kafka-plugins/pom.xml @@ -56,6 +56,13 @@ org.apache.avro avro ${avro.version} + + + + com.thoughtworks.paranamer + paranamer + + io.confluent @@ -69,7 +76,7 @@ org.apache.kafka - kafka_2.11 + kafka_2.12 ${kafka10.version} @@ -84,16 +91,16 @@ org.apache.spark - spark-streaming-kafka-0-10_2.11 - ${spark2.version} + spark-streaming-kafka-0-10_2.12 + ${spark3.version} org.apache.kafka - kafka_2.11 + kafka_2.12 org.apache.spark - spark-tags_2.11 + spark-tags_2.12 net.jpountz.lz4 @@ -103,8 +110,8 @@ org.apache.spark - spark-mllib_2.11 - ${spark2.version} + spark-mllib_2.12 + ${spark3.version} provided @@ -115,14 +122,14 @@ org.apache.spark - spark-streaming_2.11 - ${spark2.version} + spark-streaming_2.12 + ${spark3.version} provided org.apache.spark - spark-core_2.11 - ${spark2.version} + spark-core_2.12 + ${spark3.version} provided @@ -173,19 +180,19 @@ io.cdap.cdap - cdap-spark-core2_2.11 + cdap-spark-core3_2.12 ${cdap.version} test io.cdap.cdap - cdap-data-pipeline2_2.11 + cdap-data-pipeline3_2.12 ${cdap.version} test io.cdap.cdap - cdap-data-streams2_2.11 + cdap-data-streams3_2.12 ${cdap.version} test @@ -221,19 +228,44 @@ + + net.alchim31.maven + scala-maven-plugin + 3.3.1 + + + compile + + compile + + compile + + + test-compile + + testCompile + + test-compile + + + process-resources + + compile + + + + org.apache.felix maven-bundle-plugin - 3.3.0 + 3.5.1 <_exportcontents> io.cdap.plugin.confluent.*; org.apache.spark.streaming.kafka010.*; - org.apache.kafka.common.*; - org.apache.kafka.common.serialization.*; + org.apache.kafka.*; io.confluent.kafka.serializers.*; - org.apache.kafka.clients.*; *;inline=false;scope=compile true @@ -255,8 +287,8 @@ 1.1.0 - system:cdap-data-pipeline[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT) - system:cdap-data-streams[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT) + system:cdap-data-pipeline[6.8.0,7.0.0-SNAPSHOT) + system:cdap-data-streams[6.8.0,7.0.0-SNAPSHOT) diff --git a/confluent-kafka-plugins/src/main/java/io/cdap/plugin/confluent/streaming/source/ConfluentStreamingSource.java b/confluent-kafka-plugins/src/main/java/io/cdap/plugin/confluent/streaming/source/ConfluentStreamingSource.java index 8f4aed6..86068ec 100644 --- a/confluent-kafka-plugins/src/main/java/io/cdap/plugin/confluent/streaming/source/ConfluentStreamingSource.java +++ b/confluent-kafka-plugins/src/main/java/io/cdap/plugin/confluent/streaming/source/ConfluentStreamingSource.java @@ -27,11 +27,14 @@ import io.cdap.cdap.etl.api.StageConfigurer; import io.cdap.cdap.etl.api.streaming.StreamingContext; import io.cdap.cdap.etl.api.streaming.StreamingSource; +import io.cdap.cdap.etl.api.streaming.StreamingStateHandler; import io.cdap.plugin.common.Constants; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import org.apache.spark.streaming.api.java.JavaDStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -45,7 +48,9 @@ @Plugin(type = StreamingSource.PLUGIN_TYPE) @Name(ConfluentStreamingSource.PLUGIN_NAME) @Description("Confluent Kafka streaming source.") -public class ConfluentStreamingSource extends StreamingSource { +public class ConfluentStreamingSource extends StreamingSource implements StreamingStateHandler { + + private static final Logger LOG = LoggerFactory.getLogger(ConfluentStreamingSource.class); public static final String PLUGIN_NAME = "Confluent"; private final ConfluentStreamingSourceConfig conf; @@ -79,6 +84,7 @@ public JavaDStream getStream(StreamingContext context) throws collector.getOrThrowException(); context.registerLineage(conf.referenceName); + return ConfluentStreamingSourceUtil.getStructuredRecordJavaDStream(context, conf, outputSchema, collector); } diff --git a/confluent-kafka-plugins/src/main/java/io/cdap/plugin/confluent/streaming/source/ConfluentStreamingSourceUtil.java b/confluent-kafka-plugins/src/main/java/io/cdap/plugin/confluent/streaming/source/ConfluentStreamingSourceUtil.java index ccfcd9e..0f97063 100644 --- a/confluent-kafka-plugins/src/main/java/io/cdap/plugin/confluent/streaming/source/ConfluentStreamingSourceUtil.java +++ b/confluent-kafka-plugins/src/main/java/io/cdap/plugin/confluent/streaming/source/ConfluentStreamingSourceUtil.java @@ -20,6 +20,7 @@ import com.google.common.base.Strings; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import com.google.gson.Gson; import io.cdap.cdap.api.data.format.FormatSpecification; import io.cdap.cdap.api.data.format.RecordFormat; import io.cdap.cdap.api.data.format.StructuredRecord; @@ -28,7 +29,9 @@ import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.streaming.StreamingContext; import io.cdap.cdap.format.RecordFormats; +import io.cdap.plugin.batch.source.KafkaPartitionOffsets; import io.cdap.plugin.confluent.common.KafkaHelpers; +import io.cdap.plugin.confluent.source.ConfluentDStream; import io.cdap.plugin.format.avro.AvroToStructuredTransformer; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import org.apache.avro.generic.GenericRecord; @@ -46,24 +49,35 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Time; import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; +import org.apache.spark.streaming.kafka010.OffsetRange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; import javax.annotation.Nonnull; /** @@ -74,48 +88,124 @@ */ final class ConfluentStreamingSourceUtil { private static final Logger LOG = LoggerFactory.getLogger(ConfluentStreamingSourceUtil.class); + private static final Gson gson = new Gson(); private ConfluentStreamingSourceUtil() { // no-op } /** - * Returns {@link JavaDStream} for {@link ConfluentStreamingSource}. - * @param context streaming context + * Returns {@link JavaInputDStream} for {@link ConfluentStreamingSource}. + * + * @param context streaming context * @param conf kafka conf - * @param outputSchema source output schema * @param collector failure collector + * @param stateSupplier state supplier */ - static JavaDStream getStructuredRecordJavaDStream( - StreamingContext context, ConfluentStreamingSourceConfig conf, Schema outputSchema, FailureCollector collector) { + static JavaInputDStream> getConsumerRecordJavaDStream( + StreamingContext context, ConfluentStreamingSourceConfig conf, FailureCollector collector, + Supplier> stateSupplier) { String pipelineName = context.getPipelineName(); Map kafkaParams = getConsumerParams(conf, pipelineName); Properties properties = new Properties(); properties.putAll(kafkaParams); try (Consumer consumer = new KafkaConsumer<>(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { - Map offsets = getOffsets(conf, collector, consumer); + Map offsets = getOffsets(conf, collector, consumer, stateSupplier); LOG.info("Using initial offsets {}", offsets); + return KafkaUtils.createDirectStream( + context.getSparkStreamingContext(), LocationStrategies.PreferConsistent(), + ConsumerStrategies.Subscribe(Collections.singleton(conf.getTopic()), kafkaParams, offsets) + ); + } + } + + /** + * Returns {@link JavaDStream} for {@link ConfluentStreamingSource}. + * + * @param context streaming context + * @param conf kafka conf + * @param outputSchema output schema + * @param collector failure collector + */ + static JavaDStream getStructuredRecordJavaDStream(StreamingContext context, + ConfluentStreamingSourceConfig conf, Schema outputSchema, FailureCollector collector) { + JavaInputDStream> javaInputDStream = getConsumerRecordJavaDStream(context, conf, + collector, getStateSupplier(context, conf)); + + if (!context.isStateStoreEnabled()) { + // Return the serializable DStream in case checkpointing is enabled. if (conf.getSchemaRegistryUrl() != null) { - AvroRecordTransform transform = new AvroRecordTransform(conf, outputSchema); - return createKafkaDirectStream(context, conf, kafkaParams, offsets, transform); + return javaInputDStream.transform(new AvroRecordTransform(conf, outputSchema)); + } else { + return javaInputDStream.transform(new RecordTransform(conf, outputSchema)); + } + } + + // Use the DStream that is state aware + + ConfluentDStream confluentDStream = new ConfluentDStream(context.getSparkStreamingContext().ssc(), + javaInputDStream.inputDStream(), + getTransformFunction(conf, outputSchema), + getStateConsumer(context, conf)); + return confluentDStream.convertToJavaDStream(); + + } + + private static VoidFunction getStateConsumer(StreamingContext context, + ConfluentStreamingSourceConfig conf) { + return offsetRanges -> { + try { + saveState(context, offsetRanges, conf); + } catch (IOException e) { + LOG.warn("Exception in saving state.", e); } - return createKafkaDirectStream(context, conf, kafkaParams, offsets, new RecordTransform(conf, outputSchema)); + }; + } + + private static void saveState(StreamingContext context, OffsetRange[] offsetRanges, + ConfluentStreamingSourceConfig conf) throws IOException { + if (offsetRanges.length > 0) { + Map partitionOffsetMap = Arrays.stream(offsetRanges) + .collect(Collectors.toMap(OffsetRange::partition, OffsetRange::untilOffset)); + byte[] state = gson.toJson(new KafkaPartitionOffsets(partitionOffsetMap)).getBytes(StandardCharsets.UTF_8); + context.saveState(conf.getTopic(), state); } } - private static JavaDStream createKafkaDirectStream( - StreamingContext context, - ConfluentStreamingSourceConfig conf, - Map kafkaParams, - Map offsets, - Function2>, Time, JavaRDD> transform - ) { - return KafkaUtils.createDirectStream( - context.getSparkStreamingContext(), LocationStrategies.PreferConsistent(), - ConsumerStrategies.Subscribe(Collections.singleton(conf.getTopic()), kafkaParams, offsets) - ).transform(transform); + private static Supplier> getStateSupplier(StreamingContext context, + ConfluentStreamingSourceConfig conf) { + return () -> { + try { + return getSavedState(context, conf); + } catch (IOException e) { + throw new RuntimeException("Exception in fetching state.", e); + } + }; + } + + private static Map getSavedState(StreamingContext context, ConfluentStreamingSourceConfig conf) + throws IOException { + //State store is not enabled, do not read state + if (!context.isStateStoreEnabled()) { + return Collections.emptyMap(); + } + + //If state is not present, use configured offsets or defaults + Optional state = context.getState(conf.getTopic()); + if (!state.isPresent()) { + return Collections.emptyMap(); + } + + byte[] bytes = state.get(); + try (Reader reader = new InputStreamReader(new ByteArrayInputStream(bytes), StandardCharsets.UTF_8)) { + KafkaPartitionOffsets partitionOffsets = gson.fromJson(reader, KafkaPartitionOffsets.class); + return partitionOffsets.getPartitionOffsets().entrySet() + .stream() + .collect(Collectors.toMap(partitionOffset -> new TopicPartition(conf.getTopic(), partitionOffset.getKey()), + Map.Entry::getValue)); + } } @Nonnull @@ -166,10 +256,10 @@ private static Map getConsumerParams(ConfluentStreamingSourceCon @Nonnull private static Map getOffsets(ConfluentStreamingSourceConfig conf, FailureCollector collector, - Consumer consumer) { - Map offsets = conf.getInitialPartitionOffsets( - getPartitions(consumer, conf, collector), collector); - collector.getOrThrowException(); + Consumer consumer, + Supplier> stateSupplier) { + + Map offsets = getInitialPartitionOffsets(conf, stateSupplier, consumer, collector); // KafkaUtils doesn't understand -1 and -2 as smallest offset and latest offset. // so we have to replace them with the actual smallest and latest @@ -202,6 +292,23 @@ private static Map getOffsets(ConfluentStreamingSourceConf return offsets; } + static Map getInitialPartitionOffsets(ConfluentStreamingSourceConfig conf, + Supplier> stateSupplier, + Consumer consumer, + FailureCollector collector) { + Map savedPartitions = stateSupplier.get(); + if (!savedPartitions.isEmpty()) { + LOG.info("Saved partitions found {}. ", savedPartitions); + return savedPartitions; + } + + LOG.info("No saved partitions found."); + Map offsets = conf.getInitialPartitionOffsets( + getPartitions(consumer, conf, collector), collector); + collector.getOrThrowException(); + return offsets; + } + private static Set getPartitions(Consumer consumer, ConfluentStreamingSourceConfig conf, FailureCollector collector) { Set partitions = conf.getPartitions(collector); @@ -218,10 +325,19 @@ private static Set getPartitions(Consumer consumer, Con return partitions; } + private static Function2, Time, StructuredRecord> + getTransformFunction(ConfluentStreamingSourceConfig conf, Schema outputSchema) { + if (conf.getSchemaRegistryUrl() != null) { + return new AvroFunction(conf, outputSchema); + } else { + return conf.getFormat() == null ? new BytesFunction(conf, outputSchema) : new FormatFunction(conf, outputSchema); + } + } + /** * Applies the format function to each rdd. */ - private static class AvroRecordTransform + static class AvroRecordTransform implements Function2>, Time, JavaRDD> { private final ConfluentStreamingSourceConfig conf; @@ -234,15 +350,19 @@ private static class AvroRecordTransform @Override public JavaRDD call(JavaRDD> input, Time batchTime) { - return input.map(new AvroFunction(batchTime.milliseconds(), conf, outputSchema)); + Function2, Time, StructuredRecord> recordFunction = + new AvroFunction(conf, outputSchema); + + return input.map((Function, StructuredRecord>) consumerRecord -> + recordFunction.call(consumerRecord, batchTime)); } } /** * Applies the format function to each rdd. */ - private static class RecordTransform - implements Function2>, Time, JavaRDD> { + static class RecordTransform + implements Function2>, Time, JavaRDD> { private final ConfluentStreamingSourceConfig conf; private final Schema outputSchema; @@ -253,11 +373,12 @@ private static class RecordTransform } @Override - public JavaRDD call(JavaRDD> input, Time batchTime) { - Function, StructuredRecord> recordFunction = conf.getFormat() == null ? - new BytesFunction(batchTime.milliseconds(), conf, outputSchema) : - new FormatFunction(batchTime.milliseconds(), conf, outputSchema); - return input.map(recordFunction); + public JavaRDD call(JavaRDD> input, Time batchTime) { + Function2, Time, StructuredRecord> recordFunction = conf.getFormat() == null ? + new BytesFunction(conf, outputSchema) : + new FormatFunction(conf, outputSchema); + return input.map((Function, StructuredRecord>) consumerRecord -> + recordFunction.call(consumerRecord, batchTime)); } } @@ -265,26 +386,24 @@ public JavaRDD call(JavaRDD> in * Common logic for transforming kafka key, message, partition, and offset into a structured record. * Everything here should be serializable, as Spark Streaming will serialize all functions. */ - private abstract static class BaseFunction implements Function, StructuredRecord> { + private abstract static class BaseFunction implements Function2, Time, StructuredRecord> { protected final ConfluentStreamingSourceConfig conf; - private final long ts; private final Schema outputSchema; - BaseFunction(long ts, ConfluentStreamingSourceConfig conf, Schema outputSchema) { - this.ts = ts; + BaseFunction(ConfluentStreamingSourceConfig conf, Schema outputSchema) { this.conf = conf; this.outputSchema = outputSchema; } @Override - public StructuredRecord call(ConsumerRecord in) throws Exception { + public StructuredRecord call(ConsumerRecord in, Time batchTime) throws Exception { String timeField = conf.getTimeField(); String keyField = conf.getKeyField(); String partitionField = conf.getPartitionField(); String offsetField = conf.getOffsetField(); StructuredRecord.Builder builder = StructuredRecord.builder(outputSchema); if (timeField != null) { - builder.set(timeField, ts); + builder.set(timeField, batchTime.milliseconds()); } if (keyField != null) { builder.set(keyField, convertKey(in.key())); @@ -304,20 +423,20 @@ public StructuredRecord call(ConsumerRecord in) throws Exception { protected abstract void addMessage(StructuredRecord.Builder builder, V message) throws Exception; } - private abstract static class BinaryBaseFunction extends BaseFunction { - BinaryBaseFunction(long ts, ConfluentStreamingSourceConfig conf, Schema outputSchema) { - super(ts, conf, outputSchema); + private abstract static class BinaryBaseFunction extends BaseFunction { + BinaryBaseFunction(ConfluentStreamingSourceConfig conf, Schema outputSchema) { + super(conf, outputSchema); } @Override - protected Object convertKey(byte[] key) { + protected Object convertKey(Object key) { if (key == null) { return null; } Schema keySchemaNullable = conf.getSchema().getField(conf.getKeyField()).getSchema(); Schema keySchema = keySchemaNullable.isNullable() ? keySchemaNullable.getNonNullable() : keySchemaNullable; if (keySchema.getType() == Schema.Type.STRING) { - return new String(key, StandardCharsets.UTF_8); + return new String((byte[]) key, StandardCharsets.UTF_8); } if (keySchema.getType() == Schema.Type.BYTES) { return key; @@ -333,12 +452,12 @@ protected Object convertKey(byte[] key) { private static class BytesFunction extends BinaryBaseFunction { private transient String messageField; - BytesFunction(long ts, ConfluentStreamingSourceConfig conf, Schema outputSchema) { - super(ts, conf, outputSchema); + BytesFunction(ConfluentStreamingSourceConfig conf, Schema outputSchema) { + super(conf, outputSchema); } @Override - protected void addMessage(StructuredRecord.Builder builder, byte[] message) { + protected void addMessage(StructuredRecord.Builder builder, Object message) { builder.set(getMessageField(), message); } @@ -367,12 +486,12 @@ private String getMessageField() { private static class FormatFunction extends BinaryBaseFunction { private transient RecordFormat recordFormat; - FormatFunction(long ts, ConfluentStreamingSourceConfig conf, Schema outputSchema) { - super(ts, conf, outputSchema); + FormatFunction(ConfluentStreamingSourceConfig conf, Schema outputSchema) { + super(conf, outputSchema); } @Override - protected void addMessage(StructuredRecord.Builder builder, byte[] message) throws Exception { + protected void addMessage(StructuredRecord.Builder builder, Object message) throws Exception { // first time this was called, initialize record format if (recordFormat == null) { Schema messageSchema = conf.getMessageSchema(); @@ -380,7 +499,7 @@ protected void addMessage(StructuredRecord.Builder builder, byte[] message) thro recordFormat = RecordFormats.createInitializedFormat(spec); } - StructuredRecord messageRecord = recordFormat.read(ByteBuffer.wrap(message)); + StructuredRecord messageRecord = recordFormat.read(ByteBuffer.wrap((byte[]) message)); for (Schema.Field field : messageRecord.getSchema().getFields()) { String fieldName = field.getName(); builder.set(fieldName, messageRecord.get(fieldName)); @@ -391,8 +510,8 @@ protected void addMessage(StructuredRecord.Builder builder, byte[] message) thro private static class AvroFunction extends BaseFunction { private transient AvroToStructuredTransformer transformer; - AvroFunction(long ts, ConfluentStreamingSourceConfig conf, Schema outputSchema) { - super(ts, conf, outputSchema); + AvroFunction(ConfluentStreamingSourceConfig conf, Schema outputSchema) { + super(conf, outputSchema); } @Override diff --git a/confluent-kafka-plugins/src/main/scala/io/cdap/plugin/confluent/source/ConfluentDStream.scala b/confluent-kafka-plugins/src/main/scala/io/cdap/plugin/confluent/source/ConfluentDStream.scala new file mode 100644 index 0000000..6fdfded --- /dev/null +++ b/confluent-kafka-plugins/src/main/scala/io/cdap/plugin/confluent/source/ConfluentDStream.scala @@ -0,0 +1,74 @@ +/* + * Copyright © 2022 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.confluent.source + +import io.cdap.cdap.api.data.format.StructuredRecord +import io.cdap.cdap.etl.api.streaming.StreamingEventHandler +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.spark.api.java.function.{Function2, VoidFunction} +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.api.java.JavaDStream +import org.apache.spark.streaming.dstream.{DStream, InputDStream} +import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange} +import org.apache.spark.streaming.{Duration, StreamingContext, Time} + +/** + * DStream that implements {@link StreamingEventHandler} . + * This DStream will keep the Confluent offsets for each batch RDD before applying the _transformFunction. + * On calling onBatchCompleted, the _stateConsumer will be provided with these offsets. + * + * @param _ssc Spark streaming context + * @param _kafkaDStream DStream created through KafkaUtil.createDirectStream + * @param _transformFunction Function for transforming consumer record into structured record + * @param _stateConsumer Consumer function for the state produced + */ +class ConfluentDStream(_ssc: StreamingContext, + _kafkaDStream: InputDStream[ConsumerRecord[Object, Object]], + _transformFunction: Function2[ConsumerRecord[Object, Object], Time, StructuredRecord], + _stateConsumer: VoidFunction[Array[OffsetRange]]) + extends DStream[StructuredRecord](_ssc) with StreamingEventHandler { + + // For keeping the offsets in each batch + private var offsetRanges: Array[OffsetRange] = Array[OffsetRange]() + + override def slideDuration: Duration = _kafkaDStream.slideDuration + + override def dependencies: List[DStream[_]] = List(_kafkaDStream) + + override def compute(validTime: Time): Option[RDD[StructuredRecord]] = { + val rddOption = _kafkaDStream.compute(validTime) + val transformFn = _transformFunction; + // If there is a RDD produced, cache the offsetRanges for the batch and then transform to RDD[StructuredRecord] + rddOption.map(rdd => { + offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + rdd.map(record => transformFn.call(record, validTime)) + }) + } + + override def onBatchCompleted(context: io.cdap.cdap.etl.api.streaming.StreamingContext): Unit = { + _stateConsumer.call(offsetRanges) + } + + /** + * Convert this to a {@link JavaDStream} + * + * @return JavaDStream + */ + def convertToJavaDStream(): JavaDStream[StructuredRecord] = { + JavaDStream.fromDStream(this) + } +} diff --git a/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/sink/ConfluentStreamingSinkTest.java b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/sink/ConfluentStreamingSinkTest.java index 8bb7408..8fd0790 100644 --- a/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/sink/ConfluentStreamingSinkTest.java +++ b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/sink/ConfluentStreamingSinkTest.java @@ -279,7 +279,7 @@ private List> waitForRecordsInKafka(Consumer c throws Exception { List> result = new ArrayList<>(); Stopwatch stopwatch = new Stopwatch(); - while (result.size() < expectedMessages && stopwatch.elapsed(TimeUnit.SECONDS) < 10) { + while (result.size() < expectedMessages && stopwatch.elapsedTime(TimeUnit.SECONDS) < 10) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { result.add(record); diff --git a/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceStateStoreTest.java b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceStateStoreTest.java new file mode 100644 index 0000000..1c27b52 --- /dev/null +++ b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceStateStoreTest.java @@ -0,0 +1,230 @@ +/* + * Copyright © 2022 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.confluent.integration.streaming.source; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.gson.Gson; +import io.cdap.cdap.api.app.AppStateStore; +import io.cdap.cdap.api.artifact.ArtifactRange; +import io.cdap.cdap.api.artifact.ArtifactSummary; +import io.cdap.cdap.api.artifact.ArtifactVersion; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.dataset.table.Table; +import io.cdap.cdap.common.utils.Tasks; +import io.cdap.cdap.datastreams.DataStreamsApp; +import io.cdap.cdap.datastreams.DataStreamsSparkLauncher; +import io.cdap.cdap.etl.api.streaming.StreamingSource; +import io.cdap.cdap.etl.mock.batch.MockSink; +import io.cdap.cdap.etl.mock.test.HydratorTestBase; +import io.cdap.cdap.etl.proto.v2.DataStreamsConfig; +import io.cdap.cdap.etl.proto.v2.ETLPlugin; +import io.cdap.cdap.etl.proto.v2.ETLStage; +import io.cdap.cdap.proto.ProgramRunStatus; +import io.cdap.cdap.proto.artifact.AppRequest; +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.id.ArtifactId; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.test.ApplicationManager; +import io.cdap.cdap.test.DataSetManager; +import io.cdap.cdap.test.SparkManager; +import io.cdap.cdap.test.TestBase; +import io.cdap.cdap.test.TestConfiguration; +import io.cdap.plugin.batch.source.KafkaPartitionOffsets; +import io.cdap.plugin.common.Constants; +import io.cdap.plugin.common.http.HTTPPollConfig; +import io.cdap.plugin.confluent.integration.KafkaTestUtils; +import io.cdap.plugin.confluent.source.ConfluentDStream; +import io.cdap.plugin.confluent.streaming.source.ConfluentStreamingSource; +import io.cdap.plugin.confluent.streaming.source.ConfluentStreamingSourceConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.requests.ListOffsetRequest; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.spark.streaming.kafka010.KafkaUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.ByteArrayInputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + +/** + * Tests for Confluent streaming source with state store + */ +public class ConfluentStreamingSourceStateStoreTest extends HydratorTestBase { + + // Turn on state tracking. + @ClassRule + public static final TestConfiguration CONFIG = + new TestConfiguration("explore.enabled", false, + "feature.streaming.pipeline.native.state.tracking.enabled", "true"); + private static final Gson GSON = new Gson(); + private static final ArtifactId DATASTREAMS_ARTIFACT_ID = + NamespaceId.DEFAULT.artifact("data-streams", "6.8.0"); + private static final ArtifactSummary DATASTREAMS_ARTIFACT = + new ArtifactSummary("data-streams", "6.8.0"); + private static final String SRC_STAGE_NAME = "source"; + private static final String TOPIC_NAME = "users"; + @ClassRule + public static TemporaryFolder tmpFolder = new TemporaryFolder(); + private static KafkaProducer kafkaProducer; + private static KafkaProducer kafkaAvroProducer; + + @BeforeClass + public static void setupTest() throws Exception { + setupStreamingArtifacts(DATASTREAMS_ARTIFACT_ID, DataStreamsApp.class); + + // add artifact for Confluent plugins + Set parents = ImmutableSet.of( + new ArtifactRange(NamespaceId.DEFAULT.getNamespace(), DATASTREAMS_ARTIFACT_ID.getArtifact(), + new ArtifactVersion(DATASTREAMS_ARTIFACT_ID.getVersion()), true, + new ArtifactVersion(DATASTREAMS_ARTIFACT_ID.getVersion()), true) + ); + addPluginArtifact(NamespaceId.DEFAULT.artifact("kafka-plugins", "1.0.0"), parents, + ConfluentStreamingSource.class, KafkaUtils.class, Deserializer.class, ByteArrayDeserializer.class, + TopicPartition.class, HTTPPollConfig.class, ConfluentDStream.class); + // Initialize kafka server + kafkaProducer = KafkaTestUtils.createProducer(); + kafkaAvroProducer = KafkaTestUtils.createProducerForSchemaRegistry(); + } + + @AfterClass + public static void cleanup() { + if (kafkaProducer != null) { + kafkaProducer.close(); + } + if (kafkaAvroProducer != null) { + kafkaAvroProducer.close(); + } + } + + @Test + public void testConfluentStreamingSource() throws Exception { + KafkaTestUtils.deleteTopic(TOPIC_NAME); + KafkaTestUtils.createTopic(TOPIC_NAME, 2, 3); + Schema schema = Schema.recordOf( + "user", + Schema.Field.of("id", Schema.of(Schema.Type.LONG)), + Schema.Field.of("first", Schema.of(Schema.Type.STRING)), + Schema.Field.of("last", Schema.of(Schema.Type.STRING))); + + Map properties = getConfigProperties(schema); + properties.put(ConfluentStreamingSourceConfig.NAME_FORMAT, "csv"); + + ETLStage source = new ETLStage(SRC_STAGE_NAME, new ETLPlugin(ConfluentStreamingSource.PLUGIN_NAME, + StreamingSource.PLUGIN_TYPE, properties, null)); + + DataStreamsConfig etlConfig = DataStreamsConfig.builder() + .addStage(source) + .addStage(new ETLStage("sink", MockSink.getPlugin("kafkaOutput"))) + .addConnection(SRC_STAGE_NAME, "sink") + .setBatchInterval("1s") + .setStopGracefully(true) + .build(); + + AppRequest appRequest = new AppRequest<>(DATASTREAMS_ARTIFACT, etlConfig); + ApplicationId appId = NamespaceId.DEFAULT.app("KafkaSourceApp"); + ApplicationManager appManager = deployApplication(appId, appRequest); + + // write some messages to kafka + Map messages = new HashMap<>(); + messages.put("a", "1,samuel,jackson"); + messages.put("b", "2,dwayne,johnson"); + messages.put("c", "3,christopher,walken"); + for (Map.Entry entry : messages.entrySet()) { + sendKafkaMessage(TOPIC_NAME, 0, entry.getKey(), entry.getValue()); + } + + // Save an entry for offset 1 (second in the data) in state store with reference name. + AppStateStore appStateStore = TestBase.getAppStateStore(appId.getNamespace(), appId.getApplication()); + appStateStore.saveState(SRC_STAGE_NAME + "." + TOPIC_NAME, + GSON.toJson(new KafkaPartitionOffsets(Collections.singletonMap(0, 1L))) + .getBytes(StandardCharsets.UTF_8)); + + // Launch the program and wait for results + SparkManager sparkManager = appManager.getSparkManager(DataStreamsSparkLauncher.NAME); + sparkManager.start(); + sparkManager.waitForRun(ProgramRunStatus.RUNNING, 2, TimeUnit.MINUTES); + + final DataSetManager outputManager = getDataset("kafkaOutput"); + Tasks.waitFor( + ImmutableMap.of(2L, "dwayne johnson", 3L, "christopher walken"), + () -> { + outputManager.flush(); + Map actual = new HashMap<>(); + for (StructuredRecord outputRecord : MockSink.readOutput(outputManager)) { + actual.put(outputRecord.get("id"), outputRecord.get("first") + " " + outputRecord.get("last")); + } + return actual; + }, 2, TimeUnit.MINUTES); + + // Verify that state is saved with the next offset to start from. + Tasks.waitFor(3L, () -> { + Optional savedState = appStateStore.getState(SRC_STAGE_NAME + "." + TOPIC_NAME); + try (Reader reader = new InputStreamReader(new ByteArrayInputStream(savedState.get()), + StandardCharsets.UTF_8)) { + KafkaPartitionOffsets partitionOffsets = GSON.fromJson(reader, KafkaPartitionOffsets.class); + Long savedOffset = partitionOffsets.getPartitionOffsets().get(0); + return savedOffset.longValue(); + } + }, 2, TimeUnit.MINUTES); + + // stop the run + sparkManager.stop(); + sparkManager.waitForRun(ProgramRunStatus.KILLED, 2, TimeUnit.MINUTES); + } + + private Map getConfigProperties(Schema schema) { + Map properties = new HashMap<>(); + properties.put(Constants.Reference.REFERENCE_NAME, "confluent"); + properties.put(ConfluentStreamingSourceConfig.NAME_BROKERS, KafkaTestUtils.KAFKA_SERVER); + properties.put(ConfluentStreamingSourceConfig.NAME_TOPIC, TOPIC_NAME); + properties.put(ConfluentStreamingSourceConfig.NAME_DEFAULT_INITIAL_OFFSET, + String.valueOf(ListOffsetRequest.EARLIEST_TIMESTAMP)); + properties.put(ConfluentStreamingSourceConfig.NAME_CLUSTER_API_KEY, KafkaTestUtils.CLUSTER_API_KEY); + properties.put(ConfluentStreamingSourceConfig.NAME_CLUSTER_API_SECRET, KafkaTestUtils.CLUSTER_API_SECRET); + properties.put(ConfluentStreamingSourceConfig.NAME_SCHEMA, schema.toString()); + properties.put(ConfluentStreamingSourceConfig.NAME_MAX_RATE, "1000"); + return properties; + } + + private void sendKafkaMessage(String topic, @Nullable Integer partition, @Nullable String key, String value) { + byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); + byte[] keyBytes = key != null ? key.getBytes(StandardCharsets.UTF_8) : null; + try { + kafkaProducer.send(new ProducerRecord<>(topic, partition, keyBytes, valueBytes)).get(); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException(e); + } + } +} diff --git a/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceTest.java b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceTest.java index 9dca9e5..61c9353 100644 --- a/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceTest.java +++ b/confluent-kafka-plugins/src/test/java/io/cdap/plugin/confluent/integration/streaming/source/ConfluentStreamingSourceTest.java @@ -25,6 +25,7 @@ import io.cdap.cdap.proto.ProgramRunStatus; import io.cdap.cdap.test.DataSetManager; import io.cdap.cdap.test.SparkManager; +import io.cdap.cdap.test.TestConfiguration; import io.cdap.plugin.common.Constants; import io.cdap.plugin.confluent.integration.KafkaTestUtils; import io.cdap.plugin.confluent.integration.streaming.ConfluentStreamingTestBase; @@ -41,6 +42,7 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -64,6 +66,13 @@ */ public class ConfluentStreamingSourceTest extends ConfluentStreamingTestBase { + // Explicitly turn off state tracking to ensure checkpointing is on. + // This test needs a fix to work with checkpointing disabled. See PLUGIN-1414 + @ClassRule + public static final TestConfiguration CONFIG = + new TestConfiguration("explore.enabled", false, + "feature.streaming.pipeline.native.state.tracking.enabled", "false"); + private static KafkaProducer kafkaProducer; private static KafkaProducer kafkaAvroProducer; diff --git a/kafka-plugins-common/pom.xml b/kafka-plugins-common/pom.xml index 6084048..0345e7d 100644 --- a/kafka-plugins-common/pom.xml +++ b/kafka-plugins-common/pom.xml @@ -30,7 +30,7 @@ io.cdap.cdap - cdap-data-pipeline2_2.11 + cdap-data-pipeline3_2.12 ${cdap.version} test diff --git a/kafka-plugins-common/src/main/java/io/cdap/plugin/batch/source/KafkaPartitionOffsets.java b/kafka-plugins-common/src/main/java/io/cdap/plugin/batch/source/KafkaPartitionOffsets.java index 6c2353f..e0be8dd 100644 --- a/kafka-plugins-common/src/main/java/io/cdap/plugin/batch/source/KafkaPartitionOffsets.java +++ b/kafka-plugins-common/src/main/java/io/cdap/plugin/batch/source/KafkaPartitionOffsets.java @@ -53,6 +53,10 @@ public void setPartitionOffset(int partition, long offset) { public long getPartitionOffset(int partition, long defaultValue) { return partitionOffsets.getOrDefault(partition, defaultValue); } + + public Map getPartitionOffsets() { + return Collections.unmodifiableMap(partitionOffsets); + } /** * Loads the {@link KafkaPartitionOffsets} from the given input file. diff --git a/pom.xml b/pom.xml index 521b259..c308c88 100644 --- a/pom.xml +++ b/pom.xml @@ -87,15 +87,15 @@ UTF-8 - 6.1.1 - 2.3.5 + 6.8.0 + 2.10.0 1.6.1 - 2.3.0 + 3.1.2 widgets docs 0.8.2.2 0.10.2.0 - 2.3.0 + 2.10.2 4.1.16.Final 1.3.0 1.8.2 @@ -150,6 +150,10 @@ org.apache.spark spark-core_2.10 + + io.cdap.cdap + cdap-spark-core2_2.11 + @@ -277,7 +281,7 @@ org.apache.commons commons-lang3 - 3.0 + 3.9 com.google.guava