diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java index 9223f5b4c076..6566c7b1f117 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java @@ -230,6 +230,7 @@ private static RecordReader parquetRecordReade ParquetMetadata parquetMetadata = footerData != null ? ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), ParquetMetadataConverter.NO_FILTER) : ParquetFileReader.readFooter(job, path); + inputFormat.setMetadata(parquetMetadata); MessageType fileSchema = parquetMetadata.getFileMetaData().getSchema(); MessageType typeWithIds = null; diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveIcebergVectorization.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveIcebergVectorization.java index d01e7edea329..dfad82947172 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveIcebergVectorization.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveIcebergVectorization.java @@ -261,7 +261,7 @@ public void testHiveDeleteFilterWithFilteredParquetBlock() { * @return JobConf instance * @throws HiveException any failure during job creation */ - private JobConf prepareMockJob(Schema schema, Path dataFilePath) throws HiveException { + static JobConf prepareMockJob(Schema schema, Path dataFilePath) throws HiveException { StructObjectInspector oi = (StructObjectInspector) IcebergObjectInspector.create(schema); String hiveColumnNames = String.join(",", oi.getAllStructFieldRefs().stream() .map(sf -> sf.getFieldName()).collect(Collectors.toList())); @@ -287,6 +287,7 @@ private JobConf prepareMockJob(Schema schema, Path dataFilePath) throws HiveExce rbCtx.init(oi, new String[0]); mapWork.setVectorMode(true); mapWork.setVectorizedRowBatchCtx(rbCtx); + mapWork.deriveLlap(conf, false); Utilities.setMapWork(vectorJob, mapWork); return vectorJob; } diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveVectorizedReader.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveVectorizedReader.java new file mode 100644 index 000000000000..9decd05ad7e8 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveVectorizedReader.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive.vector; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.mr.Catalogs; +import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.mr.TestHelper; +import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat.CompatibilityTaskAttemptContextImpl; +import org.apache.iceberg.mr.mapreduce.IcebergInputFormat; +import org.apache.iceberg.types.Types; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.io.InputFile; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import static org.apache.iceberg.mr.hive.vector.TestHiveIcebergVectorization.prepareMockJob; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; + +public class TestHiveVectorizedReader { + + private static final Schema SCHEMA = new Schema( + required(1, "data", Types.StringType.get()), + required(2, "id", Types.LongType.get()), + required(3, "date", Types.StringType.get())); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private TestHelper helper; + private InputFormatConfig.ConfigBuilder builder; + + private final FileFormat fileFormat = FileFormat.PARQUET; + + @Before + public void before() throws IOException, HiveException { + File location = temp.newFolder(fileFormat.name()); + Assert.assertTrue(location.delete()); + + Configuration conf = prepareMockJob(SCHEMA, new Path(location.toString())); + conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION); + HadoopTables tables = new HadoopTables(conf); + + helper = new TestHelper(conf, tables, location.toString(), SCHEMA, null, fileFormat, temp); + builder = new InputFormatConfig.ConfigBuilder(conf).readFrom(location.toString()) + .useHiveRows(); + } + + @Test + public void testRecordReaderShouldReuseFooter() throws IOException, InterruptedException { + helper.createUnpartitionedTable(); + List expectedRecords = helper.generateRandomRecords(1, 0L); + helper.appendToTable(null, expectedRecords); + + TaskAttemptContext context = new CompatibilityTaskAttemptContextImpl(builder.conf(), new TaskAttemptID(), null); + IcebergInputFormat inputFormat = new IcebergInputFormat<>(); + List splits = inputFormat.getSplits(context); + + try (MockedStatic mockedParquetFileReader = Mockito.mockStatic(ParquetFileReader.class, + Mockito.CALLS_REAL_METHODS)) { + for (InputSplit split : splits) { + try (RecordReader reader = inputFormat.createRecordReader(split, context)) { + reader.initialize(split, context); + } + } + mockedParquetFileReader.verify(times(1), () -> + ParquetFileReader.readFooter(any(InputFile.class), any(ParquetMetadataConverter.MetadataFilter.class)) + ); + } + } + +}