Skip to content

Commit

Permalink
[HUDI-7342] Use BaseFileUtils to hide format-specific logic in Hoodie…
Browse files Browse the repository at this point in the history
…PartitionMetadata (#10568)
  • Loading branch information
yihua committed Feb 27, 2024
1 parent 005c758 commit e00e2d7
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,26 @@

package org.apache.hudi.common.model;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.util.AvroOrcUtils;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.orc.OrcFile;
import org.apache.orc.Writer;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;

/**
* The metadata that goes into the meta file in each partition.
*/
Expand Down Expand Up @@ -152,34 +138,7 @@ private String getMetafileExtension() {
*/
private void writeMetafile(Path filePath) throws IOException {
if (format.isPresent()) {
Schema schema = HoodieAvroUtils.getRecordKeySchema();

switch (format.get()) {
case PARQUET:
// Since we are only interested in saving metadata to the footer, the schema, blocksizes and other
// parameters are not important.
MessageType type = Types.buildMessage().optional(PrimitiveTypeName.INT64).named("dummyint").named("dummy");
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(type, schema, Option.empty(), new Properties());
try (ParquetWriter writer = new ParquetWriter(filePath, writeSupport, CompressionCodecName.UNCOMPRESSED, 1024, 1024)) {
for (String key : props.stringPropertyNames()) {
writeSupport.addFooterMetadata(key, props.getProperty(key));
}
}
break;
case ORC:
// Since we are only interested in saving metadata to the footer, the schema, blocksizes and other
// parameters are not important.
OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(fs.getConf()).fileSystem(fs)
.setSchema(AvroOrcUtils.createOrcSchema(schema));
try (Writer writer = OrcFile.createWriter(filePath, writerOptions)) {
for (String key : props.stringPropertyNames()) {
writer.addUserMetadata(key, ByteBuffer.wrap(getUTF8Bytes(props.getProperty(key))));
}
}
break;
default:
throw new HoodieException("Unsupported format for partition metafiles: " + format.get());
}
BaseFileUtils.getInstance(format.get()).writeMetaFile(fs, filePath, props);
} else {
// Backwards compatible properties file format
FSDataOutputStream os = fs.create(filePath, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

/**
Expand Down Expand Up @@ -216,4 +219,16 @@ public abstract Map<String, String> readFooter(Configuration configuration, bool
* @return The subclass's {@link HoodieFileFormat}.
*/
public abstract HoodieFileFormat getFormat();

/**
* Writes properties to the meta file.
*
* @param fs {@link FileSystem} instance.
* @param filePath file path to write to.
* @param props properties to write.
* @throws IOException upon write error.
*/
public abstract void writeMetaFile(FileSystem fs,
Path filePath,
Properties props) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
Expand All @@ -41,6 +42,7 @@
import org.apache.orc.Reader.Options;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -50,10 +52,12 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.BinaryUtil.toBytes;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;

/**
* Utility functions for ORC files.
Expand Down Expand Up @@ -265,4 +269,18 @@ public long getRowCount(Configuration conf, Path orcFilePath) {
throw new HoodieIOException("Unable to get row count for ORC file:" + orcFilePath, io);
}
}

@Override
public void writeMetaFile(FileSystem fs, Path filePath, Properties props) throws IOException {
// Since we are only interested in saving metadata to the footer, the schema, blocksizes and other
// parameters are not important.
Schema schema = HoodieAvroUtils.getRecordKeySchema();
OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(fs.getConf()).fileSystem(fs)
.setSchema(AvroOrcUtils.createOrcSchema(schema));
try (Writer writer = OrcFile.createWriter(filePath, writerOptions)) {
for (String key : props.stringPropertyNames()) {
writer.addUserMetadata(key, ByteBuffer.wrap(getUTF8Bytes(props.getProperty(key))));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.common.util;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
Expand All @@ -32,20 +33,24 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -59,6 +64,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collector;
Expand Down Expand Up @@ -280,6 +286,23 @@ public long getRowCount(Configuration conf, Path parquetFilePath) {
return rowCount;
}

@Override
public void writeMetaFile(FileSystem fs, Path filePath, Properties props) throws IOException {
// Since we are only interested in saving metadata to the footer, the schema, blocksizes and other
// parameters are not important.
Schema schema = HoodieAvroUtils.getRecordKeySchema();
MessageType type = Types.buildMessage()
.optional(PrimitiveType.PrimitiveTypeName.INT64).named("dummyint").named("dummy");
HoodieAvroWriteSupport writeSupport =
new HoodieAvroWriteSupport(type, schema, Option.empty(), new Properties());
try (ParquetWriter writer = new ParquetWriter(
filePath, writeSupport, CompressionCodecName.UNCOMPRESSED, 1024, 1024)) {
for (String key : props.stringPropertyNames()) {
writeSupport.addFooterMetadata(key, props.getProperty(key));
}
}
}

static class RecordKeysFilterFunction implements Function<String, Boolean> {

private final Set<String> candidateKeys;
Expand Down

0 comments on commit e00e2d7

Please sign in to comment.