From e00e2d7e896ba4d75a5578ee69f4ce653e050008 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Sun, 28 Jan 2024 23:42:07 -0800 Subject: [PATCH] [HUDI-7342] Use BaseFileUtils to hide format-specific logic in HoodiePartitionMetadata (#10568) --- .../common/model/HoodiePartitionMetadata.java | 43 +------------------ .../hudi/common/util/BaseFileUtils.java | 15 +++++++ .../org/apache/hudi/common/util/OrcUtils.java | 18 ++++++++ .../apache/hudi/common/util/ParquetUtils.java | 23 ++++++++++ 4 files changed, 57 insertions(+), 42 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java index ad5912ba8b9c..2b63433bef46 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java @@ -18,40 +18,26 @@ package org.apache.hudi.common.model; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.avro.HoodieAvroWriteSupport; -import org.apache.hudi.common.util.AvroOrcUtils; import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; -import org.apache.avro.Schema; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.orc.OrcFile; -import org.apache.orc.Writer; -import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; -import org.apache.parquet.schema.Types; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; - /** * The metadata that goes into the meta file in each partition. */ @@ -152,34 +138,7 @@ private String getMetafileExtension() { */ private void writeMetafile(Path filePath) throws IOException { if (format.isPresent()) { - Schema schema = HoodieAvroUtils.getRecordKeySchema(); - - switch (format.get()) { - case PARQUET: - // Since we are only interested in saving metadata to the footer, the schema, blocksizes and other - // parameters are not important. - MessageType type = Types.buildMessage().optional(PrimitiveTypeName.INT64).named("dummyint").named("dummy"); - HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(type, schema, Option.empty(), new Properties()); - try (ParquetWriter writer = new ParquetWriter(filePath, writeSupport, CompressionCodecName.UNCOMPRESSED, 1024, 1024)) { - for (String key : props.stringPropertyNames()) { - writeSupport.addFooterMetadata(key, props.getProperty(key)); - } - } - break; - case ORC: - // Since we are only interested in saving metadata to the footer, the schema, blocksizes and other - // parameters are not important. - OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(fs.getConf()).fileSystem(fs) - .setSchema(AvroOrcUtils.createOrcSchema(schema)); - try (Writer writer = OrcFile.createWriter(filePath, writerOptions)) { - for (String key : props.stringPropertyNames()) { - writer.addUserMetadata(key, ByteBuffer.wrap(getUTF8Bytes(props.getProperty(key)))); - } - } - break; - default: - throw new HoodieException("Unsupported format for partition metafiles: " + format.get()); - } + BaseFileUtils.getInstance(format.get()).writeMetaFile(fs, filePath, props); } else { // Backwards compatible properties file format FSDataOutputStream os = fs.create(filePath, true); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java index d402f58a40a1..dd2eb7ad5c0f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java @@ -33,11 +33,14 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; /** @@ -216,4 +219,16 @@ public abstract Map readFooter(Configuration configuration, bool * @return The subclass's {@link HoodieFileFormat}. */ public abstract HoodieFileFormat getFormat(); + + /** + * Writes properties to the meta file. + * + * @param fs {@link FileSystem} instance. + * @param filePath file path to write to. + * @param props properties to write. + * @throws IOException upon write error. + */ + public abstract void writeMetaFile(FileSystem fs, + Path filePath, + Properties props) throws IOException; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java index 66e9ab237fcc..0d3342626ae3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java @@ -32,6 +32,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; @@ -41,6 +42,7 @@ import org.apache.orc.Reader.Options; import org.apache.orc.RecordReader; import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; import java.io.IOException; import java.nio.ByteBuffer; @@ -50,10 +52,12 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; import static org.apache.hudi.common.util.BinaryUtil.toBytes; +import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; /** * Utility functions for ORC files. @@ -265,4 +269,18 @@ public long getRowCount(Configuration conf, Path orcFilePath) { throw new HoodieIOException("Unable to get row count for ORC file:" + orcFilePath, io); } } + + @Override + public void writeMetaFile(FileSystem fs, Path filePath, Properties props) throws IOException { + // Since we are only interested in saving metadata to the footer, the schema, blocksizes and other + // parameters are not important. + Schema schema = HoodieAvroUtils.getRecordKeySchema(); + OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(fs.getConf()).fileSystem(fs) + .setSchema(AvroOrcUtils.createOrcSchema(schema)); + try (Writer writer = OrcFile.createWriter(filePath, writerOptions)) { + for (String key : props.stringPropertyNames()) { + writer.addUserMetadata(key, ByteBuffer.wrap(getUTF8Bytes(props.getProperty(key)))); + } + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index a1e51cd69d42..0a4c5691df31 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.util; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieKey; @@ -32,6 +33,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.avro.AvroReadSupport; @@ -39,13 +41,16 @@ import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.DecimalMetadata; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,6 +64,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.function.Function; import java.util.stream.Collector; @@ -280,6 +286,23 @@ public long getRowCount(Configuration conf, Path parquetFilePath) { return rowCount; } + @Override + public void writeMetaFile(FileSystem fs, Path filePath, Properties props) throws IOException { + // Since we are only interested in saving metadata to the footer, the schema, blocksizes and other + // parameters are not important. + Schema schema = HoodieAvroUtils.getRecordKeySchema(); + MessageType type = Types.buildMessage() + .optional(PrimitiveType.PrimitiveTypeName.INT64).named("dummyint").named("dummy"); + HoodieAvroWriteSupport writeSupport = + new HoodieAvroWriteSupport(type, schema, Option.empty(), new Properties()); + try (ParquetWriter writer = new ParquetWriter( + filePath, writeSupport, CompressionCodecName.UNCOMPRESSED, 1024, 1024)) { + for (String key : props.stringPropertyNames()) { + writeSupport.addFooterMetadata(key, props.getProperty(key)); + } + } + } + static class RecordKeysFilterFunction implements Function { private final Set candidateKeys;