-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-8551] Validate MOR Without Precombine Set and Fix Related Bugs #12317
base: master
Are you sure you want to change the base?
Changes from 5 commits
76fa927
9155119
c95f9fb
77f2b83
7643abc
e4efb8f
03b2522
03dd57a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -208,6 +208,10 @@ private static boolean isProjectionOfInternal(Schema sourceSchema, | |
} | ||
|
||
public static Option<Schema.Type> findNestedFieldType(Schema schema, String fieldName) { | ||
return findNestedFieldType(schema, fieldName, true); | ||
} | ||
|
||
public static Option<Schema.Type> findNestedFieldType(Schema schema, String fieldName, boolean throwOnNotFound) { | ||
if (StringUtils.isNullOrEmpty(fieldName)) { | ||
return Option.empty(); | ||
} | ||
|
@@ -216,7 +220,11 @@ public static Option<Schema.Type> findNestedFieldType(Schema schema, String fiel | |
for (String part : parts) { | ||
Schema.Field foundField = resolveNullableSchema(schema).getField(part); | ||
if (foundField == null) { | ||
throw new HoodieAvroSchemaException(fieldName + " not a field in " + schema); | ||
if (throwOnNotFound) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. whey do we trigger lookup using findNestedFieldType in the first place if ordering field is not set. in other words, why do we need throwOnNotFound argument. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The ordering field is set. It is just set to a nonexistent field. https://issues.apache.org/jira/browse/HUDI-8574 we have not been validating that the precombine actually exists in the table schema, so the current behavior of the filegroup reader will always throw an exception when trying to read a filegroup with log files. We don't want to completely break reading for those users. We could do a LOG.warn though? |
||
throw new HoodieAvroSchemaException(fieldName + " not a field in " + schema); | ||
} else { | ||
return Option.empty(); | ||
} | ||
} | ||
schema = foundField.schema(); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -112,7 +112,9 @@ public HoodieBaseFileGroupRecordBuffer(HoodieReaderContext<T> readerContext, | |
this.payloadClass = Option.empty(); | ||
} | ||
this.orderingFieldName = Option.ofNullable(ConfigUtils.getOrderingField(props)).orElseGet(() -> hoodieTableMetaClient.getTableConfig().getPreCombineField()); | ||
this.orderingFieldTypeOpt = recordMergeMode == RecordMergeMode.COMMIT_TIME_ORDERING ? Option.empty() : AvroSchemaUtils.findNestedFieldType(readerSchema, this.orderingFieldName); | ||
|
||
// Don't throw exception due to [HUDI-8574] | ||
this.orderingFieldTypeOpt = recordMergeMode == RecordMergeMode.COMMIT_TIME_ORDERING ? Option.empty() : AvroSchemaUtils.findNestedFieldType(readerSchema, this.orderingFieldName, false); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why can't we check for isEmptyOrNull on orderingFieldName and then avoid calling findNestedFieldType. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See above comment, the ordering field might not actually exist in the schema |
||
this.orderingFieldDefault = orderingFieldTypeOpt.map(type -> readerContext.castValue(0, type)).orElse(0); | ||
this.props = props; | ||
this.internalSchema = readerContext.getSchemaHandler().getInternalSchema(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we avoid unrelated changes in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved the fg reader changes and schema handler test to #12340