Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ jobs:
mvn verify -Pintegration-tests -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" -pl hudi-flink-datasource/hudi-flink $MVN_ARGS

docker-java17-test:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
strategy:
matrix:
include:
Expand Down Expand Up @@ -269,7 +269,7 @@ jobs:
./packaging/bundle-validation/run_docker_java17.sh

validate-bundles:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
strategy:
matrix:
include:
Expand Down
26 changes: 25 additions & 1 deletion azure-pipelines-20230430.yml
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,31 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
options: $(MVN_OPTS_TEST) -Pfunctional-tests -pl $(JOB2_MODULES)
options: $(MVN_OPTS_TEST) -pl $(JOB2_MODULES) -Pfunctional-tests -debug
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx4g'
- script: |
grep "testcase" */target/surefire-reports/*.xml */*/target/surefire-reports/*.xml | awk -F'"' ' { print $6,$4,$2 } ' | sort -nr | head -n 100
displayName: Top 100 long-running testcases
- job: UT_FT_5
displayName: FT client/spark-client Long running
timeoutInMinutes: '150'
steps:
- task: Maven@4
displayName: maven install
inputs:
mavenPomFile: 'pom.xml'
goals: 'clean install'
options: $(MVN_OPTS_INSTALL)
publishJUnitResults: false
jdkVersionOption: '1.8'
- task: Maven@4
displayName: FT client/spark-client Long running
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
options: $(MVN_OPTS_TEST) -pl $(JOB2_MODULES) -Pisolated-tests
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx4g'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.TableSchemaResolver;
Expand Down Expand Up @@ -101,13 +103,16 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import static org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName;
import static org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.keygen.KeyGenUtils.getComplexKeygenErrorMessage;
import static org.apache.hudi.keygen.KeyGenUtils.isComplexKeyGeneratorWithSingleRecordKeyField;
import static org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;

/**
Expand Down Expand Up @@ -1323,7 +1328,7 @@ public final HoodieTable initTable(WriteOperationType operationType, Option<Stri
HoodieTable table = createTable(config, hadoopConf, metaClient);

// Validate table properties
metaClient.validateTableProperties(config.getProps());
validateTableProperties(metaClient.getTableConfig(), config.getProps());

switch (operationType) {
case INSERT:
Expand Down Expand Up @@ -1381,6 +1386,44 @@ protected void releaseResources(String instantTime) {
// do nothing here
}

/**
* Validate table properties.
*
* @param properties Properties from writeConfig.
*/
public void validateTableProperties(HoodieTableConfig tableConfig, Properties properties) {
// Once meta fields are disabled, it cant be re-enabled for a given table.
if (!tableConfig.populateMetaFields()
&& Boolean.parseBoolean((String) properties.getOrDefault(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue().toString()))) {
throw new HoodieException(HoodieTableConfig.POPULATE_META_FIELDS.key() + " already disabled for the table. Can't be re-enabled back");
}

// Meta fields can be disabled only when either {@code SimpleKeyGenerator}, {@code ComplexKeyGenerator}, {@code NonpartitionedKeyGenerator} is used
if (!tableConfig.populateMetaFields()) {
String keyGenClass = properties.getProperty(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.SimpleKeyGenerator");
if (!keyGenClass.equals("org.apache.hudi.keygen.SimpleKeyGenerator")
&& !keyGenClass.equals("org.apache.hudi.keygen.NonpartitionedKeyGenerator")
&& !keyGenClass.equals("org.apache.hudi.keygen.ComplexKeyGenerator")) {
throw new HoodieException("Only simple, non-partitioned or complex key generator are supported when meta-fields are disabled. Used: " + keyGenClass);
}
}
if (config.enableComplexKeygenValidation()
&& isComplexKeyGeneratorWithSingleRecordKeyField(tableConfig)) {
throw new HoodieException(getComplexKeygenErrorMessage("ingestion"));
}

//Check to make sure it's not a COW table with consistent hashing bucket index
if (tableConfig.getTableType() == HoodieTableType.COPY_ON_WRITE) {
String indexType = properties.getProperty("hoodie.index.type");
if (indexType != null && indexType.equals("BUCKET")) {
String bucketEngine = properties.getProperty("hoodie.index.bucket.engine");
if (bucketEngine != null && bucketEngine.equals("CONSISTENT_HASHING")) {
throw new HoodieException("Consistent hashing bucket index does not work with COW table. Use simple bucket index or an MOR table.");
}
}
}
}

@Override
public void close() {
// Stop timeline-server if running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,36 @@ public class HoodieWriteConfig extends HoodieConfig {
"**Note** This is being actively worked on. Please use "
+ "`hoodie.datasource.write.keygenerator.class` instead.");

public static final ConfigProperty<Boolean> COMPLEX_KEYGEN_NEW_ENCODING = ConfigProperty
.key("hoodie.write.complex.keygen.new.encoding")
.defaultValue(false)
.markAdvanced()
.sinceVersion("0.14.2")
.supportedVersions("0.14.2")
.withDocumentation("If set to false, the record key field name is encoded and prepended "
+ "in the case where a single record key field is used in the complex key generator, "
+ "i.e., record keys stored in _hoodie_record_key meta field is in the format of "
+ "`<field_name>:<field_value>`, which conforms to the behavior "
+ "in 0.14.0 release and older. If set to true, the record key field name is not "
+ "encoded under the same case in the complex key generator, i.e., record keys stored "
+ "in _hoodie_record_key meta field is in the format of `<field_value>`, "
+ "which conforms to the behavior in 0.14.1 release.");

public static final ConfigProperty<Boolean> ENABLE_COMPLEX_KEYGEN_VALIDATION = ConfigProperty
.key("hoodie.write.complex.keygen.validation.enable")
.defaultValue(true)
.markAdvanced()
.sinceVersion("0.14.2")
.supportedVersions("0.14.2", "0.15.1", "1.0.3", "1.1.0")
.withDocumentation("This config only takes effect for writing table version 8 and below, "
+ "upgrade or downgrade. If set to true, the writer enables the validation on whether the "
+ "table uses the complex key generator with a single record key field, which can be affected "
+ "by a breaking change in 0.14.1, 0.15.0, 1.0.0, 1.0.1, 1.0.2 releases, causing key "
+ "encoding change and potential duplicates in the table. The validation fails the "
+ "pipeline if the table meets the condition for the user to take proper action. "
+ "The user can turn this validation off by setting the config to false, after "
+ "evaluating the table and situation and doing table repair if needed.");

public static final ConfigProperty<String> ROLLBACK_USING_MARKERS_ENABLE = ConfigProperty
.key("hoodie.rollback.using.markers")
.defaultValue("true")
Expand Down Expand Up @@ -1317,6 +1347,10 @@ public boolean shouldRollbackUsingMarkers() {
return getBoolean(ROLLBACK_USING_MARKERS_ENABLE);
}

public boolean enableComplexKeygenValidation() {
return getBoolean(ENABLE_COMPLEX_KEYGEN_VALIDATION);
}

public int getWriteBufferLimitBytes() {
return Integer.parseInt(getStringOrDefault(WRITE_BUFFER_LIMIT_BYTES_VALUE));
}
Expand Down Expand Up @@ -2745,6 +2779,11 @@ public Builder withRollbackUsingMarkers(boolean rollbackUsingMarkers) {
return this;
}

public Builder withComplexKeygenValidation(boolean enableComplexKeygenValidation) {
writeConfig.setValue(ENABLE_COMPLEX_KEYGEN_VALIDATION, String.valueOf(enableComplexKeygenValidation));
return this;
}

public Builder withWriteBufferLimitBytes(int writeBufferLimit) {
writeConfig.setValue(WRITE_BUFFER_LIMIT_BYTES_VALUE, String.valueOf(writeBufferLimit));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.function.SerializableFunctionUnchecked;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;

import java.util.Arrays;
Expand All @@ -29,6 +30,7 @@
*/
public class ComplexAvroKeyGenerator extends BaseKeyGenerator {
public static final String DEFAULT_RECORD_KEY_SEPARATOR = ":";
private final SerializableFunctionUnchecked<GenericRecord, String> recordKeyFunction;

public ComplexAvroKeyGenerator(TypedProperties props) {
super(props);
Expand All @@ -37,18 +39,26 @@ public ComplexAvroKeyGenerator(TypedProperties props) {
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
this.recordKeyFunction = getRecordKeyFunc(KeyGenUtils.encodeSingleKeyFieldNameForComplexKeyGen(props));
}

@Override
public String getRecordKey(GenericRecord record) {
if (getRecordKeyFieldNames().size() == 1) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled());
}
return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
return recordKeyFunction.apply(record);
}

@Override
public String getPartitionPath(GenericRecord record) {
return KeyGenUtils.getRecordPartitionPath(record, getPartitionPathFields(), hiveStylePartitioning, encodePartitionPath, isConsistentLogicalTimestampEnabled());
}

private SerializableFunctionUnchecked<GenericRecord, String> getRecordKeyFunc(boolean encodeSingleKeyFieldName) {
if (getRecordKeyFieldNames().size() == 1) {
if (encodeSingleKeyFieldName) {
return record -> KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
}
return record -> KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled());
}
return record -> KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.common.util.ReflectionUtils;
Expand All @@ -39,6 +41,8 @@
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.hudi.config.HoodieWriteConfig.COMPLEX_KEYGEN_NEW_ENCODING;

public class KeyGenUtils {

protected static final String NULL_RECORDKEY_PLACEHOLDER = "__null__";
Expand All @@ -47,7 +51,7 @@ public class KeyGenUtils {
protected static final String HUDI_DEFAULT_PARTITION_PATH = PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
public static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
public static final String DEFAULT_RECORD_KEY_PARTS_SEPARATOR = ",";
public static final String DEFAULT_COMPOSITE_KEY_FILED_VALUE = ":";
public static final String DEFAULT_COLUMN_VALUE_SEPARATOR = ":";

public static final String RECORD_KEY_GEN_PARTITION_ID_CONFIG = "_hoodie.record.key.gen.partition.id";
public static final String RECORD_KEY_GEN_INSTANT_TIME_CONFIG = "_hoodie.record.key.gen.instant.time";
Expand Down Expand Up @@ -128,7 +132,7 @@ public static String[] extractRecordKeys(String recordKey) {

public static String[] extractRecordKeysByFields(String recordKey, List<String> fields) {
String[] fieldKV = recordKey.split(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
return Arrays.stream(fieldKV).map(kv -> kv.split(DEFAULT_COMPOSITE_KEY_FILED_VALUE, 2))
return Arrays.stream(fieldKV).map(kv -> kv.split(DEFAULT_COLUMN_VALUE_SEPARATOR, 2))
.filter(kvArray -> kvArray.length == 1 || fields.isEmpty() || (fields.contains(kvArray[0])))
.map(kvArray -> {
if (kvArray.length == 1) {
Expand All @@ -149,11 +153,11 @@ public static String getRecordKey(GenericRecord record, List<String> recordKeyFi
for (String recordKeyField : recordKeyFields) {
String recordKeyValue = HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true, consistentLogicalTimestampEnabled);
if (recordKeyValue == null) {
recordKey.append(recordKeyField + DEFAULT_COMPOSITE_KEY_FILED_VALUE + NULL_RECORDKEY_PLACEHOLDER + DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
recordKey.append(recordKeyField).append(DEFAULT_COLUMN_VALUE_SEPARATOR).append(NULL_RECORDKEY_PLACEHOLDER).append(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
} else if (recordKeyValue.isEmpty()) {
recordKey.append(recordKeyField + DEFAULT_COMPOSITE_KEY_FILED_VALUE + EMPTY_RECORDKEY_PLACEHOLDER + DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
recordKey.append(recordKeyField).append(DEFAULT_COLUMN_VALUE_SEPARATOR).append(EMPTY_RECORDKEY_PLACEHOLDER).append(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
} else {
recordKey.append(recordKeyField + DEFAULT_COMPOSITE_KEY_FILED_VALUE + recordKeyValue + DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
recordKey.append(recordKeyField).append(DEFAULT_COLUMN_VALUE_SEPARATOR).append(recordKeyValue).append(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
keyIsNullEmpty = false;
}
}
Expand Down Expand Up @@ -260,4 +264,30 @@ public static List<String> getRecordKeyFields(TypedProperties props) {
public static boolean enableAutoGenerateRecordKeys(TypedProperties props) {
return !props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
}

public static boolean isComplexKeyGeneratorWithSingleRecordKeyField(HoodieTableConfig tableConfig) {
Option<String[]> recordKeyFields = tableConfig.getRecordKeyFields();
return KeyGeneratorType.isComplexKeyGenerator(tableConfig)
&& recordKeyFields.isPresent() && recordKeyFields.get().length == 1;
}

public static String getComplexKeygenErrorMessage(String operation) {
return "This table uses the complex key generator with a single record "
+ "key field. If the table is written with Hudi 0.14.1, 0.15.0, 1.0.0, 1.0.1, or 1.0.2 "
+ "release before, the table may potentially contain duplicates due to a breaking "
+ "change in the key encoding in the _hoodie_record_key meta field (HUDI-7001) which "
+ "is crucial for upserts. Please take action based on the details on the deployment "
+ "guide (https://hudi.apache.org/docs/deployment#complex-key-generator) "
+ "before resuming the " + operation + " to the table. If you're certain "
+ "that the table is not affected by the key encoding change, set "
+ "`hoodie.write.complex.keygen.validation.enable=false` to skip this validation.";
}

public static boolean encodeSingleKeyFieldNameForComplexKeyGen(TypedProperties props) {
return !ConfigUtils.getBooleanWithAltKeys(props, COMPLEX_KEYGEN_NEW_ENCODING);
}

public static boolean mayUseNewEncodingForComplexKeyGen(HoodieTableConfig tableConfig) {
return isComplexKeyGeneratorWithSingleRecordKeyField(tableConfig);
}
}
Loading
Loading