Skip to content

Commit

Permalink
HIVE-26913: Addendum: Missing ParquetMetadata initialization in Vecto…
Browse files Browse the repository at this point in the history
…rizedParquetInputFormat (Denys Kuzmenko, reviewed by Ayush Saxena)

Closes #4136
  • Loading branch information
deniskuzZ authored May 4, 2023
1 parent 2c2fc19 commit 36e6560
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ private static RecordReader<NullWritable, VectorizedRowBatch> parquetRecordReade
ParquetMetadata parquetMetadata = footerData != null ?
ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), ParquetMetadataConverter.NO_FILTER) :
ParquetFileReader.readFooter(job, path);
inputFormat.setMetadata(parquetMetadata);

MessageType fileSchema = parquetMetadata.getFileMetaData().getSchema();
MessageType typeWithIds = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public void testHiveDeleteFilterWithFilteredParquetBlock() {
* @return JobConf instance
* @throws HiveException any failure during job creation
*/
private JobConf prepareMockJob(Schema schema, Path dataFilePath) throws HiveException {
static JobConf prepareMockJob(Schema schema, Path dataFilePath) throws HiveException {
StructObjectInspector oi = (StructObjectInspector) IcebergObjectInspector.create(schema);
String hiveColumnNames = String.join(",", oi.getAllStructFieldRefs().stream()
.map(sf -> sf.getFieldName()).collect(Collectors.toList()));
Expand All @@ -287,6 +287,7 @@ private JobConf prepareMockJob(Schema schema, Path dataFilePath) throws HiveExce
rbCtx.init(oi, new String[0]);
mapWork.setVectorMode(true);
mapWork.setVectorizedRowBatchCtx(rbCtx);
mapWork.deriveLlap(conf, false);
Utilities.setMapWork(vectorJob, mapWork);
return vectorJob;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive.vector;

import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.TestHelper;
import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat.CompatibilityTaskAttemptContextImpl;
import org.apache.iceberg.mr.mapreduce.IcebergInputFormat;
import org.apache.iceberg.types.Types;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.io.InputFile;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

import static org.apache.iceberg.mr.hive.vector.TestHiveIcebergVectorization.prepareMockJob;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;

public class TestHiveVectorizedReader {

private static final Schema SCHEMA = new Schema(
required(1, "data", Types.StringType.get()),
required(2, "id", Types.LongType.get()),
required(3, "date", Types.StringType.get()));

@Rule
public TemporaryFolder temp = new TemporaryFolder();

private TestHelper helper;
private InputFormatConfig.ConfigBuilder builder;

private final FileFormat fileFormat = FileFormat.PARQUET;

@Before
public void before() throws IOException, HiveException {
File location = temp.newFolder(fileFormat.name());
Assert.assertTrue(location.delete());

Configuration conf = prepareMockJob(SCHEMA, new Path(location.toString()));
conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION);
HadoopTables tables = new HadoopTables(conf);

helper = new TestHelper(conf, tables, location.toString(), SCHEMA, null, fileFormat, temp);
builder = new InputFormatConfig.ConfigBuilder(conf).readFrom(location.toString())
.useHiveRows();
}

@Test
public void testRecordReaderShouldReuseFooter() throws IOException, InterruptedException {
helper.createUnpartitionedTable();
List<Record> expectedRecords = helper.generateRandomRecords(1, 0L);
helper.appendToTable(null, expectedRecords);

TaskAttemptContext context = new CompatibilityTaskAttemptContextImpl(builder.conf(), new TaskAttemptID(), null);
IcebergInputFormat<?> inputFormat = new IcebergInputFormat<>();
List<InputSplit> splits = inputFormat.getSplits(context);

try (MockedStatic<ParquetFileReader> mockedParquetFileReader = Mockito.mockStatic(ParquetFileReader.class,
Mockito.CALLS_REAL_METHODS)) {
for (InputSplit split : splits) {
try (RecordReader<Void, ?> reader = inputFormat.createRecordReader(split, context)) {
reader.initialize(split, context);
}
}
mockedParquetFileReader.verify(times(1), () ->
ParquetFileReader.readFooter(any(InputFile.class), any(ParquetMetadataConverter.MetadataFilter.class))
);
}
}

}

0 comments on commit 36e6560

Please sign in to comment.