From 686f07147499f06eeb78c9643aba499e20002676 Mon Sep 17 00:00:00 2001 From: Maksim Konstantinov Date: Wed, 13 Nov 2024 07:09:08 -0800 Subject: [PATCH] GH-3035: ParquetRewriter: Add a column renaming feature (#3036) --- .../hadoop/rewrite/ParquetRewriter.java | 137 ++++++++-- .../hadoop/rewrite/RewriteOptions.java | 81 ++++-- .../hadoop/rewrite/ParquetRewriterTest.java | 251 +++++++++++++----- 3 files changed, 363 insertions(+), 106 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index 2ff9c0ea34..9535b4335d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -109,7 +109,7 @@ * Please note the schema of all inputFiles must be the same, otherwise the rewrite will fail. *

*

Applying column transformations

- * Some supported column transformations: pruning, masking, encrypting, changing a codec. + * Some supported column transformations: pruning, masking, renaming, encrypting, changing a codec. * See {@link RewriteOptions} and {@link RewriteOptions.Builder} for the full list with description. *

*

Joining with extra files with a different schema

@@ -149,18 +149,23 @@ public class ParquetRewriter implements Closeable { private final IndexCache.CacheStrategy indexCacheStrategy; private final boolean overwriteInputWithJoinColumns; private final InternalFileEncryptor nullColumnEncryptor; + private final Map renamedColumns; public ParquetRewriter(RewriteOptions options) throws IOException { this.newCodecName = options.getNewCodecName(); this.indexCacheStrategy = options.getIndexCacheStrategy(); this.overwriteInputWithJoinColumns = options.getOverwriteInputWithJoinColumns(); + this.renamedColumns = options.getRenameColumns(); ParquetConfiguration conf = options.getParquetConfiguration(); - OutputFile out = options.getParquetOutputFile(); - inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf)); - inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(), conf)); + this.inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf)); + this.inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(), conf)); + this.outSchema = pruneColumnsInSchema(getSchema(), options.getPruneColumns()); + this.extraMetaData = getExtraMetadata(options); ensureSameSchema(inputFiles); ensureSameSchema(inputFilesToJoin); ensureRowCount(); + ensureRenamingCorrectness(outSchema, renamedColumns); + OutputFile out = options.getParquetOutputFile(); LOG.info( "Start rewriting {} input file(s) {} to {}", inputFiles.size() + inputFilesToJoin.size(), @@ -168,9 +173,6 @@ public ParquetRewriter(RewriteOptions options) throws IOException { .collect(Collectors.toList()), out); - this.outSchema = pruneColumnsInSchema(getSchema(), options.getPruneColumns()); - this.extraMetaData = getExtraMetadata(options); - if (options.getMaskColumns() != null) { this.maskColumns = new HashMap<>(); for (Map.Entry col : options.getMaskColumns().entrySet()) { @@ -184,9 +186,9 @@ public ParquetRewriter(RewriteOptions options) throws IOException { } ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE; - writer = new ParquetFileWriter( + this.writer = new ParquetFileWriter( out, - outSchema, + renamedColumns.isEmpty() ? outSchema : getSchemaWithRenamedColumns(this.outSchema), writerMode, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, @@ -200,7 +202,8 @@ public ParquetRewriter(RewriteOptions options) throws IOException { this.nullColumnEncryptor = null; } else { this.nullColumnEncryptor = new InternalFileEncryptor(options.getFileEncryptionProperties()); - List columns = outSchema.getColumns(); + List columns = + getSchemaWithRenamedColumns(this.outSchema).getColumns(); for (int i = 0; i < columns.size(); i++) { writer.getEncryptor() .getColumnSetup(ColumnPath.get(columns.get(i).getPath()), true, i); @@ -223,8 +226,8 @@ public ParquetRewriter( this.writer = writer; this.outSchema = outSchema; this.newCodecName = codecName; - extraMetaData = new HashMap<>(meta.getFileMetaData().getKeyValueMetaData()); - extraMetaData.put( + this.extraMetaData = new HashMap<>(meta.getFileMetaData().getKeyValueMetaData()); + this.extraMetaData.put( ORIGINAL_CREATED_BY_KEY, originalCreatedBy != null ? originalCreatedBy @@ -239,6 +242,7 @@ public ParquetRewriter( this.indexCacheStrategy = IndexCache.CacheStrategy.NONE; this.overwriteInputWithJoinColumns = false; this.nullColumnEncryptor = null; + this.renamedColumns = new HashMap<>(); } private MessageType getSchema() { @@ -266,6 +270,27 @@ private MessageType getSchema() { } } + private MessageType getSchemaWithRenamedColumns(MessageType schema) { + List fields = schema.getFields().stream() + .map(type -> { + if (!renamedColumns.containsKey(type.getName())) { + return type; + } else if (type.isPrimitive()) { + return new PrimitiveType( + type.getRepetition(), + type.asPrimitiveType().getPrimitiveTypeName(), + renamedColumns.get(type.getName())); + } else { + return new GroupType( + type.getRepetition(), + renamedColumns.get(type.getName()), + type.asGroupType().getFields()); + } + }) + .collect(Collectors.toList()); + return new MessageType(schema.getName(), fields); + } + private Map getExtraMetadata(RewriteOptions options) { List allFiles; if (options.getIgnoreJoinFilesMetadata()) { @@ -338,6 +363,21 @@ private void ensureSameSchema(Queue inputFileReaders) { } } + private void ensureRenamingCorrectness(MessageType schema, Map renameMap) { + Set columns = schema.getFields().stream().map(Type::getName).collect(Collectors.toSet()); + renameMap.forEach((src, dst) -> { + if (!columns.contains(src)) { + String msg = String.format("Column to rename '%s' is not found in input files schema", src); + LOG.error(msg); + throw new IllegalArgumentException(msg); + } else if (columns.contains(dst)) { + String msg = String.format("Renamed column target name '%s' is already present in a schema", dst); + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + }); + } + @Override public void close() throws IOException { writer.end(extraMetaData); @@ -421,6 +461,27 @@ public void processBlocks() throws IOException { if (readerToJoin != null) readerToJoin.close(); } + private ColumnPath normalizeFieldsInPath(ColumnPath path) { + if (renamedColumns.isEmpty()) { + return path; + } else { + String[] pathArray = path.toArray(); + pathArray[0] = renamedColumns.getOrDefault(pathArray[0], pathArray[0]); + return ColumnPath.get(pathArray); + } + } + + private PrimitiveType normalizeNameInType(PrimitiveType type) { + if (renamedColumns.isEmpty()) { + return type; + } else { + return new PrimitiveType( + type.getRepetition(), + type.asPrimitiveType().getPrimitiveTypeName(), + renamedColumns.getOrDefault(type.getName(), type.getName())); + } + } + private void processBlock( TransParquetFileReader reader, int blockIdx, @@ -431,7 +492,28 @@ private void processBlock( if (chunk.isEncrypted()) { throw new IOException("Column " + chunk.getPath().toDotString() + " is already encrypted"); } - ColumnDescriptor descriptor = outSchema.getColumns().get(outColumnIdx); + + ColumnChunkMetaData chunkNormalized = chunk; + if (!renamedColumns.isEmpty()) { + // Keep an eye if this get stale because of ColumnChunkMetaData change + chunkNormalized = ColumnChunkMetaData.get( + normalizeFieldsInPath(chunk.getPath()), + normalizeNameInType(chunk.getPrimitiveType()), + chunk.getCodec(), + chunk.getEncodingStats(), + chunk.getEncodings(), + chunk.getStatistics(), + chunk.getFirstDataPageOffset(), + chunk.getDictionaryPageOffset(), + chunk.getValueCount(), + chunk.getTotalSize(), + chunk.getTotalUncompressedSize(), + chunk.getSizeStatistics()); + } + + ColumnDescriptor descriptorOriginal = outSchema.getColumns().get(outColumnIdx); + ColumnDescriptor descriptorRenamed = + getSchemaWithRenamedColumns(outSchema).getColumns().get(outColumnIdx); BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(blockIdx); String originalCreatedBy = reader.getFileMetaData().getCreatedBy(); @@ -443,13 +525,21 @@ private void processBlock( // Mask column and compress it again. MaskMode maskMode = maskColumns.get(chunk.getPath()); if (maskMode.equals(MaskMode.NULLIFY)) { - Type.Repetition repetition = descriptor.getPrimitiveType().getRepetition(); + Type.Repetition repetition = + descriptorOriginal.getPrimitiveType().getRepetition(); if (repetition.equals(Type.Repetition.REQUIRED)) { - throw new IOException( - "Required column [" + descriptor.getPrimitiveType().getName() + "] cannot be nullified"); + throw new IOException("Required column [" + + descriptorOriginal.getPrimitiveType().getName() + "] cannot be nullified"); } nullifyColumn( - reader, blockIdx, descriptor, chunk, writer, newCodecName, encryptColumn, originalCreatedBy); + reader, + blockIdx, + descriptorOriginal, + chunk, + writer, + newCodecName, + encryptColumn, + originalCreatedBy); } else { throw new UnsupportedOperationException("Only nullify is supported for now"); } @@ -462,7 +552,7 @@ private void processBlock( } // Translate compression and/or encryption - writer.startColumn(descriptor, chunk.getValueCount(), newCodecName); + writer.startColumn(descriptorRenamed, chunk.getValueCount(), newCodecName); processChunk( reader, blockMetaData.getRowCount(), @@ -480,7 +570,8 @@ private void processBlock( BloomFilter bloomFilter = indexCache.getBloomFilter(chunk); ColumnIndex columnIndex = indexCache.getColumnIndex(chunk); OffsetIndex offsetIndex = indexCache.getOffsetIndex(chunk); - writer.appendColumnChunk(descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, offsetIndex); + writer.appendColumnChunk( + descriptorRenamed, reader.getStream(), chunkNormalized, bloomFilter, columnIndex, offsetIndex); } } @@ -522,7 +613,7 @@ private void processChunk( } if (bloomFilter != null) { - writer.addBloomFilter(chunk.getPath().toDotString(), bloomFilter); + writer.addBloomFilter(normalizeFieldsInPath(chunk.getPath()).toDotString(), bloomFilter); } reader.setStreamPosition(chunk.getStartingPos()); @@ -580,7 +671,7 @@ private void processChunk( dataPageAAD); statistics = convertStatistics( originalCreatedBy, - chunk.getPrimitiveType(), + normalizeNameInType(chunk.getPrimitiveType()), headerV1.getStatistics(), columnIndex, pageOrdinal, @@ -648,7 +739,7 @@ private void processChunk( dataPageAAD); statistics = convertStatistics( originalCreatedBy, - chunk.getPrimitiveType(), + normalizeNameInType(chunk.getPrimitiveType()), headerV2.getStatistics(), columnIndex, pageOrdinal, @@ -887,7 +978,7 @@ private void nullifyColumn( CompressionCodecFactory.BytesInputCompressor compressor = codecFactory.getCompressor(newCodecName); // Create new schema that only has the current column - MessageType newSchema = newSchema(outSchema, descriptor); + MessageType newSchema = getSchemaWithRenamedColumns(newSchema(outSchema, descriptor)); ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore( compressor, newSchema, diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java index a69403f464..f85b65ea3d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java @@ -20,8 +20,11 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -49,6 +52,7 @@ public class RewriteOptions { private final List pruneColumns; private final CompressionCodecName newCodecName; private final Map maskColumns; + private final Map renameColumns; private final List encryptColumns; private final FileEncryptionProperties fileEncryptionProperties; private final IndexCache.CacheStrategy indexCacheStrategy; @@ -63,6 +67,7 @@ private RewriteOptions( List pruneColumns, CompressionCodecName newCodecName, Map maskColumns, + Map renameColumns, List encryptColumns, FileEncryptionProperties fileEncryptionProperties, IndexCache.CacheStrategy indexCacheStrategy, @@ -75,6 +80,7 @@ private RewriteOptions( this.pruneColumns = pruneColumns; this.newCodecName = newCodecName; this.maskColumns = maskColumns; + this.renameColumns = renameColumns; this.encryptColumns = encryptColumns; this.fileEncryptionProperties = fileEncryptionProperties; this.indexCacheStrategy = indexCacheStrategy; @@ -192,6 +198,10 @@ public Map getMaskColumns() { return maskColumns; } + public Map getRenameColumns() { + return renameColumns; + } + public List getEncryptColumns() { return encryptColumns; } @@ -221,6 +231,7 @@ public static class Builder { private List pruneColumns; private CompressionCodecName newCodecName; private Map maskColumns; + private Map renameColumns; private List encryptColumns; private FileEncryptionProperties fileEncryptionProperties; private IndexCache.CacheStrategy indexCacheStrategy = IndexCache.CacheStrategy.NONE; @@ -432,6 +443,19 @@ public Builder mask(Map maskColumns) { return this; } + /** + * Set the columns to be renamed. + *

+ * Note that nested columns can't be renamed, in case of GroupType column only top level column can be renamed. + * + * @param renameColumns map where keys are original names and values are new names + * @return self + */ + public Builder renameColumns(Map renameColumns) { + this.renameColumns = renameColumns; + return this; + } + /** * Set the columns to encrypt. *

@@ -551,6 +575,28 @@ public Builder ignoreJoinFilesMetadata(boolean ignoreJoinFilesMetadata) { * @return a RewriterOptions */ public RewriteOptions build() { + checkPreconditions(); + return new RewriteOptions( + conf, + inputFiles, + (inputFilesToJoin != null ? inputFilesToJoin : new ArrayList<>()), + outputFile, + pruneColumns, + newCodecName, + maskColumns, + renameColumns == null + ? new HashMap<>() + : renameColumns.entrySet().stream() + .collect(Collectors.toMap(x -> x.getKey().trim(), x -> x.getValue() + .trim())), + encryptColumns, + fileEncryptionProperties, + indexCacheStrategy, + overwriteInputWithJoinColumns, + ignoreJoinFilesMetadata); + } + + private void checkPreconditions() { Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(), "Input file is required"); Preconditions.checkArgument(outputFile != null, "Output file is required"); @@ -561,7 +607,6 @@ public RewriteOptions build() { !maskColumns.containsKey(pruneColumn), "Cannot prune and mask same column"); } } - if (encryptColumns != null) { for (String pruneColumn : pruneColumns) { Preconditions.checkArgument( @@ -570,6 +615,26 @@ public RewriteOptions build() { } } + if (renameColumns != null) { + Set nullifiedColumns = maskColumns == null + ? new HashSet<>() + : maskColumns.entrySet().stream() + .filter(x -> x.getValue() == MaskMode.NULLIFY) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + renameColumns.forEach((colSrc, colDst) -> { + Preconditions.checkArgument( + colSrc != null && !colSrc.trim().isEmpty(), "Renamed column source name can't be empty"); + Preconditions.checkArgument( + colDst != null && !colDst.trim().isEmpty(), "Renamed column target name can't be empty"); + Preconditions.checkArgument( + !nullifiedColumns.contains(colSrc), "Cannot nullify and rename the same column"); + Preconditions.checkArgument( + !colSrc.contains(".") && !colDst.contains("."), + "Renamed column can't be nested, in case of GroupType column only a top level column can be renamed"); + }); + } + if (encryptColumns != null && !encryptColumns.isEmpty()) { Preconditions.checkArgument( fileEncryptionProperties != null, @@ -581,20 +646,6 @@ public RewriteOptions build() { encryptColumns != null && !encryptColumns.isEmpty(), "Encrypt columns is required when FileEncryptionProperties is set"); } - - return new RewriteOptions( - conf, - inputFiles, - (inputFilesToJoin != null ? inputFilesToJoin : new ArrayList<>()), - outputFile, - pruneColumns, - newCodecName, - maskColumns, - encryptColumns, - fileEncryptionProperties, - indexCacheStrategy, - overwriteInputWithJoinColumns, - ignoreJoinFilesMetadata); } } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java index 34c90a4641..c1da97c403 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java @@ -18,6 +18,7 @@ */ package org.apache.parquet.hadoop.rewrite; +import static java.util.Collections.emptyMap; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; @@ -181,10 +182,10 @@ private void testPruneSingleColumnTranslateCodec(List inputPaths) throws E null); // Verify the data are not changed for the columns not pruned - validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), null, false); + validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), null, false, emptyMap()); // Verify the page index - validatePageIndex(new HashSet<>(), false); + validatePageIndex(new HashSet<>(), false, emptyMap()); // Verify original.created.by is preserved validateCreatedBy(); @@ -199,7 +200,7 @@ public void setUp() { @Test public void testPruneSingleColumnTranslateCodecSingleFile() throws Exception { - ensureContainsGzipFile(); + addGzipInputFile(); List inputPaths = new ArrayList() { { add(new Path(inputFiles.get(0).getFileName())); @@ -210,8 +211,8 @@ public void testPruneSingleColumnTranslateCodecSingleFile() throws Exception { @Test public void testPruneSingleColumnTranslateCodecTwoFiles() throws Exception { - ensureContainsGzipFile(); - ensureContainsUncompressedFile(); + addGzipInputFile(); + addUncompressedInputFile(); List inputPaths = new ArrayList() { { add(new Path(inputFiles.get(0).getFileName())); @@ -252,10 +253,10 @@ private void testPruneNullifyTranslateCodec(List inputPaths) throws Except null); // Verify the data are not changed for the columns not pruned - validateColumnData(new HashSet<>(pruneColumns), maskColumns.keySet(), null, false); + validateColumnData(new HashSet<>(pruneColumns), maskColumns.keySet(), null, false, emptyMap()); // Verify the page index - validatePageIndex(ImmutableSet.of("Links.Forward"), false); + validatePageIndex(ImmutableSet.of("Links.Forward"), false, emptyMap()); // Verify original.created.by is preserved validateCreatedBy(); @@ -264,7 +265,7 @@ private void testPruneNullifyTranslateCodec(List inputPaths) throws Except @Test public void testPruneNullifyTranslateCodecSingleFile() throws Exception { - ensureContainsGzipFile(); + addGzipInputFile(); List inputPaths = new ArrayList() { { @@ -276,8 +277,8 @@ public void testPruneNullifyTranslateCodecSingleFile() throws Exception { @Test public void testPruneNullifyTranslateCodecTwoFiles() throws Exception { - ensureContainsGzipFile(); - ensureContainsUncompressedFile(); + addGzipInputFile(); + addUncompressedInputFile(); List inputPaths = new ArrayList() { { @@ -327,7 +328,8 @@ private void testPruneEncryptTranslateCodec(List inputPaths) throws Except fileDecryptionProperties); // Verify the data are not changed for the columns not pruned - validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), fileDecryptionProperties, false); + validateColumnData( + new HashSet<>(pruneColumns), Collections.emptySet(), fileDecryptionProperties, false, emptyMap()); // Verify column encryption ParquetMetadata metaData = getFileMetaData(outputFile, fileDecryptionProperties); @@ -349,7 +351,7 @@ private void testPruneEncryptTranslateCodec(List inputPaths) throws Except @Test public void testPruneEncryptTranslateCodecSingleFile() throws Exception { - ensureContainsGzipFile(); + addGzipInputFile(); List inputPaths = new ArrayList() { { @@ -361,8 +363,8 @@ public void testPruneEncryptTranslateCodecSingleFile() throws Exception { @Test public void testPruneEncryptTranslateCodecTwoFiles() throws Exception { - ensureContainsGzipFile(); - ensureContainsUncompressedFile(); + addGzipInputFile(); + addUncompressedInputFile(); List inputPaths = new ArrayList() { { @@ -488,10 +490,10 @@ private void testNullifyAndEncryptColumn(List inputPaths) throws Exception // Verify the data are not changed for non-encrypted and non-masked columns. // Also make sure the masked column is nullified. - validateColumnData(Collections.emptySet(), maskColumns.keySet(), fileDecryptionProperties, false); + validateColumnData(Collections.emptySet(), maskColumns.keySet(), fileDecryptionProperties, false, emptyMap()); // Verify the page index - validatePageIndex(ImmutableSet.of("DocId", "Links.Forward"), false); + validatePageIndex(ImmutableSet.of("DocId", "Links.Forward"), false, emptyMap()); // Verify the column is encrypted ParquetMetadata metaData = getFileMetaData(outputFile, fileDecryptionProperties); @@ -511,7 +513,7 @@ private void testNullifyAndEncryptColumn(List inputPaths) throws Exception @Test public void testNullifyEncryptSingleFile() throws Exception { - ensureContainsGzipFile(); + addGzipInputFile(); List inputPaths = new ArrayList() { { @@ -523,8 +525,8 @@ public void testNullifyEncryptSingleFile() throws Exception { @Test public void testNullifyEncryptTwoFiles() throws Exception { - ensureContainsGzipFile(); - ensureContainsUncompressedFile(); + addGzipInputFile(); + addUncompressedInputFile(); List inputPaths = new ArrayList() { { @@ -537,8 +539,8 @@ public void testNullifyEncryptTwoFiles() throws Exception { @Test public void testMergeTwoFilesOnly() throws Exception { - ensureContainsGzipFile(); - ensureContainsUncompressedFile(); + addGzipInputFile(); + addUncompressedInputFile(); // Only merge two files but do not change anything. List inputPaths = new ArrayList<>(); @@ -571,27 +573,103 @@ public void testMergeTwoFilesOnly() throws Exception { null); // Verify the merged data are not changed - validateColumnData(Collections.emptySet(), Collections.emptySet(), null, false); + validateColumnData(Collections.emptySet(), Collections.emptySet(), null, false, emptyMap()); // Verify the page index - validatePageIndex(new HashSet<>(), false); + validatePageIndex(new HashSet<>(), false, emptyMap()); // Verify original.created.by is preserved validateCreatedBy(); validateRowGroupRowCount(); } + @Test + public void testMergeTwoFilesOnlyRenameColumn() throws Exception { + addGzipInputFile(); + addUncompressedInputFile(); + + Map renameColumns = ImmutableMap.of("Name", "NameRenamed"); + List pruneColumns = ImmutableList.of("Gender"); + String[] encryptColumns = {"DocId"}; + FileEncryptionProperties fileEncryptionProperties = + EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false); + List inputPaths = + inputFiles.stream().map(x -> new Path(x.getFileName())).collect(Collectors.toList()); + RewriteOptions.Builder builder = createBuilder(inputPaths); + RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy) + .renameColumns(ImmutableMap.of("Name", "NameRenamed")) + .prune(pruneColumns) + .transform(CompressionCodecName.SNAPPY) + .encrypt(Arrays.asList(encryptColumns)) + .encryptionProperties(fileEncryptionProperties) + .build(); + + rewriter = new ParquetRewriter(options); + rewriter.processBlocks(); + rewriter.close(); + + FileDecryptionProperties fileDecryptionProperties = EncDecProperties.getFileDecryptionProperties(); + + // Verify the schema is not changed + ParquetMetadata pmd = + ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER); + MessageType schema = pmd.getFileMetaData().getSchema(); + MessageType expectSchema = createSchemaWithRenamed(); + assertEquals(expectSchema, schema); + + verifyCodec(outputFile, ImmutableSet.of(CompressionCodecName.SNAPPY), fileDecryptionProperties); // Verify codec + // Verify the merged data are not changed + validateColumnData( + new HashSet<>(pruneColumns), Collections.emptySet(), fileDecryptionProperties, false, renameColumns); + validatePageIndex(ImmutableSet.of("DocId"), false, renameColumns); // Verify the page index + validateCreatedBy(); // Verify original.created.by is preserved + validateRowGroupRowCount(); + + ParquetMetadata metaData = getFileMetaData(outputFile, fileDecryptionProperties); + assertFalse(metaData.getBlocks().isEmpty()); + Set encryptedColumns = new HashSet<>(Arrays.asList(encryptColumns)); + for (BlockMetaData blockMetaData : metaData.getBlocks()) { + List columns = blockMetaData.getColumns(); + for (ColumnChunkMetaData column : columns) { + if (encryptedColumns.contains(column.getPath().toDotString())) { + assertTrue(column.isEncrypted()); + } else { + assertFalse(column.isEncrypted()); + } + } + } + } + @Test(expected = InvalidSchemaException.class) public void testMergeTwoFilesWithDifferentSchema() throws Exception { - testMergeTwoFilesWithDifferentSchemaSetup(true); + testMergeTwoFilesWithDifferentSchemaSetup(true, null, null); } @Test(expected = InvalidSchemaException.class) public void testMergeTwoFilesToJoinWithDifferentSchema() throws Exception { - testMergeTwoFilesWithDifferentSchemaSetup(false); + testMergeTwoFilesWithDifferentSchemaSetup(false, null, null); + } + + @Test(expected = IllegalArgumentException.class) + public void testMergeTwoFilesWithWrongDestinationRenamedColumn() throws Exception { + testMergeTwoFilesWithDifferentSchemaSetup( + null, ImmutableMap.of("WrongColumnName", "WrongColumnNameRenamed"), null); } - public void testMergeTwoFilesWithDifferentSchemaSetup(boolean wrongSchemaInInputFile) throws Exception { + @Test(expected = IllegalArgumentException.class) + public void testMergeTwoFilesWithWrongSourceRenamedColumn() throws Exception { + testMergeTwoFilesWithDifferentSchemaSetup(null, ImmutableMap.of("Name", "DocId"), null); + } + + @Test(expected = IllegalArgumentException.class) + public void testMergeTwoFilesNullifyAndRenamedSameColumn() throws Exception { + testMergeTwoFilesWithDifferentSchemaSetup( + null, ImmutableMap.of("Name", "NameRenamed"), ImmutableMap.of("Name", MaskMode.NULLIFY)); + } + + public void testMergeTwoFilesWithDifferentSchemaSetup( + Boolean wrongSchemaInInputFile, Map renameColumns, Map maskColumns) + throws Exception { MessageType schema1 = new MessageType( "schema", new PrimitiveType(OPTIONAL, INT64, "DocId"), @@ -620,27 +698,32 @@ public void testMergeTwoFilesWithDifferentSchemaSetup(boolean wrongSchemaInInput .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) .withWriterVersion(writerVersion) .build()); - if (wrongSchemaInInputFile) { - inputFiles.add(new TestFileBuilder(conf, schema2) - .withNumRecord(numRecord) - .withCodec("UNCOMPRESSED") - .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) - .withWriterVersion(writerVersion) - .build()); - } else { - inputFilesToJoin.add(new TestFileBuilder(conf, schema2) - .withNumRecord(numRecord) - .withCodec("UNCOMPRESSED") - .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) - .withWriterVersion(writerVersion) - .build()); + if (wrongSchemaInInputFile != null) { + if (wrongSchemaInInputFile) { + inputFiles.add(new TestFileBuilder(conf, schema2) + .withNumRecord(numRecord) + .withCodec("UNCOMPRESSED") + .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) + .withWriterVersion(writerVersion) + .build()); + } else { + inputFilesToJoin.add(new TestFileBuilder(conf, schema2) + .withNumRecord(numRecord) + .withCodec("UNCOMPRESSED") + .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) + .withWriterVersion(writerVersion) + .build()); + } } RewriteOptions.Builder builder = createBuilder( inputFiles.stream().map(x -> new Path(x.getFileName())).collect(Collectors.toList()), inputFilesToJoin.stream().map(x -> new Path(x.getFileName())).collect(Collectors.toList()), false); - RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy).build(); + RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy) + .renameColumns(renameColumns) + .mask(maskColumns) + .build(); // This should throw an exception because the schemas are different rewriter = new ParquetRewriter(options); @@ -648,7 +731,7 @@ public void testMergeTwoFilesWithDifferentSchemaSetup(boolean wrongSchemaInInput @Test public void testRewriteFileWithMultipleBlocks() throws Exception { - ensureContainsGzipFile(); + addGzipInputFile(); List inputPaths = new ArrayList() { { @@ -823,12 +906,13 @@ public void testOneInputFileManyInputFilesToJoinSetup(boolean joinColumnsOverwri new HashSet<>(pruneColumns), maskColumns.keySet(), fileDecryptionProperties, - joinColumnsOverwrite); // Verify data + joinColumnsOverwrite, + emptyMap()); // Verify data validateSchemaWithGenderColumnPruned(true); // Verify schema validateCreatedBy(); // Verify original.created.by assertEquals(inputBloomFilters.keySet(), rBloomFilters); // Verify bloom filters verifyCodec(outputFile, ImmutableSet.of(CompressionCodecName.ZSTD), fileDecryptionProperties); // Verify codec - validatePageIndex(ImmutableSet.of(encryptColumn), joinColumnsOverwrite); + validatePageIndex(ImmutableSet.of(encryptColumn), joinColumnsOverwrite, emptyMap()); } private void testOneInputFileManyInputFilesToJoinSetup() throws IOException { @@ -884,11 +968,26 @@ private MessageType createSchemaToJoin() { new PrimitiveType(REPEATED, BINARY, "Forward"))); } + private MessageType createSchemaWithRenamed() { + return new MessageType( + "schema", + new PrimitiveType(OPTIONAL, INT64, "DocId"), + new PrimitiveType(REQUIRED, BINARY, "NameRenamed"), + new PrimitiveType(REPEATED, FLOAT, "FloatFraction"), + new PrimitiveType(OPTIONAL, DOUBLE, "DoubleFraction"), + new GroupType( + OPTIONAL, + "Links", + new PrimitiveType(REPEATED, BINARY, "Backward"), + new PrimitiveType(REPEATED, BINARY, "Forward"))); + } + private void validateColumnData( Set prunePaths, Set nullifiedPaths, FileDecryptionProperties fileDecryptionProperties, - Boolean joinColumnsOverwrite) + Boolean joinColumnsOverwrite, + Map renameColumns) throws IOException { ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile)) .withConf(conf) @@ -901,7 +1000,7 @@ private void validateColumnData( List filesJoined = inputFilesToJoin.stream() .flatMap(x -> Arrays.stream(x.getFileContent())) .collect(Collectors.toList()); - BiFunction groups = (name, rowIdx) -> { + BiFunction groupsExpected = (name, rowIdx) -> { if (!filesMain.get(0).getType().containsField(name) || joinColumnsOverwrite && !filesJoined.isEmpty() @@ -915,50 +1014,53 @@ private void validateColumnData( int totalRows = inputFiles.stream().mapToInt(x -> x.getFileContent().length).sum(); for (int i = 0; i < totalRows; i++) { - Group group = reader.read(); - assertNotNull(group); + Group groupActual = reader.read(); + assertNotNull(groupActual); if (!prunePaths.contains("DocId")) { if (nullifiedPaths.contains("DocId")) { - assertThrows(RuntimeException.class, () -> group.getLong("DocId", 0)); + assertThrows(RuntimeException.class, () -> groupActual.getLong("DocId", 0)); } else { assertEquals( - group.getLong("DocId", 0), groups.apply("DocId", i).getLong("DocId", 0)); + groupActual.getLong("DocId", 0), + groupsExpected.apply("DocId", i).getLong("DocId", 0)); } } if (!prunePaths.contains("Name") && !nullifiedPaths.contains("Name")) { + String colName = renameColumns.getOrDefault("Name", "Name"); assertArrayEquals( - group.getBinary("Name", 0).getBytes(), - groups.apply("Name", i).getBinary("Name", 0).getBytes()); + groupActual.getBinary(colName, 0).getBytes(), + groupsExpected.apply("Name", i).getBinary("Name", 0).getBytes()); } if (!prunePaths.contains("Gender") && !nullifiedPaths.contains("Gender")) { assertArrayEquals( - group.getBinary("Gender", 0).getBytes(), - groups.apply("Gender", i).getBinary("Gender", 0).getBytes()); + groupActual.getBinary("Gender", 0).getBytes(), + groupsExpected.apply("Gender", i).getBinary("Gender", 0).getBytes()); } if (!prunePaths.contains("FloatFraction") && !nullifiedPaths.contains("FloatFraction")) { assertEquals( - group.getFloat("FloatFraction", 0), - groups.apply("FloatFraction", i).getFloat("FloatFraction", 0), + groupActual.getFloat("FloatFraction", 0), + groupsExpected.apply("FloatFraction", i).getFloat("FloatFraction", 0), 0); } if (!prunePaths.contains("DoubleFraction") && !nullifiedPaths.contains("DoubleFraction")) { assertEquals( - group.getDouble("DoubleFraction", 0), - groups.apply("DoubleFraction", i).getDouble("DoubleFraction", 0), + groupActual.getDouble("DoubleFraction", 0), + groupsExpected.apply("DoubleFraction", i).getDouble("DoubleFraction", 0), 0); } - Group subGroup = group.getGroup("Links", 0); + Group subGroup = groupActual.getGroup("Links", 0); if (!prunePaths.contains("Links.Backward") && !nullifiedPaths.contains("Links.Backward")) { assertArrayEquals( subGroup.getBinary("Backward", 0).getBytes(), - groups.apply("Links", i) + groupsExpected + .apply("Links", i) .getGroup("Links", 0) .getBinary("Backward", 0) .getBytes()); @@ -970,7 +1072,8 @@ private void validateColumnData( } else { assertArrayEquals( subGroup.getBinary("Forward", 0).getBytes(), - groups.apply("Links", i) + groupsExpected + .apply("Links", i) .getGroup("Links", 0) .getBinary("Forward", 0) .getBytes()); @@ -1014,13 +1117,22 @@ interface CheckedFunction { R apply(T t) throws IOException; } + private ColumnPath normalizeFieldsInPath(ColumnPath path, Map renameColumns) { + String[] pathArray = path.toArray(); + if (renameColumns != null) { + pathArray[0] = renameColumns.getOrDefault(pathArray[0], pathArray[0]); + } + return ColumnPath.get(pathArray); + } + /** * Verify the page index is correct. * * @param exclude the columns to exclude from comparison, for example because they were nullified. * @param joinColumnsOverwrite whether a join columns overwrote existing overlapping columns. */ - private void validatePageIndex(Set exclude, boolean joinColumnsOverwrite) throws Exception { + private void validatePageIndex(Set exclude, boolean joinColumnsOverwrite, Map renameColumns) + throws Exception { class BlockMeta { final TransParquetFileReader reader; final BlockMetaData blockMeta; @@ -1058,6 +1170,8 @@ class BlockMeta { List inBlocksJoined = blockMetaExtractor.apply( inputFilesToJoin.stream().map(EncryptionTestFile::getFileName).collect(Collectors.toList())); List outBlocks = blockMetaExtractor.apply(ImmutableList.of(outputFile)); + Map renameColumnsInverted = + renameColumns.entrySet().stream().collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey)); for (int blockIdx = 0; blockIdx < outBlocks.size(); blockIdx++) { BlockMetaData outBlockMeta = outBlocks.get(blockIdx).blockMeta; TransParquetFileReader outReader = outBlocks.get(blockIdx).reader; @@ -1066,17 +1180,18 @@ class BlockMeta { TransParquetFileReader inReader; BlockMetaData inBlockMeta; ColumnChunkMetaData inChunk; - if (!inBlocksMain.get(blockIdx).colPathToMeta.containsKey(outChunk.getPath()) + ColumnPath colPath = normalizeFieldsInPath(outChunk.getPath(), renameColumnsInverted); + if (!inBlocksMain.get(blockIdx).colPathToMeta.containsKey(colPath) || joinColumnsOverwrite && !inBlocksJoined.isEmpty() - && inBlocksJoined.get(blockIdx).colPathToMeta.containsKey(outChunk.getPath())) { + && inBlocksJoined.get(blockIdx).colPathToMeta.containsKey(colPath)) { inReader = inBlocksJoined.get(blockIdx).reader; inBlockMeta = inBlocksJoined.get(blockIdx).blockMeta; - inChunk = inBlocksJoined.get(blockIdx).colPathToMeta.get(outChunk.getPath()); + inChunk = inBlocksJoined.get(blockIdx).colPathToMeta.get(colPath); } else { inReader = inBlocksMain.get(blockIdx).reader; inBlockMeta = inBlocksMain.get(blockIdx).blockMeta; - inChunk = inBlocksMain.get(blockIdx).colPathToMeta.get(outChunk.getPath()); + inChunk = inBlocksMain.get(blockIdx).colPathToMeta.get(colPath); } ColumnIndex inColumnIndex = inReader.readColumnIndex(inChunk); @@ -1284,13 +1399,13 @@ conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER) assertEquals(expectSchema, actualSchema); } - private void ensureContainsGzipFile() { + private void addGzipInputFile() { if (!inputFiles.contains(gzipEncryptionTestFileWithoutBloomFilterColumn)) { inputFiles.add(this.gzipEncryptionTestFileWithoutBloomFilterColumn); } } - private void ensureContainsUncompressedFile() { + private void addUncompressedInputFile() { if (!inputFiles.contains(uncompressedEncryptionTestFileWithoutBloomFilterColumn)) { inputFiles.add(uncompressedEncryptionTestFileWithoutBloomFilterColumn); }