Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8551] Validate MOR Without Precombine Set and Fix Related Bugs #12317

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,11 @@ public HoodieSparkRecord copy() {

@Override
public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) {
StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
String orderingField = ConfigUtils.getOrderingField(props);
if (StringUtils.isNullOrEmpty(orderingField)) {
return 0;
}
StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
scala.Option<NestedFieldPath> cachedNestedFieldPath =
HoodieInternalRowUtils.getCachedPosList(structType, orderingField);
if (cachedNestedFieldPath.isDefined()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ private static boolean isProjectionOfInternal(Schema sourceSchema,
}

public static Option<Schema.Type> findNestedFieldType(Schema schema, String fieldName) {
return findNestedFieldType(schema, fieldName, true);
}

public static Option<Schema.Type> findNestedFieldType(Schema schema, String fieldName, boolean throwOnNotFound) {
if (StringUtils.isNullOrEmpty(fieldName)) {
return Option.empty();
}
Expand All @@ -216,7 +220,11 @@ public static Option<Schema.Type> findNestedFieldType(Schema schema, String fiel
for (String part : parts) {
Schema.Field foundField = resolveNullableSchema(schema).getField(part);
if (foundField == null) {
throw new HoodieAvroSchemaException(fieldName + " not a field in " + schema);
if (throwOnNotFound) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whey do we trigger lookup using findNestedFieldType in the first place if ordering field is not set.
why can't we deduce it from table config that ordering field is not set bypass calling findNestedFieldType.

in other words, why do we need throwOnNotFound argument.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ordering field is set. It is just set to a nonexistent field. https://issues.apache.org/jira/browse/HUDI-8574 we have not been validating that the precombine actually exists in the table schema, so the current behavior of the filegroup reader will always throw an exception when trying to read a filegroup with log files. We don't want to completely break reading for those users. We could do a LOG.warn though?

throw new HoodieAvroSchemaException(fieldName + " not a field in " + schema);
} else {
return Option.empty();
}
}
schema = foundField.schema();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ default String[] getMandatoryFieldsForMerging(Schema dataSchema, HoodieTableConf

String preCombine = cfg.getPreCombineField();
if (!StringUtils.isNullOrEmpty(preCombine)) {

requiredFields.add(preCombine);
}
return requiredFields.toArray(new String[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ public HoodieBaseFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
this.payloadClass = Option.empty();
}
this.orderingFieldName = Option.ofNullable(ConfigUtils.getOrderingField(props)).orElseGet(() -> hoodieTableMetaClient.getTableConfig().getPreCombineField());
this.orderingFieldTypeOpt = recordMergeMode == RecordMergeMode.COMMIT_TIME_ORDERING ? Option.empty() : AvroSchemaUtils.findNestedFieldType(readerSchema, this.orderingFieldName);

// Don't throw exception due to [HUDI-8574]
this.orderingFieldTypeOpt = recordMergeMode == RecordMergeMode.COMMIT_TIME_ORDERING ? Option.empty() : AvroSchemaUtils.findNestedFieldType(readerSchema, this.orderingFieldName, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we check for isEmptyOrNull on orderingFieldName and then avoid calling findNestedFieldType.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above comment, the ordering field might not actually exist in the schema

this.orderingFieldDefault = orderingFieldTypeOpt.map(type -> readerContext.castValue(0, type)).orElse(0);
this.props = props;
this.internalSchema = readerContext.getSchemaHandler().getInternalSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,16 @@ private Schema generateRequiredSchema() {
List<Schema.Field> addedFields = new ArrayList<>();
for (String field : getMandatoryFieldsForMerging(hoodieTableConfig, properties, dataSchema, recordMerger)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the impl of getMandatoryFieldsForMerging. It does not return precombine if its not set. So, can you help me understand why do we need below changes ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above comments

if (!findNestedField(requestedSchema, field).isPresent()) {
Option<Schema.Field> foundFieldOpt = findNestedField(dataSchema, field);
Option<Schema.Field> foundFieldOpt = findNestedField(dataSchema, field);
if (!foundFieldOpt.isPresent()) {
throw new IllegalArgumentException("Field: " + field + " does not exist in the table schema");
//see [HUDI-8574]
if (!field.equals(hoodieTableConfig.getPreCombineField())) {
throw new IllegalArgumentException("Field: " + field + " does not exist in the table schema");
}
} else {
Schema.Field foundField = foundFieldOpt.get();
addedFields.add(foundField);
}
Schema.Field foundField = foundFieldOpt.get();
addedFields.add(foundField);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,11 @@ private static InternalSchema addPositionalMergeCol(InternalSchema internalSchem
@Override
public Pair<List<Schema.Field>,List<Schema.Field>> getBootstrapRequiredFields() {
Pair<List<Schema.Field>,List<Schema.Field>> dataAndMetaCols = super.getBootstrapRequiredFields();
if (readerContext.supportsParquetRowIndex()) {
if (!dataAndMetaCols.getLeft().isEmpty() && !dataAndMetaCols.getRight().isEmpty()) {
if (readerContext.supportsParquetRowIndex() && (this.needsBootstrapMerge || this.needsMORMerge)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we expected there to be a precombine if it was mor. So If you tried to just read meta cols, you would also still need to read the precombine so left and right would both have values. If you tried to only read data cols, you would also still read hoodie_record_key for mor merging in case we have to fall back to key based merging so left and right would also still both have values.

Now that we don't require precombine field, If you only read the meta cols then you don't need to read any data cols. But we still want to do positional merging for mor, so we need to add the positional merge field

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we make this simpler.

  • poll table config and find if precombine field is set or not.
  • if yes, do changes to existing code block here.
  • if not set, remove precombine field from both dataAndMetaCols.getLeft() and dataAndMetaCols.getRight()

if (!dataAndMetaCols.getLeft().isEmpty()) {
dataAndMetaCols.getLeft().add(getPositionalMergeField());
}
if (!dataAndMetaCols.getRight().isEmpty()) {
dataAndMetaCols.getRight().add(getPositionalMergeField());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
Expand Down Expand Up @@ -101,9 +102,8 @@ public HoodieRecord<ArrayWritable> newInstance(HoodieKey key) {
@Override
public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) {
String orderingField = ConfigUtils.getOrderingField(props);
if (orderingField == null) {
if (StringUtils.isNullOrEmpty(orderingField)) {
return 0;
//throw new IllegalArgumentException("Ordering Field is not set. Precombine must be set. (If you are using a custom record merger it might be something else)");
}
return (Comparable<?>) getValue(ConfigUtils.getOrderingField(props));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,12 @@ object HoodieCreateRecordUtils {
avroRecWithoutMeta
}

val hoodieRecord = if (shouldCombine && !precombineEmpty) {
val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRec, precombine,
false, consistentLogicalTimestampEnabled).asInstanceOf[Comparable[_]]
//TODO [HUDI-8574] we can throw exception if field doesn't exist
// lazy so that we don't evaluate if we short circuit the boolean expression in the if below
lazy val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRec, precombine,
true, consistentLogicalTimestampEnabled).asInstanceOf[Comparable[_]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need lazy here. whats the issue w/ previous code here? may be I am missing something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to add validation that the precombine existed but decided against it for this pr due to backwards compatibility and scope creep of this pr. So this made sense when that other code was here. But now we can just pull it into the if

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I still think it should be like this. We need to be fault tolerant for the case where the precombine field does not exist.


val hoodieRecord = if (shouldCombine && !precombineEmpty && orderingVal != null) {
DataSourceUtils.createHoodieRecord(processedRecord, orderingVal, hoodieKey,
config.getPayloadClass, recordLocation)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.hudi.HoodieSparkSqlWriter.StreamingWriteParams
import org.apache.hudi.HoodieSparkUtils.sparkAdapter
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
import org.apache.hudi.commit.{DatasetBulkInsertCommitActionExecutor, DatasetBulkInsertOverwriteCommitActionExecutor, DatasetBulkInsertOverwriteTableCommitActionExecutor}
Expand All @@ -50,7 +50,7 @@ import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME}
import org.apache.hudi.config.HoodieWriteConfig.{SPARK_SQL_MERGE_INTO_PREPPED_KEY, WRITE_TABLE_VERSION}
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.exception.{HoodieException, HoodieRecordCreationException, HoodieWriteConflictException}
import org.apache.hudi.exception.{HoodieAvroSchemaException, HoodieException, HoodieRecordCreationException, HoodieWriteConflictException}
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
Expand Down Expand Up @@ -746,7 +746,7 @@ class HoodieSparkSqlWriterInternal {
String.valueOf(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())
))

HoodieTableMetaClient.newTableBuilder()
val metaClient = HoodieTableMetaClient.newTableBuilder()
.setTableType(HoodieTableType.valueOf(tableType))
.setTableName(tableName)
.setRecordKeyFields(recordKeyFields)
Expand All @@ -755,7 +755,9 @@ class HoodieSparkSqlWriterInternal {
.setPayloadClassName(payloadClass)
.setRecordMergeMode(RecordMergeMode.getValue(hoodieConfig.getString(HoodieWriteConfig.RECORD_MERGE_MODE)))
.setRecordMergeStrategyId(recordMergerStrategy)
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null))
// we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is what we do for non-bootstrap, so I added this for consistency

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we just remove default value for the precombine field only?

and then chase the test failures.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am very much in favor of doing that. But users that rely on ts as the precombine without setting it will see a behavior change. In the 6 to 8 upgrade flow, we could check the table schema and if precombine is not set and the schema includes a field "ts" then we could add it to the hoodie.properties. @yihua didn't think we should make that change though.

Copy link
Contributor

@yihua yihua Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach of this PR is much more complicated than required. Previously, the precombine field is set as ts if the user does not configure it. We can still keep that. If the ts field does not exist, we use the natural ordering, i.e., setting 0 as the ordering value (in else branch which creates the record; second DefaultHoodieRecordPayload constructor fills the 0 as the ordering value).

val hoodieRecord = if (shouldCombine && !precombineEmpty) {
              val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRec, precombine,
                false, consistentLogicalTimestampEnabled).asInstanceOf[Comparable[_]]
              DataSourceUtils.createHoodieRecord(processedRecord, orderingVal, hoodieKey,
                config.getPayloadClass, recordLocation)
            } else {
              DataSourceUtils.createHoodieRecord(processedRecord, hoodieKey,
                config.getPayloadClass, recordLocation)
            }
  public DefaultHoodieRecordPayload(GenericRecord record, Comparable orderingVal) {
    super(record, orderingVal);
  }

  public DefaultHoodieRecordPayload(Option<GenericRecord> record) {
    this(record.isPresent() ? record.get() : null, 0); // natural order
  }

Note that if ts or another configured ordering field exists in the schema, nothing should break, which is the behavior.

// but we are interested in what user has set, hence fetching from optParams.
.setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), null))
.setBootstrapIndexClass(bootstrapIndexClass)
.setBaseFileFormat(baseFileFormat)
.setBootstrapBasePath(bootstrapBasePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
Expand All @@ -50,6 +51,8 @@
import java.io.IOException;
import java.util.List;

import static org.apache.hudi.config.HoodieWriteConfig.PRECOMBINE_FIELD_NAME;

public abstract class SparkFullBootstrapDataProviderBase extends FullRecordBootstrapDataProvider<JavaRDD<HoodieRecord>> {

private final transient SparkSession sparkSession;
Expand All @@ -72,20 +75,31 @@ public JavaRDD<HoodieRecord> generateInputRecords(String tableName, String sourc
HoodieRecordType recordType = config.getRecordMerger().getRecordType();
Dataset inputDataset = sparkSession.read().format(getFormat()).option("basePath", sourceBasePath).load(filePaths);
KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
String precombineKey = props.getString("hoodie.datasource.write.precombine.field");
String precombineKey = ConfigUtils.getStringWithAltKeys(props, PRECOMBINE_FIELD_NAME);
boolean hasPrecombine = !StringUtils.isNullOrEmpty(precombineKey);
String structName = tableName + "_record";
String namespace = "hoodie." + tableName;
if (recordType == HoodieRecordType.AVRO) {
RDD<GenericRecord> genericRecords = HoodieSparkUtils.createRdd(inputDataset, structName, namespace, false,
Option.empty());
return genericRecords.toJavaRDD().map(gr -> {
String orderingVal = HoodieAvroUtils.getNestedFieldValAsString(
gr, precombineKey, false, props.getBoolean(
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())));
String orderingVal = null;
if (hasPrecombine) {
//TODO [HUDI-8574] we can throw exception if field doesn't exist
// lazy so that we don't evaluate if we short circuit the boolean expression in the if below
orderingVal = HoodieAvroUtils.getNestedFieldValAsString(
gr, precombineKey, true, props.getBoolean(
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())));
}
try {
return DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr),
ConfigUtils.getPayloadClass(props), scala.Option.apply(null));
if (hasPrecombine && !StringUtils.isNullOrEmpty(orderingVal)) {
return DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr),
ConfigUtils.getPayloadClass(props), scala.Option.apply(null));
} else {
return DataSourceUtils.createHoodieRecord(gr, keyGenerator.getKey(gr),
ConfigUtils.getPayloadClass(props), scala.Option.apply(null));
}
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.functional;

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;

import org.apache.spark.sql.Dataset;
Expand Down Expand Up @@ -47,33 +48,47 @@ private static Stream<Arguments> testArgs() {
Boolean[] dashPartitions = {true,false};
HoodieTableType[] tableType = {COPY_ON_WRITE, MERGE_ON_READ};
Integer[] nPartitions = {0, 1, 2};
HoodieRecord.HoodieRecordType[] recordTypes = {HoodieRecord.HoodieRecordType.AVRO};
for (HoodieTableType tt : tableType) {
for (Boolean dash : dashPartitions) {
for (String bt : bootstrapType) {
for (Integer n : nPartitions) {
// can't be mixed bootstrap if it's nonpartitioned
// don't need to test slash partitions if it's nonpartitioned
if ((!bt.equals("mixed") && dash) || n > 0) {
b.add(Arguments.of(bt, dash, tt, n));
for (HoodieRecord.HoodieRecordType rt : recordTypes) {
// can't be mixed bootstrap if it's nonpartitioned
// don't need to test slash partitions if it's nonpartitioned
if ((!bt.equals("mixed") && dash) || n > 0) {
b.add(Arguments.of(bt, dash, tt, n, rt, true));
if (tt.equals(MERGE_ON_READ)) {
b.add(Arguments.of(bt, dash, tt, n, rt, false));
}
}
}
}
}
}
}
} else {
b.add(Arguments.of("metadata", true, COPY_ON_WRITE, 0));
b.add(Arguments.of("mixed", false, MERGE_ON_READ, 2));
b.add(Arguments.of("metadata", true, COPY_ON_WRITE, 0, HoodieRecord.HoodieRecordType.AVRO, true));
b.add(Arguments.of("mixed", false, MERGE_ON_READ, 1, HoodieRecord.HoodieRecordType.AVRO, false));
b.add(Arguments.of("mixed", false, MERGE_ON_READ, 2, HoodieRecord.HoodieRecordType.AVRO, true));
}
return b.build();
}

@ParameterizedTest
@MethodSource("testArgs")
public void testBootstrapFunctional(String bootstrapType, Boolean dashPartitions, HoodieTableType tableType, Integer nPartitions) {
public void testBootstrapFunctional(String bootstrapType,
Boolean dashPartitions,
HoodieTableType tableType,
Integer nPartitions,
HoodieRecord.HoodieRecordType recordType,
Boolean hasPrecombine) {
this.bootstrapType = bootstrapType;
this.dashPartitions = dashPartitions;
this.tableType = tableType;
this.nPartitions = nPartitions;
this.recordType = recordType;
this.hasPrecombine = hasPrecombine;
setupDirs();

// do bootstrap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
package org.apache.hudi.functional;

import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.DefaultSparkRecordMerger;
import org.apache.hudi.client.bootstrap.selector.BootstrapRegexModeSelector;
import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector;
import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector;
import org.apache.hudi.client.bootstrap.translator.DecodedBootstrapPartitionPathTranslator;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.config.HoodieBootstrapConfig;
Expand Down Expand Up @@ -70,6 +73,8 @@ public abstract class TestBootstrapReadBase extends HoodieSparkClientTestBase {
protected Boolean dashPartitions;
protected HoodieTableType tableType;
protected Integer nPartitions;
protected Boolean hasPrecombine = true;
protected HoodieRecord.HoodieRecordType recordType = HoodieRecord.HoodieRecordType.AVRO;

protected String[] partitionCols;
protected static String[] dropColumns = {"_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_file_name", "city_to_state"};
Expand Down Expand Up @@ -104,7 +109,13 @@ protected Map<String, String> basicOptions() {
options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), ComplexKeyGenerator.class.getName());
}
}
options.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp");
if (hasPrecombine) {
options.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp");
}
if (recordType == HoodieRecord.HoodieRecordType.SPARK) {
options.put(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), DefaultSparkRecordMerger.class.getName());
options.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet");
}
if (tableType.equals(MERGE_ON_READ)) {
options.put(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
}
Expand Down
Loading
Loading