Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8551] Validate MOR Without Precombine Set and Fix Related Bugs #12317

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,11 @@ public HoodieSparkRecord copy() {

@Override
public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) {
StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
String orderingField = ConfigUtils.getOrderingField(props);
if (StringUtils.isNullOrEmpty(orderingField)) {
return 0;
}
StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
scala.Option<NestedFieldPath> cachedNestedFieldPath =
HoodieInternalRowUtils.getCachedPosList(structType, orderingField);
if (cachedNestedFieldPath.isDefined()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
HoodieAvroUtils.removeFields(skeletonRequiredSchema, rowIndexColumn))

//If we need to do position based merging with log files we will leave the row index column at the end
val dataProjection = if (getHasLogFiles && getShouldMergeUseRecordPosition) {
val dataProjection = if (getShouldMergeUseRecordPosition) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we avoid unrelated changes in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved the fg reader changes and schema handler test to #12340

getIdentityProjection
} else {
projectRecord(dataRequiredSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ private static boolean isProjectionOfInternal(Schema sourceSchema,
}

public static Option<Schema.Type> findNestedFieldType(Schema schema, String fieldName) {
return findNestedFieldType(schema, fieldName, true);
}

public static Option<Schema.Type> findNestedFieldType(Schema schema, String fieldName, boolean throwOnNotFound) {
if (StringUtils.isNullOrEmpty(fieldName)) {
return Option.empty();
}
Expand All @@ -216,7 +220,11 @@ public static Option<Schema.Type> findNestedFieldType(Schema schema, String fiel
for (String part : parts) {
Schema.Field foundField = resolveNullableSchema(schema).getField(part);
if (foundField == null) {
throw new HoodieAvroSchemaException(fieldName + " not a field in " + schema);
if (throwOnNotFound) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whey do we trigger lookup using findNestedFieldType in the first place if ordering field is not set.
why can't we deduce it from table config that ordering field is not set bypass calling findNestedFieldType.

in other words, why do we need throwOnNotFound argument.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ordering field is set. It is just set to a nonexistent field. https://issues.apache.org/jira/browse/HUDI-8574 we have not been validating that the precombine actually exists in the table schema, so the current behavior of the filegroup reader will always throw an exception when trying to read a filegroup with log files. We don't want to completely break reading for those users. We could do a LOG.warn though?

throw new HoodieAvroSchemaException(fieldName + " not a field in " + schema);
} else {
return Option.empty();
}
}
schema = foundField.schema();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public abstract class HoodieReaderContext<T> {
private Boolean hasLogFiles = null;
private Boolean hasBootstrapBaseFile = null;
private Boolean needsBootstrapMerge = null;

// should we do position based merging for mor
private Boolean shouldMergeUseRecordPosition = null;

// Getter and Setter for schemaHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ public HoodieBaseFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
this.payloadClass = Option.empty();
}
this.orderingFieldName = Option.ofNullable(ConfigUtils.getOrderingField(props)).orElseGet(() -> hoodieTableMetaClient.getTableConfig().getPreCombineField());
this.orderingFieldTypeOpt = recordMergeMode == RecordMergeMode.COMMIT_TIME_ORDERING ? Option.empty() : AvroSchemaUtils.findNestedFieldType(readerSchema, this.orderingFieldName);

// Don't throw exception due to [HUDI-8574]
this.orderingFieldTypeOpt = recordMergeMode == RecordMergeMode.COMMIT_TIME_ORDERING ? Option.empty() : AvroSchemaUtils.findNestedFieldType(readerSchema, this.orderingFieldName, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we check for isEmptyOrNull on orderingFieldName and then avoid calling findNestedFieldType.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above comment, the ordering field might not actually exist in the schema

this.orderingFieldDefault = orderingFieldTypeOpt.map(type -> readerContext.castValue(0, type)).orElse(0);
this.props = props;
this.internalSchema = readerContext.getSchemaHandler().getInternalSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,12 @@ public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
this.readerContext = readerContext;
this.storage = storage;
this.hoodieBaseFileOption = fileSlice.getBaseFile();
readerContext.setHasBootstrapBaseFile(hoodieBaseFileOption.isPresent() && hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent());
this.logFiles = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
readerContext.setHasLogFiles(!this.logFiles.isEmpty());
if (readerContext.getHasLogFiles() && start != 0) {
throw new IllegalArgumentException("Filegroup reader is doing log file merge but not reading from the start of the base file");
}
this.props = props;
this.start = start;
this.length = length;
Expand All @@ -109,17 +114,12 @@ public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
readerContext.setTablePath(tablePath);
readerContext.setLatestCommitTime(latestCommitTime);
boolean isSkipMerge = ConfigUtils.getStringWithAltKeys(props, HoodieReaderConfig.MERGE_TYPE, true).equalsIgnoreCase(HoodieReaderConfig.REALTIME_SKIP_MERGE);
readerContext.setShouldMergeUseRecordPosition(shouldUseRecordPosition && !isSkipMerge);
readerContext.setHasLogFiles(!this.logFiles.isEmpty());
if (readerContext.getHasLogFiles() && start != 0) {
throw new IllegalArgumentException("Filegroup reader is doing log file merge but not reading from the start of the base file");
}
readerContext.setHasBootstrapBaseFile(hoodieBaseFileOption.isPresent() && hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent());
readerContext.setShouldMergeUseRecordPosition(shouldUseRecordPosition && !isSkipMerge && readerContext.getHasLogFiles());
readerContext.setSchemaHandler(readerContext.supportsParquetRowIndex()
? new HoodiePositionBasedSchemaHandler<>(readerContext, dataSchema, requestedSchema, internalSchemaOpt, tableConfig, props)
: new HoodieFileGroupReaderSchemaHandler<>(readerContext, dataSchema, requestedSchema, internalSchemaOpt, tableConfig, props));
this.outputConverter = readerContext.getSchemaHandler().getOutputConverter();
this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient, tableConfig.getRecordMergeMode(), props, this.logFiles.isEmpty(), isSkipMerge, shouldUseRecordPosition);
this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient, tableConfig.getRecordMergeMode(), props, !readerContext.getHasLogFiles(), isSkipMerge, shouldUseRecordPosition);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,7 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
protected final HoodieReaderContext<T> readerContext;

protected final TypedProperties properties;

protected final Option<HoodieRecordMerger> recordMerger;

protected final boolean hasBootstrapBaseFile;
protected boolean needsBootstrapMerge;

protected final boolean needsMORMerge;


public HoodieFileGroupReaderSchemaHandler(HoodieReaderContext<T> readerContext,
Schema dataSchema,
Schema requestedSchema,
Expand All @@ -86,16 +79,12 @@ public HoodieFileGroupReaderSchemaHandler(HoodieReaderContext<T> readerContext,
TypedProperties properties) {
this.properties = properties;
this.readerContext = readerContext;
this.hasBootstrapBaseFile = readerContext.getHasBootstrapBaseFile();
this.needsMORMerge = readerContext.getHasLogFiles();
this.recordMerger = readerContext.getRecordMerger();
this.dataSchema = dataSchema;
this.requestedSchema = requestedSchema;
this.hoodieTableConfig = hoodieTableConfig;
this.requiredSchema = prepareRequiredSchema();
this.internalSchema = pruneInternalSchema(requiredSchema, internalSchemaOpt);
this.internalSchemaOpt = getInternalSchemaOpt(internalSchemaOpt);
readerContext.setNeedsBootstrapMerge(this.needsBootstrapMerge);
}

public Schema getDataSchema() {
Expand Down Expand Up @@ -147,25 +136,29 @@ protected InternalSchema doPruneInternalSchema(Schema requiredSchema, InternalSc

private Schema generateRequiredSchema() {
//might need to change this if other queries than mor have mandatory fields
if (!needsMORMerge) {
if (!readerContext.getHasLogFiles()) {
return requestedSchema;
}

if (hoodieTableConfig.getRecordMergeMode() == RecordMergeMode.CUSTOM) {
if (!recordMerger.get().isProjectionCompatible()) {
if (!readerContext.getRecordMerger().get().isProjectionCompatible()) {
return dataSchema;
}
}

List<Schema.Field> addedFields = new ArrayList<>();
for (String field : getMandatoryFieldsForMerging(hoodieTableConfig, properties, dataSchema, recordMerger)) {
for (String field : getMandatoryFieldsForMerging(hoodieTableConfig, properties, dataSchema, readerContext.getRecordMerger())) {
if (!findNestedField(requestedSchema, field).isPresent()) {
Option<Schema.Field> foundFieldOpt = findNestedField(dataSchema, field);
Option<Schema.Field> foundFieldOpt = findNestedField(dataSchema, field);
if (!foundFieldOpt.isPresent()) {
throw new IllegalArgumentException("Field: " + field + " does not exist in the table schema");
//see [HUDI-8574]
if (!field.equals(hoodieTableConfig.getPreCombineField())) {
throw new IllegalArgumentException("Field: " + field + " does not exist in the table schema");
}
} else {
Schema.Field foundField = foundFieldOpt.get();
addedFields.add(foundField);
}
Schema.Field foundField = foundFieldOpt.get();
addedFields.add(foundField);
}
}

Expand Down Expand Up @@ -205,8 +198,9 @@ private static String[] getMandatoryFieldsForMerging(HoodieTableConfig cfg, Type
protected Schema prepareRequiredSchema() {
Schema preReorderRequiredSchema = generateRequiredSchema();
Pair<List<Schema.Field>, List<Schema.Field>> requiredFields = getDataAndMetaCols(preReorderRequiredSchema);
this.needsBootstrapMerge = hasBootstrapBaseFile && !requiredFields.getLeft().isEmpty() && !requiredFields.getRight().isEmpty();
return needsBootstrapMerge
readerContext.setNeedsBootstrapMerge(readerContext.getHasBootstrapBaseFile()
&& !requiredFields.getLeft().isEmpty() && !requiredFields.getRight().isEmpty());
return readerContext.getNeedsBootstrapMerge()
? createSchemaFromFields(Stream.concat(requiredFields.getLeft().stream(), requiredFields.getRight().stream()).collect(Collectors.toList()))
: preReorderRequiredSchema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.Types;
Expand Down Expand Up @@ -50,10 +51,18 @@ public HoodiePositionBasedSchemaHandler(HoodieReaderContext<T> readerContext,
super(readerContext, dataSchema, requestedSchema, internalSchemaOpt, hoodieTableConfig, properties);
}

private boolean morMergeNeedsPositionCol() {
return readerContext.supportsParquetRowIndex() && readerContext.getShouldMergeUseRecordPosition();
}

private boolean bootstrapMergeNeedsPositionCol() {
return readerContext.supportsParquetRowIndex() && readerContext.getNeedsBootstrapMerge();
}

@Override
protected Schema prepareRequiredSchema() {
Schema preMergeSchema = super.prepareRequiredSchema();
return readerContext.getShouldMergeUseRecordPosition() && readerContext.getHasLogFiles()
return morMergeNeedsPositionCol()
? addPositionalMergeCol(preMergeSchema)
: preMergeSchema;
}
Expand All @@ -65,7 +74,7 @@ protected Option<InternalSchema> getInternalSchemaOpt(Option<InternalSchema> int

@Override
protected InternalSchema doPruneInternalSchema(Schema requiredSchema, InternalSchema internalSchema) {
if (!(readerContext.getShouldMergeUseRecordPosition() && readerContext.getHasLogFiles())) {
if (!morMergeNeedsPositionCol()) {
return super.doPruneInternalSchema(requiredSchema, internalSchema);
}

Expand All @@ -82,20 +91,24 @@ private static InternalSchema addPositionalMergeCol(InternalSchema internalSchem
@Override
public Pair<List<Schema.Field>,List<Schema.Field>> getBootstrapRequiredFields() {
Pair<List<Schema.Field>,List<Schema.Field>> dataAndMetaCols = super.getBootstrapRequiredFields();
if (readerContext.supportsParquetRowIndex()) {
if (!dataAndMetaCols.getLeft().isEmpty() && !dataAndMetaCols.getRight().isEmpty()) {
if (bootstrapMergeNeedsPositionCol() || morMergeNeedsPositionCol()) {
if (!dataAndMetaCols.getLeft().isEmpty()) {
dataAndMetaCols.getLeft().add(getPositionalMergeField());
}
if (!dataAndMetaCols.getRight().isEmpty()) {
dataAndMetaCols.getRight().add(getPositionalMergeField());
}
}
return dataAndMetaCols;
}

private static Schema addPositionalMergeCol(Schema input) {
@VisibleForTesting
static Schema addPositionalMergeCol(Schema input) {
return appendFieldsToSchemaDedupNested(input, Collections.singletonList(getPositionalMergeField()));
}

private static Schema.Field getPositionalMergeField() {
@VisibleForTesting
static Schema.Field getPositionalMergeField() {
return new Schema.Field(ROW_INDEX_TEMPORARY_COLUMN_NAME,
Schema.create(Schema.Type.LONG), "", -1L);
}
Expand Down
Loading
Loading