From 76fa927be2db76bbdace1281ee6ce2f8afc8070f Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Fri, 22 Nov 2024 12:57:11 -0500 Subject: [PATCH 1/8] fix no precombine set for mor --- .../hudi/common/model/HoodieSparkRecord.java | 5 +- .../apache/hudi/hadoop/HoodieHiveRecord.java | 4 +- .../apache/hudi/HoodieCreateRecordUtils.scala | 8 ++- .../hudi/functional/TestMORDataSource.scala | 63 ++++++++++++++++--- .../dml/TestPartialUpdateForMergeInto.scala | 27 ++++++-- 5 files changed, 86 insertions(+), 21 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java index ab7ec4ac557f2..025839d4c5456 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java @@ -313,8 +313,11 @@ public HoodieSparkRecord copy() { @Override public Comparable getOrderingValue(Schema recordSchema, Properties props) { - StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema); String orderingField = ConfigUtils.getOrderingField(props); + if (StringUtils.isNullOrEmpty(orderingField)) { + return 0; + } + StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema); scala.Option cachedNestedFieldPath = HoodieInternalRowUtils.getCachedPosList(structType, orderingField); if (cachedNestedFieldPath.isDefined()) { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java index a2fb08fd6146d..422e6438d4f5d 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.MetadataValues; import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils; import org.apache.hudi.hadoop.utils.ObjectInspectorCache; @@ -101,9 +102,8 @@ public HoodieRecord newInstance(HoodieKey key) { @Override public Comparable getOrderingValue(Schema recordSchema, Properties props) { String orderingField = ConfigUtils.getOrderingField(props); - if (orderingField == null) { + if (StringUtils.isNullOrEmpty(orderingField)) { return 0; - //throw new IllegalArgumentException("Ordering Field is not set. Precombine must be set. (If you are using a custom record merger it might be something else)"); } return (Comparable) getValue(ConfigUtils.getOrderingField(props)); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala index 048f45bba9f2f..907856389ce18 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala @@ -143,9 +143,11 @@ object HoodieCreateRecordUtils { avroRecWithoutMeta } - val hoodieRecord = if (shouldCombine && !precombineEmpty) { - val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRec, precombine, - false, consistentLogicalTimestampEnabled).asInstanceOf[Comparable[_]] + // lazy so that we don't evaluate if we short circuit the boolean expression in the if below + lazy val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRec, precombine, + true, consistentLogicalTimestampEnabled).asInstanceOf[Comparable[_]] + + val hoodieRecord = if (shouldCombine && !precombineEmpty && orderingVal != null) { DataSourceUtils.createHoodieRecord(processedRecord, orderingVal, hoodieKey, config.getPayloadClass, recordLocation) } else { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 9e3c09edb764c..263436c075915 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -29,7 +29,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.common.util.Option -import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} +import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable import org.apache.hudi.index.HoodieIndex.IndexType import org.apache.hudi.storage.StoragePath @@ -1026,24 +1026,35 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin .save(basePath) } - @ParameterizedTest - @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) - def testClusteringSamePrecombine(recordType: HoodieRecordType): Unit = { + def testClusteringCompactionHelper(recordType: HoodieRecordType, + hasPrecombine: Boolean, + isClustering: Boolean, + withRowWriter: Boolean = false): Unit = { var writeOpts = Map( "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4", DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", - DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", DataSourceWriteOptions.OPERATION.key() -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.TABLE_TYPE.key()-> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, - "hoodie.clustering.inline"-> "true", - "hoodie.clustering.inline.max.commits" -> "2", - "hoodie.clustering.plan.strategy.sort.columns" -> "_row_key", "hoodie.metadata.enable" -> "false", - "hoodie.datasource.write.row.writer.enable" -> "false" + "hoodie.datasource.write.row.writer.enable" -> withRowWriter.toString ) + + if (hasPrecombine) { + writeOpts = writeOpts ++ Map(DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp") + } + + if (isClustering) { + writeOpts = writeOpts ++ Map(HoodieClusteringConfig.INLINE_CLUSTERING.key() -> "true", + HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key() -> "2", + HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key() -> "_row_key") + } else { + writeOpts = writeOpts ++ Map(HoodieCompactionConfig.INLINE_COMPACT.key() -> "true", + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2") + } + if (recordType.equals(HoodieRecordType.SPARK)) { writeOpts = Map(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key -> classOf[DefaultSparkRecordMerger].getName, HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet") ++ writeOpts @@ -1068,6 +1079,40 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin .except(inputDF2.select("_row_key", "partition", "rider")).count()) } + @ParameterizedTest + @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) + def testClusteringSamePrecombine(recordType: HoodieRecordType): Unit = { + testClusteringCompactionHelper(recordType, hasPrecombine = true, isClustering = true) + } + + @ParameterizedTest + @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) + def testClusteringNoPrecombine(recordType: HoodieRecordType): Unit = { + testClusteringCompactionHelper(recordType, hasPrecombine = false, isClustering = true) + } + + @Test + def testClusteringSamePrecombineWithRowWriter(): Unit = { + testClusteringCompactionHelper(HoodieRecordType.SPARK, hasPrecombine = true, isClustering = true, withRowWriter = true) + } + + @Test + def testClusteringNoPrecombineWithRowWriter(): Unit = { + testClusteringCompactionHelper(HoodieRecordType.SPARK, hasPrecombine = false, isClustering = true, withRowWriter = true) + } + + @ParameterizedTest + @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) + def testCompactionSamePrecombine(recordType: HoodieRecordType): Unit = { + testClusteringCompactionHelper(recordType, hasPrecombine = true, isClustering = false) + } + + @ParameterizedTest + @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) + def testCompactionNoPrecombine(recordType: HoodieRecordType): Unit = { + testClusteringCompactionHelper(recordType, hasPrecombine = false, isClustering = false) + } + @ParameterizedTest @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) def testClusteringSamePrecombineWithDelete(recordType: HoodieRecordType): Unit = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala index 82b6e0d23a922..323c378a36013 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala @@ -43,15 +43,23 @@ import scala.collection.JavaConverters._ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { test("Test partial update with COW and Avro log format") { - testPartialUpdate("cow", "avro") + testPartialUpdate("cow", "avro", true) } test("Test partial update with MOR and Avro log format") { - testPartialUpdate("mor", "avro") + testPartialUpdate("mor", "avro", true) } test("Test partial update with MOR and Parquet log format") { - testPartialUpdate("mor", "parquet") + testPartialUpdate("mor", "parquet", true) + } + + test("Test partial update with MOR and Avro log format no precombine") { + testPartialUpdate("mor", "avro", false) + } + + test("Test partial update with MOR and Parquet log format no precombine") { + testPartialUpdate("mor", "parquet", false) } test("Test partial update and insert with COW and Avro log format") { @@ -171,7 +179,8 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { } def testPartialUpdate(tableType: String, - logDataBlockFormat: String): Unit = { + logDataBlockFormat: String, + hasPrecombine: Boolean): Unit = { withTempDir { tmp => val tableName = generateTableName val basePath = tmp.getCanonicalPath + "/" + tableName @@ -180,6 +189,12 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { spark.sql(s"set ${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key} = $logDataBlockFormat") spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} = true") + val preCombineOpt = if (hasPrecombine) { + "preCombineField = '_ts'," + } else { + "" + } + // Create a table with five data fields spark.sql( s""" @@ -192,8 +207,8 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { |) using hudi |tblproperties( | type ='$tableType', - | primaryKey = 'id', - | preCombineField = '_ts' + | $preCombineOpt + | primaryKey = 'id' |) |location '$basePath' """.stripMargin) From 9155119c7ce87cb2d4e80c0a6452eb4e06246a95 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Sun, 24 Nov 2024 17:01:04 -0500 Subject: [PATCH 2/8] add more testing and fix more cases --- .../org/apache/hudi/avro/AvroSchemaUtils.java | 10 +- .../hudi/common/model/HoodieRecordMerger.java | 1 + .../read/HoodieBaseFileGroupRecordBuffer.java | 4 +- .../HoodieFileGroupReaderSchemaHandler.java | 12 +- .../HoodiePositionBasedSchemaHandler.java | 6 +- .../apache/hudi/HoodieCreateRecordUtils.scala | 1 + .../apache/hudi/HoodieSparkSqlWriter.scala | 10 +- .../SparkFullBootstrapDataProviderBase.java | 28 +- .../hudi/functional/TestBootstrapRead.java | 29 +- .../functional/TestBootstrapReadBase.java | 13 +- .../hudi/functional/TestMORDataSource.scala | 133 +++++- .../spark/sql/hudi/ddl/TestCreateTable.scala | 105 ++++- .../sql/hudi/dml/TestCDCForSparkSQL.scala | 398 +++++++++--------- 13 files changed, 525 insertions(+), 225 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java index 18fdedca50f13..e0c47e3e09352 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java @@ -208,6 +208,10 @@ private static boolean isProjectionOfInternal(Schema sourceSchema, } public static Option findNestedFieldType(Schema schema, String fieldName) { + return findNestedFieldType(schema, fieldName, true); + } + + public static Option findNestedFieldType(Schema schema, String fieldName, boolean throwOnNotFound) { if (StringUtils.isNullOrEmpty(fieldName)) { return Option.empty(); } @@ -216,7 +220,11 @@ public static Option findNestedFieldType(Schema schema, String fiel for (String part : parts) { Schema.Field foundField = resolveNullableSchema(schema).getField(part); if (foundField == null) { - throw new HoodieAvroSchemaException(fieldName + " not a field in " + schema); + if (throwOnNotFound) { + throw new HoodieAvroSchemaException(fieldName + " not a field in " + schema); + } else { + return Option.empty(); + } } schema = foundField.schema(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java index 9628e9ced2401..d54a28c1760ba 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java @@ -167,6 +167,7 @@ default String[] getMandatoryFieldsForMerging(Schema dataSchema, HoodieTableConf String preCombine = cfg.getPreCombineField(); if (!StringUtils.isNullOrEmpty(preCombine)) { + requiredFields.add(preCombine); } return requiredFields.toArray(new String[0]); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java index e001446091673..eb4e91d515637 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java @@ -112,7 +112,9 @@ public HoodieBaseFileGroupRecordBuffer(HoodieReaderContext readerContext, this.payloadClass = Option.empty(); } this.orderingFieldName = Option.ofNullable(ConfigUtils.getOrderingField(props)).orElseGet(() -> hoodieTableMetaClient.getTableConfig().getPreCombineField()); - this.orderingFieldTypeOpt = recordMergeMode == RecordMergeMode.COMMIT_TIME_ORDERING ? Option.empty() : AvroSchemaUtils.findNestedFieldType(readerSchema, this.orderingFieldName); + + // Don't throw exception due to [HUDI-8574] + this.orderingFieldTypeOpt = recordMergeMode == RecordMergeMode.COMMIT_TIME_ORDERING ? Option.empty() : AvroSchemaUtils.findNestedFieldType(readerSchema, this.orderingFieldName, false); this.orderingFieldDefault = orderingFieldTypeOpt.map(type -> readerContext.castValue(0, type)).orElse(0); this.props = props; this.internalSchema = readerContext.getSchemaHandler().getInternalSchema(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java index de2cd920289a2..2e27ebe10a658 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java @@ -160,12 +160,16 @@ private Schema generateRequiredSchema() { List addedFields = new ArrayList<>(); for (String field : getMandatoryFieldsForMerging(hoodieTableConfig, properties, dataSchema, recordMerger)) { if (!findNestedField(requestedSchema, field).isPresent()) { - Option foundFieldOpt = findNestedField(dataSchema, field); + Option foundFieldOpt = findNestedField(dataSchema, field); if (!foundFieldOpt.isPresent()) { - throw new IllegalArgumentException("Field: " + field + " does not exist in the table schema"); + //see [HUDI-8574] + if (!field.equals(hoodieTableConfig.getPreCombineField())) { + throw new IllegalArgumentException("Field: " + field + " does not exist in the table schema"); + } + } else { + Schema.Field foundField = foundFieldOpt.get(); + addedFields.add(foundField); } - Schema.Field foundField = foundFieldOpt.get(); - addedFields.add(foundField); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java index 71722f438a771..cafcceb8beff1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java @@ -82,9 +82,11 @@ private static InternalSchema addPositionalMergeCol(InternalSchema internalSchem @Override public Pair,List> getBootstrapRequiredFields() { Pair,List> dataAndMetaCols = super.getBootstrapRequiredFields(); - if (readerContext.supportsParquetRowIndex()) { - if (!dataAndMetaCols.getLeft().isEmpty() && !dataAndMetaCols.getRight().isEmpty()) { + if (readerContext.supportsParquetRowIndex() && (this.needsBootstrapMerge || this.needsMORMerge)) { + if (!dataAndMetaCols.getLeft().isEmpty()) { dataAndMetaCols.getLeft().add(getPositionalMergeField()); + } + if (!dataAndMetaCols.getRight().isEmpty()) { dataAndMetaCols.getRight().add(getPositionalMergeField()); } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala index 907856389ce18..2a1922f385a4a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala @@ -143,6 +143,7 @@ object HoodieCreateRecordUtils { avroRecWithoutMeta } + //TODO [HUDI-8574] we can throw exception if field doesn't exist // lazy so that we don't evaluate if we short circuit the boolean expression in the if below lazy val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRec, precombine, true, consistentLogicalTimestampEnabled).asInstanceOf[Comparable[_]] diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 8398157810240..754bce99a62f8 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -32,7 +32,7 @@ import org.apache.hudi.HoodieSparkSqlWriter.StreamingWriteParams import org.apache.hudi.HoodieSparkUtils.sparkAdapter import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema -import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils} import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} import org.apache.hudi.commit.{DatasetBulkInsertCommitActionExecutor, DatasetBulkInsertOverwriteCommitActionExecutor, DatasetBulkInsertOverwriteTableCommitActionExecutor} @@ -50,7 +50,7 @@ import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption} import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME} import org.apache.hudi.config.HoodieWriteConfig.{SPARK_SQL_MERGE_INTO_PREPPED_KEY, WRITE_TABLE_VERSION} import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, HoodieWriteConfig} -import org.apache.hudi.exception.{HoodieException, HoodieRecordCreationException, HoodieWriteConflictException} +import org.apache.hudi.exception.{HoodieAvroSchemaException, HoodieException, HoodieRecordCreationException, HoodieWriteConflictException} import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.hive.ddl.HiveSyncMode import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool} @@ -746,7 +746,7 @@ class HoodieSparkSqlWriterInternal { String.valueOf(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()) )) - HoodieTableMetaClient.newTableBuilder() + val metaClient = HoodieTableMetaClient.newTableBuilder() .setTableType(HoodieTableType.valueOf(tableType)) .setTableName(tableName) .setRecordKeyFields(recordKeyFields) @@ -755,7 +755,9 @@ class HoodieSparkSqlWriterInternal { .setPayloadClassName(payloadClass) .setRecordMergeMode(RecordMergeMode.getValue(hoodieConfig.getString(HoodieWriteConfig.RECORD_MERGE_MODE))) .setRecordMergeStrategyId(recordMergerStrategy) - .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null)) + // we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value, + // but we are interested in what user has set, hence fetching from optParams. + .setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), null)) .setBootstrapIndexClass(bootstrapIndexClass) .setBaseFileFormat(baseFileFormat) .setBootstrapBasePath(bootstrapBasePath) diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java index 65af0c3543409..fba18f9f5be30 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieSparkRecord; import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; @@ -50,6 +51,8 @@ import java.io.IOException; import java.util.List; +import static org.apache.hudi.config.HoodieWriteConfig.PRECOMBINE_FIELD_NAME; + public abstract class SparkFullBootstrapDataProviderBase extends FullRecordBootstrapDataProvider> { private final transient SparkSession sparkSession; @@ -72,20 +75,31 @@ public JavaRDD generateInputRecords(String tableName, String sourc HoodieRecordType recordType = config.getRecordMerger().getRecordType(); Dataset inputDataset = sparkSession.read().format(getFormat()).option("basePath", sourceBasePath).load(filePaths); KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); - String precombineKey = props.getString("hoodie.datasource.write.precombine.field"); + String precombineKey = ConfigUtils.getStringWithAltKeys(props, PRECOMBINE_FIELD_NAME); + boolean hasPrecombine = !StringUtils.isNullOrEmpty(precombineKey); String structName = tableName + "_record"; String namespace = "hoodie." + tableName; if (recordType == HoodieRecordType.AVRO) { RDD genericRecords = HoodieSparkUtils.createRdd(inputDataset, structName, namespace, false, Option.empty()); return genericRecords.toJavaRDD().map(gr -> { - String orderingVal = HoodieAvroUtils.getNestedFieldValAsString( - gr, precombineKey, false, props.getBoolean( - KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), - Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))); + String orderingVal = null; + if (hasPrecombine) { + //TODO [HUDI-8574] we can throw exception if field doesn't exist + // lazy so that we don't evaluate if we short circuit the boolean expression in the if below + orderingVal = HoodieAvroUtils.getNestedFieldValAsString( + gr, precombineKey, true, props.getBoolean( + KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), + Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))); + } try { - return DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr), - ConfigUtils.getPayloadClass(props), scala.Option.apply(null)); + if (hasPrecombine && !StringUtils.isNullOrEmpty(orderingVal)) { + return DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr), + ConfigUtils.getPayloadClass(props), scala.Option.apply(null)); + } else { + return DataSourceUtils.createHoodieRecord(gr, keyGenerator.getKey(gr), + ConfigUtils.getPayloadClass(props), scala.Option.apply(null)); + } } catch (IOException ioe) { throw new HoodieIOException(ioe.getMessage(), ioe); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java index 1e36f491b3f61..a6ebfa42fd214 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java @@ -18,6 +18,7 @@ package org.apache.hudi.functional; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.spark.sql.Dataset; @@ -47,33 +48,47 @@ private static Stream testArgs() { Boolean[] dashPartitions = {true,false}; HoodieTableType[] tableType = {COPY_ON_WRITE, MERGE_ON_READ}; Integer[] nPartitions = {0, 1, 2}; + HoodieRecord.HoodieRecordType[] recordTypes = {HoodieRecord.HoodieRecordType.AVRO}; for (HoodieTableType tt : tableType) { for (Boolean dash : dashPartitions) { for (String bt : bootstrapType) { for (Integer n : nPartitions) { - // can't be mixed bootstrap if it's nonpartitioned - // don't need to test slash partitions if it's nonpartitioned - if ((!bt.equals("mixed") && dash) || n > 0) { - b.add(Arguments.of(bt, dash, tt, n)); + for (HoodieRecord.HoodieRecordType rt : recordTypes) { + // can't be mixed bootstrap if it's nonpartitioned + // don't need to test slash partitions if it's nonpartitioned + if ((!bt.equals("mixed") && dash) || n > 0) { + b.add(Arguments.of(bt, dash, tt, n, rt, true)); + if (tt.equals(MERGE_ON_READ)) { + b.add(Arguments.of(bt, dash, tt, n, rt, false)); + } + } } } } } } } else { - b.add(Arguments.of("metadata", true, COPY_ON_WRITE, 0)); - b.add(Arguments.of("mixed", false, MERGE_ON_READ, 2)); + b.add(Arguments.of("metadata", true, COPY_ON_WRITE, 0, HoodieRecord.HoodieRecordType.AVRO, true)); + b.add(Arguments.of("mixed", false, MERGE_ON_READ, 1, HoodieRecord.HoodieRecordType.AVRO, false)); + b.add(Arguments.of("mixed", false, MERGE_ON_READ, 2, HoodieRecord.HoodieRecordType.AVRO, true)); } return b.build(); } @ParameterizedTest @MethodSource("testArgs") - public void testBootstrapFunctional(String bootstrapType, Boolean dashPartitions, HoodieTableType tableType, Integer nPartitions) { + public void testBootstrapFunctional(String bootstrapType, + Boolean dashPartitions, + HoodieTableType tableType, + Integer nPartitions, + HoodieRecord.HoodieRecordType recordType, + Boolean hasPrecombine) { this.bootstrapType = bootstrapType; this.dashPartitions = dashPartitions; this.tableType = tableType; this.nPartitions = nPartitions; + this.recordType = recordType; + this.hasPrecombine = hasPrecombine; setupDirs(); // do bootstrap diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java index 011d67d5658f8..5c8f43397da13 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapReadBase.java @@ -19,10 +19,13 @@ package org.apache.hudi.functional; import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.DefaultSparkRecordMerger; import org.apache.hudi.client.bootstrap.selector.BootstrapRegexModeSelector; import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector; import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector; import org.apache.hudi.client.bootstrap.translator.DecodedBootstrapPartitionPathTranslator; +import org.apache.hudi.common.config.HoodieStorageConfig; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.config.HoodieBootstrapConfig; @@ -70,6 +73,8 @@ public abstract class TestBootstrapReadBase extends HoodieSparkClientTestBase { protected Boolean dashPartitions; protected HoodieTableType tableType; protected Integer nPartitions; + protected Boolean hasPrecombine = true; + protected HoodieRecord.HoodieRecordType recordType = HoodieRecord.HoodieRecordType.AVRO; protected String[] partitionCols; protected static String[] dropColumns = {"_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_file_name", "city_to_state"}; @@ -104,7 +109,13 @@ protected Map basicOptions() { options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), ComplexKeyGenerator.class.getName()); } } - options.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp"); + if (hasPrecombine) { + options.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp"); + } + if (recordType == HoodieRecord.HoodieRecordType.SPARK) { + options.put(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), DefaultSparkRecordMerger.class.getName()); + options.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"); + } if (tableType.equals(MERGE_ON_READ)) { options.put(HoodieCompactionConfig.INLINE_COMPACT.key(), "true"); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 263436c075915..06ea06caa8df5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -26,13 +26,14 @@ import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPU import org.apache.hudi.common.model._ import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.hudi.common.testutils.HoodieTestDataGenerator +import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestUtils} import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.common.util.Option import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable import org.apache.hudi.index.HoodieIndex.IndexType import org.apache.hudi.storage.StoragePath +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration import org.apache.hudi.table.action.compact.CompactionTriggerStrategy import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieSparkClientTestBase} import org.apache.hudi.util.JFunction @@ -1113,6 +1114,136 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin testClusteringCompactionHelper(recordType, hasPrecombine = false, isClustering = false) } + @ParameterizedTest + @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) + def testPrecombineBehavior(recordType: HoodieRecordType): Unit = { + Seq("COPY_ON_WRITE", "MERGE_ON_READ").foreach { tableType => + val basicOpts = if (recordType == HoodieRecordType.SPARK) { + Map(HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key() -> tableType, + DataSourceWriteOptions.OPERATION.key() -> UPSERT_OPERATION_OPT_VAL, + DataSourceWriteOptions.RECORDKEY_FIELD.key()-> "id") + } else { + sparkOpts ++ Map(HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.TABLE_TYPE.key() -> tableType, + DataSourceWriteOptions.OPERATION.key() -> UPSERT_OPERATION_OPT_VAL, + DataSourceWriteOptions.RECORDKEY_FIELD.key()-> "id") + } + + val _spark = spark + import _spark.implicits._ + + // precombine is ts, table schema has ts + // result: tableconfig precombine is ts + val table1Path = basePath + "/table1" + val df1: Dataset[Row] = Seq((1, "a1", 10, 1000)).toDF("id", "name", "price", "ts") + df1.write.format("org.apache.hudi") + .options(basicOpts) + .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts") + .mode(SaveMode.Overwrite) + .save(table1Path) + spark.read.format("hudi").load(table1Path).show(100, false) + df1.write.format("org.apache.hudi") + .options(basicOpts) + .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts") + .mode(SaveMode.Append) + .save(table1Path) + spark.read.format("hudi").load(table1Path).show(100, false) + assertPrecombine("ts", table1Path) + + // precombine is not set, table schema has ts + // result: ts is used as precombine but not set in tableconfig + val table2Path = basePath + "/table2" + val df2: Dataset[Row] = Seq((1, "a1", 10, 1000)).toDF("id", "name", "price", "ts") + df2.write.format("org.apache.hudi") + .options(basicOpts) + .mode(SaveMode.Overwrite) + .save(table2Path) + spark.read.format("hudi").load(table2Path).show(100, false) + df2.write.format("org.apache.hudi") + .options(basicOpts) + .mode(SaveMode.Append) + .save(table2Path) + spark.read.format("hudi").load(table2Path).show(100, false) + assertPrecombine(null, table2Path) + + // precombine is price, table schema has price + // result: tableconfig precombine is price + val table3Path = basePath + "/table3" + val df3: Dataset[Row] = Seq((1, "a1", 10, 1000)).toDF("id", "name", "price", "ts") + df3.write.format("org.apache.hudi") + .options(basicOpts) + .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "price") + .mode(SaveMode.Overwrite) + .save(table3Path) + spark.read.format("hudi").load(table3Path).show(100, false) + df3.write.format("org.apache.hudi") + .options(basicOpts) + .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "price") + .mode(SaveMode.Append) + .save(table3Path) + spark.read.format("hudi").load(table3Path).show(100, false) + assertPrecombine("price", table3Path) + + // precombine is not notexist, table schema does not have notexist + // TODO [HUDI-8574] this should fail + // result: exception + val table4Path = basePath + "/table4" + val df4: Dataset[Row] = Seq((1, "a1", 10, 1000)).toDF("id", "name", "price", "ts") + df4.write.format("org.apache.hudi") + .options(basicOpts) + .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "noexist") + .mode(SaveMode.Overwrite) + .save(table4Path) + spark.read.format("hudi").load(table4Path).show(100, false) + df4.write.format("org.apache.hudi") + .options(basicOpts) + .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "noexist") + .mode(SaveMode.Append) + .save(table4Path) + spark.read.format("hudi").load(table4Path).show(100, false) + + // precombine is not set, table schema does not have ts + // result: precombine is not used and tableconfig does not have precombine set + val table5Path = basePath + "/table5" + val df5: Dataset[Row] = Seq((1, "a1", 10)).toDF("id", "name", "price") + df5.write.format("org.apache.hudi") + .options(basicOpts) + .mode(SaveMode.Overwrite) + .save(table5Path) + spark.read.format("hudi").load(table5Path).show(100, false) + df5.write.format("org.apache.hudi") + .options(basicOpts) + .mode(SaveMode.Append) + .save(table5Path) + spark.read.format("hudi").load(table5Path).show(100, false) + assertPrecombine(null, table5Path) + + // precombine is ts, table schema does not have ts + // TODO [HUDI-8574] this should fail + val table6Path = basePath + "/table6" + val df6: Dataset[Row] = Seq((1, "a1", 10)).toDF("id", "name", "price") + df6.write.format("org.apache.hudi") + .options(basicOpts) + .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts") + .mode(SaveMode.Overwrite) + .save(table6Path) + spark.read.format("hudi").load(table6Path).show(100, false) + df6.write.format("org.apache.hudi") + .options(basicOpts) + .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "ts") + .mode(SaveMode.Append) + .save(table6Path) + spark.read.format("hudi").load(table6Path).show(100, false) + } + } + + def assertPrecombine(expected: String, tableBasePath: String): Unit = { + assertEquals(expected, HoodieTestUtils + .createMetaClient(new HadoopStorageConfiguration(spark.sessionState.newHadoopConf), tableBasePath) + .getTableConfig.getPreCombineField) + } + @ParameterizedTest @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) def testClusteringSamePrecombineWithDelete(recordType: HoodieRecordType): Unit = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala index 12c0d64833803..c65a54cd45190 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.getLastCommitMetadata import org.apache.spark.sql.types._ -import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue} +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import scala.collection.JavaConverters._ @@ -1555,4 +1555,107 @@ class TestCreateTable extends HoodieSparkSqlTestBase { )("Failed to create catalog table in metastore") } } + + test("Test Precombine Behavior With Create Table As Select") { + Seq("cow", "mor").foreach { tableType => + withTempDir { tmp => + + // precombine is ts, table schema has ts + // result: tableconfig precombine is ts + val tableName1 = generateTableName + spark.sql( + s""" + | create table $tableName1 using hudi + | tblproperties( + | type = '$tableType', + | preCombineField = 'ts' + | ) + | location '${tmp.getCanonicalPath}/$tableName1' + | AS + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts + """.stripMargin) + assertEquals("ts", createMetaClient(spark, s"${tmp.getCanonicalPath}/$tableName1").getTableConfig.getPreCombineField) + + // precombine is not set, table schema has ts + // result: ts is used as precombine but not set in tableconfig + val tableName2 = generateTableName + spark.sql( + s""" + | create table $tableName2 using hudi + | tblproperties( + | type = '$tableType' + | ) + | location '${tmp.getCanonicalPath}/$tableName2' + | AS + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts + """.stripMargin) + assertEquals(null, createMetaClient(spark, s"${tmp.getCanonicalPath}/$tableName2").getTableConfig.getPreCombineField) + + // precombine is price, table schema has price + // result: tableconfig precombine is price + val tableName3 = generateTableName + spark.sql( + s""" + | create table $tableName3 using hudi + | tblproperties( + | type = '$tableType', + | preCombineField = 'price' + | ) + | location '${tmp.getCanonicalPath}/$tableName3' + | AS + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts + """.stripMargin) + assertEquals("price", createMetaClient(spark, s"${tmp.getCanonicalPath}/$tableName3").getTableConfig.getPreCombineField) + + // precombine is not notexist, table schema does not have notexist + // result: exception + val tableName4 = generateTableName + assertThrows[IllegalArgumentException] { + spark.sql( + s""" + | create table $tableName4 using hudi + | tblproperties( + | type = '$tableType', + | preCombineField = 'notexist' + | ) + | location '${tmp.getCanonicalPath}/$tableName4' + | AS + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts + """.stripMargin) + } + + // precombine is not set, table schema does not have ts + // result: precombine is not used and tableconfig does not have precombine set + val tableName5 = generateTableName + spark.sql( + s""" + | create table $tableName5 using hudi + | tblproperties( + | type = '$tableType' + | ) + | location '${tmp.getCanonicalPath}/$tableName5' + | AS + | select 1 as id, 'a1' as name, 10 as price + """.stripMargin) + assertEquals(null, createMetaClient(spark, s"${tmp.getCanonicalPath}/$tableName5").getTableConfig.getPreCombineField) + + // precombine is ts, table schema does not have ts + // result: exception + val tableName6 = generateTableName + assertThrows[IllegalArgumentException] { + spark.sql( + s""" + | create table $tableName6 using hudi + | tblproperties( + | type = '$tableType', + | preCombineField = 'ts' + | ) + | location '${tmp.getCanonicalPath}/$tableName6' + | AS + | select 1 as id, 'a1' as name, 10 as price + """.stripMargin) + } + } + } + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCDCForSparkSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCDCForSparkSQL.scala index 5535400cb667d..3ff760c698b9d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCDCForSparkSQL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCDCForSparkSQL.scala @@ -49,169 +49,173 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase { test("Test delete all records in filegroup") { Seq("cow", "mor").foreach { tableType => - withTempDir { tmp => - val databaseName = "hudi_database" - spark.sql(s"create database if not exists $databaseName") - spark.sql(s"use $databaseName") - val tableName = generateTableName - val basePath = s"${tmp.getCanonicalPath}/$tableName" - spark.sql( - s""" - | create table $tableName ( - | id int, - | name string, - | price double, - | ts long - | ) using hudi - | partitioned by (name) - | tblproperties ( - | 'primaryKey' = 'id', - | 'preCombineField' = 'ts', - | 'hoodie.table.cdc.enabled' = 'true', - | 'hoodie.table.cdc.supplemental.logging.mode' = '$DATA_BEFORE_AFTER', - | type = '$tableType' - | ) - | location '$basePath' - """.stripMargin) - val metaClient = createMetaClient(spark, basePath) - spark.sql(s"insert into $tableName values (1, 11, 1000, 'a1'), (2, 12, 1000, 'a2')") - assert(spark.sql(s"select _hoodie_file_name from $tableName").distinct().count() == 2) - val fgForID1 = spark.sql(s"select _hoodie_file_name from $tableName where id=1").head().get(0) - val commitTime1 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime - val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1) - cdcDataOnly1.show(false) - assertCDCOpCnt(cdcDataOnly1, 2, 0, 0) - - spark.sql(s"delete from $tableName where id = 1") - val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1.toLong) - assertCDCOpCnt(cdcDataOnly2, 0, 0, 1) - assert(spark.sql(s"select _hoodie_file_name from $tableName").distinct().count() == 1) - assert(!spark.sql(s"select _hoodie_file_name from $tableName").head().get(0).equals(fgForID1)) - } - } - } - - /** - * Test CDC in cases that it's a COW/MOR non--partitioned table and `cdcSupplementalLoggingMode` is true or not. - */ - test("Test Non-Partitioned Hoodie Table") { - val databaseName = "hudi_database" - spark.sql(s"create database if not exists $databaseName") - spark.sql(s"use $databaseName") - - Seq("cow", "mor").foreach { tableType => - Seq(OP_KEY_ONLY, DATA_BEFORE, DATA_BEFORE_AFTER).foreach { loggingMode => + Seq("'preCombineField' = '_ts',", "").foreach { preCombineLine => withTempDir { tmp => + val databaseName = "hudi_database" + spark.sql(s"create database if not exists $databaseName") + spark.sql(s"use $databaseName") val tableName = generateTableName val basePath = s"${tmp.getCanonicalPath}/$tableName" - spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert") - val otherTableProperties = if (tableType == "mor") { - "'hoodie.compact.inline'='true', 'hoodie.compact.inline.max.delta.commits'='2'," - } else { - "" - } spark.sql( s""" | create table $tableName ( | id int, | name string, | price double, - | ts long + | _ts long | ) using hudi + | partitioned by (name) | tblproperties ( | 'primaryKey' = 'id', - | 'preCombineField' = 'ts', + | $preCombineLine | 'hoodie.table.cdc.enabled' = 'true', - | 'hoodie.table.cdc.supplemental.logging.mode' = '${loggingMode.name()}', - | $otherTableProperties + | 'hoodie.table.cdc.supplemental.logging.mode' = '$DATA_BEFORE_AFTER', | type = '$tableType' | ) | location '$basePath' - """.stripMargin) - + """.stripMargin) val metaClient = createMetaClient(spark, basePath) - - spark.sql(s"insert into $tableName values (1, 'a1', 11, 1000), (2, 'a2', 12, 1000), (3, 'a3', 13, 1000)") + spark.sql(s"insert into $tableName values (1, 11, 1000, 'a1'), (2, 12, 1000, 'a2')") + assert(spark.sql(s"select _hoodie_file_name from $tableName").distinct().count() == 2) + val fgForID1 = spark.sql(s"select _hoodie_file_name from $tableName where id=1").head().get(0) val commitTime1 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1) cdcDataOnly1.show(false) - assertCDCOpCnt(cdcDataOnly1, 3, 0, 0) + assertCDCOpCnt(cdcDataOnly1, 2, 0, 0) - spark.sql(s"insert into $tableName values (1, 'a1_v2', 11, 1100)") - val commitTime2 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime - // here we use `commitTime1` to query the change data in commit 2. - // because `commitTime2` is maybe the ts of the compaction operation, not the write operation. + spark.sql(s"delete from $tableName where id = 1") val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1.toLong) - cdcDataOnly2.show(false) - assertCDCOpCnt(cdcDataOnly2, 0, 1, 0) + assertCDCOpCnt(cdcDataOnly2, 0, 0, 1) + assert(spark.sql(s"select _hoodie_file_name from $tableName").distinct().count() == 1) + assert(!spark.sql(s"select _hoodie_file_name from $tableName").head().get(0).equals(fgForID1)) + } + } + } + } - // Check the details - val originSchema = spark.read.format("hudi").load(basePath).schema - val change2 = cdcDataOnly2.select( - col("op"), - from_json(col("before"), originSchema).as("before"), - from_json(col("after"), originSchema).as("after") - ).select( - col("op"), - col("after.id"), - col("before.name"), - col("before.price"), - col("after.name"), - col("after.price") - ).collect() - checkAnswer(change2)(Seq("u", 1, "a1", 11, "a1_v2", 11)) + /** + * Test CDC in cases that it's a COW/MOR non--partitioned table and `cdcSupplementalLoggingMode` is true or not. + */ + test("Test Non-Partitioned Hoodie Table") { + val databaseName = "hudi_database" + spark.sql(s"create database if not exists $databaseName") + spark.sql(s"use $databaseName") - spark.sql(s"update $tableName set name = 'a2_v2', ts = 1200 where id = 2") - val commitTime3 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime - val cdcDataOnly3 = cdcDataFrame(basePath, commitTime2.toLong) - cdcDataOnly3.show(false) - assertCDCOpCnt(cdcDataOnly3, 0, 1, 0) + Seq("cow", "mor").foreach { tableType => + Seq("'preCombineField' = '_ts',", "").foreach { preCombineLine => + Seq(OP_KEY_ONLY, DATA_BEFORE, DATA_BEFORE_AFTER).foreach { loggingMode => + withTempDir { tmp => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert") + val otherTableProperties = if (tableType == "mor") { + "'hoodie.compact.inline'='true', 'hoodie.compact.inline.max.delta.commits'='2'," + } else { + "" + } + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | _ts long + | ) using hudi + | tblproperties ( + | 'primaryKey' = 'id', + | $preCombineLine + | 'hoodie.table.cdc.enabled' = 'true', + | 'hoodie.table.cdc.supplemental.logging.mode' = '${loggingMode.name()}', + | $otherTableProperties + | type = '$tableType' + | ) + | location '$basePath' + """.stripMargin) - spark.sql(s"delete from $tableName where id = 3") - val commitTime4 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime - val cdcDataOnly4 = cdcDataFrame(basePath, commitTime3.toLong) - cdcDataOnly4.show(false) - assertCDCOpCnt(cdcDataOnly4, 0, 0, 1) + val metaClient = createMetaClient(spark, basePath) - spark.sql( - s""" - | merge into $tableName - | using ( - | select * from ( - | select 1 as id, 'a1_v3' as name, cast(11 as double) as price, cast(1300 as long) as ts - | union all - | select 4 as id, 'a4' as name, cast(14 as double) as price, cast(1300 as long) as ts - | ) - | ) s0 - | on s0.id = $tableName.id - | when matched then update set id = s0.id, name = s0.name, price = s0.price, ts = s0.ts - | when not matched then insert * + spark.sql(s"insert into $tableName values (1, 'a1', 11, 1000), (2, 'a2', 12, 1000), (3, 'a3', 13, 1000)") + val commitTime1 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime + val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1) + cdcDataOnly1.show(false) + assertCDCOpCnt(cdcDataOnly1, 3, 0, 0) + + spark.sql(s"insert into $tableName values (1, 'a1_v2', 11, 1100)") + val commitTime2 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime + // here we use `commitTime1` to query the change data in commit 2. + // because `commitTime2` is maybe the ts of the compaction operation, not the write operation. + val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1.toLong) + cdcDataOnly2.show(false) + assertCDCOpCnt(cdcDataOnly2, 0, 1, 0) + + // Check the details + val originSchema = spark.read.format("hudi").load(basePath).schema + val change2 = cdcDataOnly2.select( + col("op"), + from_json(col("before"), originSchema).as("before"), + from_json(col("after"), originSchema).as("after") + ).select( + col("op"), + col("after.id"), + col("before.name"), + col("before.price"), + col("after.name"), + col("after.price") + ).collect() + checkAnswer(change2)(Seq("u", 1, "a1", 11, "a1_v2", 11)) + + spark.sql(s"update $tableName set name = 'a2_v2', _ts = 1200 where id = 2") + val commitTime3 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime + val cdcDataOnly3 = cdcDataFrame(basePath, commitTime2.toLong) + cdcDataOnly3.show(false) + assertCDCOpCnt(cdcDataOnly3, 0, 1, 0) + + spark.sql(s"delete from $tableName where id = 3") + val commitTime4 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime + val cdcDataOnly4 = cdcDataFrame(basePath, commitTime3.toLong) + cdcDataOnly4.show(false) + assertCDCOpCnt(cdcDataOnly4, 0, 0, 1) + + spark.sql( + s""" + | merge into $tableName + | using ( + | select * from ( + | select 1 as id, 'a1_v3' as name, cast(11 as double) as price, cast(1300 as long) as _ts + | union all + | select 4 as id, 'a4' as name, cast(14 as double) as price, cast(1300 as long) as _ts + | ) + | ) s0 + | on s0.id = $tableName.id + | when matched then update set id = s0.id, name = s0.name, price = s0.price, _ts = s0._ts + | when not matched then insert * """.stripMargin) - val commitTime5 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime - val cdcDataOnly5 = cdcDataFrame(basePath, commitTime4.toLong) - cdcDataOnly5.show(false) - assertCDCOpCnt(cdcDataOnly5, 1, 1, 0) + val commitTime5 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime + val cdcDataOnly5 = cdcDataFrame(basePath, commitTime4.toLong) + cdcDataOnly5.show(false) + assertCDCOpCnt(cdcDataOnly5, 1, 1, 0) - // Check the details - val change5 = cdcDataOnly5.select( - col("op"), - from_json(col("before"), originSchema).as("before"), - from_json(col("after"), originSchema).as("after") - ).select( - col("op"), - col("after.id"), - col("before.name"), - col("before.price"), - col("after.name"), - col("after.price") - ).collect() - checkAnswer(change5.sortBy(_.getInt(1)))( - Seq("u", 1, "a1_v2", 11, "a1_v3", 11), - Seq("i", 4, null, null, "a4", 14) - ) + // Check the details + val change5 = cdcDataOnly5.select( + col("op"), + from_json(col("before"), originSchema).as("before"), + from_json(col("after"), originSchema).as("after") + ).select( + col("op"), + col("after.id"), + col("before.name"), + col("before.price"), + col("after.name"), + col("after.price") + ).collect() + checkAnswer(change5.sortBy(_.getInt(1)))( + Seq("u", 1, "a1_v2", 11, "a1_v3", 11), + Seq("i", 4, null, null, "a4", 14) + ) - val totalCdcData = cdcDataFrame(basePath, commitTime1.toLong - 1) - assertCDCOpCnt(totalCdcData, 4, 3, 1) + val totalCdcData = cdcDataFrame(basePath, commitTime1.toLong - 1) + assertCDCOpCnt(totalCdcData, 4, 3, 1) + } } } } @@ -227,77 +231,79 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase { spark.sql(s"use $databaseName") Seq("cow", "mor").foreach { tableType => - Seq(OP_KEY_ONLY, DATA_BEFORE).foreach { loggingMode => - withTempDir { tmp => - val tableName = generateTableName - val basePath = s"${tmp.getCanonicalPath}/$tableName" - spark.sql( - s""" - | create table $tableName ( - | id int, - | name string, - | price double, - | ts long, - | pt string - | ) using hudi - | partitioned by (pt) - | tblproperties ( - | 'primaryKey' = 'id', - | 'preCombineField' = 'ts', - | 'hoodie.table.cdc.enabled' = 'true', - | 'hoodie.table.cdc.supplemental.logging.mode' = '${loggingMode.name()}', - | 'type' = '$tableType' - | ) - | location '$basePath' + Seq("'preCombineField' = '_ts',", "").foreach { preCombineLine => + Seq(OP_KEY_ONLY, DATA_BEFORE).foreach { loggingMode => + withTempDir { tmp => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | _ts long, + | pt string + | ) using hudi + | partitioned by (pt) + | tblproperties ( + | 'primaryKey' = 'id', + | $preCombineLine + | 'hoodie.table.cdc.enabled' = 'true', + | 'hoodie.table.cdc.supplemental.logging.mode' = '${loggingMode.name()}', + | 'type' = '$tableType' + | ) + | location '$basePath' """.stripMargin) - val metaClient = createMetaClient(spark, basePath) + val metaClient = createMetaClient(spark, basePath) - spark.sql( - s""" - | insert into $tableName values - | (1, 'a1', 11, 1000, '2021'), - | (2, 'a2', 12, 1000, '2022'), - | (3, 'a3', 13, 1000, '2022') + spark.sql( + s""" + | insert into $tableName values + | (1, 'a1', 11, 1000, '2021'), + | (2, 'a2', 12, 1000, '2022'), + | (3, 'a3', 13, 1000, '2022') """.stripMargin) - val commitTime1 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime - val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1) - cdcDataOnly1.show(false) - assertCDCOpCnt(cdcDataOnly1, 3, 0, 0) + val commitTime1 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime + val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1) + cdcDataOnly1.show(false) + assertCDCOpCnt(cdcDataOnly1, 3, 0, 0) - spark.sql(s"insert overwrite table $tableName partition (pt = '2021') values (1, 'a1_v2', 11, 1100)") - val commitTime2 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime - val cdcDataOnly2 = cdcDataFrame(basePath, commitTime2.toLong - 1) - cdcDataOnly2.show(false) - assertCDCOpCnt(cdcDataOnly2, 1, 0, 1) + spark.sql(s"insert overwrite table $tableName partition (pt = '2021') values (1, 'a1_v2', 11, 1100)") + val commitTime2 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime + val cdcDataOnly2 = cdcDataFrame(basePath, commitTime2.toLong - 1) + cdcDataOnly2.show(false) + assertCDCOpCnt(cdcDataOnly2, 1, 0, 1) - spark.sql(s"update $tableName set name = 'a2_v2', ts = 1200 where id = 2") - val commitTime3 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime - val cdcDataOnly3 = cdcDataFrame(basePath, commitTime3.toLong - 1) - cdcDataOnly3.show(false) - assertCDCOpCnt(cdcDataOnly3, 0, 1, 0) + spark.sql(s"update $tableName set name = 'a2_v2', _ts = 1200 where id = 2") + val commitTime3 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime + val cdcDataOnly3 = cdcDataFrame(basePath, commitTime3.toLong - 1) + cdcDataOnly3.show(false) + assertCDCOpCnt(cdcDataOnly3, 0, 1, 0) - spark.sql( - s""" - | merge into $tableName - | using ( - | select * from ( - | select 1 as id, 'a1_v3' as name, cast(11 as double) as price, cast(1300 as long) as ts, "2021" as pt - | union all - | select 4 as id, 'a4' as name, cast(14 as double) as price, cast(1300 as long) as ts, "2022" as pt - | ) - | ) s0 - | on s0.id = $tableName.id - | when matched then update set id = s0.id, name = s0.name, price = s0.price, ts = s0.ts, pt = s0.pt - | when not matched then insert * + spark.sql( + s""" + | merge into $tableName + | using ( + | select * from ( + | select 1 as id, 'a1_v3' as name, cast(11 as double) as price, cast(1300 as long) as _ts, "2021" as pt + | union all + | select 4 as id, 'a4' as name, cast(14 as double) as price, cast(1300 as long) as _ts, "2022" as pt + | ) + | ) s0 + | on s0.id = $tableName.id + | when matched then update set id = s0.id, name = s0.name, price = s0.price, _ts = s0._ts, pt = s0.pt + | when not matched then insert * """.stripMargin) - val commitTime4 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime - val cdcDataOnly4 = cdcDataFrame(basePath, commitTime4.toLong - 1) - cdcDataOnly4.show(false) - assertCDCOpCnt(cdcDataOnly4, 1, 1, 0) + val commitTime4 = metaClient.reloadActiveTimeline.lastInstant().get().requestedTime + val cdcDataOnly4 = cdcDataFrame(basePath, commitTime4.toLong - 1) + cdcDataOnly4.show(false) + assertCDCOpCnt(cdcDataOnly4, 1, 1, 0) - val totalCdcData = cdcDataFrame(basePath, commitTime1.toLong - 1) - assertCDCOpCnt(totalCdcData, 5, 2, 1) + val totalCdcData = cdcDataFrame(basePath, commitTime1.toLong - 1) + assertCDCOpCnt(totalCdcData, 5, 2, 1) + } } } } From c95f9fb21c345ca925f5b666f50795a563ed32df Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Sun, 24 Nov 2024 17:17:28 -0500 Subject: [PATCH 3/8] clean up style --- .../java/org/apache/hudi/common/model/HoodieRecordMerger.java | 1 - .../hudi/bootstrap/SparkFullBootstrapDataProviderBase.java | 1 - 2 files changed, 2 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java index d54a28c1760ba..9628e9ced2401 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java @@ -167,7 +167,6 @@ default String[] getMandatoryFieldsForMerging(Schema dataSchema, HoodieTableConf String preCombine = cfg.getPreCombineField(); if (!StringUtils.isNullOrEmpty(preCombine)) { - requiredFields.add(preCombine); } return requiredFields.toArray(new String[0]); diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java index fba18f9f5be30..b7bbae1799b9b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java @@ -86,7 +86,6 @@ public JavaRDD generateInputRecords(String tableName, String sourc String orderingVal = null; if (hasPrecombine) { //TODO [HUDI-8574] we can throw exception if field doesn't exist - // lazy so that we don't evaluate if we short circuit the boolean expression in the if below orderingVal = HoodieAvroUtils.getNestedFieldValAsString( gr, precombineKey, true, props.getBoolean( KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), From 77f2b837af4769509682fc84cb652faeab476083 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Mon, 25 Nov 2024 11:39:03 -0500 Subject: [PATCH 4/8] update sql test to update records --- .../spark/sql/hudi/ddl/TestCreateTable.scala | 30 ++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala index c65a54cd45190..a60fcfd228cf8 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala @@ -1574,7 +1574,15 @@ class TestCreateTable extends HoodieSparkSqlTestBase { | AS | select 1 as id, 'a1' as name, 10 as price, 1000 as ts """.stripMargin) + checkAnswer(s"select id, name, price, ts from $tableName1")( + Seq(1, "a1", 10, 1000) + ) assertEquals("ts", createMetaClient(spark, s"${tmp.getCanonicalPath}/$tableName1").getTableConfig.getPreCombineField) + spark.sql(s"update $tableName1 set name = 'a2', ts = 1001 where id = 1") + checkAnswer(s"select id, name, price, ts from $tableName1")( + Seq(1, "a2", 10, 1001) + ) + // precombine is not set, table schema has ts // result: ts is used as precombine but not set in tableconfig @@ -1589,7 +1597,14 @@ class TestCreateTable extends HoodieSparkSqlTestBase { | AS | select 1 as id, 'a1' as name, 10 as price, 1000 as ts """.stripMargin) + checkAnswer(s"select id, name, price, ts from $tableName2")( + Seq(1, "a1", 10, 1000) + ) assertEquals(null, createMetaClient(spark, s"${tmp.getCanonicalPath}/$tableName2").getTableConfig.getPreCombineField) + spark.sql(s"update $tableName2 set name = 'a2', ts = 1001 where id = 1") + checkAnswer(s"select id, name, price, ts from $tableName2")( + Seq(1, "a2", 10, 1001) + ) // precombine is price, table schema has price // result: tableconfig precombine is price @@ -1605,7 +1620,14 @@ class TestCreateTable extends HoodieSparkSqlTestBase { | AS | select 1 as id, 'a1' as name, 10 as price, 1000 as ts """.stripMargin) + checkAnswer(s"select id, name, price, ts from $tableName3")( + Seq(1, "a1", 10, 1000) + ) assertEquals("price", createMetaClient(spark, s"${tmp.getCanonicalPath}/$tableName3").getTableConfig.getPreCombineField) + spark.sql(s"update $tableName3 set name = 'a2', price = 11 where id = 1") + checkAnswer(s"select id, name, price, ts from $tableName3")( + Seq(1, "a2", 11, 1000) + ) // precombine is not notexist, table schema does not have notexist // result: exception @@ -1637,8 +1659,14 @@ class TestCreateTable extends HoodieSparkSqlTestBase { | AS | select 1 as id, 'a1' as name, 10 as price """.stripMargin) + checkAnswer(s"select id, name, price from $tableName5")( + Seq(1, "a1", 10) + ) assertEquals(null, createMetaClient(spark, s"${tmp.getCanonicalPath}/$tableName5").getTableConfig.getPreCombineField) - + spark.sql(s"update $tableName5 set name = 'a2', price = 11 where id = 1") + checkAnswer(s"select id, name, price from $tableName5")( + Seq(1, "a2", 11) + ) // precombine is ts, table schema does not have ts // result: exception val tableName6 = generateTableName From 7643abc1500f7e19ffa682a7856d8d63c5f7cd02 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Mon, 25 Nov 2024 17:50:52 -0500 Subject: [PATCH 5/8] make some changes to schema handler and add tests. Still need mor bootstrap tests --- ...rkFileFormatInternalRowReaderContext.scala | 2 +- .../common/engine/HoodieReaderContext.java | 2 + .../table/read/HoodieFileGroupReader.java | 14 +- .../HoodieFileGroupReaderSchemaHandler.java | 24 +- .../HoodiePositionBasedSchemaHandler.java | 21 +- .../common/table/read/TestSchemaHandler.java | 320 ++++++++++++++++++ 6 files changed, 353 insertions(+), 30 deletions(-) create mode 100644 hudi-common/src/test/java/org/apache/hudi/common/table/read/TestSchemaHandler.java diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala index 00e14e06bf6bf..c5b19ddf50502 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala @@ -167,7 +167,7 @@ class SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea HoodieAvroUtils.removeFields(skeletonRequiredSchema, rowIndexColumn)) //If we need to do position based merging with log files we will leave the row index column at the end - val dataProjection = if (getHasLogFiles && getShouldMergeUseRecordPosition) { + val dataProjection = if (getShouldMergeUseRecordPosition) { getIdentityProjection } else { projectRecord(dataRequiredSchema, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java index a0d779cd40e0b..6479dcee6df86 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java @@ -61,6 +61,8 @@ public abstract class HoodieReaderContext { private Boolean hasLogFiles = null; private Boolean hasBootstrapBaseFile = null; private Boolean needsBootstrapMerge = null; + + // should we do position based merging for mor private Boolean shouldMergeUseRecordPosition = null; // Getter and Setter for schemaHandler diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java index ac34efb0ab2da..1df2d95a74c0b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java @@ -96,7 +96,12 @@ public HoodieFileGroupReader(HoodieReaderContext readerContext, this.readerContext = readerContext; this.storage = storage; this.hoodieBaseFileOption = fileSlice.getBaseFile(); + readerContext.setHasBootstrapBaseFile(hoodieBaseFileOption.isPresent() && hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent()); this.logFiles = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); + readerContext.setHasLogFiles(!this.logFiles.isEmpty()); + if (readerContext.getHasLogFiles() && start != 0) { + throw new IllegalArgumentException("Filegroup reader is doing log file merge but not reading from the start of the base file"); + } this.props = props; this.start = start; this.length = length; @@ -109,17 +114,12 @@ public HoodieFileGroupReader(HoodieReaderContext readerContext, readerContext.setTablePath(tablePath); readerContext.setLatestCommitTime(latestCommitTime); boolean isSkipMerge = ConfigUtils.getStringWithAltKeys(props, HoodieReaderConfig.MERGE_TYPE, true).equalsIgnoreCase(HoodieReaderConfig.REALTIME_SKIP_MERGE); - readerContext.setShouldMergeUseRecordPosition(shouldUseRecordPosition && !isSkipMerge); - readerContext.setHasLogFiles(!this.logFiles.isEmpty()); - if (readerContext.getHasLogFiles() && start != 0) { - throw new IllegalArgumentException("Filegroup reader is doing log file merge but not reading from the start of the base file"); - } - readerContext.setHasBootstrapBaseFile(hoodieBaseFileOption.isPresent() && hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent()); + readerContext.setShouldMergeUseRecordPosition(shouldUseRecordPosition && !isSkipMerge && readerContext.getHasLogFiles()); readerContext.setSchemaHandler(readerContext.supportsParquetRowIndex() ? new HoodiePositionBasedSchemaHandler<>(readerContext, dataSchema, requestedSchema, internalSchemaOpt, tableConfig, props) : new HoodieFileGroupReaderSchemaHandler<>(readerContext, dataSchema, requestedSchema, internalSchemaOpt, tableConfig, props)); this.outputConverter = readerContext.getSchemaHandler().getOutputConverter(); - this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient, tableConfig.getRecordMergeMode(), props, this.logFiles.isEmpty(), isSkipMerge, shouldUseRecordPosition); + this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient, tableConfig.getRecordMergeMode(), props, !readerContext.getHasLogFiles(), isSkipMerge, shouldUseRecordPosition); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java index 2e27ebe10a658..35701ba0a5454 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java @@ -70,14 +70,7 @@ public class HoodieFileGroupReaderSchemaHandler { protected final HoodieReaderContext readerContext; protected final TypedProperties properties; - - protected final Option recordMerger; - - protected final boolean hasBootstrapBaseFile; - protected boolean needsBootstrapMerge; - - protected final boolean needsMORMerge; - + public HoodieFileGroupReaderSchemaHandler(HoodieReaderContext readerContext, Schema dataSchema, Schema requestedSchema, @@ -86,16 +79,12 @@ public HoodieFileGroupReaderSchemaHandler(HoodieReaderContext readerContext, TypedProperties properties) { this.properties = properties; this.readerContext = readerContext; - this.hasBootstrapBaseFile = readerContext.getHasBootstrapBaseFile(); - this.needsMORMerge = readerContext.getHasLogFiles(); - this.recordMerger = readerContext.getRecordMerger(); this.dataSchema = dataSchema; this.requestedSchema = requestedSchema; this.hoodieTableConfig = hoodieTableConfig; this.requiredSchema = prepareRequiredSchema(); this.internalSchema = pruneInternalSchema(requiredSchema, internalSchemaOpt); this.internalSchemaOpt = getInternalSchemaOpt(internalSchemaOpt); - readerContext.setNeedsBootstrapMerge(this.needsBootstrapMerge); } public Schema getDataSchema() { @@ -147,18 +136,18 @@ protected InternalSchema doPruneInternalSchema(Schema requiredSchema, InternalSc private Schema generateRequiredSchema() { //might need to change this if other queries than mor have mandatory fields - if (!needsMORMerge) { + if (!readerContext.getHasLogFiles()) { return requestedSchema; } if (hoodieTableConfig.getRecordMergeMode() == RecordMergeMode.CUSTOM) { - if (!recordMerger.get().isProjectionCompatible()) { + if (!readerContext.getRecordMerger().get().isProjectionCompatible()) { return dataSchema; } } List addedFields = new ArrayList<>(); - for (String field : getMandatoryFieldsForMerging(hoodieTableConfig, properties, dataSchema, recordMerger)) { + for (String field : getMandatoryFieldsForMerging(hoodieTableConfig, properties, dataSchema, readerContext.getRecordMerger())) { if (!findNestedField(requestedSchema, field).isPresent()) { Option foundFieldOpt = findNestedField(dataSchema, field); if (!foundFieldOpt.isPresent()) { @@ -209,8 +198,9 @@ private static String[] getMandatoryFieldsForMerging(HoodieTableConfig cfg, Type protected Schema prepareRequiredSchema() { Schema preReorderRequiredSchema = generateRequiredSchema(); Pair, List> requiredFields = getDataAndMetaCols(preReorderRequiredSchema); - this.needsBootstrapMerge = hasBootstrapBaseFile && !requiredFields.getLeft().isEmpty() && !requiredFields.getRight().isEmpty(); - return needsBootstrapMerge + readerContext.setNeedsBootstrapMerge(readerContext.getHasBootstrapBaseFile() + && !requiredFields.getLeft().isEmpty() && !requiredFields.getRight().isEmpty()); + return readerContext.getNeedsBootstrapMerge() ? createSchemaFromFields(Stream.concat(requiredFields.getLeft().stream(), requiredFields.getRight().stream()).collect(Collectors.toList())) : preReorderRequiredSchema; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java index cafcceb8beff1..c7373b87975f6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.engine.HoodieReaderContext; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.Types; @@ -50,10 +51,18 @@ public HoodiePositionBasedSchemaHandler(HoodieReaderContext readerContext, super(readerContext, dataSchema, requestedSchema, internalSchemaOpt, hoodieTableConfig, properties); } + private boolean morMergeNeedsPositionCol() { + return readerContext.supportsParquetRowIndex() && readerContext.getShouldMergeUseRecordPosition(); + } + + private boolean bootstrapMergeNeedsPositionCol() { + return readerContext.supportsParquetRowIndex() && readerContext.getNeedsBootstrapMerge(); + } + @Override protected Schema prepareRequiredSchema() { Schema preMergeSchema = super.prepareRequiredSchema(); - return readerContext.getShouldMergeUseRecordPosition() && readerContext.getHasLogFiles() + return morMergeNeedsPositionCol() ? addPositionalMergeCol(preMergeSchema) : preMergeSchema; } @@ -65,7 +74,7 @@ protected Option getInternalSchemaOpt(Option int @Override protected InternalSchema doPruneInternalSchema(Schema requiredSchema, InternalSchema internalSchema) { - if (!(readerContext.getShouldMergeUseRecordPosition() && readerContext.getHasLogFiles())) { + if (!morMergeNeedsPositionCol()) { return super.doPruneInternalSchema(requiredSchema, internalSchema); } @@ -82,7 +91,7 @@ private static InternalSchema addPositionalMergeCol(InternalSchema internalSchem @Override public Pair,List> getBootstrapRequiredFields() { Pair,List> dataAndMetaCols = super.getBootstrapRequiredFields(); - if (readerContext.supportsParquetRowIndex() && (this.needsBootstrapMerge || this.needsMORMerge)) { + if (bootstrapMergeNeedsPositionCol() || morMergeNeedsPositionCol()) { if (!dataAndMetaCols.getLeft().isEmpty()) { dataAndMetaCols.getLeft().add(getPositionalMergeField()); } @@ -93,11 +102,13 @@ public Pair,List> getBootstrapRequiredFields() return dataAndMetaCols; } - private static Schema addPositionalMergeCol(Schema input) { + @VisibleForTesting + static Schema addPositionalMergeCol(Schema input) { return appendFieldsToSchemaDedupNested(input, Collections.singletonList(getPositionalMergeField())); } - private static Schema.Field getPositionalMergeField() { + @VisibleForTesting + static Schema.Field getPositionalMergeField() { return new Schema.Field(ROW_INDEX_TEMPORARY_COLUMN_NAME, Schema.create(Schema.Type.LONG), "", -1L); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestSchemaHandler.java b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestSchemaHandler.java new file mode 100644 index 0000000000000..6aab5203b805f --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestSchemaHandler.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.read; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.config.RecordMergeMode; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordMerger; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.UnaryOperator; +import java.util.stream.Stream; + +import static org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING; +import static org.apache.hudi.common.config.RecordMergeMode.CUSTOM; +import static org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING; +import static org.apache.hudi.common.table.read.HoodiePositionBasedSchemaHandler.addPositionalMergeCol; +import static org.apache.hudi.common.table.read.HoodiePositionBasedSchemaHandler.getPositionalMergeField; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestSchemaHandler { + + protected static final Schema DATA_SCHEMA = HoodieAvroUtils.addMetadataFields(HoodieTestDataGenerator.AVRO_SCHEMA); + protected static final Schema DATA_COLS_ONLY_SCHEMA = generateProjectionSchema("begin_lat", "tip_history", "rider"); + protected static final Schema META_COLS_ONLY_SCHEMA = generateProjectionSchema("_hoodie_commit_seqno", "_hoodie_record_key"); + + @Test + public void testCow() { + HoodieReaderContext readerContext = new MockReaderContext(false); + readerContext.setHasLogFiles(false); + readerContext.setHasBootstrapBaseFile(false); + readerContext.setShouldMergeUseRecordPosition(false); + HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class); + Schema requestedSchema = DATA_SCHEMA; + HoodieFileGroupReaderSchemaHandler schemaHandler = new HoodieFileGroupReaderSchemaHandler(readerContext, DATA_SCHEMA, + requestedSchema, Option.empty(), hoodieTableConfig, new TypedProperties()); + assertEquals(requestedSchema, schemaHandler.getRequiredSchema()); + + //read subset of columns + requestedSchema = generateProjectionSchema("begin_lat", "tip_history", "rider"); + schemaHandler = + new HoodieFileGroupReaderSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, + Option.empty(), hoodieTableConfig, new TypedProperties()); + assertEquals(requestedSchema, schemaHandler.getRequiredSchema()); + } + + @Test + public void testCowBootstrap() { + HoodieReaderContext readerContext = new MockReaderContext(false); + readerContext.setHasLogFiles(false); + readerContext.setHasBootstrapBaseFile(true); + readerContext.setShouldMergeUseRecordPosition(false); + HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class); + Schema requestedSchema = generateProjectionSchema("begin_lat", "tip_history", "_hoodie_record_key", "rider"); + HoodieFileGroupReaderSchemaHandler schemaHandler = + new HoodieFileGroupReaderSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, + Option.empty(), hoodieTableConfig, new TypedProperties()); + //meta cols must go first in the required schema + Schema expectedRequiredSchema = generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history", "rider"); + assertEquals(expectedRequiredSchema, schemaHandler.getRequiredSchema()); + Pair, List> bootstrapFields = schemaHandler.getBootstrapRequiredFields(); + assertEquals(Collections.singletonList(getField("_hoodie_record_key")), bootstrapFields.getLeft()); + assertEquals(Arrays.asList(getField("begin_lat"), getField("tip_history"), getField("rider")), bootstrapFields.getRight()); + } + + @Test + public void testCowBootstrapWithPositionMerge() { + HoodieReaderContext readerContext = new MockReaderContext(true); + readerContext.setHasLogFiles(false); + readerContext.setHasBootstrapBaseFile(true); + readerContext.setShouldMergeUseRecordPosition(false); + HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class); + Schema requestedSchema = generateProjectionSchema("begin_lat", "tip_history", "_hoodie_record_key", "rider"); + HoodieFileGroupReaderSchemaHandler schemaHandler = + new HoodiePositionBasedSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, + Option.empty(), hoodieTableConfig, new TypedProperties()); + //meta cols must go first in the required schema + Schema expectedRequiredSchema = generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history", "rider"); + assertEquals(expectedRequiredSchema, schemaHandler.getRequiredSchema()); + Pair, List> bootstrapFields = schemaHandler.getBootstrapRequiredFields(); + assertEquals(Arrays.asList(getField("_hoodie_record_key"), getPositionalMergeField()), bootstrapFields.getLeft()); + assertEquals(Arrays.asList(getField("begin_lat"), getField("tip_history"), getField("rider"), getPositionalMergeField()), bootstrapFields.getRight()); + + schemaHandler = new HoodiePositionBasedSchemaHandler(readerContext, DATA_SCHEMA, DATA_COLS_ONLY_SCHEMA, + Option.empty(), hoodieTableConfig, new TypedProperties()); + assertEquals(DATA_COLS_ONLY_SCHEMA, schemaHandler.getRequiredSchema()); + bootstrapFields = schemaHandler.getBootstrapRequiredFields(); + assertTrue(bootstrapFields.getLeft().isEmpty()); + assertEquals(Arrays.asList(getField("begin_lat"), getField("tip_history"), getField("rider")), bootstrapFields.getRight()); + + schemaHandler = new HoodiePositionBasedSchemaHandler(readerContext, DATA_SCHEMA, META_COLS_ONLY_SCHEMA, + Option.empty(), hoodieTableConfig, new TypedProperties()); + assertEquals(META_COLS_ONLY_SCHEMA, schemaHandler.getRequiredSchema()); + bootstrapFields = schemaHandler.getBootstrapRequiredFields(); + assertEquals(Arrays.asList(getField("_hoodie_commit_seqno"), getField("_hoodie_record_key")), bootstrapFields.getLeft()); + assertTrue(bootstrapFields.getRight().isEmpty()); + } + + private static Stream testMorParams() { + Stream.Builder b = Stream.builder(); + for (boolean mergeUseRecordPosition : new boolean[] {true, false}) { + for (boolean supportsParquetRowIndex : new boolean[] {true, false}) { + b.add(Arguments.of(EVENT_TIME_ORDERING, true, false, mergeUseRecordPosition, supportsParquetRowIndex)); + b.add(Arguments.of(EVENT_TIME_ORDERING, false, false, mergeUseRecordPosition, supportsParquetRowIndex)); + b.add(Arguments.of(COMMIT_TIME_ORDERING, false, false, mergeUseRecordPosition, supportsParquetRowIndex)); + b.add(Arguments.of(CUSTOM, false, true, mergeUseRecordPosition, supportsParquetRowIndex)); + b.add(Arguments.of(CUSTOM, false, false, mergeUseRecordPosition, supportsParquetRowIndex)); + } + } + return b.build(); + } + + @ParameterizedTest + @MethodSource("testMorParams") + public void testMor(RecordMergeMode mergeMode, + boolean hasPrecombine, + boolean isProjectionCompatible, + boolean mergeUseRecordPosition, + boolean supportsParquetRowIndex) { + HoodieReaderContext readerContext = new MockReaderContext(supportsParquetRowIndex); + readerContext.setHasLogFiles(true); + readerContext.setHasBootstrapBaseFile(false); + //has no effect on schema unless we support position based merging + readerContext.setShouldMergeUseRecordPosition(mergeUseRecordPosition); + HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class); + when(hoodieTableConfig.populateMetaFields()).thenReturn(Boolean.TRUE); + when(hoodieTableConfig.getRecordMergeMode()).thenReturn(mergeMode); + if (hasPrecombine) { + when(hoodieTableConfig.getPreCombineField()).thenReturn("timestamp"); + } else { + when(hoodieTableConfig.getPreCombineField()).thenReturn(null); + } + if (mergeMode == CUSTOM) { + Option merger = Option.of(new MockMerger(isProjectionCompatible, new String[]{"begin_lat", "begin_lon", "_hoodie_record_key", "timestamp"})); + readerContext.setRecordMerger(merger); + } + Schema requestedSchema = DATA_SCHEMA; + HoodieFileGroupReaderSchemaHandler schemaHandler = supportsParquetRowIndex + ? new HoodiePositionBasedSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, + Option.empty(), hoodieTableConfig, new TypedProperties()) + : new HoodieFileGroupReaderSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, + Option.empty(), hoodieTableConfig, new TypedProperties()); + Schema expectedRequiredFullSchema = supportsParquetRowIndex && mergeUseRecordPosition + ? HoodiePositionBasedSchemaHandler.addPositionalMergeCol(requestedSchema) + : requestedSchema; + assertEquals(expectedRequiredFullSchema, schemaHandler.getRequiredSchema()); + + //read subset of columns + requestedSchema = generateProjectionSchema("begin_lat", "tip_history", "rider"); + schemaHandler = supportsParquetRowIndex + ? new HoodiePositionBasedSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, + Option.empty(), hoodieTableConfig, new TypedProperties()) + : new HoodieFileGroupReaderSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, + Option.empty(), hoodieTableConfig, new TypedProperties()); + Schema expectedRequiredSchema; + if (mergeMode == EVENT_TIME_ORDERING && hasPrecombine) { + expectedRequiredSchema = generateProjectionSchema("begin_lat", "tip_history", "rider", "_hoodie_record_key", "timestamp"); + } else if (mergeMode == EVENT_TIME_ORDERING || mergeMode == COMMIT_TIME_ORDERING) { + expectedRequiredSchema = generateProjectionSchema("begin_lat", "tip_history", "rider", "_hoodie_record_key"); + } else if (mergeMode == CUSTOM && isProjectionCompatible) { + expectedRequiredSchema = generateProjectionSchema("begin_lat", "tip_history", "rider", "begin_lon", "_hoodie_record_key", "timestamp"); + } else { + expectedRequiredSchema = DATA_SCHEMA; + } + if (supportsParquetRowIndex && mergeUseRecordPosition) { + expectedRequiredSchema = addPositionalMergeCol(expectedRequiredSchema); + } + assertEquals(expectedRequiredSchema, schemaHandler.getRequiredSchema()); + } + + private static Schema generateProjectionSchema(String... fields) { + return HoodieAvroUtils.generateProjectionSchema(DATA_SCHEMA, Arrays.asList(fields)); + } + + private Schema.Field getField(String fieldName) { + return DATA_SCHEMA.getField(fieldName); + } + + static class MockMerger implements HoodieRecordMerger { + + private final boolean isProjectionCompatible; + private final String[] mandatoryMergeFields; + + MockMerger(boolean isProjectionCompatible, String[] mandatoryMergeFields) { + this.isProjectionCompatible = isProjectionCompatible; + this.mandatoryMergeFields = mandatoryMergeFields; + } + + @Override + public boolean isProjectionCompatible() { + return this.isProjectionCompatible; + } + + @Override + public String[] getMandatoryFieldsForMerging(Schema dataSchema, HoodieTableConfig cfg, TypedProperties properties) { + return this.mandatoryMergeFields; + } + + @Override + public Option> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, + Schema newSchema, TypedProperties props) throws IOException { + return null; + } + + @Override + public HoodieRecord.HoodieRecordType getRecordType() { + return null; + } + + @Override + public String getMergingStrategy() { + return ""; + } + } + + static class MockReaderContext extends HoodieReaderContext { + private final boolean supportsParquetRowIndex; + + MockReaderContext(boolean supportsParquetRowIndex) { + this.supportsParquetRowIndex = supportsParquetRowIndex; + } + + @Override + public boolean supportsParquetRowIndex() { + return this.supportsParquetRowIndex; + } + + @Override + public ClosableIterator getFileRecordIterator(StoragePath filePath, long start, long length, Schema dataSchema, Schema requiredSchema, HoodieStorage storage) throws IOException { + return null; + } + + @Override + public String convertAvroRecord(IndexedRecord avroRecord) { + return ""; + } + + @Override + public GenericRecord convertToAvroRecord(String record, Schema schema) { + return null; + } + + @Override + public Option getRecordMerger(RecordMergeMode mergeMode, String mergeStrategyId, String mergeImplClasses) { + return null; + } + + @Override + public Object getValue(String record, Schema schema, String fieldName) { + return null; + } + + @Override + public HoodieRecord constructHoodieRecord(Option recordOption, Map metadataMap) { + return null; + } + + @Override + public String seal(String record) { + return ""; + } + + @Override + public ClosableIterator mergeBootstrapReaders(ClosableIterator skeletonFileIterator, Schema skeletonRequiredSchema, ClosableIterator dataFileIterator, + Schema dataRequiredSchema) { + return null; + } + + @Override + public UnaryOperator projectRecord(Schema from, Schema to, Map renamedColumns) { + return null; + } + + @Override + public Comparable castValue(Comparable value, Schema.Type newType) { + return null; + } + } +} From e4efb8f049efdd106655bd6e7f3d83fec6770c55 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Tue, 26 Nov 2024 12:33:27 -0500 Subject: [PATCH 6/8] add schema handler test for mor bootstrap --- .../common/table/read/TestSchemaHandler.java | 146 +++++++++++++++++- 1 file changed, 145 insertions(+), 1 deletion(-) diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestSchemaHandler.java b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestSchemaHandler.java index 6aab5203b805f..312dbe1c52782 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestSchemaHandler.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestSchemaHandler.java @@ -187,7 +187,7 @@ public void testMor(RecordMergeMode mergeMode, assertEquals(expectedRequiredFullSchema, schemaHandler.getRequiredSchema()); //read subset of columns - requestedSchema = generateProjectionSchema("begin_lat", "tip_history", "rider"); + requestedSchema = DATA_COLS_ONLY_SCHEMA; schemaHandler = supportsParquetRowIndex ? new HoodiePositionBasedSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, Option.empty(), hoodieTableConfig, new TypedProperties()) @@ -209,6 +209,150 @@ public void testMor(RecordMergeMode mergeMode, assertEquals(expectedRequiredSchema, schemaHandler.getRequiredSchema()); } + @ParameterizedTest + @MethodSource("testMorParams") + public void testMorBootstrap(RecordMergeMode mergeMode, + boolean hasPrecombine, + boolean isProjectionCompatible, + boolean mergeUseRecordPosition, + boolean supportsParquetRowIndex) { + HoodieReaderContext readerContext = new MockReaderContext(supportsParquetRowIndex); + readerContext.setHasLogFiles(true); + readerContext.setHasBootstrapBaseFile(true); + readerContext.setShouldMergeUseRecordPosition(mergeUseRecordPosition); + HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class); + when(hoodieTableConfig.populateMetaFields()).thenReturn(Boolean.TRUE); + when(hoodieTableConfig.getRecordMergeMode()).thenReturn(mergeMode); + if (hasPrecombine) { + when(hoodieTableConfig.getPreCombineField()).thenReturn("timestamp"); + } else { + when(hoodieTableConfig.getPreCombineField()).thenReturn(null); + } + if (mergeMode == CUSTOM) { + // NOTE: in this test custom doesn't have any meta cols because it is more interesting of a test case + Option merger = Option.of(new MockMerger(isProjectionCompatible, new String[]{"begin_lat", "begin_lon", "timestamp"})); + readerContext.setRecordMerger(merger); + } + Schema requestedSchema = DATA_SCHEMA; + HoodieFileGroupReaderSchemaHandler schemaHandler = supportsParquetRowIndex + ? new HoodiePositionBasedSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, + Option.empty(), hoodieTableConfig, new TypedProperties()) + : new HoodieFileGroupReaderSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, + Option.empty(), hoodieTableConfig, new TypedProperties()); + Schema expectedRequiredFullSchema = supportsParquetRowIndex && mergeUseRecordPosition + ? HoodiePositionBasedSchemaHandler.addPositionalMergeCol(requestedSchema) + : requestedSchema; + assertEquals(expectedRequiredFullSchema, schemaHandler.getRequiredSchema()); + Pair, List> bootstrapFields = schemaHandler.getBootstrapRequiredFields(); + Pair, List> expectedBootstrapFields = HoodieFileGroupReaderSchemaHandler.getDataAndMetaCols(expectedRequiredFullSchema); + if (supportsParquetRowIndex) { + expectedBootstrapFields.getLeft().add(HoodiePositionBasedSchemaHandler.getPositionalMergeField()); + expectedBootstrapFields.getRight().add(HoodiePositionBasedSchemaHandler.getPositionalMergeField()); + } + assertEquals(expectedBootstrapFields.getLeft(), bootstrapFields.getLeft()); + assertEquals(expectedBootstrapFields.getRight(), bootstrapFields.getRight()); + + //read subset of columns + requestedSchema = generateProjectionSchema("begin_lat", "tip_history", "_hoodie_record_key", "rider"); + schemaHandler = supportsParquetRowIndex + ? new HoodiePositionBasedSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, + Option.empty(), hoodieTableConfig, new TypedProperties()) + : new HoodieFileGroupReaderSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, + Option.empty(), hoodieTableConfig, new TypedProperties()); + Schema expectedRequiredSchema; + if (mergeMode == EVENT_TIME_ORDERING && hasPrecombine) { + expectedRequiredSchema = generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history", "rider", "timestamp"); + } else if (mergeMode == EVENT_TIME_ORDERING || mergeMode == COMMIT_TIME_ORDERING) { + expectedRequiredSchema = generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history", "rider"); + } else if (mergeMode == CUSTOM && isProjectionCompatible) { + expectedRequiredSchema = generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history", "rider", "begin_lon", "timestamp"); + } else { + expectedRequiredSchema = DATA_SCHEMA; + } + if (supportsParquetRowIndex && mergeUseRecordPosition) { + expectedRequiredSchema = addPositionalMergeCol(expectedRequiredSchema); + } + assertEquals(expectedRequiredSchema, schemaHandler.getRequiredSchema()); + bootstrapFields = schemaHandler.getBootstrapRequiredFields(); + expectedBootstrapFields = HoodieFileGroupReaderSchemaHandler.getDataAndMetaCols(expectedRequiredSchema); + if (supportsParquetRowIndex) { + expectedBootstrapFields.getLeft().add(HoodiePositionBasedSchemaHandler.getPositionalMergeField()); + expectedBootstrapFields.getRight().add(HoodiePositionBasedSchemaHandler.getPositionalMergeField()); + } + assertEquals(expectedBootstrapFields.getLeft(), bootstrapFields.getLeft()); + assertEquals(expectedBootstrapFields.getRight(), bootstrapFields.getRight()); + + // request only data cols + requestedSchema = DATA_COLS_ONLY_SCHEMA; + schemaHandler = supportsParquetRowIndex + ? new HoodiePositionBasedSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, + Option.empty(), hoodieTableConfig, new TypedProperties()) + : new HoodieFileGroupReaderSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, + Option.empty(), hoodieTableConfig, new TypedProperties()); + if (mergeMode == EVENT_TIME_ORDERING && hasPrecombine) { + expectedRequiredSchema = generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history", "rider", "timestamp"); + } else if (mergeMode == EVENT_TIME_ORDERING || mergeMode == COMMIT_TIME_ORDERING) { + expectedRequiredSchema = generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history", "rider"); + } else if (mergeMode == CUSTOM && isProjectionCompatible) { + expectedRequiredSchema = generateProjectionSchema("begin_lat", "tip_history", "rider", "begin_lon", "timestamp"); + } else { + expectedRequiredSchema = DATA_SCHEMA; + } + if (supportsParquetRowIndex && mergeUseRecordPosition) { + expectedRequiredSchema = addPositionalMergeCol(expectedRequiredSchema); + } + assertEquals(expectedRequiredSchema, schemaHandler.getRequiredSchema()); + bootstrapFields = schemaHandler.getBootstrapRequiredFields(); + expectedBootstrapFields = HoodieFileGroupReaderSchemaHandler.getDataAndMetaCols(expectedRequiredSchema); + if (supportsParquetRowIndex) { + if (mergeMode == CUSTOM && isProjectionCompatible) { + if (mergeUseRecordPosition) { + expectedBootstrapFields.getRight().add(HoodiePositionBasedSchemaHandler.getPositionalMergeField()); + } + } else { + expectedBootstrapFields.getLeft().add(HoodiePositionBasedSchemaHandler.getPositionalMergeField()); + expectedBootstrapFields.getRight().add(HoodiePositionBasedSchemaHandler.getPositionalMergeField()); + } + } + assertEquals(expectedBootstrapFields.getLeft(), bootstrapFields.getLeft()); + assertEquals(expectedBootstrapFields.getRight(), bootstrapFields.getRight()); + + // request only meta cols + requestedSchema = META_COLS_ONLY_SCHEMA; + schemaHandler = supportsParquetRowIndex + ? new HoodiePositionBasedSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, + Option.empty(), hoodieTableConfig, new TypedProperties()) + : new HoodieFileGroupReaderSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, + Option.empty(), hoodieTableConfig, new TypedProperties()); + if (mergeMode == EVENT_TIME_ORDERING && hasPrecombine) { + expectedRequiredSchema = generateProjectionSchema("_hoodie_commit_seqno", "_hoodie_record_key", "timestamp"); + } else if (mergeMode == EVENT_TIME_ORDERING || mergeMode == COMMIT_TIME_ORDERING) { + expectedRequiredSchema = generateProjectionSchema("_hoodie_commit_seqno", "_hoodie_record_key"); + } else if (mergeMode == CUSTOM && isProjectionCompatible) { + expectedRequiredSchema = generateProjectionSchema("_hoodie_commit_seqno", "_hoodie_record_key", "begin_lat", "begin_lon", "timestamp"); + } else { + expectedRequiredSchema = DATA_SCHEMA; + } + if (supportsParquetRowIndex && mergeUseRecordPosition) { + expectedRequiredSchema = addPositionalMergeCol(expectedRequiredSchema); + } + assertEquals(expectedRequiredSchema, schemaHandler.getRequiredSchema()); + bootstrapFields = schemaHandler.getBootstrapRequiredFields(); + expectedBootstrapFields = HoodieFileGroupReaderSchemaHandler.getDataAndMetaCols(expectedRequiredSchema); + if (supportsParquetRowIndex) { + if ((mergeMode == EVENT_TIME_ORDERING && !hasPrecombine) || mergeMode == COMMIT_TIME_ORDERING) { + if (mergeUseRecordPosition) { + expectedBootstrapFields.getLeft().add(HoodiePositionBasedSchemaHandler.getPositionalMergeField()); + } + } else { + expectedBootstrapFields.getLeft().add(HoodiePositionBasedSchemaHandler.getPositionalMergeField()); + expectedBootstrapFields.getRight().add(HoodiePositionBasedSchemaHandler.getPositionalMergeField()); + } + } + assertEquals(expectedBootstrapFields.getLeft(), bootstrapFields.getLeft()); + assertEquals(expectedBootstrapFields.getRight(), bootstrapFields.getRight()); + } + private static Schema generateProjectionSchema(String... fields) { return HoodieAvroUtils.generateProjectionSchema(DATA_SCHEMA, Arrays.asList(fields)); } From 03b2522ddbfbce2b014abed526b6a18217dae93c Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Tue, 26 Nov 2024 16:16:35 -0500 Subject: [PATCH 7/8] move fg reader changes to different pr --- ...rkFileFormatInternalRowReaderContext.scala | 2 +- .../common/engine/HoodieReaderContext.java | 2 - .../table/read/HoodieFileGroupReader.java | 14 +- .../HoodieFileGroupReaderSchemaHandler.java | 24 +- .../HoodiePositionBasedSchemaHandler.java | 14 +- .../common/table/read/TestSchemaHandler.java | 464 ------------------ 6 files changed, 28 insertions(+), 492 deletions(-) delete mode 100644 hudi-common/src/test/java/org/apache/hudi/common/table/read/TestSchemaHandler.java diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala index c5b19ddf50502..00e14e06bf6bf 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala @@ -167,7 +167,7 @@ class SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea HoodieAvroUtils.removeFields(skeletonRequiredSchema, rowIndexColumn)) //If we need to do position based merging with log files we will leave the row index column at the end - val dataProjection = if (getShouldMergeUseRecordPosition) { + val dataProjection = if (getHasLogFiles && getShouldMergeUseRecordPosition) { getIdentityProjection } else { projectRecord(dataRequiredSchema, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java index 6479dcee6df86..a0d779cd40e0b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java @@ -61,8 +61,6 @@ public abstract class HoodieReaderContext { private Boolean hasLogFiles = null; private Boolean hasBootstrapBaseFile = null; private Boolean needsBootstrapMerge = null; - - // should we do position based merging for mor private Boolean shouldMergeUseRecordPosition = null; // Getter and Setter for schemaHandler diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java index 1df2d95a74c0b..ac34efb0ab2da 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java @@ -96,12 +96,7 @@ public HoodieFileGroupReader(HoodieReaderContext readerContext, this.readerContext = readerContext; this.storage = storage; this.hoodieBaseFileOption = fileSlice.getBaseFile(); - readerContext.setHasBootstrapBaseFile(hoodieBaseFileOption.isPresent() && hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent()); this.logFiles = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); - readerContext.setHasLogFiles(!this.logFiles.isEmpty()); - if (readerContext.getHasLogFiles() && start != 0) { - throw new IllegalArgumentException("Filegroup reader is doing log file merge but not reading from the start of the base file"); - } this.props = props; this.start = start; this.length = length; @@ -114,12 +109,17 @@ public HoodieFileGroupReader(HoodieReaderContext readerContext, readerContext.setTablePath(tablePath); readerContext.setLatestCommitTime(latestCommitTime); boolean isSkipMerge = ConfigUtils.getStringWithAltKeys(props, HoodieReaderConfig.MERGE_TYPE, true).equalsIgnoreCase(HoodieReaderConfig.REALTIME_SKIP_MERGE); - readerContext.setShouldMergeUseRecordPosition(shouldUseRecordPosition && !isSkipMerge && readerContext.getHasLogFiles()); + readerContext.setShouldMergeUseRecordPosition(shouldUseRecordPosition && !isSkipMerge); + readerContext.setHasLogFiles(!this.logFiles.isEmpty()); + if (readerContext.getHasLogFiles() && start != 0) { + throw new IllegalArgumentException("Filegroup reader is doing log file merge but not reading from the start of the base file"); + } + readerContext.setHasBootstrapBaseFile(hoodieBaseFileOption.isPresent() && hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent()); readerContext.setSchemaHandler(readerContext.supportsParquetRowIndex() ? new HoodiePositionBasedSchemaHandler<>(readerContext, dataSchema, requestedSchema, internalSchemaOpt, tableConfig, props) : new HoodieFileGroupReaderSchemaHandler<>(readerContext, dataSchema, requestedSchema, internalSchemaOpt, tableConfig, props)); this.outputConverter = readerContext.getSchemaHandler().getOutputConverter(); - this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient, tableConfig.getRecordMergeMode(), props, !readerContext.getHasLogFiles(), isSkipMerge, shouldUseRecordPosition); + this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient, tableConfig.getRecordMergeMode(), props, this.logFiles.isEmpty(), isSkipMerge, shouldUseRecordPosition); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java index 35701ba0a5454..2e27ebe10a658 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java @@ -70,7 +70,14 @@ public class HoodieFileGroupReaderSchemaHandler { protected final HoodieReaderContext readerContext; protected final TypedProperties properties; - + + protected final Option recordMerger; + + protected final boolean hasBootstrapBaseFile; + protected boolean needsBootstrapMerge; + + protected final boolean needsMORMerge; + public HoodieFileGroupReaderSchemaHandler(HoodieReaderContext readerContext, Schema dataSchema, Schema requestedSchema, @@ -79,12 +86,16 @@ public HoodieFileGroupReaderSchemaHandler(HoodieReaderContext readerContext, TypedProperties properties) { this.properties = properties; this.readerContext = readerContext; + this.hasBootstrapBaseFile = readerContext.getHasBootstrapBaseFile(); + this.needsMORMerge = readerContext.getHasLogFiles(); + this.recordMerger = readerContext.getRecordMerger(); this.dataSchema = dataSchema; this.requestedSchema = requestedSchema; this.hoodieTableConfig = hoodieTableConfig; this.requiredSchema = prepareRequiredSchema(); this.internalSchema = pruneInternalSchema(requiredSchema, internalSchemaOpt); this.internalSchemaOpt = getInternalSchemaOpt(internalSchemaOpt); + readerContext.setNeedsBootstrapMerge(this.needsBootstrapMerge); } public Schema getDataSchema() { @@ -136,18 +147,18 @@ protected InternalSchema doPruneInternalSchema(Schema requiredSchema, InternalSc private Schema generateRequiredSchema() { //might need to change this if other queries than mor have mandatory fields - if (!readerContext.getHasLogFiles()) { + if (!needsMORMerge) { return requestedSchema; } if (hoodieTableConfig.getRecordMergeMode() == RecordMergeMode.CUSTOM) { - if (!readerContext.getRecordMerger().get().isProjectionCompatible()) { + if (!recordMerger.get().isProjectionCompatible()) { return dataSchema; } } List addedFields = new ArrayList<>(); - for (String field : getMandatoryFieldsForMerging(hoodieTableConfig, properties, dataSchema, readerContext.getRecordMerger())) { + for (String field : getMandatoryFieldsForMerging(hoodieTableConfig, properties, dataSchema, recordMerger)) { if (!findNestedField(requestedSchema, field).isPresent()) { Option foundFieldOpt = findNestedField(dataSchema, field); if (!foundFieldOpt.isPresent()) { @@ -198,9 +209,8 @@ private static String[] getMandatoryFieldsForMerging(HoodieTableConfig cfg, Type protected Schema prepareRequiredSchema() { Schema preReorderRequiredSchema = generateRequiredSchema(); Pair, List> requiredFields = getDataAndMetaCols(preReorderRequiredSchema); - readerContext.setNeedsBootstrapMerge(readerContext.getHasBootstrapBaseFile() - && !requiredFields.getLeft().isEmpty() && !requiredFields.getRight().isEmpty()); - return readerContext.getNeedsBootstrapMerge() + this.needsBootstrapMerge = hasBootstrapBaseFile && !requiredFields.getLeft().isEmpty() && !requiredFields.getRight().isEmpty(); + return needsBootstrapMerge ? createSchemaFromFields(Stream.concat(requiredFields.getLeft().stream(), requiredFields.getRight().stream()).collect(Collectors.toList())) : preReorderRequiredSchema; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java index c7373b87975f6..9bcf420769532 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java @@ -51,18 +51,10 @@ public HoodiePositionBasedSchemaHandler(HoodieReaderContext readerContext, super(readerContext, dataSchema, requestedSchema, internalSchemaOpt, hoodieTableConfig, properties); } - private boolean morMergeNeedsPositionCol() { - return readerContext.supportsParquetRowIndex() && readerContext.getShouldMergeUseRecordPosition(); - } - - private boolean bootstrapMergeNeedsPositionCol() { - return readerContext.supportsParquetRowIndex() && readerContext.getNeedsBootstrapMerge(); - } - @Override protected Schema prepareRequiredSchema() { Schema preMergeSchema = super.prepareRequiredSchema(); - return morMergeNeedsPositionCol() + return readerContext.getShouldMergeUseRecordPosition() && readerContext.getHasLogFiles() ? addPositionalMergeCol(preMergeSchema) : preMergeSchema; } @@ -74,7 +66,7 @@ protected Option getInternalSchemaOpt(Option int @Override protected InternalSchema doPruneInternalSchema(Schema requiredSchema, InternalSchema internalSchema) { - if (!morMergeNeedsPositionCol()) { + if (!(readerContext.getShouldMergeUseRecordPosition() && readerContext.getHasLogFiles())) { return super.doPruneInternalSchema(requiredSchema, internalSchema); } @@ -91,7 +83,7 @@ private static InternalSchema addPositionalMergeCol(InternalSchema internalSchem @Override public Pair,List> getBootstrapRequiredFields() { Pair,List> dataAndMetaCols = super.getBootstrapRequiredFields(); - if (bootstrapMergeNeedsPositionCol() || morMergeNeedsPositionCol()) { + if (readerContext.supportsParquetRowIndex() && (this.needsBootstrapMerge || this.needsMORMerge)) { if (!dataAndMetaCols.getLeft().isEmpty()) { dataAndMetaCols.getLeft().add(getPositionalMergeField()); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestSchemaHandler.java b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestSchemaHandler.java deleted file mode 100644 index 312dbe1c52782..0000000000000 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestSchemaHandler.java +++ /dev/null @@ -1,464 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hudi.common.table.read; - -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.common.config.RecordMergeMode; -import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.engine.HoodieReaderContext; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordMerger; -import org.apache.hudi.common.table.HoodieTableConfig; -import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.ClosableIterator; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.storage.HoodieStorage; -import org.apache.hudi.storage.StoragePath; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.function.UnaryOperator; -import java.util.stream.Stream; - -import static org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING; -import static org.apache.hudi.common.config.RecordMergeMode.CUSTOM; -import static org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING; -import static org.apache.hudi.common.table.read.HoodiePositionBasedSchemaHandler.addPositionalMergeCol; -import static org.apache.hudi.common.table.read.HoodiePositionBasedSchemaHandler.getPositionalMergeField; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class TestSchemaHandler { - - protected static final Schema DATA_SCHEMA = HoodieAvroUtils.addMetadataFields(HoodieTestDataGenerator.AVRO_SCHEMA); - protected static final Schema DATA_COLS_ONLY_SCHEMA = generateProjectionSchema("begin_lat", "tip_history", "rider"); - protected static final Schema META_COLS_ONLY_SCHEMA = generateProjectionSchema("_hoodie_commit_seqno", "_hoodie_record_key"); - - @Test - public void testCow() { - HoodieReaderContext readerContext = new MockReaderContext(false); - readerContext.setHasLogFiles(false); - readerContext.setHasBootstrapBaseFile(false); - readerContext.setShouldMergeUseRecordPosition(false); - HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class); - Schema requestedSchema = DATA_SCHEMA; - HoodieFileGroupReaderSchemaHandler schemaHandler = new HoodieFileGroupReaderSchemaHandler(readerContext, DATA_SCHEMA, - requestedSchema, Option.empty(), hoodieTableConfig, new TypedProperties()); - assertEquals(requestedSchema, schemaHandler.getRequiredSchema()); - - //read subset of columns - requestedSchema = generateProjectionSchema("begin_lat", "tip_history", "rider"); - schemaHandler = - new HoodieFileGroupReaderSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, - Option.empty(), hoodieTableConfig, new TypedProperties()); - assertEquals(requestedSchema, schemaHandler.getRequiredSchema()); - } - - @Test - public void testCowBootstrap() { - HoodieReaderContext readerContext = new MockReaderContext(false); - readerContext.setHasLogFiles(false); - readerContext.setHasBootstrapBaseFile(true); - readerContext.setShouldMergeUseRecordPosition(false); - HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class); - Schema requestedSchema = generateProjectionSchema("begin_lat", "tip_history", "_hoodie_record_key", "rider"); - HoodieFileGroupReaderSchemaHandler schemaHandler = - new HoodieFileGroupReaderSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, - Option.empty(), hoodieTableConfig, new TypedProperties()); - //meta cols must go first in the required schema - Schema expectedRequiredSchema = generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history", "rider"); - assertEquals(expectedRequiredSchema, schemaHandler.getRequiredSchema()); - Pair, List> bootstrapFields = schemaHandler.getBootstrapRequiredFields(); - assertEquals(Collections.singletonList(getField("_hoodie_record_key")), bootstrapFields.getLeft()); - assertEquals(Arrays.asList(getField("begin_lat"), getField("tip_history"), getField("rider")), bootstrapFields.getRight()); - } - - @Test - public void testCowBootstrapWithPositionMerge() { - HoodieReaderContext readerContext = new MockReaderContext(true); - readerContext.setHasLogFiles(false); - readerContext.setHasBootstrapBaseFile(true); - readerContext.setShouldMergeUseRecordPosition(false); - HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class); - Schema requestedSchema = generateProjectionSchema("begin_lat", "tip_history", "_hoodie_record_key", "rider"); - HoodieFileGroupReaderSchemaHandler schemaHandler = - new HoodiePositionBasedSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, - Option.empty(), hoodieTableConfig, new TypedProperties()); - //meta cols must go first in the required schema - Schema expectedRequiredSchema = generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history", "rider"); - assertEquals(expectedRequiredSchema, schemaHandler.getRequiredSchema()); - Pair, List> bootstrapFields = schemaHandler.getBootstrapRequiredFields(); - assertEquals(Arrays.asList(getField("_hoodie_record_key"), getPositionalMergeField()), bootstrapFields.getLeft()); - assertEquals(Arrays.asList(getField("begin_lat"), getField("tip_history"), getField("rider"), getPositionalMergeField()), bootstrapFields.getRight()); - - schemaHandler = new HoodiePositionBasedSchemaHandler(readerContext, DATA_SCHEMA, DATA_COLS_ONLY_SCHEMA, - Option.empty(), hoodieTableConfig, new TypedProperties()); - assertEquals(DATA_COLS_ONLY_SCHEMA, schemaHandler.getRequiredSchema()); - bootstrapFields = schemaHandler.getBootstrapRequiredFields(); - assertTrue(bootstrapFields.getLeft().isEmpty()); - assertEquals(Arrays.asList(getField("begin_lat"), getField("tip_history"), getField("rider")), bootstrapFields.getRight()); - - schemaHandler = new HoodiePositionBasedSchemaHandler(readerContext, DATA_SCHEMA, META_COLS_ONLY_SCHEMA, - Option.empty(), hoodieTableConfig, new TypedProperties()); - assertEquals(META_COLS_ONLY_SCHEMA, schemaHandler.getRequiredSchema()); - bootstrapFields = schemaHandler.getBootstrapRequiredFields(); - assertEquals(Arrays.asList(getField("_hoodie_commit_seqno"), getField("_hoodie_record_key")), bootstrapFields.getLeft()); - assertTrue(bootstrapFields.getRight().isEmpty()); - } - - private static Stream testMorParams() { - Stream.Builder b = Stream.builder(); - for (boolean mergeUseRecordPosition : new boolean[] {true, false}) { - for (boolean supportsParquetRowIndex : new boolean[] {true, false}) { - b.add(Arguments.of(EVENT_TIME_ORDERING, true, false, mergeUseRecordPosition, supportsParquetRowIndex)); - b.add(Arguments.of(EVENT_TIME_ORDERING, false, false, mergeUseRecordPosition, supportsParquetRowIndex)); - b.add(Arguments.of(COMMIT_TIME_ORDERING, false, false, mergeUseRecordPosition, supportsParquetRowIndex)); - b.add(Arguments.of(CUSTOM, false, true, mergeUseRecordPosition, supportsParquetRowIndex)); - b.add(Arguments.of(CUSTOM, false, false, mergeUseRecordPosition, supportsParquetRowIndex)); - } - } - return b.build(); - } - - @ParameterizedTest - @MethodSource("testMorParams") - public void testMor(RecordMergeMode mergeMode, - boolean hasPrecombine, - boolean isProjectionCompatible, - boolean mergeUseRecordPosition, - boolean supportsParquetRowIndex) { - HoodieReaderContext readerContext = new MockReaderContext(supportsParquetRowIndex); - readerContext.setHasLogFiles(true); - readerContext.setHasBootstrapBaseFile(false); - //has no effect on schema unless we support position based merging - readerContext.setShouldMergeUseRecordPosition(mergeUseRecordPosition); - HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class); - when(hoodieTableConfig.populateMetaFields()).thenReturn(Boolean.TRUE); - when(hoodieTableConfig.getRecordMergeMode()).thenReturn(mergeMode); - if (hasPrecombine) { - when(hoodieTableConfig.getPreCombineField()).thenReturn("timestamp"); - } else { - when(hoodieTableConfig.getPreCombineField()).thenReturn(null); - } - if (mergeMode == CUSTOM) { - Option merger = Option.of(new MockMerger(isProjectionCompatible, new String[]{"begin_lat", "begin_lon", "_hoodie_record_key", "timestamp"})); - readerContext.setRecordMerger(merger); - } - Schema requestedSchema = DATA_SCHEMA; - HoodieFileGroupReaderSchemaHandler schemaHandler = supportsParquetRowIndex - ? new HoodiePositionBasedSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, - Option.empty(), hoodieTableConfig, new TypedProperties()) - : new HoodieFileGroupReaderSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, - Option.empty(), hoodieTableConfig, new TypedProperties()); - Schema expectedRequiredFullSchema = supportsParquetRowIndex && mergeUseRecordPosition - ? HoodiePositionBasedSchemaHandler.addPositionalMergeCol(requestedSchema) - : requestedSchema; - assertEquals(expectedRequiredFullSchema, schemaHandler.getRequiredSchema()); - - //read subset of columns - requestedSchema = DATA_COLS_ONLY_SCHEMA; - schemaHandler = supportsParquetRowIndex - ? new HoodiePositionBasedSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, - Option.empty(), hoodieTableConfig, new TypedProperties()) - : new HoodieFileGroupReaderSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, - Option.empty(), hoodieTableConfig, new TypedProperties()); - Schema expectedRequiredSchema; - if (mergeMode == EVENT_TIME_ORDERING && hasPrecombine) { - expectedRequiredSchema = generateProjectionSchema("begin_lat", "tip_history", "rider", "_hoodie_record_key", "timestamp"); - } else if (mergeMode == EVENT_TIME_ORDERING || mergeMode == COMMIT_TIME_ORDERING) { - expectedRequiredSchema = generateProjectionSchema("begin_lat", "tip_history", "rider", "_hoodie_record_key"); - } else if (mergeMode == CUSTOM && isProjectionCompatible) { - expectedRequiredSchema = generateProjectionSchema("begin_lat", "tip_history", "rider", "begin_lon", "_hoodie_record_key", "timestamp"); - } else { - expectedRequiredSchema = DATA_SCHEMA; - } - if (supportsParquetRowIndex && mergeUseRecordPosition) { - expectedRequiredSchema = addPositionalMergeCol(expectedRequiredSchema); - } - assertEquals(expectedRequiredSchema, schemaHandler.getRequiredSchema()); - } - - @ParameterizedTest - @MethodSource("testMorParams") - public void testMorBootstrap(RecordMergeMode mergeMode, - boolean hasPrecombine, - boolean isProjectionCompatible, - boolean mergeUseRecordPosition, - boolean supportsParquetRowIndex) { - HoodieReaderContext readerContext = new MockReaderContext(supportsParquetRowIndex); - readerContext.setHasLogFiles(true); - readerContext.setHasBootstrapBaseFile(true); - readerContext.setShouldMergeUseRecordPosition(mergeUseRecordPosition); - HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class); - when(hoodieTableConfig.populateMetaFields()).thenReturn(Boolean.TRUE); - when(hoodieTableConfig.getRecordMergeMode()).thenReturn(mergeMode); - if (hasPrecombine) { - when(hoodieTableConfig.getPreCombineField()).thenReturn("timestamp"); - } else { - when(hoodieTableConfig.getPreCombineField()).thenReturn(null); - } - if (mergeMode == CUSTOM) { - // NOTE: in this test custom doesn't have any meta cols because it is more interesting of a test case - Option merger = Option.of(new MockMerger(isProjectionCompatible, new String[]{"begin_lat", "begin_lon", "timestamp"})); - readerContext.setRecordMerger(merger); - } - Schema requestedSchema = DATA_SCHEMA; - HoodieFileGroupReaderSchemaHandler schemaHandler = supportsParquetRowIndex - ? new HoodiePositionBasedSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, - Option.empty(), hoodieTableConfig, new TypedProperties()) - : new HoodieFileGroupReaderSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, - Option.empty(), hoodieTableConfig, new TypedProperties()); - Schema expectedRequiredFullSchema = supportsParquetRowIndex && mergeUseRecordPosition - ? HoodiePositionBasedSchemaHandler.addPositionalMergeCol(requestedSchema) - : requestedSchema; - assertEquals(expectedRequiredFullSchema, schemaHandler.getRequiredSchema()); - Pair, List> bootstrapFields = schemaHandler.getBootstrapRequiredFields(); - Pair, List> expectedBootstrapFields = HoodieFileGroupReaderSchemaHandler.getDataAndMetaCols(expectedRequiredFullSchema); - if (supportsParquetRowIndex) { - expectedBootstrapFields.getLeft().add(HoodiePositionBasedSchemaHandler.getPositionalMergeField()); - expectedBootstrapFields.getRight().add(HoodiePositionBasedSchemaHandler.getPositionalMergeField()); - } - assertEquals(expectedBootstrapFields.getLeft(), bootstrapFields.getLeft()); - assertEquals(expectedBootstrapFields.getRight(), bootstrapFields.getRight()); - - //read subset of columns - requestedSchema = generateProjectionSchema("begin_lat", "tip_history", "_hoodie_record_key", "rider"); - schemaHandler = supportsParquetRowIndex - ? new HoodiePositionBasedSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, - Option.empty(), hoodieTableConfig, new TypedProperties()) - : new HoodieFileGroupReaderSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, - Option.empty(), hoodieTableConfig, new TypedProperties()); - Schema expectedRequiredSchema; - if (mergeMode == EVENT_TIME_ORDERING && hasPrecombine) { - expectedRequiredSchema = generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history", "rider", "timestamp"); - } else if (mergeMode == EVENT_TIME_ORDERING || mergeMode == COMMIT_TIME_ORDERING) { - expectedRequiredSchema = generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history", "rider"); - } else if (mergeMode == CUSTOM && isProjectionCompatible) { - expectedRequiredSchema = generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history", "rider", "begin_lon", "timestamp"); - } else { - expectedRequiredSchema = DATA_SCHEMA; - } - if (supportsParquetRowIndex && mergeUseRecordPosition) { - expectedRequiredSchema = addPositionalMergeCol(expectedRequiredSchema); - } - assertEquals(expectedRequiredSchema, schemaHandler.getRequiredSchema()); - bootstrapFields = schemaHandler.getBootstrapRequiredFields(); - expectedBootstrapFields = HoodieFileGroupReaderSchemaHandler.getDataAndMetaCols(expectedRequiredSchema); - if (supportsParquetRowIndex) { - expectedBootstrapFields.getLeft().add(HoodiePositionBasedSchemaHandler.getPositionalMergeField()); - expectedBootstrapFields.getRight().add(HoodiePositionBasedSchemaHandler.getPositionalMergeField()); - } - assertEquals(expectedBootstrapFields.getLeft(), bootstrapFields.getLeft()); - assertEquals(expectedBootstrapFields.getRight(), bootstrapFields.getRight()); - - // request only data cols - requestedSchema = DATA_COLS_ONLY_SCHEMA; - schemaHandler = supportsParquetRowIndex - ? new HoodiePositionBasedSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, - Option.empty(), hoodieTableConfig, new TypedProperties()) - : new HoodieFileGroupReaderSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, - Option.empty(), hoodieTableConfig, new TypedProperties()); - if (mergeMode == EVENT_TIME_ORDERING && hasPrecombine) { - expectedRequiredSchema = generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history", "rider", "timestamp"); - } else if (mergeMode == EVENT_TIME_ORDERING || mergeMode == COMMIT_TIME_ORDERING) { - expectedRequiredSchema = generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history", "rider"); - } else if (mergeMode == CUSTOM && isProjectionCompatible) { - expectedRequiredSchema = generateProjectionSchema("begin_lat", "tip_history", "rider", "begin_lon", "timestamp"); - } else { - expectedRequiredSchema = DATA_SCHEMA; - } - if (supportsParquetRowIndex && mergeUseRecordPosition) { - expectedRequiredSchema = addPositionalMergeCol(expectedRequiredSchema); - } - assertEquals(expectedRequiredSchema, schemaHandler.getRequiredSchema()); - bootstrapFields = schemaHandler.getBootstrapRequiredFields(); - expectedBootstrapFields = HoodieFileGroupReaderSchemaHandler.getDataAndMetaCols(expectedRequiredSchema); - if (supportsParquetRowIndex) { - if (mergeMode == CUSTOM && isProjectionCompatible) { - if (mergeUseRecordPosition) { - expectedBootstrapFields.getRight().add(HoodiePositionBasedSchemaHandler.getPositionalMergeField()); - } - } else { - expectedBootstrapFields.getLeft().add(HoodiePositionBasedSchemaHandler.getPositionalMergeField()); - expectedBootstrapFields.getRight().add(HoodiePositionBasedSchemaHandler.getPositionalMergeField()); - } - } - assertEquals(expectedBootstrapFields.getLeft(), bootstrapFields.getLeft()); - assertEquals(expectedBootstrapFields.getRight(), bootstrapFields.getRight()); - - // request only meta cols - requestedSchema = META_COLS_ONLY_SCHEMA; - schemaHandler = supportsParquetRowIndex - ? new HoodiePositionBasedSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, - Option.empty(), hoodieTableConfig, new TypedProperties()) - : new HoodieFileGroupReaderSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, - Option.empty(), hoodieTableConfig, new TypedProperties()); - if (mergeMode == EVENT_TIME_ORDERING && hasPrecombine) { - expectedRequiredSchema = generateProjectionSchema("_hoodie_commit_seqno", "_hoodie_record_key", "timestamp"); - } else if (mergeMode == EVENT_TIME_ORDERING || mergeMode == COMMIT_TIME_ORDERING) { - expectedRequiredSchema = generateProjectionSchema("_hoodie_commit_seqno", "_hoodie_record_key"); - } else if (mergeMode == CUSTOM && isProjectionCompatible) { - expectedRequiredSchema = generateProjectionSchema("_hoodie_commit_seqno", "_hoodie_record_key", "begin_lat", "begin_lon", "timestamp"); - } else { - expectedRequiredSchema = DATA_SCHEMA; - } - if (supportsParquetRowIndex && mergeUseRecordPosition) { - expectedRequiredSchema = addPositionalMergeCol(expectedRequiredSchema); - } - assertEquals(expectedRequiredSchema, schemaHandler.getRequiredSchema()); - bootstrapFields = schemaHandler.getBootstrapRequiredFields(); - expectedBootstrapFields = HoodieFileGroupReaderSchemaHandler.getDataAndMetaCols(expectedRequiredSchema); - if (supportsParquetRowIndex) { - if ((mergeMode == EVENT_TIME_ORDERING && !hasPrecombine) || mergeMode == COMMIT_TIME_ORDERING) { - if (mergeUseRecordPosition) { - expectedBootstrapFields.getLeft().add(HoodiePositionBasedSchemaHandler.getPositionalMergeField()); - } - } else { - expectedBootstrapFields.getLeft().add(HoodiePositionBasedSchemaHandler.getPositionalMergeField()); - expectedBootstrapFields.getRight().add(HoodiePositionBasedSchemaHandler.getPositionalMergeField()); - } - } - assertEquals(expectedBootstrapFields.getLeft(), bootstrapFields.getLeft()); - assertEquals(expectedBootstrapFields.getRight(), bootstrapFields.getRight()); - } - - private static Schema generateProjectionSchema(String... fields) { - return HoodieAvroUtils.generateProjectionSchema(DATA_SCHEMA, Arrays.asList(fields)); - } - - private Schema.Field getField(String fieldName) { - return DATA_SCHEMA.getField(fieldName); - } - - static class MockMerger implements HoodieRecordMerger { - - private final boolean isProjectionCompatible; - private final String[] mandatoryMergeFields; - - MockMerger(boolean isProjectionCompatible, String[] mandatoryMergeFields) { - this.isProjectionCompatible = isProjectionCompatible; - this.mandatoryMergeFields = mandatoryMergeFields; - } - - @Override - public boolean isProjectionCompatible() { - return this.isProjectionCompatible; - } - - @Override - public String[] getMandatoryFieldsForMerging(Schema dataSchema, HoodieTableConfig cfg, TypedProperties properties) { - return this.mandatoryMergeFields; - } - - @Override - public Option> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, - Schema newSchema, TypedProperties props) throws IOException { - return null; - } - - @Override - public HoodieRecord.HoodieRecordType getRecordType() { - return null; - } - - @Override - public String getMergingStrategy() { - return ""; - } - } - - static class MockReaderContext extends HoodieReaderContext { - private final boolean supportsParquetRowIndex; - - MockReaderContext(boolean supportsParquetRowIndex) { - this.supportsParquetRowIndex = supportsParquetRowIndex; - } - - @Override - public boolean supportsParquetRowIndex() { - return this.supportsParquetRowIndex; - } - - @Override - public ClosableIterator getFileRecordIterator(StoragePath filePath, long start, long length, Schema dataSchema, Schema requiredSchema, HoodieStorage storage) throws IOException { - return null; - } - - @Override - public String convertAvroRecord(IndexedRecord avroRecord) { - return ""; - } - - @Override - public GenericRecord convertToAvroRecord(String record, Schema schema) { - return null; - } - - @Override - public Option getRecordMerger(RecordMergeMode mergeMode, String mergeStrategyId, String mergeImplClasses) { - return null; - } - - @Override - public Object getValue(String record, Schema schema, String fieldName) { - return null; - } - - @Override - public HoodieRecord constructHoodieRecord(Option recordOption, Map metadataMap) { - return null; - } - - @Override - public String seal(String record) { - return ""; - } - - @Override - public ClosableIterator mergeBootstrapReaders(ClosableIterator skeletonFileIterator, Schema skeletonRequiredSchema, ClosableIterator dataFileIterator, - Schema dataRequiredSchema) { - return null; - } - - @Override - public UnaryOperator projectRecord(Schema from, Schema to, Map renamedColumns) { - return null; - } - - @Override - public Comparable castValue(Comparable value, Schema.Type newType) { - return null; - } - } -} From 03dd57a1abc269019f1bd76437739bdab1061dff Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Tue, 26 Nov 2024 16:28:39 -0500 Subject: [PATCH 8/8] revert another thing --- .../table/read/HoodiePositionBasedSchemaHandler.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java index 9bcf420769532..cafcceb8beff1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java @@ -23,7 +23,6 @@ import org.apache.hudi.common.engine.HoodieReaderContext; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.Types; @@ -94,13 +93,11 @@ public Pair,List> getBootstrapRequiredFields() return dataAndMetaCols; } - @VisibleForTesting - static Schema addPositionalMergeCol(Schema input) { + private static Schema addPositionalMergeCol(Schema input) { return appendFieldsToSchemaDedupNested(input, Collections.singletonList(getPositionalMergeField())); } - @VisibleForTesting - static Schema.Field getPositionalMergeField() { + private static Schema.Field getPositionalMergeField() { return new Schema.Field(ROW_INDEX_TEMPORARY_COLUMN_NAME, Schema.create(Schema.Type.LONG), "", -1L); }