Skip to content

Commit

Permalink
[HUDI-7218] Integrate new HFile reader with file reader factory (#10330)
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua committed Feb 27, 2024
1 parent e00e2d7 commit a058344
Show file tree
Hide file tree
Showing 71 changed files with 2,520 additions and 922 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.TreeSet;

import static java.util.stream.Collectors.toList;
import static org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
import static org.apache.hudi.table.action.commit.HoodieDeleteHelper.createDeleteRecord;

/**
Expand Down Expand Up @@ -185,7 +186,7 @@ public static List<String> filterKeysFromFile(Path filePath, List<String> candid
ValidationUtils.checkArgument(FSUtils.isBaseFile(filePath));
List<String> foundRecordKeys = new ArrayList<>();
try (HoodieFileReader fileReader = HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
.getFileReader(configuration, filePath)) {
.getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, configuration, filePath)) {
// Load all rowKeys from the file, to double-confirm
if (!candidateRecordKeys.isEmpty()) {
HoodieTimer timer = HoodieTimer.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.io;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
Expand Down Expand Up @@ -666,7 +667,8 @@ private static HoodieLogBlock getBlock(HoodieWriteConfig writeConfig,
return new HoodieAvroDataBlock(records, header, keyField);
case HFILE_DATA_BLOCK:
return new HoodieHFileDataBlock(
records, header, writeConfig.getHFileCompressionAlgorithm(), new Path(writeConfig.getBasePath()));
records, header, writeConfig.getHFileCompressionAlgorithm(), new Path(writeConfig.getBasePath()),
writeConfig.getBooleanOrDefault(HoodieReaderConfig.USE_NATIVE_HFILE_READER));
case PARQUET_DATA_BLOCK:
return new HoodieParquetDataBlock(
records,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,8 @@ public void performMergeDataValidationCheck(WriteStatus writeStatus) {
}

long oldNumWrites = 0;
try (HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(this.recordMerger.getRecordType()).getFileReader(hoodieTable.getHadoopConf(), oldFilePath)) {
try (HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(this.recordMerger.getRecordType())
.getFileReader(config, hoodieTable.getHadoopConf(), oldFilePath)) {
oldNumWrites = reader.getTotalRecords();
} catch (IOException e) {
throw new HoodieUpsertException("Failed to check for merge data validation", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ protected HoodieBaseFile getLatestBaseFile() {
}

protected HoodieFileReader createNewFileReader() throws IOException {
return HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType()).getFileReader(hoodieTable.getHadoopConf(),
new Path(getLatestBaseFile().getPath()));
return HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType())
.getFileReader(config, hoodieTable.getHadoopConf(), new Path(getLatestBaseFile().getPath()));
}

protected HoodieFileReader createNewFileReader(HoodieBaseFile hoodieBaseFile) throws IOException {
return HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType()).getFileReader(hoodieTable.getHadoopConf(),
new Path(hoodieBaseFile.getPath()));
return HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType())
.getFileReader(config, hoodieTable.getHadoopConf(), new Path(hoodieBaseFile.getPath()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializeRecordIndexPartition()
// Collect record keys from the files in parallel
HoodieData<HoodieRecord> records = readRecordKeysFromBaseFiles(
engineContext,
dataWriteConfig,
partitionBaseFilePairs,
false,
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(),
Expand Down Expand Up @@ -864,7 +865,8 @@ public void buildMetadataPartitions(HoodieEngineContext engineContext, List<Hood
public void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata, HoodieData<WriteStatus> writeStatus, String instantTime) {
processAndCommit(instantTime, () -> {
Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordMap =
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, commitMetadata, instantTime, getRecordsGenerationParams());
HoodieTableMetadataUtil.convertMetadataToRecords(
engineContext, dataWriteConfig, commitMetadata, instantTime, getRecordsGenerationParams());

// Updates for record index are created by parsing the WriteStatus which is a hudi-client object. Hence, we cannot yet move this code
// to the HoodieTableMetadataUtil class in hudi-common.
Expand All @@ -880,7 +882,8 @@ public void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata, HoodieD
public void update(HoodieCommitMetadata commitMetadata, HoodieData<HoodieRecord> records, String instantTime) {
processAndCommit(instantTime, () -> {
Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordMap =
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, commitMetadata, instantTime, getRecordsGenerationParams());
HoodieTableMetadataUtil.convertMetadataToRecords(
engineContext, dataWriteConfig, commitMetadata, instantTime, getRecordsGenerationParams());
HoodieData<HoodieRecord> additionalUpdates = getRecordIndexAdditionalUpserts(records, commitMetadata);
partitionToRecordMap.put(MetadataPartitionType.RECORD_INDEX, records.union(additionalUpdates));
return partitionToRecordMap;
Expand Down Expand Up @@ -1421,6 +1424,7 @@ private HoodieData<HoodieRecord> getRecordIndexReplacedRecords(HoodieReplaceComm

return readRecordKeysFromBaseFiles(
engineContext,
dataWriteConfig,
partitionBaseFilePairs,
true,
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
HoodieRecord.HoodieRecordType recordType = table.getConfig().getRecordMerger().getRecordType();
HoodieFileReader baseFileReader = HoodieFileReaderFactory
.getReaderFactory(recordType)
.getFileReader(hadoopConf, mergeHandle.getOldFilePath());
.getFileReader(writeConfig, hadoopConf, mergeHandle.getOldFilePath());
HoodieFileReader bootstrapFileReader = null;

Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
Expand Down Expand Up @@ -114,7 +114,7 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf());
bootstrapFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).newBootstrapFileReader(
baseFileReader,
HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(bootstrapFileConfig, bootstrapFilePath),
HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(writeConfig, bootstrapFileConfig, bootstrapFilePath),
mergeHandle.getPartitionFields(),
mergeHandle.getPartitionValues());
recordSchema = mergeHandle.getWriterSchemaWithMetaFields();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,43 @@
package org.apache.hudi.testutils;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.model.HoodieRecord.COMMIT_SEQNO_METADATA_FIELD;
import static org.apache.hudi.common.model.HoodieRecord.COMMIT_TIME_METADATA_FIELD;
import static org.apache.hudi.common.model.HoodieRecord.FILENAME_METADATA_FIELD;
import static org.apache.hudi.common.model.HoodieRecord.OPERATION_METADATA_FIELD;
import static org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
import static org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.HOODIE_CONSUME_COMMIT;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -126,8 +136,22 @@ public static Map<String, GenericRecord> getRecordsMap(HoodieWriteConfig config,
.map(partitionPath -> Paths.get(config.getBasePath(), partitionPath).toString())
.collect(Collectors.toList());
return HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
hadoopConf, fullPartitionPaths, config.getBasePath(), jobConf, true).stream()
hadoopConf, fullPartitionPaths, config.getBasePath(), jobConf, true).stream()
.collect(Collectors.toMap(rec -> rec.get(RECORD_KEY_METADATA_FIELD).toString(), Function.identity()));
}

public static Stream<GenericRecord> readHFile(Configuration conf, String[] paths) {
List<GenericRecord> valuesAsList = new LinkedList<>();
for (String path : paths) {
try (HoodieAvroHFileReaderImplBase reader = (HoodieAvroHFileReaderImplBase)
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
.getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, conf, new Path(path), HoodieFileFormat.HFILE)) {
valuesAsList.addAll(HoodieAvroHFileReaderImplBase.readAllRecords(reader)
.stream().map(e -> (GenericRecord) e).collect(Collectors.toList()));
} catch (IOException e) {
throw new HoodieException("Error reading HFile " + path, e);
}
}
return valuesAsList.stream();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ private List<HoodieRecord<T>> readRecordsForGroupWithLogs(List<ClusteringOperati

baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
? Option.empty()
: Option.of(HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())));
: Option.of(HoodieFileReaderFactory.getReaderFactory(recordType)
.getFileReader(config, table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())));
HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
Iterator<HoodieRecord<T>> fileSliceReader = new HoodieFileSliceReader(baseFileReader, scanner, readerSchema, tableConfig.getPreCombineField(), writeConfig.getRecordMerger(),
tableConfig.getProps(),
Expand Down Expand Up @@ -221,7 +222,8 @@ private List<HoodieRecord<T>> readRecordsForGroupWithLogs(List<ClusteringOperati
private List<HoodieRecord<T>> readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
List<HoodieRecord<T>> records = new ArrayList<>();
clusteringOps.forEach(clusteringOp -> {
try (HoodieFileReader baseFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath()))) {
try (HoodieFileReader baseFileReader = HoodieFileReaderFactory.getReaderFactory(recordType)
.getFileReader(getHoodieTable().getConfig(), getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath()))) {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
Iterator<HoodieRecord> recordIterator = baseFileReader.getRecordIterator(readerSchema);
// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.storage.HoodieAvroHFileReader;
import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
Expand All @@ -110,7 +111,6 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.util.Time;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.MessageType;
Expand Down Expand Up @@ -539,9 +539,10 @@ public void testVirtualKeysInBaseFiles() throws Exception {
table.getHoodieView().sync();
List<FileSlice> fileSlices = table.getSliceView().getLatestFileSlices("files").collect(Collectors.toList());
HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
HoodieAvroHFileReader hoodieHFileReader = new HoodieAvroHFileReader(context.getHadoopConf().get(), new Path(baseFile.getPath()),
new CacheConfig(context.getHadoopConf().get()));
List<IndexedRecord> records = HoodieAvroHFileReader.readAllRecords(hoodieHFileReader);
HoodieAvroHFileReaderImplBase hoodieHFileReader = (HoodieAvroHFileReaderImplBase)
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
writeConfig, context.getHadoopConf().get(), new Path(baseFile.getPath()));
List<IndexedRecord> records = HoodieAvroHFileReaderImplBase.readAllRecords(hoodieHFileReader);
records.forEach(entry -> {
if (populateMetaFields) {
assertNotNull(((GenericRecord) entry).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
Expand Down Expand Up @@ -959,10 +960,10 @@ private void verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(HoodieTable tabl
}
final HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();

HoodieAvroHFileReader hoodieHFileReader = new HoodieAvroHFileReader(context.getHadoopConf().get(),
new Path(baseFile.getPath()),
new CacheConfig(context.getHadoopConf().get()));
List<IndexedRecord> records = HoodieAvroHFileReader.readAllRecords(hoodieHFileReader);
HoodieAvroHFileReaderImplBase hoodieHFileReader = (HoodieAvroHFileReaderImplBase)
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
table.getConfig(), context.getHadoopConf().get(), new Path(baseFile.getPath()));
List<IndexedRecord> records = HoodieAvroHFileReaderImplBase.readAllRecords(hoodieHFileReader);
records.forEach(entry -> {
if (enableMetaFields) {
assertNotNull(((GenericRecord) entry).get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.hudi.testutils;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
Expand Down Expand Up @@ -65,7 +64,6 @@
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.JavaHoodieIndexFactory;
import org.apache.hudi.io.storage.HoodieHFileUtils;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieTableMetadata;
Expand All @@ -76,17 +74,12 @@
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.utils.HoodieWriterClientTestHarness;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -98,7 +91,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -109,9 +101,8 @@
import java.util.stream.Stream;

import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.io.storage.HoodieAvroHFileReader.SCHEMA_KEY;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.apache.hudi.testutils.GenericRecordValidationTestUtils.readHFile;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertLinesMatch;
Expand Down Expand Up @@ -978,7 +969,7 @@ public long countRecordsOptionallySince(String basePath, HoodieTimeline commitTi
}
}).count();
} else if (paths[0].endsWith(HoodieFileFormat.HFILE.getFileExtension())) {
Stream<GenericRecord> genericRecordStream = readHFile(paths);
Stream<GenericRecord> genericRecordStream = readHFile(context.getHadoopConf().get(), paths);
if (lastCommitTimeOpt.isPresent()) {
return genericRecordStream.filter(gr -> HoodieTimeline.compareTimestamps(lastCommitTimeOpt.get(), HoodieActiveTimeline.LESSER_THAN,
gr.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString()))
Expand All @@ -993,38 +984,6 @@ public long countRecordsOptionallySince(String basePath, HoodieTimeline commitTi
}
}

public Stream<GenericRecord> readHFile(String[] paths) {
// TODO: this should be ported to use HoodieStorageReader
List<GenericRecord> valuesAsList = new LinkedList<>();

FileSystem fs = HadoopFSUtils.getFs(paths[0], context.getHadoopConf().get());
CacheConfig cacheConfig = new CacheConfig(fs.getConf());
Schema schema = null;
for (String path : paths) {
try {
HFile.Reader reader =
HoodieHFileUtils.createHFileReader(fs, new Path(path), cacheConfig, fs.getConf());
if (schema == null) {
schema = new Schema.Parser().parse(new String(reader.getHFileInfo().get(getUTF8Bytes(SCHEMA_KEY))));
}
HFileScanner scanner = reader.getScanner(false, false);
if (!scanner.seekTo()) {
// EOF reached
continue;
}

do {
Cell c = scanner.getCell();
byte[] value = Arrays.copyOfRange(c.getValueArray(), c.getValueOffset(), c.getValueOffset() + c.getValueLength());
valuesAsList.add(HoodieAvroUtils.bytesToAvro(value, schema));
} while (scanner.next());
} catch (IOException e) {
throw new HoodieException("Error reading hfile " + path + " as a dataframe", e);
}
}
return valuesAsList.stream();
}

public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, HoodieIndex.IndexType indexType,
HoodieFailedWritesCleaningPolicy cleaningPolicy) {
HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder().withPath(basePath)
Expand Down
Loading

0 comments on commit a058344

Please sign in to comment.