Skip to content

Commit

Permalink
add tests for spill map and schema cache. And minor refactor (#12430)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonvex authored Dec 23, 2024
1 parent ca058f3 commit f95bb6a
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ public void validateRecordsInFileGroup(String tablePath, List<ArrayWritable> act
}
}

@Override
public void assertRecordsEqual(Schema schema, ArrayWritable expected, ArrayWritable actual) {
ArrayWritableTestUtil.assertArrayWritableEqual(schema, expected, actual, false);
}

private static boolean isLogFileRec(HoodieReaderContext<ArrayWritable> readerContext, Schema schema, ArrayWritable record) {
return !readerContext.getValue(record, schema, HoodieRecord.FILENAME_METADATA_FIELD).toString().contains(".parquet");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public HoodieRecord<InternalRow> constructHoodieRecord(Option<InternalRow> rowOp
HoodieRecord.HoodieRecordType.SPARK);
}

Schema schema = getSchemaHandler().decodeAvroSchema(metadataMap.get(INTERNAL_META_SCHEMA_ID));
Schema schema = getSchemaFromMetadata(metadataMap);
InternalRow row = rowOption.get();
return new HoodieSparkRecord(row, HoodieInternalRowUtils.getCachedSchema(schema));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.read.HoodieFileGroupReaderSchemaHandler;
import org.apache.hudi.common.util.AvroSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.storage.HoodieStorage;
Expand All @@ -33,6 +34,9 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

import javax.annotation.Nullable;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -52,7 +56,7 @@
* @param <T> The type of engine-specific record representation, e.g.,{@code InternalRow} in Spark
* and {@code RowData} in Flink.
*/
public abstract class HoodieReaderContext<T> {
public abstract class HoodieReaderContext<T> implements Closeable {

private HoodieFileGroupReaderSchemaHandler<T> schemaHandler = null;
private String tablePath = null;
Expand All @@ -63,6 +67,9 @@ public abstract class HoodieReaderContext<T> {
private Boolean needsBootstrapMerge = null;
private Boolean shouldMergeUseRecordPosition = null;

// for encoding and decoding schemas to the spillable map
private final AvroSchemaCache avroSchemaCache = AvroSchemaCache.getInstance();

// Getter and Setter for schemaHandler
public HoodieFileGroupReaderSchemaHandler<T> getSchemaHandler() {
return schemaHandler;
Expand Down Expand Up @@ -295,10 +302,20 @@ public Map<String, Object> generateMetadataForRecord(
public Map<String, Object> generateMetadataForRecord(T record, Schema schema) {
Map<String, Object> meta = new HashMap<>();
meta.put(INTERNAL_META_RECORD_KEY, getRecordKey(record, schema));
meta.put(INTERNAL_META_SCHEMA_ID, this.schemaHandler.encodeAvroSchema(schema));
meta.put(INTERNAL_META_SCHEMA_ID, encodeAvroSchema(schema));
return meta;
}

/**
* Gets the schema encoded in the metadata map
*
* @param infoMap The record metadata
* @return the avro schema if it is encoded in the metadata map, else null
*/
public Schema getSchemaFromMetadata(Map<String, Object> infoMap) {
return decodeAvroSchema(infoMap.get(INTERNAL_META_SCHEMA_ID));
}

/**
* Updates the schema and reset the ordering value in existing metadata mapping of a record.
*
Expand All @@ -309,7 +326,7 @@ public Map<String, Object> generateMetadataForRecord(T record, Schema schema) {
public Map<String, Object> updateSchemaAndResetOrderingValInMetadata(Map<String, Object> meta,
Schema schema) {
meta.remove(INTERNAL_META_ORDERING_FIELD);
meta.put(INTERNAL_META_SCHEMA_ID, this.schemaHandler.encodeAvroSchema(schema));
meta.put(INTERNAL_META_SCHEMA_ID, encodeAvroSchema(schema));
return meta;
}

Expand Down Expand Up @@ -364,4 +381,26 @@ public long extractRecordPosition(T record, Schema schema, String fieldName, lon
public boolean supportsParquetRowIndex() {
return false;
}

/**
* Encodes the given avro schema for efficient serialization.
*/
private Integer encodeAvroSchema(Schema schema) {
return this.avroSchemaCache.cacheSchema(schema);
}

/**
* Decodes the avro schema with given version ID.
*/
@Nullable
private Schema decodeAvroSchema(Object versionId) {
return this.avroSchemaCache.getSchema((Integer) versionId).orElse(null);
}

@Override
public void close() {
if (this.avroSchemaCache != null) {
this.avroSchemaCache.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
import static org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
import static org.apache.hudi.common.model.HoodieRecord.HOODIE_IS_DELETED_FIELD;
import static org.apache.hudi.common.model.HoodieRecord.OPERATION_METADATA_FIELD;
import static org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_SCHEMA_ID;
import static org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;

Expand Down Expand Up @@ -180,7 +179,6 @@ public Iterator<Pair<Option<T>, Map<String, Object>>> getLogRecordIterator() {
@Override
public void close() {
records.clear();
readerContext.getSchemaHandler().close();
}

/**
Expand All @@ -205,10 +203,10 @@ protected Option<Pair<Option<T>, Map<String, Object>>> doProcessNextDataRecord(T
// the `older` in the merge API
Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = recordMerger.get().partialMerge(
readerContext.constructHoodieRecord(Option.of(record), metadata),
readerContext.getSchemaHandler().decodeAvroSchema(metadata.get(INTERNAL_META_SCHEMA_ID)),
readerContext.getSchemaFromMetadata(metadata),
readerContext.constructHoodieRecord(
existingRecordMetadataPair.getLeft(), existingRecordMetadataPair.getRight()),
readerContext.getSchemaHandler().decodeAvroSchema(existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA_ID)),
readerContext.getSchemaFromMetadata(existingRecordMetadataPair.getRight()),
readerSchema,
props);
if (!combinedRecordAndSchemaOpt.isPresent()) {
Expand Down Expand Up @@ -238,7 +236,7 @@ protected Option<Pair<Option<T>, Map<String, Object>>> doProcessNextDataRecord(T
Comparable incomingOrderingValue = readerContext.getOrderingValue(
Option.of(record), metadata, readerSchema, orderingFieldName, orderingFieldTypeOpt, orderingFieldDefault);
if (incomingOrderingValue.compareTo(existingOrderingValue) > 0) {
return Option.of(Pair.of(isDeleteRecord(Option.of(record), readerContext.getSchemaHandler().decodeAvroSchema(metadata.get(INTERNAL_META_SCHEMA_ID)))
return Option.of(Pair.of(isDeleteRecord(Option.of(record), readerContext.getSchemaFromMetadata(metadata))
? Option.empty() : Option.of(record), metadata));
}
return Option.empty();
Expand All @@ -262,10 +260,10 @@ protected Option<Pair<Option<T>, Map<String, Object>>> doProcessNextDataRecord(T
} else {
Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = recordMerger.get().merge(
readerContext.constructHoodieRecord(Option.of(record), metadata),
readerContext.getSchemaHandler().decodeAvroSchema(metadata.get(INTERNAL_META_SCHEMA_ID)),
readerContext.getSchemaFromMetadata(metadata),
readerContext.constructHoodieRecord(
existingRecordMetadataPair.getLeft(), existingRecordMetadataPair.getRight()),
readerContext.getSchemaHandler().decodeAvroSchema(existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA_ID)),
readerContext.getSchemaFromMetadata(existingRecordMetadataPair.getRight()),
props);

if (!combinedRecordAndSchemaOpt.isPresent()) {
Expand Down Expand Up @@ -388,15 +386,15 @@ protected Option<Pair<Function<T, T>, Schema>> composeEvolvedSchemaTransformer(
protected Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
Option<T> newer, Map<String, Object> newerInfoMap) throws IOException {
if (!older.isPresent()) {
return isDeleteRecord(newer, readerContext.getSchemaHandler().decodeAvroSchema(newerInfoMap.get(INTERNAL_META_SCHEMA_ID))) ? Option.empty() : newer;
return isDeleteRecord(newer, readerContext.getSchemaFromMetadata(newerInfoMap)) ? Option.empty() : newer;
}

if (enablePartialMerging) {
// TODO(HUDI-7843): decouple the merging logic from the merger
// and use the record merge mode to control how to merge partial updates
Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.get().partialMerge(
readerContext.constructHoodieRecord(older, olderInfoMap), readerContext.getSchemaHandler().decodeAvroSchema(olderInfoMap.get(INTERNAL_META_SCHEMA_ID)),
readerContext.constructHoodieRecord(newer, newerInfoMap), readerContext.getSchemaHandler().decodeAvroSchema(newerInfoMap.get(INTERNAL_META_SCHEMA_ID)),
readerContext.constructHoodieRecord(older, olderInfoMap), readerContext.getSchemaFromMetadata(olderInfoMap),
readerContext.constructHoodieRecord(newer, newerInfoMap), readerContext.getSchemaFromMetadata(newerInfoMap),
readerSchema, props);

if (mergedRecord.isPresent()
Expand All @@ -410,7 +408,7 @@ protected Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
} else {
switch (recordMergeMode) {
case COMMIT_TIME_ORDERING:
return isDeleteRecord(newer, readerContext.getSchemaHandler().decodeAvroSchema(newerInfoMap.get(INTERNAL_META_SCHEMA_ID))) ? Option.empty() : newer;
return isDeleteRecord(newer, readerContext.getSchemaFromMetadata(newerInfoMap)) ? Option.empty() : newer;
case EVENT_TIME_ORDERING:
Comparable newOrderingValue = readerContext.getOrderingValue(
newer, newerInfoMap, readerSchema, orderingFieldName, orderingFieldTypeOpt, orderingFieldDefault);
Expand All @@ -421,9 +419,9 @@ protected Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
older, olderInfoMap, readerSchema, orderingFieldName, orderingFieldTypeOpt, orderingFieldDefault);
if (!isDeleteRecordWithNaturalOrder(older, oldOrderingValue)
&& oldOrderingValue.compareTo(newOrderingValue) > 0) {
return isDeleteRecord(older, readerContext.getSchemaHandler().decodeAvroSchema(olderInfoMap.get(INTERNAL_META_SCHEMA_ID))) ? Option.empty() : older;
return isDeleteRecord(older, readerContext.getSchemaFromMetadata(olderInfoMap)) ? Option.empty() : older;
}
return isDeleteRecord(newer, readerContext.getSchemaHandler().decodeAvroSchema(newerInfoMap.get(INTERNAL_META_SCHEMA_ID))) ? Option.empty() : newer;
return isDeleteRecord(newer, readerContext.getSchemaFromMetadata(newerInfoMap)) ? Option.empty() : newer;
case CUSTOM:
default:
if (payloadClass.isPresent()) {
Expand All @@ -443,8 +441,8 @@ protected Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
return Option.empty();
} else {
Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.get().merge(
readerContext.constructHoodieRecord(older, olderInfoMap), readerContext.getSchemaHandler().decodeAvroSchema(olderInfoMap.get(INTERNAL_META_SCHEMA_ID)),
readerContext.constructHoodieRecord(newer, newerInfoMap), readerContext.getSchemaHandler().decodeAvroSchema(newerInfoMap.get(INTERNAL_META_SCHEMA_ID)), props);
readerContext.constructHoodieRecord(older, olderInfoMap), readerContext.getSchemaFromMetadata(olderInfoMap),
readerContext.constructHoodieRecord(newer, newerInfoMap), readerContext.getSchemaFromMetadata(newerInfoMap), props);
if (mergedRecord.isPresent()
&& !mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
if (!mergedRecord.get().getRight().equals(readerSchema)) {
Expand Down Expand Up @@ -482,7 +480,7 @@ private HoodieRecord constructHoodieAvroRecord(HoodieReaderContext<T> readerCont
Schema recordSchema = readerSchema;
GenericRecord record = null;
if (recordOption.isPresent()) {
recordSchema = readerContext.getSchemaHandler().decodeAvroSchema(metadataMap.get(INTERNAL_META_SCHEMA_ID));
recordSchema = readerContext.getSchemaFromMetadata(metadataMap);
record = readerContext.convertToAvroRecord(recordOption.get(), recordSchema);
}
HoodieKey hoodieKey = new HoodieKey((String) metadataMap.get(INTERNAL_META_RECORD_KEY), (String) metadataMap.get(INTERNAL_META_PARTITION_PATH));
Expand All @@ -495,7 +493,7 @@ private Schema getSchemaForAvroPayloadMerge(HoodieRecord record, Map<String, Obj
if (record.isDelete(readerSchema, props)) {
return readerSchema;
}
return readerContext.getSchemaHandler().decodeAvroSchema(infoMap.get(INTERNAL_META_SCHEMA_ID));
return readerContext.getSchemaFromMetadata(infoMap);
}

protected boolean hasNextBaseRecord(T baseRecord, Pair<Option<T>, Map<String, Object>> logRecordInfo) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,9 @@ public void close() throws IOException {
if (recordBuffer != null) {
recordBuffer.close();
}
if (readerContext != null) {
readerContext.close();
}
}

public HoodieFileGroupReaderIterator<T> getClosableIterator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@

import org.apache.avro.Schema;

import javax.annotation.Nullable;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -55,7 +52,7 @@
/**
* This class is responsible for handling the schema for the file group reader.
*/
public class HoodieFileGroupReaderSchemaHandler<T> implements Closeable {
public class HoodieFileGroupReaderSchemaHandler<T> {

protected final Schema dataSchema;

Expand Down Expand Up @@ -244,24 +241,4 @@ public Schema createSchemaFromFields(List<Schema.Field> fields) {
}
return createNewSchemaFromFieldsWithReference(dataSchema, fields);
}

/**
* Encodes the given avro schema for efficient serialization.
*/
public Integer encodeAvroSchema(Schema schema) {
return this.avroSchemaCache.cacheSchema(schema);
}

/**
* Decodes the avro schema with given version ID.
*/
@Nullable
public Schema decodeAvroSchema(Object versionId) {
return this.avroSchemaCache.getSchema((Integer) versionId).orElse(null);
}

@Override
public void close() {
this.avroSchemaCache.close();
}
}
Loading

0 comments on commit f95bb6a

Please sign in to comment.