Skip to content

Commit

Permalink
[HUDI-7001] ComplexAvroKeyGenerator should represent single record ke…
Browse files Browse the repository at this point in the history
…y as the value string without composing the key field name (apache#9936)
  • Loading branch information
hehuiyuan authored Nov 7, 2023
1 parent d0c78bf commit 5bbc025
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public ComplexAvroKeyGenerator(TypedProperties props) {

@Override
public String getRecordKey(GenericRecord record) {
if (getRecordKeyFieldNames().size() == 1) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled());
}
return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.keygen;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;

import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.Test;

import static junit.framework.TestCase.assertEquals;

public class TestComplexAvroKeyGenerator {

@Test
public void testSingleValueKeyGenerator() {
TypedProperties properties = new TypedProperties();
properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "timestamp");
ComplexAvroKeyGenerator compositeKeyGenerator = new ComplexAvroKeyGenerator(properties);
assertEquals(compositeKeyGenerator.getRecordKeyFieldNames().size(), 1);
assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 1);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
String rowKey = record.get("_row_key").toString();
String partitionPath = record.get("timestamp").toString();
HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
assertEquals(rowKey, hoodieKey.getRecordKey());
assertEquals(partitionPath, hoodieKey.getPartitionPath());
}

@Test
public void testMultipleValueKeyGenerator() {
TypedProperties properties = new TypedProperties();
properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key,timestamp");
properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "rider,driver");
ComplexAvroKeyGenerator compositeKeyGenerator = new ComplexAvroKeyGenerator(properties);
assertEquals(compositeKeyGenerator.getRecordKeyFieldNames().size(), 2);
assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
String rowKey =
"_row_key" + ComplexAvroKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("_row_key").toString() + ","
+ "timestamp" + ComplexAvroKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("timestamp").toString();
String partitionPath = record.get("rider").toString() + "/" + record.get("driver").toString();
HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
assertEquals(rowKey, hoodieKey.getRecordKey());
assertEquals(partitionPath, hoodieKey.getPartitionPath());
}

@Test
public void testMultipleValueKeyGeneratorNonPartitioned() {
TypedProperties properties = new TypedProperties();
properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key,timestamp");
properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "");
ComplexAvroKeyGenerator compositeKeyGenerator = new ComplexAvroKeyGenerator(properties);
assertEquals(compositeKeyGenerator.getRecordKeyFieldNames().size(), 2);
assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 0);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
String rowKey =
"_row_key" + ComplexAvroKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("_row_key").toString() + ","
+ "timestamp" + ComplexAvroKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("timestamp").toString();
String partitionPath = "";
HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
assertEquals(rowKey, hoodieKey.getRecordKey());
assertEquals(partitionPath, hoodieKey.getPartitionPath());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,10 @@ private <S> S combineCompositeRecordKeyInternal(
// NOTE: If record-key part has already been a string [[toString]] will be a no-op
S convertedKeyPart = emptyKeyPartHandler.apply(converter.apply(recordKeyParts[i]));

sb.appendJava(recordKeyFields.get(i));
sb.appendJava(COMPOSITE_KEY_FIELD_VALUE_INFIX);
if (recordKeyParts.length > 1) {
sb.appendJava(recordKeyFields.get(i));
sb.appendJava(COMPOSITE_KEY_FIELD_VALUE_INFIX);
}
sb.append(convertedKeyPart);
// This check is to validate that overall composite-key has at least one non-null, non-empty
// segment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,16 +480,16 @@ private ExpectedResult(String[] evolvedRows, String[] rowsWithMeta, String[] row
"+I[Alice, 90000.9, unknown, +I[9, 9, s9, 99, t9, drop_add9], {Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
},
new String[] {
"+I[uuid:id0, Indica, null, 12, null, {Indica=1212.0}, [12.0], null, null, null]",
"+I[uuid:id1, Danny, 10000.1, 23, +I[1, 1, s1, 11, t1, drop_add1], {Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
"+I[uuid:id2, Stephen, null, 33, +I[2, null, s2, 2, null, null], {Stephen=3333.0}, [33.0], null, null, null]",
"+I[uuid:id3, Julian, 30000.3, 53, +I[3, 3, s3, 33, t3, drop_add3], {Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
"+I[uuid:id4, Fabian, null, 31, +I[4, null, s4, 4, null, null], {Fabian=3131.0}, [31.0], null, null, null]",
"+I[uuid:id5, Sophia, null, 18, +I[5, null, s5, 5, null, null], {Sophia=1818.0}, [18.0, 18.0], null, null, null]",
"+I[uuid:id6, Emma, null, 20, +I[6, null, s6, 6, null, null], {Emma=2020.0}, [20.0], null, null, null]",
"+I[uuid:id7, Bob, null, 44, +I[7, null, s7, 7, null, null], {Bob=4444.0}, [44.0, 44.0], null, null, null]",
"+I[uuid:id8, Han, null, 56, +I[8, null, s8, 8, null, null], {Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]",
"+I[uuid:id9, Alice, 90000.9, unknown, +I[9, 9, s9, 99, t9, drop_add9], {Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
"+I[id0, Indica, null, 12, null, {Indica=1212.0}, [12.0], null, null, null]",
"+I[id1, Danny, 10000.1, 23, +I[1, 1, s1, 11, t1, drop_add1], {Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
"+I[id2, Stephen, null, 33, +I[2, null, s2, 2, null, null], {Stephen=3333.0}, [33.0], null, null, null]",
"+I[id3, Julian, 30000.3, 53, +I[3, 3, s3, 33, t3, drop_add3], {Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
"+I[id4, Fabian, null, 31, +I[4, null, s4, 4, null, null], {Fabian=3131.0}, [31.0], null, null, null]",
"+I[id5, Sophia, null, 18, +I[5, null, s5, 5, null, null], {Sophia=1818.0}, [18.0, 18.0], null, null, null]",
"+I[id6, Emma, null, 20, +I[6, null, s6, 6, null, null], {Emma=2020.0}, [20.0], null, null, null]",
"+I[id7, Bob, null, 44, +I[7, null, s7, 7, null, null], {Bob=4444.0}, [44.0, 44.0], null, null, null]",
"+I[id8, Han, null, 56, +I[8, null, s8, 8, null, null], {Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]",
"+I[id9, Alice, 90000.9, unknown, +I[9, 9, s9, 99, t9, drop_add9], {Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
},
new String[] {
"+I[1]",
Expand Down Expand Up @@ -530,18 +530,18 @@ private ExpectedResult(String[] evolvedRows, String[] rowsWithMeta, String[] row
"+I[Julian, 30000.3, 53, +I[3, 3, s3, 33, t3, drop_add3], {Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
},
new String[] {
"+I[uuid:id0, Indica, null, 12, null, {Indica=1212.0}, [12.0], null, null, null]",
"+I[uuid:id1, Danny, null, 23, +I[1, null, s1, 1, null, null], {Danny=2323.0}, [23.0, 23.0], null, null, null]",
"+I[uuid:id2, Stephen, null, 33, +I[2, null, s2, 2, null, null], {Stephen=3333.0}, [33.0], null, null, null]",
"+I[uuid:id3, Julian, null, 53, +I[3, null, s3, 3, null, null], {Julian=5353.0}, [53.0, 53.0], null, null, null]",
"+I[uuid:id4, Fabian, null, 31, +I[4, null, s4, 4, null, null], {Fabian=3131.0}, [31.0], null, null, null]",
"+I[uuid:id5, Sophia, null, 18, +I[5, null, s5, 5, null, null], {Sophia=1818.0}, [18.0, 18.0], null, null, null]",
"+I[uuid:id6, Emma, null, 20, +I[6, null, s6, 6, null, null], {Emma=2020.0}, [20.0], null, null, null]",
"+I[uuid:id7, Bob, null, 44, +I[7, null, s7, 7, null, null], {Bob=4444.0}, [44.0, 44.0], null, null, null]",
"+I[uuid:id8, Han, null, 56, +I[8, null, s8, 8, null, null], {Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]",
"+I[uuid:id9, Alice, 90000.9, unknown, +I[9, 9, s9, 99, t9, drop_add9], {Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
"+I[uuid:id1, Danny, 10000.1, 23, +I[1, 1, s1, 11, t1, drop_add1], {Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
"+I[uuid:id3, Julian, 30000.3, 53, +I[3, 3, s3, 33, t3, drop_add3], {Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
"+I[id0, Indica, null, 12, null, {Indica=1212.0}, [12.0], null, null, null]",
"+I[id1, Danny, null, 23, +I[1, null, s1, 1, null, null], {Danny=2323.0}, [23.0, 23.0], null, null, null]",
"+I[id2, Stephen, null, 33, +I[2, null, s2, 2, null, null], {Stephen=3333.0}, [33.0], null, null, null]",
"+I[id3, Julian, null, 53, +I[3, null, s3, 3, null, null], {Julian=5353.0}, [53.0, 53.0], null, null, null]",
"+I[id4, Fabian, null, 31, +I[4, null, s4, 4, null, null], {Fabian=3131.0}, [31.0], null, null, null]",
"+I[id5, Sophia, null, 18, +I[5, null, s5, 5, null, null], {Sophia=1818.0}, [18.0, 18.0], null, null, null]",
"+I[id6, Emma, null, 20, +I[6, null, s6, 6, null, null], {Emma=2020.0}, [20.0], null, null, null]",
"+I[id7, Bob, null, 44, +I[7, null, s7, 7, null, null], {Bob=4444.0}, [44.0, 44.0], null, null, null]",
"+I[id8, Han, null, 56, +I[8, null, s8, 8, null, null], {Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]",
"+I[id9, Alice, 90000.9, unknown, +I[9, 9, s9, 99, t9, drop_add9], {Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
"+I[id1, Danny, 10000.1, 23, +I[1, 1, s1, 11, t1, drop_add1], {Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
"+I[id3, Julian, 30000.3, 53, +I[3, 3, s3, 33, t3, drop_add3], {Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
},
new String[] {
"+I[1]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieDatasetBulkInsertHelper;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.bulkinsert.NonSortPartitionerWithRows;
import org.apache.hudi.keygen.ComplexKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.testutils.DataSourceTestUtils;
import org.apache.hudi.testutils.HoodieSparkClientTestBase;
Expand Down Expand Up @@ -142,8 +144,13 @@ private void testBulkInsertHelperFor(String keyGenClass, String recordKeyField)
boolean isNonPartitionedKeyGen = keyGenClass.equals(NonpartitionedKeyGenerator.class.getName());
boolean isComplexKeyGen = keyGenClass.equals(ComplexKeyGenerator.class.getName());

TypedProperties keyGenProperties = new TypedProperties();
keyGenProperties.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), recordKeyField);
keyGenProperties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition");
ComplexKeyGenerator complexKeyGenerator = new ComplexKeyGenerator(keyGenProperties);

result.toJavaRDD().foreach(entry -> {
String recordKey = isComplexKeyGen ? String.format("%s:%s", recordKeyField, entry.getAs(recordKeyField)) : entry.getAs(recordKeyField).toString();
String recordKey = isComplexKeyGen ? complexKeyGenerator.getRecordKey(entry) : entry.getAs(recordKeyField).toString();
assertEquals(recordKey, entry.get(resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)));

String partitionPath = isNonPartitionedKeyGen ? HoodieTableMetadata.EMPTY_PARTITION_NAME : entry.getAs("partition").toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void testSingleValueKeyGenerator() {
String rowKey = record.get("_row_key").toString();
String partitionPath = record.get("timestamp").toString();
HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
assertEquals("_row_key:" + rowKey, hoodieKey.getRecordKey());
assertEquals(rowKey, hoodieKey.getRecordKey());
assertEquals(partitionPath, hoodieKey.getPartitionPath());

Row row = KeyGeneratorTestUtilities.getRow(record, HoodieTestDataGenerator.AVRO_SCHEMA,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ class TestDataSourceDefaults extends ScalaAssertionSupport {
{
val keyGen = new ComplexKeyGenerator(getKeyConfig("field1,", "field1,", "false"))

val expectedKey = new HoodieKey("field1:field1", "field1")
val expectedKey = new HoodieKey("field1", "field1")

assertEquals(expectedKey, keyGen.getKey(baseRecord))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,8 +1001,8 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
// Test insert into
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, '$day', 12)")
checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, day, hh from $tableName order by id")(
Seq("id:1", s"$escapedPathPart/12", 1, "a1", 10, 1000, day, 12),
Seq("id:2", s"$escapedPathPart/12", 2, "a2", 10, 1000, day, 12)
Seq("1", s"$escapedPathPart/12", 1, "a1", 10, 1000, day, 12),
Seq("2", s"$escapedPathPart/12", 2, "a2", 10, 1000, day, 12)
)
// Test merge into
spark.sql(
Expand All @@ -1013,19 +1013,19 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
|when matched then update set *
|""".stripMargin)
checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, day, hh from $tableName order by id")(
Seq("id:1", s"$escapedPathPart/12", 1, "a1", 11, 1001, day, 12),
Seq("id:2", s"$escapedPathPart/12", 2, "a2", 10, 1000, day, 12)
Seq("1", s"$escapedPathPart/12", 1, "a1", 11, 1001, day, 12),
Seq("2", s"$escapedPathPart/12", 2, "a2", 10, 1000, day, 12)
)
// Test update
spark.sql(s"update $tableName set value = value + 1 where id = 2")
checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, day, hh from $tableName order by id")(
Seq("id:1", s"$escapedPathPart/12", 1, "a1", 11, 1001, day, 12),
Seq("id:2", s"$escapedPathPart/12", 2, "a2", 11, 1000, day, 12)
Seq("1", s"$escapedPathPart/12", 1, "a1", 11, 1001, day, 12),
Seq("2", s"$escapedPathPart/12", 2, "a2", 11, 1000, day, 12)
)
// Test delete
spark.sql(s"delete from $tableName where id = 1")
checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, day, hh from $tableName order by id")(
Seq("id:2", s"$escapedPathPart/12", 2, "a2", 11, 1000, day, 12)
Seq("2", s"$escapedPathPart/12", 2, "a2", 11, 1000, day, 12)
)
}
}
Expand Down

0 comments on commit 5bbc025

Please sign in to comment.