diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java index ab7ec4ac557f2..025839d4c5456 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java @@ -313,8 +313,11 @@ public HoodieSparkRecord copy() { @Override public Comparable getOrderingValue(Schema recordSchema, Properties props) { - StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema); String orderingField = ConfigUtils.getOrderingField(props); + if (StringUtils.isNullOrEmpty(orderingField)) { + return 0; + } + StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema); scala.Option cachedNestedFieldPath = HoodieInternalRowUtils.getCachedPosList(structType, orderingField); if (cachedNestedFieldPath.isDefined()) { diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java index 18fdedca50f13..e0c47e3e09352 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java @@ -208,6 +208,10 @@ private static boolean isProjectionOfInternal(Schema sourceSchema, } public static Option findNestedFieldType(Schema schema, String fieldName) { + return findNestedFieldType(schema, fieldName, true); + } + + public static Option findNestedFieldType(Schema schema, String fieldName, boolean throwOnNotFound) { if (StringUtils.isNullOrEmpty(fieldName)) { return Option.empty(); } @@ -216,7 +220,11 @@ public static Option findNestedFieldType(Schema schema, String fiel for (String part : parts) { Schema.Field foundField = resolveNullableSchema(schema).getField(part); if (foundField == null) { - throw new HoodieAvroSchemaException(fieldName + " not a field in " + schema); + if (throwOnNotFound) { + throw new HoodieAvroSchemaException(fieldName + " not a field in " + schema); + } else { + return Option.empty(); + } } schema = foundField.schema(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java index e001446091673..eb4e91d515637 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java @@ -112,7 +112,9 @@ public HoodieBaseFileGroupRecordBuffer(HoodieReaderContext readerContext, this.payloadClass = Option.empty(); } this.orderingFieldName = Option.ofNullable(ConfigUtils.getOrderingField(props)).orElseGet(() -> hoodieTableMetaClient.getTableConfig().getPreCombineField()); - this.orderingFieldTypeOpt = recordMergeMode == RecordMergeMode.COMMIT_TIME_ORDERING ? Option.empty() : AvroSchemaUtils.findNestedFieldType(readerSchema, this.orderingFieldName); + + // Don't throw exception due to [HUDI-8574] + this.orderingFieldTypeOpt = recordMergeMode == RecordMergeMode.COMMIT_TIME_ORDERING ? Option.empty() : AvroSchemaUtils.findNestedFieldType(readerSchema, this.orderingFieldName, false); this.orderingFieldDefault = orderingFieldTypeOpt.map(type -> readerContext.castValue(0, type)).orElse(0); this.props = props; this.internalSchema = readerContext.getSchemaHandler().getInternalSchema(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java index de2cd920289a2..2e27ebe10a658 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java @@ -160,12 +160,16 @@ private Schema generateRequiredSchema() { List addedFields = new ArrayList<>(); for (String field : getMandatoryFieldsForMerging(hoodieTableConfig, properties, dataSchema, recordMerger)) { if (!findNestedField(requestedSchema, field).isPresent()) { - Option foundFieldOpt = findNestedField(dataSchema, field); + Option foundFieldOpt = findNestedField(dataSchema, field); if (!foundFieldOpt.isPresent()) { - throw new IllegalArgumentException("Field: " + field + " does not exist in the table schema"); + //see [HUDI-8574] + if (!field.equals(hoodieTableConfig.getPreCombineField())) { + throw new IllegalArgumentException("Field: " + field + " does not exist in the table schema"); + } + } else { + Schema.Field foundField = foundFieldOpt.get(); + addedFields.add(foundField); } - Schema.Field foundField = foundFieldOpt.get(); - addedFields.add(foundField); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java index 71722f438a771..cafcceb8beff1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java @@ -82,9 +82,11 @@ private static InternalSchema addPositionalMergeCol(InternalSchema internalSchem @Override public Pair,List> getBootstrapRequiredFields() { Pair,List> dataAndMetaCols = super.getBootstrapRequiredFields(); - if (readerContext.supportsParquetRowIndex()) { - if (!dataAndMetaCols.getLeft().isEmpty() && !dataAndMetaCols.getRight().isEmpty()) { + if (readerContext.supportsParquetRowIndex() && (this.needsBootstrapMerge || this.needsMORMerge)) { + if (!dataAndMetaCols.getLeft().isEmpty()) { dataAndMetaCols.getLeft().add(getPositionalMergeField()); + } + if (!dataAndMetaCols.getRight().isEmpty()) { dataAndMetaCols.getRight().add(getPositionalMergeField()); } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java index a2fb08fd6146d..422e6438d4f5d 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.MetadataValues; import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils; import org.apache.hudi.hadoop.utils.ObjectInspectorCache; @@ -101,9 +102,8 @@ public HoodieRecord newInstance(HoodieKey key) { @Override public Comparable getOrderingValue(Schema recordSchema, Properties props) { String orderingField = ConfigUtils.getOrderingField(props); - if (orderingField == null) { + if (StringUtils.isNullOrEmpty(orderingField)) { return 0; - //throw new IllegalArgumentException("Ordering Field is not set. Precombine must be set. (If you are using a custom record merger it might be something else)"); } return (Comparable) getValue(ConfigUtils.getOrderingField(props)); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala index 048f45bba9f2f..2a1922f385a4a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala @@ -143,9 +143,12 @@ object HoodieCreateRecordUtils { avroRecWithoutMeta } - val hoodieRecord = if (shouldCombine && !precombineEmpty) { - val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRec, precombine, - false, consistentLogicalTimestampEnabled).asInstanceOf[Comparable[_]] + //TODO [HUDI-8574] we can throw exception if field doesn't exist + // lazy so that we don't evaluate if we short circuit the boolean expression in the if below + lazy val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRec, precombine, + true, consistentLogicalTimestampEnabled).asInstanceOf[Comparable[_]] + + val hoodieRecord = if (shouldCombine && !precombineEmpty && orderingVal != null) { DataSourceUtils.createHoodieRecord(processedRecord, orderingVal, hoodieKey, config.getPayloadClass, recordLocation) } else { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 8398157810240..754bce99a62f8 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -32,7 +32,7 @@ import org.apache.hudi.HoodieSparkSqlWriter.StreamingWriteParams import org.apache.hudi.HoodieSparkUtils.sparkAdapter import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema -import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils} import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} import org.apache.hudi.commit.{DatasetBulkInsertCommitActionExecutor, DatasetBulkInsertOverwriteCommitActionExecutor, DatasetBulkInsertOverwriteTableCommitActionExecutor} @@ -50,7 +50,7 @@ import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption} import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME} import org.apache.hudi.config.HoodieWriteConfig.{SPARK_SQL_MERGE_INTO_PREPPED_KEY, WRITE_TABLE_VERSION} import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, HoodieWriteConfig} -import org.apache.hudi.exception.{HoodieException, HoodieRecordCreationException, HoodieWriteConflictException} +import org.apache.hudi.exception.{HoodieAvroSchemaException, HoodieException, HoodieRecordCreationException, HoodieWriteConflictException} import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.hive.ddl.HiveSyncMode import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool} @@ -746,7 +746,7 @@ class HoodieSparkSqlWriterInternal { String.valueOf(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()) )) - HoodieTableMetaClient.newTableBuilder() + val metaClient = HoodieTableMetaClient.newTableBuilder() .setTableType(HoodieTableType.valueOf(tableType)) .setTableName(tableName) .setRecordKeyFields(recordKeyFields) @@ -755,7 +755,9 @@ class HoodieSparkSqlWriterInternal { .setPayloadClassName(payloadClass) .setRecordMergeMode(RecordMergeMode.getValue(hoodieConfig.getString(HoodieWriteConfig.RECORD_MERGE_MODE))) .setRecordMergeStrategyId(recordMergerStrategy) - .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null)) + // we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value, + // but we are interested in what user has set, hence fetching from optParams. + .setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), null)) .setBootstrapIndexClass(bootstrapIndexClass) .setBaseFileFormat(baseFileFormat) .setBootstrapBasePath(bootstrapBasePath) diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java index 65af0c3543409..b7bbae1799b9b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieSparkRecord; import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; @@ -50,6 +51,8 @@ import java.io.IOException; import java.util.List; +import static org.apache.hudi.config.HoodieWriteConfig.PRECOMBINE_FIELD_NAME; + public abstract class SparkFullBootstrapDataProviderBase extends FullRecordBootstrapDataProvider> { private final transient SparkSession sparkSession; @@ -72,20 +75,30 @@ public JavaRDD generateInputRecords(String tableName, String sourc HoodieRecordType recordType = config.getRecordMerger().getRecordType(); Dataset inputDataset = sparkSession.read().format(getFormat()).option("basePath", sourceBasePath).load(filePaths); KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); - String precombineKey = props.getString("hoodie.datasource.write.precombine.field"); + String precombineKey = ConfigUtils.getStringWithAltKeys(props, PRECOMBINE_FIELD_NAME); + boolean hasPrecombine = !StringUtils.isNullOrEmpty(precombineKey); String structName = tableName + "_record"; String namespace = "hoodie." + tableName; if (recordType == HoodieRecordType.AVRO) { RDD genericRecords = HoodieSparkUtils.createRdd(inputDataset, structName, namespace, false, Option.empty()); return genericRecords.toJavaRDD().map(gr -> { - String orderingVal = HoodieAvroUtils.getNestedFieldValAsString( - gr, precombineKey, false, props.getBoolean( - KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), - Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))); + String orderingVal = null; + if (hasPrecombine) { + //TODO [HUDI-8574] we can throw exception if field doesn't exist + orderingVal = HoodieAvroUtils.getNestedFieldValAsString( + gr, precombineKey, true, props.getBoolean( + KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), + Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))); + } try { - return DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr), - ConfigUtils.getPayloadClass(props), scala.Option.apply(null)); + if (hasPrecombine && !StringUtils.isNullOrEmpty(orderingVal)) { + return DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr), + ConfigUtils.getPayloadClass(props), scala.Option.apply(null)); + } else { + return DataSourceUtils.createHoodieRecord(gr, keyGenerator.getKey(gr), + ConfigUtils.getPayloadClass(props), scala.Option.apply(null)); + } } catch (IOException ioe) { throw new HoodieIOException(ioe.getMessage(), ioe); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java index 1e36f491b3f61..a6ebfa42fd214 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java @@ -18,6 +18,7 @@ package org.apache.hudi.functional; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.spark.sql.Dataset; @@ -47,33 +48,47 @@ private static Stream testArgs() { Boolean[] dashPartitions = {true,false}; HoodieTableType[] tableType = {COPY_ON_WRITE, MERGE_ON_READ}; Integer[] nPartitions = {0, 1, 2}; + HoodieRecord.HoodieRecordType[] recordTypes = {HoodieRecord.HoodieRecordType.AVRO}; for (HoodieTableType tt : tableType) { for (Boolean dash : dashPartitions) { for (String bt : bootstrapType) { for (Integer n : nPartitions) { - // can't be mixed bootstrap if it's nonpartitioned - // don't need to test slash partitions if it's nonpartitioned - if ((!bt.equals("mixed") && dash) || n > 0) { - b.add(Arguments.of(bt, dash, tt, n)); + for (HoodieRecord.HoodieRecordType rt : recordTypes) { + // can't be mixed bootstrap if it's nonpartitioned + // don't need to test slash partitions if it's nonpartitioned + if ((!bt.equals("mixed") && dash) || n > 0) { + b.add(Arguments.of(bt, dash, tt, n, rt, true)); + if (tt.equals(MERGE_ON_READ)) { + b.add(Arguments.of(bt, dash, tt, n, rt, false)); + } + } } } } } } } else { - b.add(Arguments.of("metadata", true, COPY_ON_WRITE, 0)); - b.add(Arguments.of("mixed", false, MERGE_ON_READ, 2)); + b.add(Arguments.of("metadata", true, COPY_ON_WRITE, 0, HoodieRecord.HoodieRecordType.AVRO, true)); + b.add(Arguments.of("mixed", false, MERGE_ON_READ, 1, HoodieRecord.HoodieRecordType.AVRO, false)); + b.add(Arguments.of("mixed", false, MERGE_ON_READ, 2, HoodieRecord.HoodieRecordType.AVRO, true)); } return b.build(); } @ParameterizedTest @MethodSource("testArgs") - public void testBootstrapFunctional(String bootstrapType, Boolean dashPartitions, HoodieTableType tableType, Integer nPartitions) { + public void testBootstrapFunctional(String bootstrapType, + Boolean dashPartitions, + HoodieTableType tableType, + Integer nPartitions, + HoodieRecord.HoodieRecordType recordType, + Boolean hasPrecombine) { this.bootstrapType = bootstrapType; this.dashPartitions = dashPartitions; this.tableType = tableType; this.nPartitions = nPartitions; + this.recordType = recordType; + this.hasPrecombine = hasPrecombine; setupDirs(); // do bootstrap diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java index 011d67d5658f8..5c8f43397da13 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java @@ -19,10 +19,13 @@ package org.apache.hudi.functional; import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.DefaultSparkRecordMerger; import org.apache.hudi.client.bootstrap.selector.BootstrapRegexModeSelector; import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector; import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector; import org.apache.hudi.client.bootstrap.translator.DecodedBootstrapPartitionPathTranslator; +import org.apache.hudi.common.config.HoodieStorageConfig; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.config.HoodieBootstrapConfig; @@ -70,6 +73,8 @@ public abstract class TestBootstrapReadBase extends HoodieSparkClientTestBase { protected Boolean dashPartitions; protected HoodieTableType tableType; protected Integer nPartitions; + protected Boolean hasPrecombine = true; + protected HoodieRecord.HoodieRecordType recordType = HoodieRecord.HoodieRecordType.AVRO; protected String[] partitionCols; protected static String[] dropColumns = {"_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_file_name", "city_to_state"}; @@ -104,7 +109,13 @@ protected Map basicOptions() { options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), ComplexKeyGenerator.class.getName()); } } - options.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp"); + if (hasPrecombine) { + options.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp"); + } + if (recordType == HoodieRecord.HoodieRecordType.SPARK) { + options.put(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), DefaultSparkRecordMerger.class.getName()); + options.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"); + } if (tableType.equals(MERGE_ON_READ)) { options.put(HoodieCompactionConfig.INLINE_COMPACT.key(), "true"); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 9e3c09edb764c..06ea06caa8df5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -26,13 +26,14 @@ import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPU import org.apache.hudi.common.model._ import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.hudi.common.testutils.HoodieTestDataGenerator +import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestUtils} import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.common.util.Option -import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} +import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable import org.apache.hudi.index.HoodieIndex.IndexType import org.apache.hudi.storage.StoragePath +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration import org.apache.hudi.table.action.compact.CompactionTriggerStrategy import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieSparkClientTestBase} import org.apache.hudi.util.JFunction @@ -1026,24 +1027,35 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin .save(basePath) } - @ParameterizedTest - @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) - def testClusteringSamePrecombine(recordType: HoodieRecordType): Unit = { + def testClusteringCompactionHelper(recordType: HoodieRecordType, + hasPrecombine: Boolean, + isClustering: Boolean, + withRowWriter: Boolean = false): Unit = { var writeOpts = Map( "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4", DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", - DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", DataSourceWriteOptions.OPERATION.key() -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.TABLE_TYPE.key()-> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, - "hoodie.clustering.inline"-> "true", - "hoodie.clustering.inline.max.commits" -> "2", - "hoodie.clustering.plan.strategy.sort.columns" -> "_row_key", "hoodie.metadata.enable" -> "false", - "hoodie.datasource.write.row.writer.enable" -> "false" + "hoodie.datasource.write.row.writer.enable" -> withRowWriter.toString ) + + if (hasPrecombine) { + writeOpts = writeOpts ++ Map(DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp") + } + + if (isClustering) { + writeOpts = writeOpts ++ Map(HoodieClusteringConfig.INLINE_CLUSTERING.key() -> "true", + HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key() -> "2", + HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key() -> "_row_key") + } else { + writeOpts = writeOpts ++ Map(HoodieCompactionConfig.INLINE_COMPACT.key() -> "true", + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2") + } + if (recordType.equals(HoodieRecordType.SPARK)) { writeOpts = Map(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key -> classOf[DefaultSparkRecordMerger].getName, HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet") ++ writeOpts @@ -1068,6 +1080,170 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin .except(inputDF2.select("_row_key", "partition", "rider")).count()) } + @ParameterizedTest + @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) + def testClusteringSamePrecombine(recordType: HoodieRecordType): Unit = { + testClusteringCompactionHelper(recordType, hasPrecombine = true, isClustering = true) + } + + @ParameterizedTest + @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) + def testClusteringNoPrecombine(recordType: HoodieRecordType): Unit = { + testClusteringCompactionHelper(recordType, hasPrecombine = false, isClustering = true) + } + + @Test + def testClusteringSamePrecombineWithRowWriter(): Unit = { + testClusteringCompactionHelper(HoodieRecordType.SPARK, hasPrecombine = true, isClustering = true, withRowWriter = true) + } + + @Test + def testClusteringNoPrecombineWithRowWriter(): Unit = { + testClusteringCompactionHelper(HoodieRecordType.SPARK, hasPrecombine = false, isClustering = true, withRowWriter = true) + } + + @ParameterizedTest + @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) + def testCompactionSamePrecombine(recordType: HoodieRecordType): Unit = { + testClusteringCompactionHelper(recordType, hasPrecombine = true, isClustering = false) + } + + @ParameterizedTest + @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) + def testCompactionNoPrecombine(recordType: HoodieRecordType): Unit = { + testClusteringCompactionHelper(recordType, hasPrecombine = false, isClustering = false) + } + + @ParameterizedTest + @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) + def testPrecombineBehavior(recordType: HoodieRecordType): Unit = { + Seq("COPY_ON_WRITE", "MERGE_ON_READ").foreach { tableType => + val basicOpts = if (recordType == HoodieRecordType.SPARK) { + Map(HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key() -> tableType, + DataSourceWriteOptions.OPERATION.key() -> UPSERT_OPERATION_OPT_VAL, + DataSourceWriteOptions.RECORDKEY_FIELD.key()-> "id") + } else { + sparkOpts ++ Map(HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key() -> tableType, + DataSourceWriteOptions.OPERATION.key() -> UPSERT_OPERATION_OPT_VAL, + DataSourceWriteOptions.RECORDKEY_FIELD.key()-> "id") + } + + val _spark = spark + import _spark.implicits._ + + // precombine is ts, table schema has ts + // result: tableconfig precombine is ts + val table1Path = basePath + "/table1" + val df1: Dataset[Row] = Seq((1, "a1", 10, 1000)).toDF("id", "name", "price", "ts") + df1.write.format("org.apache.hudi") + .options(basicOpts) + .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts") + .mode(SaveMode.Overwrite) + .save(table1Path) + spark.read.format("hudi").load(table1Path).show(100, false) + df1.write.format("org.apache.hudi") + .options(basicOpts) + .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts") + .mode(SaveMode.Append) + .save(table1Path) + spark.read.format("hudi").load(table1Path).show(100, false) + assertPrecombine("ts", table1Path) + + // precombine is not set, table schema has ts + // result: ts is used as precombine but not set in tableconfig + val table2Path = basePath + "/table2" + val df2: Dataset[Row] = Seq((1, "a1", 10, 1000)).toDF("id", "name", "price", "ts") + df2.write.format("org.apache.hudi") + .options(basicOpts) + .mode(SaveMode.Overwrite) + .save(table2Path) + spark.read.format("hudi").load(table2Path).show(100, false) + df2.write.format("org.apache.hudi") + .options(basicOpts) + .mode(SaveMode.Append) + .save(table2Path) + spark.read.format("hudi").load(table2Path).show(100, false) + assertPrecombine(null, table2Path) + + // precombine is price, table schema has price + // result: tableconfig precombine is price + val table3Path = basePath + "/table3" + val df3: Dataset[Row] = Seq((1, "a1", 10, 1000)).toDF("id", "name", "price", "ts") + df3.write.format("org.apache.hudi") + .options(basicOpts) + .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "price") + .mode(SaveMode.Overwrite) + .save(table3Path) + spark.read.format("hudi").load(table3Path).show(100, false) + df3.write.format("org.apache.hudi") + .options(basicOpts) + .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "price") + .mode(SaveMode.Append) + .save(table3Path) + spark.read.format("hudi").load(table3Path).show(100, false) + assertPrecombine("price", table3Path) + + // precombine is not notexist, table schema does not have notexist + // TODO [HUDI-8574] this should fail + // result: exception + val table4Path = basePath + "/table4" + val df4: Dataset[Row] = Seq((1, "a1", 10, 1000)).toDF("id", "name", "price", "ts") + df4.write.format("org.apache.hudi") + .options(basicOpts) + .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "noexist") + .mode(SaveMode.Overwrite) + .save(table4Path) + spark.read.format("hudi").load(table4Path).show(100, false) + df4.write.format("org.apache.hudi") + .options(basicOpts) + .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "noexist") + .mode(SaveMode.Append) + .save(table4Path) + spark.read.format("hudi").load(table4Path).show(100, false) + + // precombine is not set, table schema does not have ts + // result: precombine is not used and tableconfig does not have precombine set + val table5Path = basePath + "/table5" + val df5: Dataset[Row] = Seq((1, "a1", 10)).toDF("id", "name", "price") + df5.write.format("org.apache.hudi") + .options(basicOpts) + .mode(SaveMode.Overwrite) + .save(table5Path) + spark.read.format("hudi").load(table5Path).show(100, false) + df5.write.format("org.apache.hudi") + .options(basicOpts) + .mode(SaveMode.Append) + .save(table5Path) + spark.read.format("hudi").load(table5Path).show(100, false) + assertPrecombine(null, table5Path) + + // precombine is ts, table schema does not have ts + // TODO [HUDI-8574] this should fail + val table6Path = basePath + "/table6" + val df6: Dataset[Row] = Seq((1, "a1", 10)).toDF("id", "name", "price") + df6.write.format("org.apache.hudi") + .options(basicOpts) + .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts") + .mode(SaveMode.Overwrite) + .save(table6Path) + spark.read.format("hudi").load(table6Path).show(100, false) + df6.write.format("org.apache.hudi") + .options(basicOpts) + .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts") + .mode(SaveMode.Append) + .save(table6Path) + spark.read.format("hudi").load(table6Path).show(100, false) + } + } + + def assertPrecombine(expected: String, tableBasePath: String): Unit = { + assertEquals(expected, HoodieTestUtils + .createMetaClient(new HadoopStorageConfiguration(spark.sessionState.newHadoopConf), tableBasePath) + .getTableConfig.getPreCombineField) + } + @ParameterizedTest @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) def testClusteringSamePrecombineWithDelete(recordType: HoodieRecordType): Unit = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala index 12c0d64833803..a60fcfd228cf8 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.getLastCommitMetadata import org.apache.spark.sql.types._ -import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue} +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import scala.collection.JavaConverters._ @@ -1555,4 +1555,135 @@ class TestCreateTable extends HoodieSparkSqlTestBase { )("Failed to create catalog table in metastore") } } + + test("Test Precombine Behavior With Create Table As Select") { + Seq("cow", "mor").foreach { tableType => + withTempDir { tmp => + + // precombine is ts, table schema has ts + // result: tableconfig precombine is ts + val tableName1 = generateTableName + spark.sql( + s""" + | create table $tableName1 using hudi + | tblproperties( + | type = '$tableType', + | preCombineField = 'ts' + | ) + | location '${tmp.getCanonicalPath}/$tableName1' + | AS + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts + """.stripMargin) + checkAnswer(s"select id, name, price, ts from $tableName1")( + Seq(1, "a1", 10, 1000) + ) + assertEquals("ts", createMetaClient(spark, s"${tmp.getCanonicalPath}/$tableName1").getTableConfig.getPreCombineField) + spark.sql(s"update $tableName1 set name = 'a2', ts = 1001 where id = 1") + checkAnswer(s"select id, name, price, ts from $tableName1")( + Seq(1, "a2", 10, 1001) + ) + + + // precombine is not set, table schema has ts + // result: ts is used as precombine but not set in tableconfig + val tableName2 = generateTableName + spark.sql( + s""" + | create table $tableName2 using hudi + | tblproperties( + | type = '$tableType' + | ) + | location '${tmp.getCanonicalPath}/$tableName2' + | AS + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts + """.stripMargin) + checkAnswer(s"select id, name, price, ts from $tableName2")( + Seq(1, "a1", 10, 1000) + ) + assertEquals(null, createMetaClient(spark, s"${tmp.getCanonicalPath}/$tableName2").getTableConfig.getPreCombineField) + spark.sql(s"update $tableName2 set name = 'a2', ts = 1001 where id = 1") + checkAnswer(s"select id, name, price, ts from $tableName2")( + Seq(1, "a2", 10, 1001) + ) + + // precombine is price, table schema has price + // result: tableconfig precombine is price + val tableName3 = generateTableName + spark.sql( + s""" + | create table $tableName3 using hudi + | tblproperties( + | type = '$tableType', + | preCombineField = 'price' + | ) + | location '${tmp.getCanonicalPath}/$tableName3' + | AS + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts + """.stripMargin) + checkAnswer(s"select id, name, price, ts from $tableName3")( + Seq(1, "a1", 10, 1000) + ) + assertEquals("price", createMetaClient(spark, s"${tmp.getCanonicalPath}/$tableName3").getTableConfig.getPreCombineField) + spark.sql(s"update $tableName3 set name = 'a2', price = 11 where id = 1") + checkAnswer(s"select id, name, price, ts from $tableName3")( + Seq(1, "a2", 11, 1000) + ) + + // precombine is not notexist, table schema does not have notexist + // result: exception + val tableName4 = generateTableName + assertThrows[IllegalArgumentException] { + spark.sql( + s""" + | create table $tableName4 using hudi + | tblproperties( + | type = '$tableType', + | preCombineField = 'notexist' + | ) + | location '${tmp.getCanonicalPath}/$tableName4' + | AS + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts + """.stripMargin) + } + + // precombine is not set, table schema does not have ts + // result: precombine is not used and tableconfig does not have precombine set + val tableName5 = generateTableName + spark.sql( + s""" + | create table $tableName5 using hudi + | tblproperties( + | type = '$tableType' + | ) + | location '${tmp.getCanonicalPath}/$tableName5' + | AS + | select 1 as id, 'a1' as name, 10 as price + """.stripMargin) + checkAnswer(s"select id, name, price from $tableName5")( + Seq(1, "a1", 10) + ) + assertEquals(null, createMetaClient(spark, s"${tmp.getCanonicalPath}/$tableName5").getTableConfig.getPreCombineField) + spark.sql(s"update $tableName5 set name = 'a2', price = 11 where id = 1") + checkAnswer(s"select id, name, price from $tableName5")( + Seq(1, "a2", 11) + ) + // precombine is ts, table schema does not have ts + // result: exception + val tableName6 = generateTableName + assertThrows[IllegalArgumentException] { + spark.sql( + s""" + | create table $tableName6 using hudi + | tblproperties( + | type = '$tableType', + | preCombineField = 'ts' + | ) + | location '${tmp.getCanonicalPath}/$tableName6' + | AS + | select 1 as id, 'a1' as name, 10 as price + """.stripMargin) + } + } + } + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCDCForSparkSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCDCForSparkSQL.scala index 5535400cb667d..3ff760c698b9d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCDCForSparkSQL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCDCForSparkSQL.scala @@ -49,169 +49,173 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase { test("Test delete all records in filegroup") { Seq("cow", "mor").foreach { tableType => - withTempDir { tmp => - val databaseName = "hudi_database" - spark.sql(s"create database if not exists $databaseName") - spark.sql(s"use $databaseName") - val tableName = generateTableName - val basePath = s"${tmp.getCanonicalPath}/$tableName" - spark.sql( - s""" - | create table $tableName ( - | id int, - | name string, - | price double, - | ts long - | ) using hudi - | partitioned by (name) - | tblproperties ( - | 'primaryKey' = 'id', - | 'preCombineField' = 'ts', - | 'hoodie.table.cdc.enabled' = 'true', - | 'hoodie.table.cdc.supplemental.logging.mode' = '$DATA_BEFORE_AFTER', - | type = '$tableType' - | ) - | location '$basePath' - """.stripMargin) - val metaClient = createMetaClient(spark, basePath) - spark.sql(s"insert into $tableName values (1, 11, 1000, 'a1'), (2, 12, 1000, 'a2')") - assert(spark.sql(s"select _hoodie_file_name from $tableName").distinct().count() == 2) - val fgForID1 = spark.sql(s"select _hoodie_file_name from $tableName where id=1").head().get(0) - val commitTime1 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime - val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1) - cdcDataOnly1.show(false) - assertCDCOpCnt(cdcDataOnly1, 2, 0, 0) - - spark.sql(s"delete from $tableName where id = 1") - val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1.toLong) - assertCDCOpCnt(cdcDataOnly2, 0, 0, 1) - assert(spark.sql(s"select _hoodie_file_name from $tableName").distinct().count() == 1) - assert(!spark.sql(s"select _hoodie_file_name from $tableName").head().get(0).equals(fgForID1)) - } - } - } - - /** - * Test CDC in cases that it's a COW/MOR non--partitioned table and `cdcSupplementalLoggingMode` is true or not. - */ - test("Test Non-Partitioned Hoodie Table") { - val databaseName = "hudi_database" - spark.sql(s"create database if not exists $databaseName") - spark.sql(s"use $databaseName") - - Seq("cow", "mor").foreach { tableType => - Seq(OP_KEY_ONLY, DATA_BEFORE, DATA_BEFORE_AFTER).foreach { loggingMode => + Seq("'preCombineField' = '_ts',", "").foreach { preCombineLine => withTempDir { tmp => + val databaseName = "hudi_database" + spark.sql(s"create database if not exists $databaseName") + spark.sql(s"use $databaseName") val tableName = generateTableName val basePath = s"${tmp.getCanonicalPath}/$tableName" - spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert") - val otherTableProperties = if (tableType == "mor") { - "'hoodie.compact.inline'='true', 'hoodie.compact.inline.max.delta.commits'='2'," - } else { - "" - } spark.sql( s""" | create table $tableName ( | id int, | name string, | price double, - | ts long + | _ts long | ) using hudi + | partitioned by (name) | tblproperties ( | 'primaryKey' = 'id', - | 'preCombineField' = 'ts', + | $preCombineLine | 'hoodie.table.cdc.enabled' = 'true', - | 'hoodie.table.cdc.supplemental.logging.mode' = '${loggingMode.name()}', - | $otherTableProperties + | 'hoodie.table.cdc.supplemental.logging.mode' = '$DATA_BEFORE_AFTER', | type = '$tableType' | ) | location '$basePath' - """.stripMargin) - + """.stripMargin) val metaClient = createMetaClient(spark, basePath) - - spark.sql(s"insert into $tableName values (1, 'a1', 11, 1000), (2, 'a2', 12, 1000), (3, 'a3', 13, 1000)") + spark.sql(s"insert into $tableName values (1, 11, 1000, 'a1'), (2, 12, 1000, 'a2')") + assert(spark.sql(s"select _hoodie_file_name from $tableName").distinct().count() == 2) + val fgForID1 = spark.sql(s"select _hoodie_file_name from $tableName where id=1").head().get(0) val commitTime1 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1) cdcDataOnly1.show(false) - assertCDCOpCnt(cdcDataOnly1, 3, 0, 0) + assertCDCOpCnt(cdcDataOnly1, 2, 0, 0) - spark.sql(s"insert into $tableName values (1, 'a1_v2', 11, 1100)") - val commitTime2 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime - // here we use `commitTime1` to query the change data in commit 2. - // because `commitTime2` is maybe the ts of the compaction operation, not the write operation. + spark.sql(s"delete from $tableName where id = 1") val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1.toLong) - cdcDataOnly2.show(false) - assertCDCOpCnt(cdcDataOnly2, 0, 1, 0) + assertCDCOpCnt(cdcDataOnly2, 0, 0, 1) + assert(spark.sql(s"select _hoodie_file_name from $tableName").distinct().count() == 1) + assert(!spark.sql(s"select _hoodie_file_name from $tableName").head().get(0).equals(fgForID1)) + } + } + } + } - // Check the details - val originSchema = spark.read.format("hudi").load(basePath).schema - val change2 = cdcDataOnly2.select( - col("op"), - from_json(col("before"), originSchema).as("before"), - from_json(col("after"), originSchema).as("after") - ).select( - col("op"), - col("after.id"), - col("before.name"), - col("before.price"), - col("after.name"), - col("after.price") - ).collect() - checkAnswer(change2)(Seq("u", 1, "a1", 11, "a1_v2", 11)) + /** + * Test CDC in cases that it's a COW/MOR non--partitioned table and `cdcSupplementalLoggingMode` is true or not. + */ + test("Test Non-Partitioned Hoodie Table") { + val databaseName = "hudi_database" + spark.sql(s"create database if not exists $databaseName") + spark.sql(s"use $databaseName") - spark.sql(s"update $tableName set name = 'a2_v2', ts = 1200 where id = 2") - val commitTime3 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime - val cdcDataOnly3 = cdcDataFrame(basePath, commitTime2.toLong) - cdcDataOnly3.show(false) - assertCDCOpCnt(cdcDataOnly3, 0, 1, 0) + Seq("cow", "mor").foreach { tableType => + Seq("'preCombineField' = '_ts',", "").foreach { preCombineLine => + Seq(OP_KEY_ONLY, DATA_BEFORE, DATA_BEFORE_AFTER).foreach { loggingMode => + withTempDir { tmp => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert") + val otherTableProperties = if (tableType == "mor") { + "'hoodie.compact.inline'='true', 'hoodie.compact.inline.max.delta.commits'='2'," + } else { + "" + } + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | _ts long + | ) using hudi + | tblproperties ( + | 'primaryKey' = 'id', + | $preCombineLine + | 'hoodie.table.cdc.enabled' = 'true', + | 'hoodie.table.cdc.supplemental.logging.mode' = '${loggingMode.name()}', + | $otherTableProperties + | type = '$tableType' + | ) + | location '$basePath' + """.stripMargin) - spark.sql(s"delete from $tableName where id = 3") - val commitTime4 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime - val cdcDataOnly4 = cdcDataFrame(basePath, commitTime3.toLong) - cdcDataOnly4.show(false) - assertCDCOpCnt(cdcDataOnly4, 0, 0, 1) + val metaClient = createMetaClient(spark, basePath) - spark.sql( - s""" - | merge into $tableName - | using ( - | select * from ( - | select 1 as id, 'a1_v3' as name, cast(11 as double) as price, cast(1300 as long) as ts - | union all - | select 4 as id, 'a4' as name, cast(14 as double) as price, cast(1300 as long) as ts - | ) - | ) s0 - | on s0.id = $tableName.id - | when matched then update set id = s0.id, name = s0.name, price = s0.price, ts = s0.ts - | when not matched then insert * + spark.sql(s"insert into $tableName values (1, 'a1', 11, 1000), (2, 'a2', 12, 1000), (3, 'a3', 13, 1000)") + val commitTime1 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime + val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1) + cdcDataOnly1.show(false) + assertCDCOpCnt(cdcDataOnly1, 3, 0, 0) + + spark.sql(s"insert into $tableName values (1, 'a1_v2', 11, 1100)") + val commitTime2 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime + // here we use `commitTime1` to query the change data in commit 2. + // because `commitTime2` is maybe the ts of the compaction operation, not the write operation. + val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1.toLong) + cdcDataOnly2.show(false) + assertCDCOpCnt(cdcDataOnly2, 0, 1, 0) + + // Check the details + val originSchema = spark.read.format("hudi").load(basePath).schema + val change2 = cdcDataOnly2.select( + col("op"), + from_json(col("before"), originSchema).as("before"), + from_json(col("after"), originSchema).as("after") + ).select( + col("op"), + col("after.id"), + col("before.name"), + col("before.price"), + col("after.name"), + col("after.price") + ).collect() + checkAnswer(change2)(Seq("u", 1, "a1", 11, "a1_v2", 11)) + + spark.sql(s"update $tableName set name = 'a2_v2', _ts = 1200 where id = 2") + val commitTime3 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime + val cdcDataOnly3 = cdcDataFrame(basePath, commitTime2.toLong) + cdcDataOnly3.show(false) + assertCDCOpCnt(cdcDataOnly3, 0, 1, 0) + + spark.sql(s"delete from $tableName where id = 3") + val commitTime4 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime + val cdcDataOnly4 = cdcDataFrame(basePath, commitTime3.toLong) + cdcDataOnly4.show(false) + assertCDCOpCnt(cdcDataOnly4, 0, 0, 1) + + spark.sql( + s""" + | merge into $tableName + | using ( + | select * from ( + | select 1 as id, 'a1_v3' as name, cast(11 as double) as price, cast(1300 as long) as _ts + | union all + | select 4 as id, 'a4' as name, cast(14 as double) as price, cast(1300 as long) as _ts + | ) + | ) s0 + | on s0.id = $tableName.id + | when matched then update set id = s0.id, name = s0.name, price = s0.price, _ts = s0._ts + | when not matched then insert * """.stripMargin) - val commitTime5 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime - val cdcDataOnly5 = cdcDataFrame(basePath, commitTime4.toLong) - cdcDataOnly5.show(false) - assertCDCOpCnt(cdcDataOnly5, 1, 1, 0) + val commitTime5 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime + val cdcDataOnly5 = cdcDataFrame(basePath, commitTime4.toLong) + cdcDataOnly5.show(false) + assertCDCOpCnt(cdcDataOnly5, 1, 1, 0) - // Check the details - val change5 = cdcDataOnly5.select( - col("op"), - from_json(col("before"), originSchema).as("before"), - from_json(col("after"), originSchema).as("after") - ).select( - col("op"), - col("after.id"), - col("before.name"), - col("before.price"), - col("after.name"), - col("after.price") - ).collect() - checkAnswer(change5.sortBy(_.getInt(1)))( - Seq("u", 1, "a1_v2", 11, "a1_v3", 11), - Seq("i", 4, null, null, "a4", 14) - ) + // Check the details + val change5 = cdcDataOnly5.select( + col("op"), + from_json(col("before"), originSchema).as("before"), + from_json(col("after"), originSchema).as("after") + ).select( + col("op"), + col("after.id"), + col("before.name"), + col("before.price"), + col("after.name"), + col("after.price") + ).collect() + checkAnswer(change5.sortBy(_.getInt(1)))( + Seq("u", 1, "a1_v2", 11, "a1_v3", 11), + Seq("i", 4, null, null, "a4", 14) + ) - val totalCdcData = cdcDataFrame(basePath, commitTime1.toLong - 1) - assertCDCOpCnt(totalCdcData, 4, 3, 1) + val totalCdcData = cdcDataFrame(basePath, commitTime1.toLong - 1) + assertCDCOpCnt(totalCdcData, 4, 3, 1) + } } } } @@ -227,77 +231,79 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase { spark.sql(s"use $databaseName") Seq("cow", "mor").foreach { tableType => - Seq(OP_KEY_ONLY, DATA_BEFORE).foreach { loggingMode => - withTempDir { tmp => - val tableName = generateTableName - val basePath = s"${tmp.getCanonicalPath}/$tableName" - spark.sql( - s""" - | create table $tableName ( - | id int, - | name string, - | price double, - | ts long, - | pt string - | ) using hudi - | partitioned by (pt) - | tblproperties ( - | 'primaryKey' = 'id', - | 'preCombineField' = 'ts', - | 'hoodie.table.cdc.enabled' = 'true', - | 'hoodie.table.cdc.supplemental.logging.mode' = '${loggingMode.name()}', - | 'type' = '$tableType' - | ) - | location '$basePath' + Seq("'preCombineField' = '_ts',", "").foreach { preCombineLine => + Seq(OP_KEY_ONLY, DATA_BEFORE).foreach { loggingMode => + withTempDir { tmp => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | _ts long, + | pt string + | ) using hudi + | partitioned by (pt) + | tblproperties ( + | 'primaryKey' = 'id', + | $preCombineLine + | 'hoodie.table.cdc.enabled' = 'true', + | 'hoodie.table.cdc.supplemental.logging.mode' = '${loggingMode.name()}', + | 'type' = '$tableType' + | ) + | location '$basePath' """.stripMargin) - val metaClient = createMetaClient(spark, basePath) + val metaClient = createMetaClient(spark, basePath) - spark.sql( - s""" - | insert into $tableName values - | (1, 'a1', 11, 1000, '2021'), - | (2, 'a2', 12, 1000, '2022'), - | (3, 'a3', 13, 1000, '2022') + spark.sql( + s""" + | insert into $tableName values + | (1, 'a1', 11, 1000, '2021'), + | (2, 'a2', 12, 1000, '2022'), + | (3, 'a3', 13, 1000, '2022') """.stripMargin) - val commitTime1 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime - val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1) - cdcDataOnly1.show(false) - assertCDCOpCnt(cdcDataOnly1, 3, 0, 0) + val commitTime1 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime + val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1) + cdcDataOnly1.show(false) + assertCDCOpCnt(cdcDataOnly1, 3, 0, 0) - spark.sql(s"insert overwrite table $tableName partition (pt = '2021') values (1, 'a1_v2', 11, 1100)") - val commitTime2 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime - val cdcDataOnly2 = cdcDataFrame(basePath, commitTime2.toLong - 1) - cdcDataOnly2.show(false) - assertCDCOpCnt(cdcDataOnly2, 1, 0, 1) + spark.sql(s"insert overwrite table $tableName partition (pt = '2021') values (1, 'a1_v2', 11, 1100)") + val commitTime2 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime + val cdcDataOnly2 = cdcDataFrame(basePath, commitTime2.toLong - 1) + cdcDataOnly2.show(false) + assertCDCOpCnt(cdcDataOnly2, 1, 0, 1) - spark.sql(s"update $tableName set name = 'a2_v2', ts = 1200 where id = 2") - val commitTime3 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime - val cdcDataOnly3 = cdcDataFrame(basePath, commitTime3.toLong - 1) - cdcDataOnly3.show(false) - assertCDCOpCnt(cdcDataOnly3, 0, 1, 0) + spark.sql(s"update $tableName set name = 'a2_v2', _ts = 1200 where id = 2") + val commitTime3 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime + val cdcDataOnly3 = cdcDataFrame(basePath, commitTime3.toLong - 1) + cdcDataOnly3.show(false) + assertCDCOpCnt(cdcDataOnly3, 0, 1, 0) - spark.sql( - s""" - | merge into $tableName - | using ( - | select * from ( - | select 1 as id, 'a1_v3' as name, cast(11 as double) as price, cast(1300 as long) as ts, "2021" as pt - | union all - | select 4 as id, 'a4' as name, cast(14 as double) as price, cast(1300 as long) as ts, "2022" as pt - | ) - | ) s0 - | on s0.id = $tableName.id - | when matched then update set id = s0.id, name = s0.name, price = s0.price, ts = s0.ts, pt = s0.pt - | when not matched then insert * + spark.sql( + s""" + | merge into $tableName + | using ( + | select * from ( + | select 1 as id, 'a1_v3' as name, cast(11 as double) as price, cast(1300 as long) as _ts, "2021" as pt + | union all + | select 4 as id, 'a4' as name, cast(14 as double) as price, cast(1300 as long) as _ts, "2022" as pt + | ) + | ) s0 + | on s0.id = $tableName.id + | when matched then update set id = s0.id, name = s0.name, price = s0.price, _ts = s0._ts, pt = s0.pt + | when not matched then insert * """.stripMargin) - val commitTime4 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime - val cdcDataOnly4 = cdcDataFrame(basePath, commitTime4.toLong - 1) - cdcDataOnly4.show(false) - assertCDCOpCnt(cdcDataOnly4, 1, 1, 0) + val commitTime4 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime + val cdcDataOnly4 = cdcDataFrame(basePath, commitTime4.toLong - 1) + cdcDataOnly4.show(false) + assertCDCOpCnt(cdcDataOnly4, 1, 1, 0) - val totalCdcData = cdcDataFrame(basePath, commitTime1.toLong - 1) - assertCDCOpCnt(totalCdcData, 5, 2, 1) + val totalCdcData = cdcDataFrame(basePath, commitTime1.toLong - 1) + assertCDCOpCnt(totalCdcData, 5, 2, 1) + } } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala index 82b6e0d23a922..323c378a36013 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala @@ -43,15 +43,23 @@ import scala.collection.JavaConverters._ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { test("Test partial update with COW and Avro log format") { - testPartialUpdate("cow", "avro") + testPartialUpdate("cow", "avro", true) } test("Test partial update with MOR and Avro log format") { - testPartialUpdate("mor", "avro") + testPartialUpdate("mor", "avro", true) } test("Test partial update with MOR and Parquet log format") { - testPartialUpdate("mor", "parquet") + testPartialUpdate("mor", "parquet", true) + } + + test("Test partial update with MOR and Avro log format no precombine") { + testPartialUpdate("mor", "avro", false) + } + + test("Test partial update with MOR and Parquet log format no precombine") { + testPartialUpdate("mor", "parquet", false) } test("Test partial update and insert with COW and Avro log format") { @@ -171,7 +179,8 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { } def testPartialUpdate(tableType: String, - logDataBlockFormat: String): Unit = { + logDataBlockFormat: String, + hasPrecombine: Boolean): Unit = { withTempDir { tmp => val tableName = generateTableName val basePath = tmp.getCanonicalPath + "/" + tableName @@ -180,6 +189,12 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { spark.sql(s"set ${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key} = $logDataBlockFormat") spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} = true") + val preCombineOpt = if (hasPrecombine) { + "preCombineField = '_ts'," + } else { + "" + } + // Create a table with five data fields spark.sql( s""" @@ -192,8 +207,8 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { |) using hudi |tblproperties( | type ='$tableType', - | primaryKey = 'id', - | preCombineField = '_ts' + | $preCombineOpt + | primaryKey = 'id' |) |location '$basePath' """.stripMargin)