Skip to content

Commit 9d2174a

Browse files
authored
Revert "[HUDI-7416] Add interface for StreamProfile to be used in StreamSync for reading and writing data (#10687)" (#10734)
This reverts commit 93cd25f.
1 parent 215fd8a commit 9d2174a

File tree

17 files changed

+60
-370
lines changed

17 files changed

+60
-370
lines changed

hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@
6666
import org.apache.hudi.utilities.sources.Source;
6767
import org.apache.hudi.utilities.sources.processor.ChainedJsonKafkaSourcePostProcessor;
6868
import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
69-
import org.apache.hudi.utilities.streamer.StreamContext;
7069
import org.apache.hudi.utilities.transform.ChainedTransformer;
7170
import org.apache.hudi.utilities.transform.ErrorTableAwareChainedTransformer;
7271
import org.apache.hudi.utilities.transform.Transformer;
@@ -157,24 +156,6 @@ public static Source createSource(String sourceClass, TypedProperties cfg, JavaS
157156
}
158157
}
159158

160-
public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc,
161-
SparkSession sparkSession, HoodieIngestionMetrics metrics, StreamContext streamContext)
162-
throws IOException {
163-
try {
164-
try {
165-
return (Source) ReflectionUtils.loadClass(sourceClass,
166-
new Class<?>[] {TypedProperties.class, JavaSparkContext.class,
167-
SparkSession.class,
168-
HoodieIngestionMetrics.class, streamContext.getClass()},
169-
cfg, jssc, sparkSession, metrics, streamContext);
170-
} catch (HoodieException e) {
171-
return createSource(sourceClass, cfg, jssc, sparkSession, streamContext.getSchemaProvider(), metrics);
172-
}
173-
} catch (Throwable e) {
174-
throw new IOException("Could not load source class " + sourceClass, e);
175-
}
176-
}
177-
178159
public static JsonKafkaSourcePostProcessor createJsonKafkaSourcePostProcessor(String postProcessorClassNames, TypedProperties props) throws IOException {
179160
if (StringUtils.isNullOrEmpty(postProcessorClassNames)) {
180161
return null;

hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@
2222
import org.apache.hudi.client.SparkRDDWriteClient;
2323
import org.apache.hudi.client.common.HoodieSparkEngineContext;
2424
import org.apache.hudi.common.config.TypedProperties;
25-
import org.apache.hudi.common.util.Option;
2625
import org.apache.hudi.utilities.schema.SchemaProvider;
27-
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
2826
import org.apache.hudi.utilities.streamer.HoodieStreamer;
2927
import org.apache.hudi.utilities.streamer.StreamSync;
3028

@@ -51,6 +49,6 @@ public DeltaSync(HoodieStreamer.Config cfg, SparkSession sparkSession, SchemaPro
5149
public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider,
5250
TypedProperties props, HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf,
5351
Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
54-
super(cfg, sparkSession, props, hoodieSparkContext, fs, conf, onInitializingHoodieWriteClient, new DefaultStreamContext(schemaProvider, Option.empty()));
52+
super(cfg, sparkSession, schemaProvider, props, hoodieSparkContext, fs, conf, onInitializingHoodieWriteClient);
5553
}
5654
}

hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@
2727
import org.apache.hudi.utilities.schema.SchemaProvider;
2828
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
2929
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
30-
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
31-
import org.apache.hudi.utilities.streamer.StreamContext;
3230

3331
import org.apache.avro.generic.GenericRecord;
3432
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -71,11 +69,9 @@ public class AvroKafkaSource extends KafkaSource<GenericRecord> {
7169

7270
public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
7371
SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) {
74-
this(props, sparkContext, sparkSession, metrics, new DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider, props, sparkContext), Option.empty()));
75-
}
76-
77-
public AvroKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics, StreamContext streamContext) {
78-
super(properties, sparkContext, sparkSession, SourceType.AVRO, metrics, streamContext);
72+
super(props, sparkContext, sparkSession,
73+
UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider, props, sparkContext),
74+
SourceType.AVRO, metrics);
7975
this.originalSchemaProvider = schemaProvider;
8076

8177
props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class.getName());

hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.hudi.utilities.sources;
2020

2121
import org.apache.hudi.common.config.TypedProperties;
22-
import org.apache.hudi.common.util.Option;
2322
import org.apache.hudi.common.util.StringUtils;
2423
import org.apache.hudi.utilities.UtilHelpers;
2524
import org.apache.hudi.utilities.config.JsonKafkaPostProcessorConfig;
@@ -28,8 +27,6 @@
2827
import org.apache.hudi.utilities.schema.SchemaProvider;
2928
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
3029
import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
31-
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
32-
import org.apache.hudi.utilities.streamer.StreamContext;
3330

3431
import com.fasterxml.jackson.databind.ObjectMapper;
3532
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -47,10 +44,10 @@
4744
import java.util.List;
4845

4946
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
50-
import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
5147
import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
5248
import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
5349
import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_TIMESTAMP_COLUMN;
50+
import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
5451

5552
/**
5653
* Read json kafka data.
@@ -59,11 +56,9 @@ public class JsonKafkaSource extends KafkaSource<String> {
5956

6057
public JsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession,
6158
SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) {
62-
this(properties, sparkContext, sparkSession, metrics, new DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider, properties, sparkContext), Option.empty()));
63-
}
64-
65-
public JsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics, StreamContext streamContext) {
66-
super(properties, sparkContext, sparkSession, SourceType.JSON, metrics, streamContext);
59+
super(properties, sparkContext, sparkSession,
60+
UtilHelpers.getSchemaProviderForKafkaSource(schemaProvider, properties, sparkContext),
61+
SourceType.JSON, metrics);
6762
properties.put("key.deserializer", StringDeserializer.class.getName());
6863
properties.put("value.deserializer", StringDeserializer.class.getName());
6964
this.offsetGen = new KafkaOffsetGen(props);

hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java

Lines changed: 12 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626
import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
2727
import org.apache.hudi.utilities.schema.SchemaProvider;
2828
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
29-
import org.apache.hudi.utilities.streamer.SourceProfile;
30-
import org.apache.hudi.utilities.streamer.StreamContext;
3129

3230
import org.apache.spark.api.java.JavaRDD;
3331
import org.apache.spark.api.java.JavaSparkContext;
@@ -52,44 +50,31 @@ abstract class KafkaSource<T> extends Source<JavaRDD<T>> {
5250
protected final boolean shouldAddOffsets;
5351

5452
protected KafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
55-
SourceType sourceType, HoodieIngestionMetrics metrics, StreamContext streamContext) {
56-
super(props, sparkContext, sparkSession, sourceType, streamContext);
57-
this.schemaProvider = streamContext.getSchemaProvider();
53+
SchemaProvider schemaProvider, SourceType sourceType, HoodieIngestionMetrics metrics) {
54+
super(props, sparkContext, sparkSession, schemaProvider, sourceType);
55+
this.schemaProvider = schemaProvider;
5856
this.metrics = metrics;
5957
this.shouldAddOffsets = KafkaOffsetPostProcessor.Config.shouldAddOffsets(props);
6058
}
6159

6260
@Override
6361
protected InputBatch<JavaRDD<T>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
6462
try {
65-
OffsetRange[] offsetRanges;
66-
if (sourceProfileSupplier.isPresent() && sourceProfileSupplier.get().getSourceProfile() != null) {
67-
SourceProfile<Long> kafkaSourceProfile = sourceProfileSupplier.get().getSourceProfile();
68-
offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, kafkaSourceProfile.getSourceSpecificContext(), kafkaSourceProfile.getSourcePartitions(), metrics);
69-
LOG.info("About to read numEvents {} of size {} bytes in {} partitions from Kafka for topic {} with offsetRanges {}",
70-
kafkaSourceProfile.getSourceSpecificContext(), kafkaSourceProfile.getMaxSourceBytes(),
71-
kafkaSourceProfile.getSourcePartitions(), offsetGen.getTopicName(), offsetRanges);
72-
} else {
73-
offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
63+
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
64+
long totalNewMsgs = KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges);
65+
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
66+
if (totalNewMsgs <= 0) {
67+
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT, 0);
68+
return new InputBatch<>(Option.empty(), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
7469
}
75-
return toInputBatch(offsetRanges);
70+
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT, totalNewMsgs);
71+
JavaRDD<T> newDataRDD = toRDD(offsetRanges);
72+
return new InputBatch<>(Option.of(newDataRDD), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
7673
} catch (org.apache.kafka.common.errors.TimeoutException e) {
7774
throw new HoodieSourceTimeoutException("Kafka Source timed out " + e.getMessage());
7875
}
7976
}
8077

81-
private InputBatch<JavaRDD<T>> toInputBatch(OffsetRange[] offsetRanges) {
82-
long totalNewMsgs = KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges);
83-
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
84-
if (totalNewMsgs <= 0) {
85-
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT, 0);
86-
return new InputBatch<>(Option.empty(), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
87-
}
88-
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT, totalNewMsgs);
89-
JavaRDD<T> newDataRDD = toRDD(offsetRanges);
90-
return new InputBatch<>(Option.of(newDataRDD), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
91-
}
92-
9378
abstract JavaRDD<T> toRDD(OffsetRange[] offsetRanges);
9479

9580
@Override

hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,12 @@
1919
package org.apache.hudi.utilities.sources;
2020

2121
import org.apache.hudi.common.config.TypedProperties;
22-
import org.apache.hudi.common.util.Option;
2322
import org.apache.hudi.common.util.ReflectionUtils;
2423
import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
2524
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
2625
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
2726
import org.apache.hudi.utilities.schema.SchemaProvider;
2827
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
29-
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
30-
import org.apache.hudi.utilities.streamer.StreamContext;
3128

3229
import com.google.protobuf.Message;
3330
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -54,13 +51,9 @@ public class ProtoKafkaSource extends KafkaSource<Message> {
5451

5552
private final String className;
5653

57-
public ProtoKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
58-
SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) {
59-
this(props, sparkContext, sparkSession, metrics, new DefaultStreamContext(schemaProvider, Option.empty()));
60-
}
61-
62-
public ProtoKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics, StreamContext streamContext) {
63-
super(properties, sparkContext, sparkSession, SourceType.PROTO, metrics, streamContext);
54+
public ProtoKafkaSource(TypedProperties props, JavaSparkContext sparkContext,
55+
SparkSession sparkSession, SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) {
56+
super(props, sparkContext, sparkSession, schemaProvider, SourceType.PROTO, metrics);
6457
checkRequiredConfigProperties(props, Collections.singletonList(
6558
ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME));
6659
props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);

hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,6 @@
2525
import org.apache.hudi.common.util.Option;
2626
import org.apache.hudi.utilities.callback.SourceCommitCallback;
2727
import org.apache.hudi.utilities.schema.SchemaProvider;
28-
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
29-
import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
30-
import org.apache.hudi.utilities.streamer.StreamContext;
3128

3229
import org.apache.spark.api.java.JavaSparkContext;
3330
import org.apache.spark.sql.SparkSession;
@@ -47,7 +44,6 @@ public enum SourceType {
4744
protected transient TypedProperties props;
4845
protected transient JavaSparkContext sparkContext;
4946
protected transient SparkSession sparkSession;
50-
protected transient Option<SourceProfileSupplier> sourceProfileSupplier;
5147
private transient SchemaProvider overriddenSchemaProvider;
5248

5349
private final SourceType sourceType;
@@ -59,16 +55,11 @@ protected Source(TypedProperties props, JavaSparkContext sparkContext, SparkSess
5955

6056
protected Source(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
6157
SchemaProvider schemaProvider, SourceType sourceType) {
62-
this(props, sparkContext, sparkSession, sourceType, new DefaultStreamContext(schemaProvider, Option.empty()));
63-
}
64-
65-
protected Source(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SourceType sourceType, StreamContext streamContext) {
6658
this.props = props;
6759
this.sparkContext = sparkContext;
6860
this.sparkSession = sparkSession;
69-
this.overriddenSchemaProvider = streamContext.getSchemaProvider();
61+
this.overriddenSchemaProvider = schemaProvider;
7062
this.sourceType = sourceType;
71-
this.sourceProfileSupplier = streamContext.getSourceProfileSupplier();
7263
}
7364

7465
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)

hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -241,24 +241,7 @@ public KafkaOffsetGen(TypedProperties props) {
241241
}
242242

243243
public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit, HoodieIngestionMetrics metrics) {
244-
// Come up with final set of OffsetRanges to read (account for new partitions, limit number of events)
245-
long maxEventsToReadFromKafka = getLongWithAltKeys(props, KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE);
246244

247-
long numEvents;
248-
if (sourceLimit == Long.MAX_VALUE) {
249-
numEvents = maxEventsToReadFromKafka;
250-
LOG.info("SourceLimit not configured, set numEvents to default value : " + maxEventsToReadFromKafka);
251-
} else {
252-
numEvents = sourceLimit;
253-
}
254-
255-
long minPartitions = getLongWithAltKeys(props, KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
256-
LOG.info("getNextOffsetRanges set config " + KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key() + " to " + minPartitions);
257-
258-
return getNextOffsetRanges(lastCheckpointStr, numEvents, minPartitions, metrics);
259-
}
260-
261-
public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long numEvents, long minPartitions, HoodieIngestionMetrics metrics) {
262245
// Obtain current metadata for the topic
263246
Map<TopicPartition, Long> fromOffsets;
264247
Map<TopicPartition, Long> toOffsets;
@@ -296,9 +279,29 @@ public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long
296279
// Obtain the latest offsets.
297280
toOffsets = consumer.endOffsets(topicPartitions);
298281
}
282+
283+
// Come up with final set of OffsetRanges to read (account for new partitions, limit number of events)
284+
long maxEventsToReadFromKafka = getLongWithAltKeys(props, KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE);
285+
286+
long numEvents;
287+
if (sourceLimit == Long.MAX_VALUE) {
288+
numEvents = maxEventsToReadFromKafka;
289+
LOG.info("SourceLimit not configured, set numEvents to default value : " + maxEventsToReadFromKafka);
290+
} else {
291+
numEvents = sourceLimit;
292+
}
293+
294+
// TODO(HUDI-4625) remove
295+
if (numEvents < toOffsets.size()) {
296+
throw new HoodieException("sourceLimit should not be less than the number of kafka partitions");
297+
}
298+
299+
long minPartitions = getLongWithAltKeys(props, KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS);
300+
LOG.info("getNextOffsetRanges set config " + KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key() + " to " + minPartitions);
301+
299302
return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents, minPartitions);
300303
}
301-
304+
302305
/**
303306
* Fetch partition infos for given topic.
304307
*

hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/DefaultStreamContext.java

Lines changed: 0 additions & 48 deletions
This file was deleted.

0 commit comments

Comments
 (0)