Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8518] Fix secondary index updates for event_time and custom merge modes #12507

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1181,14 +1181,20 @@ private HoodieData<HoodieRecord> getSecondaryIndexUpdates(HoodieCommitMetadata c
long targetPartitionSize = 100 * 1024 * 1024;
int parallelism = (int) Math.max(1, (totalWriteBytesForSecondaryIndex + targetPartitionSize - 1) / targetPartitionSize);

// Load file system view for only the affected partitions on the driver.
// By loading on the driver one time, we avoid loading the same metadata multiple times on the executors.
HoodieMetadataFileSystemView fsView = getMetadataView();
fsView.loadPartitions(partitionFilePairs.stream().map(Pair::getKey).collect(Collectors.toList()));

return readSecondaryKeysFromBaseFiles(
engineContext,
partitionFilePairs,
parallelism,
this.getClass().getSimpleName(),
dataMetaClient,
getEngineType(),
indexDefinition)
indexDefinition,
fsView)
.union(deletedRecords)
.distinctWithKey(HoodieRecord::getKey, parallelism);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2212,7 +2212,8 @@ public static HoodieData<HoodieRecord> readSecondaryKeysFromBaseFiles(HoodieEngi
List<Pair<String, Pair<String, List<String>>>> partitionFiles,
int secondaryIndexMaxParallelism,
String activeModule, HoodieTableMetaClient metaClient, EngineType engineType,
HoodieIndexDefinition indexDefinition) {
HoodieIndexDefinition indexDefinition,
HoodieMetadataFileSystemView fsView) {
if (partitionFiles.isEmpty()) {
return engineContext.emptyHoodieData();
}
Expand All @@ -2228,9 +2229,17 @@ public static HoodieData<HoodieRecord> readSecondaryKeysFromBaseFiles(HoodieEngi
engineContext.setJobStatus(activeModule, "Secondary Index: reading secondary keys from " + partitionFiles.size() + " partitions");
return engineContext.parallelize(partitionFiles, parallelism).flatMap(partitionWithBaseAndLogFiles -> {
final String partition = partitionWithBaseAndLogFiles.getKey();
// get the log files for the partition and group them by fileId
Map<String, List<HoodieLogFile>> logFilesByFileId = getPartitionLatestFileSlicesIncludingInflight(metaClient, Option.of(fsView), partition).stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typically our FSV may not have inflight files. So, how do we know we are getting the inflights here? can you throw some light please

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually lets sync up on this. I have a slightly diff approach to make this more optimized.

.map(fs -> Pair.of(fs.getFileId(), fs.getLogFiles().collect(toList()))).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
final Pair<String, List<String>> baseAndLogFiles = partitionWithBaseAndLogFiles.getValue();
List<String> logFilePaths = new ArrayList<>();
baseAndLogFiles.getValue().forEach(logFile -> logFilePaths.add(basePath + StoragePath.SEPARATOR + partition + StoragePath.SEPARATOR + logFile));
Set<String> logFilePaths = new HashSet<>();
baseAndLogFiles.getValue().forEach(logFile -> {
// add all log files for the fileId
logFilesByFileId.get(FSUtils.getFileIdFromFileName(logFile)).stream()
.map(HoodieLogFile::getPath).map(StoragePath::toString).forEach(logFilePaths::add);
logFilePaths.add(basePath + StoragePath.SEPARATOR + partition + StoragePath.SEPARATOR + logFile);
});
String baseFilePath = baseAndLogFiles.getKey();
Option<StoragePath> dataFilePath = baseFilePath.isEmpty() ? Option.empty() : Option.of(FSUtils.constructAbsolutePath(basePath, baseFilePath));
Schema readerSchema;
Expand All @@ -2241,7 +2250,7 @@ public static HoodieData<HoodieRecord> readSecondaryKeysFromBaseFiles(HoodieEngi
} else {
readerSchema = tableSchema;
}
return createSecondaryIndexGenerator(metaClient, engineType, logFilePaths, readerSchema, partition, dataFilePath, indexDefinition,
return createSecondaryIndexGenerator(metaClient, engineType, new ArrayList<>(logFilePaths), readerSchema, partition, dataFilePath, indexDefinition,
metaClient.getActiveTimeline().getCommitsTimeline().lastInstant().map(HoodieInstant::requestedTime).orElse(""));
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
package org.apache.spark.sql.hudi.command

import org.apache.hudi.HoodieSparkIndexClient
import org.apache.hudi.common.config.RecordMergeMode
import org.apache.hudi.common.model.{HoodieIndexDefinition, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.model.HoodieIndexDefinition
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.{StringUtils, ValidationUtils}
import org.apache.hudi.exception.HoodieIndexException
Expand Down Expand Up @@ -74,12 +73,6 @@ case class CreateIndexCommand(table: CatalogTable,
} else {
HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX
}
if (derivedIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX)) {
if ((metaClient.getTableConfig.getPayloadClass != null && !(metaClient.getTableConfig.getPayloadClass.equals(classOf[OverwriteWithLatestAvroPayload].getCanonicalName)))
|| (metaClient.getTableConfig.getRecordMergeMode ne RecordMergeMode.COMMIT_TIME_ORDERING)) {
throw new HoodieIndexException("Secondary Index can only be enabled on table with OverwriteWithLatestAvroPayload payload class or " + "Merge mode set to OVERWRITE_WITH_LATEST")
}
}
new HoodieSparkIndexClient(sparkSession).create(metaClient, indexName, derivedIndexType, columnsMap, options.asJava, table.properties.asJava)
} else {
throw new HoodieIndexException(String.format("%s is not supported", indexType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import org.scalatest.Assertions.{assertResult, assertThrows}

import java.util.concurrent.Executors
import scala.collection.JavaConverters
import scala.collection.immutable.Seq
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}

Expand Down Expand Up @@ -706,6 +707,156 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness {
)
}

/**
* Test case to write with updates and validate secondary index with EVENT_TIME_ORDERING merge mode.
*/
@Test
def testSecondaryIndexWithEventTimeOrderingMerge(): Unit = {
val tableName = "test_secondary_index_with_event_time_ordering_merge"

// Create the Hudi table
spark.sql(
s"""
|CREATE TABLE $tableName (
| ts BIGINT,
| record_key_col STRING,
| not_record_key_col STRING,
| partition_key_col STRING
|) USING hudi
| OPTIONS (
| primaryKey = 'record_key_col',
| type = 'mor',
| preCombineField = 'ts',
| hoodie.metadata.enable = 'true',
| hoodie.metadata.record.index.enable = 'true',
| hoodie.datasource.write.recordkey.field = 'record_key_col',
| hoodie.enable.data.skipping = 'true'
| )
| PARTITIONED BY (partition_key_col)
| LOCATION '$basePath'
""".stripMargin)
// by setting small file limit to 0, each insert will create a new file
// need to generate more file for non-partitioned table to test data skipping
// as the partitioned table will have only one file per partition
spark.sql("set hoodie.parquet.small.file.limit=0")
// Insert some data
spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
spark.sql(s"insert into $tableName values(2, 'row2', 'cde', 'p2')")
// create secondary index
spark.sql(s"create index idx_not_record_key_col on $tableName (not_record_key_col)")

// Update the secondary key column with higher ts value
spark.sql(s"update $tableName set not_record_key_col = 'xyz', ts = 3 where record_key_col = 'row1'")
// validate data and SI
checkAnswer(s"select ts, record_key_col, not_record_key_col, partition_key_col from $tableName where record_key_col = 'row1'")(
Seq(3, "row1", "xyz", "p1")
)
// validate the secondary index records themselves
checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")(
Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
Seq(s"xyz${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false)
)

// update the secondary key column with higher ts value and to original secondary key value 'abc'
spark.sql(s"update $tableName set not_record_key_col = 'abc', ts = 4 where record_key_col = 'row1'")
// validate data and SI
checkAnswer(s"select ts, record_key_col, not_record_key_col, partition_key_col from $tableName where record_key_col = 'row1'")(
Seq(4, "row1", "abc", "p1")
)
// validate the secondary index records themselves
checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")(
Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false)
)

// update the secondary key column with lower ts value, this should be ignored
spark.sql(s"update $tableName set not_record_key_col = 'xyz', ts = 0 where record_key_col = 'row1'")
// validate data and SI
checkAnswer(s"select ts, record_key_col, not_record_key_col, partition_key_col from $tableName where record_key_col = 'row1'")(
Seq(4, "row1", "abc", "p1")
)
// validate the secondary index records themselves
checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")(
Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),
Seq(s"abc${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false)
)
}

/**
* Test case to write with updates and validate secondary index with CUSTOM merge mode using CDC payload.
*/
@Test
def testSecondaryIndexWithCustomMergeMode(): Unit = {
val tableName = "test_secondary_index_with_custom_merge"

// Create the Hudi table
spark.sql(
s"""
|CREATE TABLE $tableName (
| record_key_col BIGINT,
| Op STRING,
| replicadmstimestamp STRING,
| not_record_key_col STRING
|) USING hudi
| OPTIONS (
| primaryKey = 'record_key_col',
| type = 'mor',
| preCombineField = 'replicadmstimestamp',
| hoodie.metadata.enable = 'true',
| hoodie.metadata.record.index.enable = 'true',
| hoodie.datasource.write.recordkey.field = 'record_key_col',
| hoodie.enable.data.skipping = 'true',
| hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.AWSDmsAvroPayload',
| hoodie.datasource.write.keygenerator.class = 'org.apache.hudi.keygen.NonpartitionedKeyGenerator',
| hoodie.write.record.merge.mode = 'CUSTOM',
| hoodie.table.cdc.enabled = 'true',
| hoodie.table.cdc.supplemental.logging.mode = 'data_before_after'
| )
| LOCATION '$basePath'
""".stripMargin)
if (HoodieSparkUtils.gteqSpark3_4) {
spark.sql("set spark.sql.defaultColumn.enabled=false")
}
// by setting small file limit to 0, each insert will create a new file
// need to generate more file for non-partitioned table to test data skipping
// as the partitioned table will have only one file per partition
spark.sql("set hoodie.parquet.small.file.limit=0")
// Insert some data
spark.sql(
s"""|INSERT INTO $tableName(record_key_col, Op, replicadmstimestamp, not_record_key_col) VALUES
| (1, 'I', '2023-06-14 15:46:06.953746', 'A'),
| (2, 'I', '2023-06-14 15:46:07.953746', 'B'),
| (3, 'I', '2023-06-14 15:46:08.953746', 'C');
| """.stripMargin)
// create secondary index
spark.sql(s"create index idx_not_record_key_col on $tableName (not_record_key_col)")

// validate data and SI
checkAnswer(s"select record_key_col, Op, replicadmstimestamp, not_record_key_col from $tableName")(
Seq(1, "I", "2023-06-14 15:46:06.953746", "A"),
Seq(2, "I", "2023-06-14 15:46:07.953746", "B"),
Seq(3, "I", "2023-06-14 15:46:08.953746", "C")
)
checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")(
Seq(s"A${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}1", false),
Seq(s"B${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}2", false),
Seq(s"C${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}3", false)
)

// Update the delete Op value for record_key_col = 3
spark.sql(s"update $tableName set Op = 'D' where record_key_col = 3")

// validate data and SI
checkAnswer(s"select record_key_col, Op, replicadmstimestamp, not_record_key_col from $tableName")(
Seq(1, "I", "2023-06-14 15:46:06.953746", "A"),
Seq(2, "I", "2023-06-14 15:46:07.953746", "B")
)
checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")(
Seq(s"A${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}1", false),
Seq(s"B${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}2", false)
)
}

@Test
def testSecondaryIndexWithMultipleUpdatesForSameRecord(): Unit = {
var hudiOpts = commonOpts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,62 +231,6 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
}
}

test("Test Secondary Index Creation Failure For Unsupported payloads") {
withTempDir {
tmp => {
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"

spark.sql(
s"""
|create table $tableName (
| ts bigint,
| id string,
| rider string,
| driver string,
| fare int,
| city string,
| state string
|) using hudi
| options (
| primaryKey ='id',
| type = 'mor',
| preCombineField = 'ts',
| hoodie.metadata.enable = 'true',
| hoodie.metadata.record.index.enable = 'true',
| hoodie.metadata.index.secondary.enable = 'true',
| hoodie.datasource.write.recordkey.field = 'id',
| hoodie.datasource.write.payload.class = 'org.apache.hudi.common.model.DefaultHoodieRecordPayload'
| )
| partitioned by(state)
| location '$basePath'
""".stripMargin)
spark.sql(
s"""
| insert into $tableName
| values
| (1695159649087, '334e26e9-8355-45cc-97c6-c31daf0df330', 'rider-A', 'driver-K', 19, 'san_francisco', 'california'),
| (1695091554787, 'e96c4396-3fad-413a-a942-4cb36106d720', 'rider-B', 'driver-M', 27, 'austin', 'texas')
| """.stripMargin
)

// validate record_index created successfully
val metadataDF = spark.sql(s"select key from hudi_metadata('$basePath') where type=5")
assert(metadataDF.count() == 2)

val metaClient = HoodieTableMetaClient.builder()
.setBasePath(basePath)
.setConf(HoodieTestUtils.getDefaultStorageConf)
.build()
assert(metaClient.getTableConfig.getMetadataPartitions.contains("record_index"))
// create secondary index throws error when trying to create on multiple fields at a time
checkException(sql = s"create index idx_city on $tableName (city)")(
"Secondary Index can only be enabled on table with OverwriteWithLatestAvroPayload payload class or Merge mode set to OVERWRITE_WITH_LATEST"
)
}
}
}

test("Test Secondary Index With Updates Compaction Clustering Deletes") {
withTempDir { tmp =>
val tableName = generateTableName
Expand Down
Loading