diff --git a/parquet-common/src/main/java/org/apache/parquet/io/ParquetFileRange.java b/parquet-common/src/main/java/org/apache/parquet/io/ParquetFileRange.java new file mode 100644 index 0000000000..fea9a9d926 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/io/ParquetFileRange.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.io; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; + +/** + * Class to define a file range for a parquet file and to + * hold future data for any ongoing read for that range. + */ +public class ParquetFileRange { + + /** + * Start position in file. + */ + private final long offset; + + /** + * Length of data to be read from position. + */ + private final int length; + + /** + * A future object to hold future for ongoing read. + */ + private CompletableFuture dataReadFuture; + + public ParquetFileRange(long offset, int length) { + this.offset = offset; + this.length = length; + } + + public long getOffset() { + return offset; + } + + public int getLength() { + return length; + } + + public CompletableFuture getDataReadFuture() { + return dataReadFuture; + } + + public void setDataReadFuture(CompletableFuture dataReadFuture) { + this.dataReadFuture = dataReadFuture; + } + + @Override + public String toString() { + return "range[" + this.offset + "," + (this.offset + (long)this.length) + ")"; + } +} diff --git a/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java b/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java index ed20163613..5e05c4f3aa 100644 --- a/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.IntFunction; /** * {@code SeekableInputStream} is an interface with the methods needed by @@ -105,4 +107,21 @@ public abstract class SeekableInputStream extends InputStream { */ public abstract void readFully(ByteBuffer buf) throws IOException; + /** + * Read a set of file ranges in a vectored manner. + * @throws UnsupportedOperationException if not available in this class/runtime. + */ + public void readVectored(List ranges, + IntFunction allocate) throws IOException { + + throw new UnsupportedOperationException("Vectored IO is not supported for " + this); + } + + /** + * Is the {@link #readVectored(List, IntFunction)} method available. + */ + public boolean readVectoredAvailable() { + return false; + } + } diff --git a/parquet-common/src/main/java/org/apache/parquet/util/DynMethods.java b/parquet-common/src/main/java/org/apache/parquet/util/DynMethods.java index 340baec02e..363afce335 100644 --- a/parquet-common/src/main/java/org/apache/parquet/util/DynMethods.java +++ b/parquet-common/src/main/java/org/apache/parquet/util/DynMethods.java @@ -27,10 +27,15 @@ import java.security.PrivilegedAction; import java.util.Arrays; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import static org.apache.parquet.Exceptions.throwIfInstance; public class DynMethods { + private static final Logger LOG = LoggerFactory.getLogger(DynMethods.class); + /** * Convenience wrapper class around {@link java.lang.reflect.Method}. * @@ -243,7 +248,8 @@ public Builder impl(String className, String methodName, Class... argClasses) Class targetClass = Class.forName(className, true, loader); impl(targetClass, methodName, argClasses); } catch (ClassNotFoundException e) { - // not the right implementation + // class not found on supplied classloader. + LOG.debug("failed to load class {}", className, e); } return this; } @@ -281,6 +287,7 @@ public Builder impl(Class targetClass, String methodName, Class... argClas targetClass.getMethod(methodName, argClasses), name); } catch (NoSuchMethodException e) { // not the right implementation + LOG.debug("failed to load method {} from class {}", methodName, targetClass, e); } return this; } @@ -311,6 +318,8 @@ public Builder ctorImpl(Class targetClass, Class... argClasses) { .buildChecked(); } catch (NoSuchMethodException e) { // not the right implementation + LOG.debug("failed to load constructor arity {} from class {}", + argClasses.length, targetClass, e); } return this; } @@ -327,6 +336,9 @@ public Builder ctorImpl(String className, Class... argClasses) { .buildChecked(); } catch (NoSuchMethodException e) { // not the right implementation + LOG.debug("failed to load constructor arity {} from class {}", + argClasses.length, className, e); + } return this; } @@ -349,7 +361,8 @@ public Builder hiddenImpl(String className, String methodName, Class... argCl Class targetClass = Class.forName(className, true, loader); hiddenImpl(targetClass, methodName, argClasses); } catch (ClassNotFoundException e) { - // not the right implementation + // class not found on supplied classloader. + LOG.debug("failed to load class {}", className, e); } return this; } @@ -388,6 +401,7 @@ public Builder hiddenImpl(Class targetClass, String methodName, Class... a this.method = new UnboundMethod(hidden, name); } catch (SecurityException | NoSuchMethodException e) { // unusable or not the right implementation + LOG.debug("failed to load method {} from class {}", methodName, targetClass, e); } return this; } diff --git a/parquet-hadoop/README.md b/parquet-hadoop/README.md index c27c5f2fc1..156dd1b710 100644 --- a/parquet-hadoop/README.md +++ b/parquet-hadoop/README.md @@ -486,3 +486,11 @@ If `false`, key material is stored in separate new files, created in the same fo **Description:** Length of key encryption keys (KEKs), randomly generated by parquet key management tools. Can be 128, 192 or 256 bits. **Default value:** `128` +--- + +**Property:** `parquet.hadoop.vectored.io.enabled` +**Description:** Flag to enable use of the FileSystem Vector IO API on Hadoop releases which support the feature. +If `true` then an attempt will be made to dynamically load the relevant classes; +if not found then the library will use the classic non-vectored reads: it is safe to enable this option on older releases. +**Default value:** `false` + diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml index e96830cc28..7598af4b44 100644 --- a/parquet-hadoop/pom.xml +++ b/parquet-hadoop/pom.xml @@ -79,6 +79,11 @@ ${hadoop.version} provided + org.apache.parquet parquet-jackson diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java index 8f0d8d8933..e0118d8b7d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java @@ -27,6 +27,7 @@ import org.apache.parquet.crypto.FileDecryptionProperties; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter; +import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.util.HadoopCodecs; import java.util.Map; @@ -37,6 +38,7 @@ import static org.apache.parquet.hadoop.ParquetInputFormat.OFF_HEAP_DECRYPT_BUFFER_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; import static org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED; import static org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY; @@ -54,6 +56,7 @@ private HadoopReadOptions(boolean useSignedStringMinMax, boolean usePageChecksumVerification, boolean useBloomFilter, boolean useOffHeapDecryptBuffer, + boolean useHadoopVectoredIO, FilterCompat.Filter recordFilter, MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, @@ -64,7 +67,7 @@ private HadoopReadOptions(boolean useSignedStringMinMax, FileDecryptionProperties fileDecryptionProperties) { super( useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter, - usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, recordFilter, metadataFilter, codecFactory, allocator, + usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, useHadoopVectoredIO, recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties, fileDecryptionProperties ); this.conf = conf; @@ -111,6 +114,7 @@ public Builder(Configuration conf, Path filePath) { usePageChecksumVerification)); useBloomFilter(conf.getBoolean(BLOOM_FILTERING_ENABLED, true)); useOffHeapDecryptBuffer(conf.getBoolean(OFF_HEAP_DECRYPT_BUFFER_ENABLED, false)); + useHadoopVectoredIo(conf.getBoolean(HADOOP_VECTORED_IO_ENABLED, ParquetInputFormat.HADOOP_VECTORED_IO_DEFAULT)); withCodecFactory(HadoopCodecs.newFactory(conf, 0)); withRecordFilter(getFilter(conf)); withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608)); @@ -128,7 +132,7 @@ public ParquetReadOptions build() { } return new HadoopReadOptions( useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, - useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, recordFilter, metadataFilter, + useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, useHadoopVectoredIo, recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties, conf, fileDecryptionProperties); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java index dc130ee8d2..495d8eb7ec 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java @@ -41,6 +41,7 @@ public class ParquetReadOptions { private static final boolean STATS_FILTERING_ENABLED_DEFAULT = true; private static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = true; private static final boolean COLUMN_INDEX_FILTERING_ENABLED_DEFAULT = true; + private static final boolean HADOOP_VECTORED_IO_ENABLED_DEFAULT = false; private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB private static final boolean PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT = false; private static final boolean BLOOM_FILTER_ENABLED_DEFAULT = true; @@ -54,6 +55,7 @@ public class ParquetReadOptions { private final boolean usePageChecksumVerification; private final boolean useBloomFilter; private final boolean useOffHeapDecryptBuffer; + private final boolean useHadoopVectoredIO; private final FilterCompat.Filter recordFilter; private final ParquetMetadataConverter.MetadataFilter metadataFilter; private final CompressionCodecFactory codecFactory; @@ -70,6 +72,7 @@ public class ParquetReadOptions { boolean usePageChecksumVerification, boolean useBloomFilter, boolean useOffHeapDecryptBuffer, + boolean useHadoopVectoredIO, FilterCompat.Filter recordFilter, ParquetMetadataConverter.MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, @@ -85,6 +88,7 @@ public class ParquetReadOptions { this.usePageChecksumVerification = usePageChecksumVerification; this.useBloomFilter = useBloomFilter; this.useOffHeapDecryptBuffer = useOffHeapDecryptBuffer; + this.useHadoopVectoredIO = useHadoopVectoredIO; this.recordFilter = recordFilter; this.metadataFilter = metadataFilter; this.codecFactory = codecFactory; @@ -126,6 +130,10 @@ public boolean usePageChecksumVerification() { return usePageChecksumVerification; } + public boolean useHadoopVectoredIO() { + return useHadoopVectoredIO; + } + public FilterCompat.Filter getRecordFilter() { return recordFilter; } @@ -173,6 +181,7 @@ public static class Builder { protected boolean useStatsFilter = STATS_FILTERING_ENABLED_DEFAULT; protected boolean useDictionaryFilter = DICTIONARY_FILTERING_ENABLED_DEFAULT; protected boolean useRecordFilter = RECORD_FILTERING_ENABLED_DEFAULT; + protected boolean useHadoopVectoredIo = HADOOP_VECTORED_IO_ENABLED_DEFAULT; protected boolean useColumnIndexFilter = COLUMN_INDEX_FILTERING_ENABLED_DEFAULT; protected boolean usePageChecksumVerification = PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT; protected boolean useBloomFilter = BLOOM_FILTER_ENABLED_DEFAULT; @@ -226,6 +235,11 @@ public Builder useRecordFilter() { return this; } + public Builder useHadoopVectoredIo(boolean useHadoopVectoredIo) { + this.useHadoopVectoredIo = useHadoopVectoredIo; + return this; + } + public Builder useColumnIndexFilter(boolean useColumnIndexFilter) { this.useColumnIndexFilter = useColumnIndexFilter; return this; @@ -320,6 +334,7 @@ public Builder copy(ParquetReadOptions options) { useDictionaryFilter(options.useDictionaryFilter); useRecordFilter(options.useRecordFilter); withRecordFilter(options.recordFilter); + useHadoopVectoredIo(options.useHadoopVectoredIO); withMetadataFilter(options.metadataFilter); withCodecFactory(options.codecFactory); withAllocator(options.allocator); @@ -338,7 +353,7 @@ public ParquetReadOptions build() { return new ParquetReadOptions( useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, - useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, recordFilter, metadataFilter, + useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, useHadoopVectoredIo, recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties, fileDecryptionProperties); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index f63367022e..c4a2b66ab7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -53,8 +53,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import java.util.stream.Stream; import java.util.zip.CRC32; import org.apache.hadoop.conf.Configuration; @@ -63,6 +64,8 @@ import org.apache.hadoop.fs.Path; import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.Preconditions; +import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.ColumnDescriptor; @@ -102,6 +105,7 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.hadoop.util.counters.BenchmarkCounter; +import org.apache.parquet.hadoop.util.vectorio.BindingUtils; import org.apache.parquet.internal.column.columnindex.ColumnIndex; import org.apache.parquet.internal.column.columnindex.OffsetIndex; import org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter; @@ -110,6 +114,7 @@ import org.apache.parquet.internal.hadoop.metadata.IndexReference; import org.apache.parquet.io.InputFile; import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.io.ParquetFileRange; import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; @@ -126,6 +131,8 @@ public class ParquetFileReader implements Closeable { public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism"; + public static final long HADOOP_VECTORED_READ_TIMEOUT_SECONDS = 300; + private final ParquetMetadataConverter converter; private final CRC32 crc; @@ -986,9 +993,7 @@ private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOE } // actually read all the chunks ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount()); - for (ConsecutivePartList consecutiveChunks : allParts) { - consecutiveChunks.readAll(f, builder); - } + readAllPartsVectoredOrNormal(allParts, builder); for (Chunk chunk : builder.build()) { readChunkPages(chunk, block, rowGroup); } @@ -1064,6 +1069,69 @@ public ColumnChunkPageReadStore readFilteredRowGroup(int blockIndex, RowRanges r return internalReadFilteredRowGroup(block, rowRanges, getColumnIndexStore(blockIndex)); } + /** + * Read data in all parts via either vectored IO or serial IO. + * @param allParts all parts to be read. + * @param builder used to build chunk list to read the pages for the different columns. + * @throws IOException any IOE. + */ + private void readAllPartsVectoredOrNormal(List allParts, ChunkListBuilder builder) + throws IOException { + boolean isVectoredIO = options.useHadoopVectoredIO() + && f.readVectoredAvailable() + && partsLengthValidForVectoredIO(allParts); + if (isVectoredIO) { + readVectored(allParts, builder); + } else { + for (ConsecutivePartList consecutiveChunks : allParts) { + consecutiveChunks.readAll(f, builder); + } + } + } + + /** + * Vectored IO doesn't support reading ranges of size greater than + * Integer.MAX_VALUE. + * @param allParts all parts to read. + * @return true or false. + */ + private boolean partsLengthValidForVectoredIO(List allParts) { + for (ConsecutivePartList consecutivePart : allParts) { + if (consecutivePart.length >= Integer.MAX_VALUE) { + LOG.debug("Part length {} greater than Integer.MAX_VALUE thus disabling vectored IO", consecutivePart.length); + return false; + } + } + return true; + } + + /** + * Read all parts through vectored IO. + * @param allParts all parts to be read. + * @param builder used to build chunk list to read the pages for the different columns. + * @throws IOException any IOE. + */ + private void readVectored(List allParts, + ChunkListBuilder builder) throws IOException { + + List ranges = new ArrayList<>(allParts.size()); + for (ConsecutivePartList consecutiveChunks : allParts) { + Preconditions.checkArgument(consecutiveChunks.length < Integer.MAX_VALUE, + "Invalid length %s for vectored read operation. It must be less than max integer value.", + consecutiveChunks.length); + ranges.add(new ParquetFileRange(consecutiveChunks.offset, (int) consecutiveChunks.length)); + } + LOG.debug("Doing vectored IO for ranges {}", ranges); + ByteBufferAllocator allocator = options.getAllocator(); + //blocking or asynchronous vectored read. + f.readVectored(ranges, allocator::allocate); + int k = 0; + for (ConsecutivePartList consecutivePart : allParts) { + ParquetFileRange currRange = ranges.get(k++); + consecutivePart.readFromVectoredRange(currRange, builder); + } + } + /** * Reads all the columns requested from the row group at the current file position. It may skip specific pages based * on the column indexes according to the actual filter. As the rows are not aligned among the pages of the different @@ -1141,10 +1209,7 @@ private ColumnChunkPageReadStore internalReadFilteredRowGroup(BlockMetaData bloc } } } - // actually read all the chunks - for (ConsecutivePartList consecutiveChunks : allParts) { - consecutiveChunks.readAll(f, builder); - } + readAllPartsVectoredOrNormal(allParts, builder); for (Chunk chunk : builder.build()) { readChunkPages(chunk, block, rowGroup); } @@ -1872,6 +1937,32 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx } } + /** + * Populate data in a parquet file range from vectored range. + * @param currRange range to populated. + * @param builder used to build chunk list to read the pages for the different columns. + * @throws IOException if there is an error while reading from the stream. + */ + public void readFromVectoredRange(ParquetFileRange currRange, + ChunkListBuilder builder) throws IOException { + ByteBuffer buffer; + try { + LOG.debug("Waiting for vectored read to finish for range {} ", currRange); + buffer = BindingUtils.awaitFuture(currRange.getDataReadFuture(), + HADOOP_VECTORED_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS); + // report in a counter the data we just scanned + BenchmarkCounter.incrementBytesRead(currRange.getLength()); + } catch (TimeoutException e) { + String error = String.format("Timeout while fetching result for %s", currRange); + LOG.error(error, e); + throw new IOException(error, e); + } + ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffer); + for (ChunkDescriptor descriptor : chunks) { + builder.add(descriptor, stream.sliceBuffers(descriptor.size), f); + } + } + /** * @return the position following the last byte of these chunks */ diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java index 1bfd4b20f0..c916b2c7ae 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java @@ -163,6 +163,17 @@ public class ParquetInputFormat extends FileInputFormat { private static final int MIN_FOOTER_CACHE_SIZE = 100; + /** + * Key to enable/disable vectored io while reading parquet files: + * {@value}. + */ + public static final String HADOOP_VECTORED_IO_ENABLED = "parquet.hadoop.vectored.io.enabled"; + + /** + * Default value of parquet.hadoop.vectored.io.enabled is {@value}. + */ + public static final boolean HADOOP_VECTORED_IO_DEFAULT = false; + public static void setTaskSideMetaData(Job job, boolean taskSideMetadata) { ContextUtil.getConfiguration(job).setBoolean(TASK_SIDE_METADATA, taskSideMetadata); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java index 0f8cdbb551..ccc0fa5841 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java @@ -20,12 +20,23 @@ package org.apache.parquet.hadoop.util; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.parquet.hadoop.util.vectorio.VectorIOBridge; import org.apache.parquet.io.DelegatingSeekableInputStream; +import org.apache.parquet.io.ParquetFileRange; + import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.IntFunction; /** * SeekableInputStream implementation that implements read(ByteBuffer) for * Hadoop 1 FSDataInputStream. + * It implements {@link #readVectored(List, IntFunction)}) by + * handing off to VectorIOBridge which uses reflection to offer the API if it is + * found. + * The return value of {@link #readVectoredAvailable()} reflects the availability of the + * API. */ class H1SeekableInputStream extends DelegatingSeekableInputStream { @@ -56,4 +67,13 @@ public void readFully(byte[] bytes, int start, int len) throws IOException { stream.readFully(bytes, start, len); } + @Override + public boolean readVectoredAvailable() { + return VectorIOBridge.bridgeAvailable(); + } + + @Override + public void readVectored(List ranges, IntFunction allocate) throws IOException { + VectorIOBridge.readVectoredRanges(stream, ranges, allocate); + } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java index 4bbbb8ed1f..f6f56650e2 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java @@ -20,15 +20,24 @@ package org.apache.parquet.hadoop.util; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.parquet.hadoop.util.vectorio.VectorIOBridge; import org.apache.parquet.io.DelegatingSeekableInputStream; +import org.apache.parquet.io.ParquetFileRange; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.IntFunction; /** * SeekableInputStream implementation for FSDataInputStream that implements * ByteBufferReadable in Hadoop 2. + * It implements {@link #readVectored(List, IntFunction)}) by + * handing off to VectorIOBridge which uses reflection to offer the API if it is + * found. + * The return value of {@link #readVectoredAvailable()} reflects the availability of the + * API. */ class H2SeekableInputStream extends DelegatingSeekableInputStream { @@ -83,6 +92,16 @@ public int read(ByteBuffer buf) throws IOException { } } + @Override + public boolean readVectoredAvailable() { + return VectorIOBridge.bridgeAvailable(); + } + + @Override + public void readVectored(List ranges, IntFunction allocate) throws IOException { + VectorIOBridge.readVectoredRanges(stream, ranges, allocate); + } + public static void readFully(Reader reader, ByteBuffer buf) throws IOException { // unfortunately the Hadoop 2 APIs do not have a 'readFully' equivalent for the byteBuffer read // calls. The read(ByteBuffer) call might read fewer than byteBuffer.hasRemaining() bytes. Thus we diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/BindingUtils.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/BindingUtils.java new file mode 100644 index 0000000000..176998bbc6 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/BindingUtils.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.hadoop.util.vectorio; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.UncheckedIOException; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.parquet.util.DynMethods; + +/** + * Package-private binding utils. + */ +public final class BindingUtils { + private static final Logger LOG = LoggerFactory.getLogger(BindingUtils.class); + + + private BindingUtils() { + } + + /** + * Given a future, evaluate it. + *

+ * Any exception generated in the future is + * extracted and rethrown. + *

+ * + * @param future future to evaluate + * @param timeout timeout to wait + * @param unit time unit. + * @param type of the result. + * + * @return the result, if all went well. + * + * @throws InterruptedIOException future was interrupted + * @throws IOException if something went wrong + * @throws RuntimeException any nested RTE thrown + * @throws TimeoutException the future timed out. + */ + public static T awaitFuture(final Future future, + final long timeout, + final TimeUnit unit) + throws InterruptedIOException, IOException, RuntimeException, + TimeoutException { + try { + LOG.debug("Awaiting future"); + return future.get(timeout, unit); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException(e.toString()) + .initCause(e); + } catch (ExecutionException e) { + return raiseInnerCause(e); + } + } + + /** + * From the inner cause of an execution exception, extract the inner cause + * if it is an IOE or RTE. + * This will always raise an exception, either the inner IOException, + * an inner RuntimeException, or a new IOException wrapping the raised + * exception. + * + * @param e exception. + * @param type of return value. + * + * @return nothing, ever. + * + * @throws IOException either the inner IOException, or a wrapper around + * any non-Runtime-Exception + * @throws RuntimeException if that is the inner cause. + */ + public static T raiseInnerCause(final ExecutionException e) + throws IOException { + throw unwrapInnerException(e); + } + + /** + * From the inner cause of an execution exception, extract the inner cause + * to an IOException, raising RuntimeExceptions and Errors immediately. + *
    + *
  1. If it is an IOE: Return.
  2. + *
  3. If it is a {@link UncheckedIOException}: return the cause
  4. + *
  5. Completion/Execution Exceptions: extract and repeat
  6. + *
  7. If it is an RTE or Error: throw.
  8. + *
  9. Any other type: wrap in an IOE
  10. + *
+ * + * Recursively handles wrapped Execution and Completion Exceptions in + * case something very complicated has happened. + * + * @param e exception. + * + * @return an IOException extracted or built from the cause. + * + * @throws RuntimeException if that is the inner cause. + * @throws Error if that is the inner cause. + */ + @SuppressWarnings("ChainOfInstanceofChecks") + public static IOException unwrapInnerException(final Throwable e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + return (IOException) cause; + } else if (cause instanceof UncheckedIOException) { + // this is always an IOException + return ((UncheckedIOException) cause).getCause(); + } else if (cause instanceof CompletionException) { + return unwrapInnerException(cause); + } else if (cause instanceof ExecutionException) { + return unwrapInnerException(cause); + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else if (cause instanceof Error) { + throw (Error) cause; + } else if (cause != null) { + // other type: wrap with a new IOE + return new IOException(cause); + } else { + // this only happens if there was no cause. + return new IOException(e); + } + } + + /** + * Get an invocation from the source class, which will be unavailable() if + * the class is null or the method isn't found. + * + * @param return type + * @param source source. If null, the method is a no-op. + * @param returnType return type class (unused) + * @param name method name + * @param parameterTypes parameters + * + * @return the method or "unavailable" + */ + static DynMethods.UnboundMethod loadInvocation( + Class source, + Class returnType, String name, + Class... parameterTypes) { + + if (source != null) { + final DynMethods.UnboundMethod m = new DynMethods + .Builder(name) + .impl(source, name, parameterTypes) + .orNoop() + .build(); + if (m.isNoop()) { + // this is a sign of a mismatch between this class's expected + // signatures and actual ones. + // log at debug. + LOG.debug("Failed to load method {} from {}", name, source); + } else { + LOG.debug("Found method {} from {}", name, source); + } + return m; + } else { + return noop(name); + } + } + + /** + * Create a no-op method. + * + * @param name method name + * + * @return a no-op method. + */ + static DynMethods.UnboundMethod noop(final String name) { + return new DynMethods.Builder(name) + .orNoop().build(); + } + + /** + * Given a sequence of methods, verify that they are all available. + * + * @param methods methods + * + * @return true if they are all implemented + */ + static boolean implemented(DynMethods.UnboundMethod... methods) { + for (DynMethods.UnboundMethod method : methods) { + if (method.isNoop()) { + return false; + } + } + return true; + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/FileRangeBridge.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/FileRangeBridge.java new file mode 100644 index 0000000000..1937976626 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/FileRangeBridge.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.hadoop.util.vectorio; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.parquet.io.ParquetFileRange; +import org.apache.parquet.util.DynMethods; + +import static java.util.Objects.requireNonNull; +import static org.apache.parquet.hadoop.util.vectorio.BindingUtils.implemented; +import static org.apache.parquet.hadoop.util.vectorio.BindingUtils.loadInvocation; + +/** + * Class to bridge to the FileRange class through reflection. + * A singleton is created when the class is loaded, so there is no need + * to repeat the reflection process on every API call. + *

+ * The Hadoop FileRange class is an interface with getters and setters; + * to instantiate the static method {@code createFileRange()} is used. + */ +final class FileRangeBridge { + + private static final Logger LOG = LoggerFactory.getLogger(FileRangeBridge.class); + + /** Class to bridge to: {@value}. */ + public static final String CLASSNAME = "org.apache.hadoop.fs.FileRange"; + + /** + * The singleton instance of the bridge. + */ + private static final FileRangeBridge INSTANCE = new FileRangeBridge(); + + /** + * Is the bridge available? + */ + private final boolean available; + private final Class fileRangeInterface; + private final DynMethods.UnboundMethod _getData; + private final DynMethods.UnboundMethod _setData; + private final DynMethods.UnboundMethod _getLength; + private final DynMethods.UnboundMethod _getOffset; + private final DynMethods.UnboundMethod _getReference; + private final DynMethods.UnboundMethod createFileRange; + + /** + * Constructor. + * This loads the real FileRange and its methods, if present. + */ + FileRangeBridge() { + + // try to load the class + Class loadedClass; + try { + loadedClass = this.getClass().getClassLoader().loadClass(CLASSNAME); + } catch (ReflectiveOperationException e) { + LOG.debug("No {}", CLASSNAME, e); + loadedClass = null; + } + fileRangeInterface = loadedClass; + + // as loadInvocation returns a no-op if the class is null, this sequence + // will either construct in the list of operations or leave them with no-ops + + _getOffset = loadInvocation(loadedClass, long.class, "getOffset"); + _getLength = loadInvocation(loadedClass, int.class, "getLength"); + _getData = loadInvocation(loadedClass, null, "getData"); + _setData = loadInvocation(loadedClass, void.class, "setData", CompletableFuture.class); + _getReference = loadInvocation(loadedClass, Object.class, "getReference"); + // static interface method to create an instance. + createFileRange = loadInvocation(fileRangeInterface, + Object.class, "createFileRange", + long.class, + int.class, + Object.class); + + // we are available only if the class is present and all methods are found + // the checks for the method are extra paranoia, but harmless + available = loadedClass != null + && implemented( + createFileRange, + _getOffset, + _getLength, + _getData, + _setData, + _getReference); + + LOG.debug("FileRangeBridge availability: {}", available); + } + + /** + * Is the bridge available? + * + * @return true iff the bridge is present. + */ + public boolean available() { + return available; + } + + /** + * Check that the bridge is available. + * + * @throws UnsupportedOperationException if it is not. + */ + private void checkAvailable() { + if (!available()) { + throw new UnsupportedOperationException("Interface " + CLASSNAME + " not found"); + } + } + + /** + * Get the file range class. + * + * @return the file range implementation class, if present. + */ + public Class getFileRangeInterface() { + return fileRangeInterface; + } + + /** + * Instantiate. + * + * @param offset offset in file + * @param length length of data to read. + * @param reference nullable reference to store in the range. + * + * @return a VectorFileRange wrapping a FileRange + * + * @throws RuntimeException if the range cannot be instantiated + * @throws IllegalStateException if the API is not available. + */ + public WrappedFileRange createFileRange( + final long offset, + final int length, + final Object reference) { + + checkAvailable(); + return new WrappedFileRange(createFileRange.invoke(null, offset, length, reference)); + } + + /** + * Build a WrappedFileRange from a ParquetFileRange; + * the reference field of the wrapped object refers + * back to the object passed in. + * + * @param in input. + * + * @return a wrapper around a FileRange instance + */ + public FileRangeBridge.WrappedFileRange toFileRange(final ParquetFileRange in) { + // create a new wrapped file range, fill in and then + // get the instance + return createFileRange(in.getOffset(), in.getLength(), in); + } + + @Override + public String toString() { + return "FileRangeBridge{" + + "available=" + available + + ", fileRangeInterface=" + fileRangeInterface + + ", _getOffset=" + _getOffset + + ", _getLength=" + _getLength + + ", _getData=" + _getData + + ", _setData=" + _setData + + ", _getReference=" + _getReference + + ", createFileRange=" + createFileRange + + '}'; + } + + /** + * Get the singleton instance. + * + * @return instance. + */ + public static FileRangeBridge instance() { + return INSTANCE; + } + + /** + * Is the bridge available for use? + * + * @return true if the vector IO API is available. + */ + public static boolean bridgeAvailable() { + return instance().available(); + } + + /** + * Wraps a Vector {@code FileRange} instance through reflection. + */ + class WrappedFileRange { + + /** + * The wrapped {@code FileRange} instance. + */ + private final Object fileRange; + + /** + * Instantiate. + * + * @param fileRange non null {@code FileRange} instance. + */ + WrappedFileRange(final Object fileRange) { + this.fileRange = requireNonNull(fileRange); + } + + public long getOffset() { + return _getOffset.invoke(fileRange); + } + + public int getLength() { + return _getLength.invoke(fileRange); + } + + public CompletableFuture getData() { + return _getData.invoke(fileRange); + } + + public void setData(final CompletableFuture data) { + _setData.invoke(fileRange, data); + } + + public Object getReference() { + return _getReference.invoke(fileRange); + } + + /** + * Get the wrapped fileRange. + * + * @return the fileRange. + */ + public Object getFileRange() { + return fileRange; + } + + @Override + public String toString() { + return "WrappedFileRange{" + + "fileRange=" + fileRange + + '}'; + } + } + +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/VectorIOBridge.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/VectorIOBridge.java new file mode 100644 index 0000000000..ba94c2e976 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/VectorIOBridge.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.hadoop.util.vectorio; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.IntFunction; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.parquet.io.ParquetFileRange; +import org.apache.parquet.util.DynMethods; + +import static org.apache.parquet.hadoop.util.vectorio.BindingUtils.loadInvocation; + +/** + * Vector IO bridge. + *

+ * This loads the {@code PositionedReadable} method: + *

+ *   void readVectored(List[?extends FileRange] ranges,
+ *     IntFunction[ByteBuffer] allocate) throws IOException
+ * 
+ * It is made slightly easier because of type erasure; the signature of the + * function is actually {@code Void.class method(List.class, IntFunction.class)}. + *

+ * There are some counters to aid in testing; the {@link #toString()} method + * will print them and the loaded method, for use in tests and debug logs. + */ +public final class VectorIOBridge { + + private static final Logger LOG = LoggerFactory.getLogger(VectorIOBridge.class); + + /** + * readVectored Method to look for. + */ + private static final String READ_VECTORED = "readVectored"; + + /** + * hasCapability Method name. + * {@code boolean hasCapability(String capability);} + */ + private static final String HAS_CAPABILITY = "hasCapability"; + + /** + * hasCapability() probe for vectored IO api implementation. + */ + static final String VECTOREDIO_CAPABILITY = "in:readvectored"; + + /** + * The singleton instance of the bridge. + */ + private static final VectorIOBridge INSTANCE = new VectorIOBridge(); + + /** + * readVectored() method. + */ + private final DynMethods.UnboundMethod readVectored; + + /** + * {@code boolean StreamCapabilities.hasCapability(String)}. + */ + private final DynMethods.UnboundMethod hasCapabilityMethod; + + /** + * How many vector read calls made. + */ + private final AtomicLong vectorReads = new AtomicLong(); + + /** + * How many blocks were read. + */ + private final AtomicLong blocksRead = new AtomicLong(); + + /** + * How many bytes were read. + */ + private final AtomicLong bytesRead = new AtomicLong(); + + /** + * Constructor. package private for testing. + */ + private VectorIOBridge() { + + readVectored = loadInvocation(PositionedReadable.class, + Void.TYPE, READ_VECTORED, List.class, IntFunction.class); + LOG.debug("Vector IO availability; ", available()); + + + // if readVectored is present, so is hasCapabilities(). + hasCapabilityMethod = loadInvocation(FSDataInputStream.class, + boolean.class, HAS_CAPABILITY, String.class); + + } + + /** + * Is the bridge available. + * + * @return true if readVectored() is available. + */ + public boolean available() { + return !readVectored.isNoop() && FileRangeBridge.bridgeAvailable(); + } + + /** + * Check that the bridge is available. + * + * @throws UnsupportedOperationException if it is not. + */ + private void checkAvailable() { + if (!available()) { + throw new UnsupportedOperationException("Hadoop VectorIO not found"); + } + } + + /** + * Read data in a list of file ranges. + * Returns when the data reads are active; possibly executed + * in a blocking sequence of reads, possibly scheduled on different + * threads. + * The {@link ParquetFileRange} parameters all have their + * data read futures set to the range reads of the associated + * operations; callers must await these to complete. + * + * @param stream stream from where the data has to be read. + * @param ranges parquet file ranges. + * @param allocate allocate function to allocate memory to hold data. + * @throws UnsupportedOperationException if the API is not available. + */ + public static void readVectoredRanges( + final FSDataInputStream stream, + final List ranges, + final IntFunction allocate) { + + final VectorIOBridge bridge = availableInstance(); + final FileRangeBridge rangeBridge = FileRangeBridge.instance(); + // Setting the parquet range as a reference. + List fileRanges = ranges.stream() + .map(rangeBridge::toFileRange) + .collect(Collectors.toList()); + bridge.readWrappedRanges(stream, fileRanges, allocate); + + // copy back the completable futures from the scheduled + // vector reads to the ParquetFileRange entries passed in. + fileRanges.forEach(fileRange -> { + // toFileRange() sets up this back reference + ParquetFileRange parquetFileRange = (ParquetFileRange) fileRange.getReference(); + parquetFileRange.setDataReadFuture(fileRange.getData()); + }); + //throw new RuntimeException("readVectoredRanges"); + + } + + + /** + * Read data in a list of wrapped file ranges, extracting the inner + * instances and then executing. + * Returns when the data reads are active; possibly executed + * in a blocking sequence of reads, possibly scheduled on different + * threads. + * + * @param stream stream from where the data has to be read. + * @param ranges wrapped file ranges. + * @param allocate allocate function to allocate memory to hold data. + */ + private void readWrappedRanges( + final PositionedReadable stream, + final List ranges, + final IntFunction allocate) { + + // update the counters. + vectorReads.incrementAndGet(); + blocksRead.addAndGet(ranges.size()); + // extract the instances the wrapped ranges refer to; update the + // bytes read counter. + List instances = ranges.stream() + .map(r -> { + bytesRead.addAndGet(r.getLength()); + return r.getFileRange(); + }) + .collect(Collectors.toList()); + LOG.debug("readVectored with {} ranges on stream {}", + ranges.size(), stream); + readVectored.invoke(stream, instances, allocate); + } + + @Override + public String toString() { + return "VectorIOBridge{" + + "readVectored=" + readVectored + + ", vectorReads=" + vectorReads.get() + + ", blocksRead=" + blocksRead.get() + + ", bytesRead=" + bytesRead.get() + + '}'; + } + + /** + * Does a stream implement StreamCapability and, if so, is the capability + * available. + * If the method is not found, this predicate will return false. + * @param stream input stream to query. + * @param capability the capability to look for. + * @return true if the stream declares the capability is available. + */ + public boolean hasCapability(final FSDataInputStream stream, + final String capability) { + + if (hasCapabilityMethod.isNoop()) { + return false; + } else { + return hasCapabilityMethod.invoke(stream, capability); + } + } + + + /** + * How many vector read calls have been made? + * @return the count of vector reads. + */ + public long getVectorReads() { + return vectorReads.get(); + } + + /** + * How many blocks were read? + * @return the count of blocks read. + */ + public long getBlocksRead() { + return blocksRead.get(); + } + + /** + * How many bytes of data have been read. + * @return the count of bytes read. + */ + public long getBytesRead() { + return bytesRead.get(); + } + + /** + * Reset all counters; for testing. + * Non-atomic. + */ + void resetCounters() { + vectorReads.set(0); + blocksRead.set(0); + bytesRead.set(0); + } + + /** + * Get the singleton instance. + * + * @return instance. + */ + public static VectorIOBridge instance() { + return INSTANCE; + } + + /** + * Is the bridge available for use? + * + * @return true if the vector IO API is available. + */ + public static boolean bridgeAvailable() { + return instance().available(); + } + + /** + * Get the instance verifying that the API is available. + * @return an available instance, always + * @throws UnsupportedOperationException if it is not. + */ + public static VectorIOBridge availableInstance() { + final VectorIOBridge bridge = instance(); + bridge.checkAvailable(); + return bridge; + } + +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/package-info.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/package-info.java new file mode 100644 index 0000000000..6c927e7c08 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Vector IO support for Hadoop 3.3.5 runtimes. + * Uses reflect so will compile against older versions, + * but will not actually work. + */ +package org.apache.parquet.hadoop.util.vectorio; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java index a6d2732d1b..8ca40484e5 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java @@ -42,15 +42,21 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import java.util.UUID; import static java.lang.Thread.sleep; import static org.apache.parquet.schema.OriginalType.UTF8; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +@RunWith(Parameterized.class) public class TestInputFormatColumnProjection { public static final String FILE_CONTENT = "" + "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ," + @@ -96,7 +102,19 @@ protected void map(Void key, Group value, Context context) @Rule public TemporaryFolder temp = new TemporaryFolder(); + @Parameterized.Parameters(name = "vectored : {0}") + public static List params() { + return Arrays.asList(true, false); + } + /** + * Read type: true for vectored IO. + */ + private final boolean readType; + + public TestInputFormatColumnProjection(boolean readType) { + this.readType = readType; + } @Test public void testProjectionSize() throws Exception { Assume.assumeTrue( // only run this test for Hadoop 2 @@ -115,6 +133,8 @@ public void testProjectionSize() throws Exception { outputFolder.delete(); Configuration conf = new Configuration(); + // set the vector IO option + conf.setBoolean(ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED, readType); // set the projection schema conf.set("parquet.read.schema", Types.buildMessage() .required(BINARY).as(UTF8).named("char") @@ -164,7 +184,8 @@ public void testProjectionSize() throws Exception { bytesRead = Reader.bytesReadCounter.getValue(); } - Assert.assertTrue("Should read less than 10% of the input file size", + Assert.assertTrue("Should read (" + bytesRead + " bytes)" + + " less than 10% of the input file size (" + bytesWritten + ")", bytesRead < (bytesWritten / 10)); } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index 8dcbf4acf4..94ee413fd6 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -89,10 +89,13 @@ import org.apache.parquet.hadoop.example.GroupWriteSupport; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RunWith(Parameterized.class) public class TestParquetFileWriter { private static final Logger LOG = LoggerFactory.getLogger(TestParquetFileWriter.class); @@ -125,13 +128,34 @@ public class TestParquetFileWriter { @Rule public final TemporaryFolder temp = new TemporaryFolder(); + @Parameterized.Parameters(name = "vectored : {0}") + public static List params() { + return Arrays.asList(true, false); + } + + /** + * Read type: true for vectored IO. + */ + private final boolean readType; + + public TestParquetFileWriter(boolean readType) { + this.readType = readType; + } + + private Configuration getTestConfiguration() { + Configuration conf = new Configuration(); + // set the vector IO option + conf.setBoolean(ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED, readType); + return conf; + } + @Test public void testWriteMode() throws Exception { File testFile = temp.newFile(); MessageType schema = MessageTypeParser.parseMessageType( "message m { required group a {required binary b;} required group " + "c { required int64 d; }}"); - Configuration conf = new Configuration(); + Configuration conf = getTestConfiguration(); ParquetFileWriter writer = null; boolean exceptionThrown = false; @@ -160,7 +184,7 @@ public void testWriteRead() throws Exception { testFile.delete(); Path path = new Path(testFile.toURI()); - Configuration configuration = new Configuration(); + Configuration configuration = getTestConfiguration(); ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path); w.start(); @@ -444,10 +468,11 @@ public void testAlignmentWithPadding() throws Exception { File testFile = temp.newFile(); Path path = new Path(testFile.toURI()); - Configuration conf = new Configuration(); + Configuration conf = getTestConfiguration(); // Disable writing out checksums as hardcoded byte offsets in assertions below expect it conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false); + // uses the test constructor ParquetFileWriter w = new ParquetFileWriter(conf, SCHEMA, path, 120, 60); @@ -553,7 +578,7 @@ public void testAlignmentWithNoPaddingNeeded() throws Exception { File testFile = temp.newFile(); Path path = new Path(testFile.toURI()); - Configuration conf = new Configuration(); + Configuration conf = getTestConfiguration(); // Disable writing out checksums as hardcoded byte offsets in assertions below expect it conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false); @@ -686,7 +711,7 @@ public void testWriteReadStatistics() throws Exception { testFile.delete(); Path path = new Path(testFile.toURI()); - Configuration configuration = new Configuration(); + Configuration configuration = getTestConfiguration(); configuration.setBoolean("parquet.strings.signed-min-max.enabled", true); MessageType schema = MessageTypeParser.parseMessageType("message m { required group a {required binary b (UTF8);} required group c { required int64 d; }}"); @@ -774,7 +799,7 @@ public void testMetaDataFile() throws Exception { File testDir = temp.newFolder(); Path testDirPath = new Path(testDir.toURI()); - Configuration configuration = new Configuration(); + Configuration configuration = getTestConfiguration(); final FileSystem fs = testDirPath.getFileSystem(configuration); enforceEmptyDir(configuration, testDirPath); @@ -826,7 +851,7 @@ public void testWriteReadStatisticsAllNulls() throws Exception { Path path = new Path(testFile.toURI()); MessageType schema = MessageTypeParser.parseMessageType(writeSchema); - Configuration configuration = new Configuration(); + Configuration configuration = getTestConfiguration(); configuration.setBoolean("parquet.strings.signed-min-max.enabled", true); GroupWriteSupport.setSchema(schema, configuration); @@ -1000,7 +1025,7 @@ public void testMergeFooters() { */ @Test public void testWriteMetadataFileWithRelativeOutputPath() throws IOException { - Configuration conf = new Configuration(); + Configuration conf = getTestConfiguration(); FileSystem fs = FileSystem.get(conf); Path relativeRoot = new Path("target/_test_relative"); Path qualifiedRoot = fs.makeQualified(relativeRoot); @@ -1026,7 +1051,7 @@ public void testColumnIndexWriteRead() throws Exception { testFile.delete(); Path path = new Path(testFile.toURI()); - Configuration configuration = new Configuration(); + Configuration configuration = getTestConfiguration(); ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path); w.start(); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java index 188a79643d..80baaa68a4 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java @@ -30,6 +30,7 @@ import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -63,9 +64,13 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.util.ContextUtil; import org.apache.parquet.schema.MessageTypeParser; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RunWith(Parameterized.class) public class TestInputOutputFormat { private static final Logger LOG = LoggerFactory.getLogger(TestInputOutputFormat.class); @@ -82,9 +87,24 @@ public class TestInputOutputFormat { private Class> readMapperClass; private Class> writeMapperClass; + @Parameterized.Parameters(name = "vectored : {0}") + public static List params() { + return Arrays.asList(true, false); + } + + /** + * Read type: true for vectored IO. + */ + private final boolean readType; + + public TestInputOutputFormat(boolean readType) { + this.readType = readType; + } @Before public void setUp() { conf = new Configuration(); + // set the vector IO option + conf.setBoolean(ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED, readType); writeSchema = "message example {\n" + "required int32 line;\n" + "required binary content;\n" + @@ -335,8 +355,9 @@ public void testReadWriteWithCounter() throws Exception { assertTrue(value(readJob, "parquet", "bytesread") > 0L); assertTrue(value(readJob, "parquet", "bytestotal") > 0L); - assertTrue(value(readJob, "parquet", "bytesread") - == value(readJob, "parquet", "bytestotal")); + assertEquals("bytestotal != bytesread", + value(readJob, "parquet", "bytestotal"), + value(readJob, "parquet", "bytesread")); //not testing the time read counter since it could be zero due to the size of data is too small } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/vectorio/TestFileRangeBridge.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/vectorio/TestFileRangeBridge.java new file mode 100644 index 0000000000..5119177553 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/vectorio/TestFileRangeBridge.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.hadoop.util.vectorio; + +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeNoException; + +/** + * Test the {@link FileRangeBridge} class; requires + * Hadoop 3.3.5+ + */ +public class TestFileRangeBridge { + + private static final Logger LOG = LoggerFactory.getLogger(TestFileRangeBridge.class); + + /** + * Class required for tests to run: {@value }. + * Probed in setup. + */ + static final String CLASSNAME = "org.apache.hadoop.fs.FileRange"; + + @Before + public void setUp() { + + // look for the FileRange; if not found, skip + try { + this.getClass().getClassLoader().loadClass(CLASSNAME); + } catch (ReflectiveOperationException e) { + assumeNoException(e); + } + } + + /** + * Attempt to explicit instantiate the bridge; + * package private constructor. + */ + @Test + public void testInstantiate() throws Throwable { + new FileRangeBridge(); + } + + /** + * Is the shared bridge instance non-null, and does the + * {@link FileRangeBridge#bridgeAvailable()} predicate + * return true. + */ + @Test + public void testFileRangeBridgeAvailable() throws Throwable { + assertNotNull("FileRangeBridge instance null", FileRangeBridge.instance()); + assertTrue("Bridge not available", FileRangeBridge.bridgeAvailable()); + } + + /** + * Create a range and validate the getters return the original values. + */ + @Test + public void testCreateFileRange() { + Object reference = "backref"; + FileRangeBridge.WrappedFileRange range = FileRangeBridge.instance() + .createFileRange(512L, 16384, reference); + LOG.info("created range {}", range); + assertNotNull("null range", range); + assertNotNull("null range instance", range.getFileRange()); + assertEquals("offset of " + range, 512L, range.getOffset()); + assertEquals("length of " + range, 16384, range.getLength()); + assertSame("backref of " + range, reference, range.getReference()); + + // this isn't set until readVectored() is called + assertNull("non-null range future", range.getData()); + } + + /** + * Verify there is no parameter validation. + */ + @Test + public void testCreateInvalidRange() { + FileRangeBridge.instance() + .createFileRange(-1, -1, null); + } +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/vectorio/TestVectorIOBridge.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/vectorio/TestVectorIOBridge.java new file mode 100644 index 0000000000..8b6c7347e8 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/vectorio/TestVectorIOBridge.java @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.hadoop.util.vectorio; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.IntFunction; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.ByteBufferPool; +import org.apache.hadoop.io.ElasticByteBufferPool; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.io.ParquetFileRange; + +import static org.apache.parquet.hadoop.util.vectorio.BindingUtils.awaitFuture; +import static org.apache.parquet.hadoop.util.vectorio.VectorIOBridge.VECTOREDIO_CAPABILITY; +import static org.apache.parquet.hadoop.util.vectorio.VectorIOBridge.instance; +import static org.apache.parquet.hadoop.util.vectorio.VectorIOBridge.readVectoredRanges; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; + +/** + * Test the vector IO bridge. + * Much of this is lifted from hadoop-common test + * {@code AbstractContractVectoredReadTest}; + * with other utility methods from + * {@code org.apache.hadoop.fs.contract.ContractTestUtils}. + */ +public class TestVectorIOBridge { + private static final int DATASET_LEN = 64 * 1024; + private static final byte[] DATASET = dataset(DATASET_LEN, 'a', 32); + private static final String VECTORED_READ_FILE_NAME = "vectored_file.txt"; + + /** + * Timeout in seconds for vectored read operation in tests : {@value}. + */ + private static final int VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS = 5 * 60; + + /** + * relative vectored path. + */ + private final Path vectoredPath = new Path("target/test/vectored"); + + private final HeapByteBufferAllocator bufferAllocator = new HeapByteBufferAllocator(); + private final ElasticByteBufferPool pool = + new ElasticByteBufferPool(); + + private final IntFunction allocate = value -> { + return pool.getBuffer(false, value); + }; + + private FileSystem fileSystem; + private Path testFilePath; + + public TestVectorIOBridge() { + + } + + @Before + public void setUp() throws IOException { + // skip the tests if the FileRangeBridge goes not load. + assumeTrue("Bridge not available", FileRangeBridge.bridgeAvailable()); + + fileSystem = FileSystem.getLocal(new Configuration()); + testFilePath = fileSystem.makeQualified(vectoredPath); + createFile(fileSystem, testFilePath, DATASET); + } + + @After + public void tearDown() throws IOException { + if (fileSystem != null) { + fileSystem.delete(testFilePath, false); + } + } + + public FileSystem getFileSystem() { + return fileSystem; + } + + /** + * If the file range bridge is available, so must be the vector io bridge. + */ + @Test + public void testVectorIOBridgeAvailable() throws Throwable { + assertTrue("VectorIOBridge not available", VectorIOBridge.bridgeAvailable()); + } + + /** + * Create a dataset for use in the tests; all data is in the range + * base to (base+modulo-1) inclusive. + * + * @param len length of data + * @param base base of the data + * @param modulo the modulo + * + * @return the newly generated dataset + */ + private static byte[] dataset(int len, int base, int modulo) { + byte[] dataset = new byte[len]; + for (int i = 0; i < len; i++) { + dataset[i] = (byte) (base + (i % modulo)); + } + return dataset; + } + + /** + * Utility to return buffers back to the pool once all + * data has been read for each file range. + * + * @param fileRanges list of file range. + * @param pool buffer pool. + * + * @throws IOException any IOE + * @throws TimeoutException ideally this should never occur. + */ + public static void returnBuffersToPoolPostRead(List fileRanges, + ByteBufferPool pool) + throws IOException, TimeoutException { + for (ParquetFileRange range : fileRanges) { + ByteBuffer buffer = awaitFuture(range.getDataReadFuture(), + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS); + pool.putBuffer(buffer); + } + } + + /** + * Create a file. + * + * @param fs filesystem + * @param path path to write + * @param data source dataset. Can be null + * + * @throws IOException on any problem + */ + public static void createFile(FileSystem fs, + Path path, + byte[] data) throws IOException { + try (FSDataOutputStream stream = fs.create(path, true)) { + if (data != null && data.length > 0) { + stream.write(data); + } + } + } + + /** + * Open the test file. + * @return test file input stream + * @throws IOException failure to open + */ + private FSDataInputStream openTestFile() throws IOException { + return getFileSystem().open(testFilePath); + } + + /** + * Read a list of ranges, all adjacent. + */ + @Test + public void testVectoredReadMultipleRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + fileRanges.add(range(i * 100, 100)); + } + try (FSDataInputStream in = fs.open(testFilePath)) { + readVectoredRanges(in, fileRanges, allocate); + CompletableFuture[] completableFutures = new CompletableFuture[fileRanges.size()]; + int i = 0; + for (ParquetFileRange res : fileRanges) { + completableFutures[i++] = res.getDataReadFuture(); + } + CompletableFuture combinedFuture = CompletableFuture.allOf(completableFutures); + combinedFuture.get(); + + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + /** + * VectorIO and readFully() can coexist. + */ + @Test + public void testVectoredReadAndReadFully() throws Exception { + final int offset = 100; + final int length = 256; + List fileRanges = ranges(offset, length); + + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + byte[] readFullRes = new byte[length]; + in.readFully(offset, readFullRes); + ByteBuffer vecRes = awaitFuture(fileRanges.get(0).getDataReadFuture(), + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); + + final byte[] array = vecRes.array(); + assertDatasetEquals(0, "readFully", + vecRes, length, readFullRes); + } finally { + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + /** + * As the minimum seek value is 4*1024,none of the test ranges + * will get merged. + */ + @Test + public void testDisjointRanges() throws Exception { + List fileRanges = ranges( + 0, 100, + 4_000 + 101, 100, + 16_000 + 101, 100); + + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + /** + * Verify the stream really implements the readVectored API, rather + * than fall back to the base implementation. + */ + @Test + public void testStreamImplementsReadVectored() throws Exception { + + try (FSDataInputStream in = openTestFile()) { + final boolean streamDoesNativeVectorIO = instance().hasCapability( + in, + VECTOREDIO_CAPABILITY); + assertTrue("capability " + VECTOREDIO_CAPABILITY + " not supported by " + in, + streamDoesNativeVectorIO); + } + } + + /** + * As the minimum seek value is 4*1024, all the below ranges + * will get merged into one. + */ + @Test + public void testAllRangesMergedIntoOne() throws Exception { + List fileRanges = ranges( + 0, 100, + 4_000 + 101, 100, + 16_000 + 101, 100); + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + /** + * As the minimum seek value is 4*1024, the first three ranges will be + * merged into and other two will remain as it is. + */ + @Test + public void testSomeRangesMergedSomeUnmerged() throws Exception { + List fileRanges = ranges( + 8 * 1024, 100, + 14 * 1024, 100, + 10 * 1024, 100, + 2 * 1024 - 101, 100, + 40 * 1024, 1024); + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testOverlappingRanges() throws Exception { + List fileRanges = getSampleOverlappingRanges(); + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testSameRanges() throws Exception { + // Same ranges are special case of overlapping only. + List fileRanges = getSampleSameRanges(); + + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testSomeRandomNonOverlappingRanges() throws Exception { + List fileRanges = ranges( + 500, 100, + 1000, 200, + 50, 10, + 10, 5); + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testConsecutiveRanges() throws Exception { + List fileRanges = ranges( + 500, 100, + 600, 200, + 800, 100); + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testNegativeLengthRange() throws Exception { + verifyExceptionalVectoredRead(getFileSystem(), ranges(1, -50), + IllegalArgumentException.class); + } + + /** + * Negative ranges are rejected; the inner cause is an + * {@code EOFException}. + */ + @Test + public void testNegativeOffsetRange() throws Exception { + final RuntimeException ex = verifyExceptionalVectoredRead( + getFileSystem(), ranges(-1, 50), + RuntimeException.class); + if (!(ex.getCause() instanceof EOFException)) { + throw ex; + } + } + + /** + * Classic seek/read read after vectored IO. + */ + @Test + public void testNormalReadAfterVectoredRead() throws Exception { + List fileRanges = getSampleNonOverlappingRanges(); + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + + // read starting 200 bytes + byte[] res = new byte[200]; + in.read(res, 0, 200); + ByteBuffer buffer = ByteBuffer.wrap(res); + assertDatasetEquals(0, "normal_read", buffer, 200, DATASET); + assertEquals("Vectored read shouldn't change file pointer.", 200, in.getPos()); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + /** + * Vectored IO after Classic seek/read. + */ + @Test + public void testVectoredReadAfterNormalRead() throws Exception { + List fileRanges = getSampleNonOverlappingRanges(); + try (FSDataInputStream in = openTestFile()) { + // read starting 200 bytes + byte[] res = new byte[200]; + in.read(res, 0, 200); + ByteBuffer buffer = ByteBuffer.wrap(res); + assertDatasetEquals(0, "normal_read", buffer, 200, DATASET); + assertEquals("Vectored read shouldn't change file pointer.", 200, in.getPos()); + readVectoredRanges(in, fileRanges, allocate); + + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testMultipleVectoredReads() throws Exception { + List fileRanges1 = getSampleNonOverlappingRanges(); + + List fileRanges2 = getSampleNonOverlappingRanges(); + try (FSDataInputStream in = openTestFile()) { + + readVectoredRanges(in, fileRanges1, allocate); + readVectoredRanges(in, fileRanges2, allocate); + + validateVectoredReadResult(fileRanges2, DATASET); + validateVectoredReadResult(fileRanges1, DATASET); + returnBuffersToPoolPostRead(fileRanges1, pool); + returnBuffersToPoolPostRead(fileRanges2, pool); + } + } + + /** + * Test to validate EOF ranges. Default implementation fails with EOFException + * while reading the ranges. Some implementation like s3, checksum fs fail fast + * as they already have the file length calculated. + * + * @throws Exception + */ +/* +@Test +public void testEOFRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = ranges(DATASET_LEN, 100); + try (FSDataInputStream in = fs.open(testFilePath)) { + in.readVectored(fileRanges, allocate); + readVectoredRanges(in, fileRanges, allocate); + for (FileRange res : fileRanges) { + CompletableFuture data = res.getData(); + interceptFuture(EOFException.class, + "", + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS, + data); + } + } +}*/ + + /** + * Create a ParquetFileRange instance. + * @param offset offset in file + * @param length range length + * @return a range + */ + private ParquetFileRange range(final long offset, final int length) { + return new ParquetFileRange(offset, length); + } + + /** + * Create a list of ranges where the arguments are expected to + * be pairs of int offset and range. + * @param args an even-numbered list. + * @return a list of ranges. + */ + private List ranges(int... args) { + final int len = args.length; + assertTrue("range argument length of " + len + " is not even", + (len % 1) == 0); + List fileRanges = new ArrayList<>(); + for (int i = 0; i < len; i += 2) { + fileRanges.add(range(args[i], args[i + 1])); + } + return fileRanges; + } + + protected List getSampleNonOverlappingRanges() { + return ranges(0, 100, 110, 50); + } + + protected List getSampleOverlappingRanges() { + return ranges( + 100, 500, + 400, 500); + } + + protected List getConsecutiveRanges() { + return ranges( + 100, 500, + 600, 500); + } + + protected List getSampleSameRanges() { + return ranges( + 8_000, 1000, + 8_000, 1000, + 8_000, 1000); + } + + /** + * Assert that the data read matches the dataset at the given offset. + * This helps verify that the seek process is moving the read pointer + * to the correct location in the file. + * + * @param readOffset the offset in the file where the read began. + * @param operation operation name for the assertion. + * @param data data read in. + * @param length length of data to check. + * @param originalData original data. + */ + public static void assertDatasetEquals( + final int readOffset, + final String operation, + final ByteBuffer data, + int length, byte[] originalData) { + for (int i = 0; i < length; i++) { + int o = readOffset + i; + assertEquals(operation + " with read offset " + readOffset + + ": data[" + i + "] != DATASET[" + o + "]", + originalData[o], data.get()); + } + } + + /** + * Utility to validate vectored read results. + * + * @param fileRanges input ranges. + * @param originalData original data. + * + * @throws IOException any ioe. + */ + public static void validateVectoredReadResult(List fileRanges, + byte[] originalData) + throws IOException, TimeoutException { + CompletableFuture[] completableFutures = new CompletableFuture[fileRanges.size()]; + int i = 0; + for (ParquetFileRange res : fileRanges) { + completableFutures[i++] = res.getDataReadFuture(); + } + CompletableFuture combinedFuture = CompletableFuture.allOf(completableFutures); + awaitFuture(combinedFuture, + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS); + + for (ParquetFileRange res : fileRanges) { + CompletableFuture data = res.getDataReadFuture(); + ByteBuffer buffer = awaitFuture(data, + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS); + assertDatasetEquals((int) res.getOffset(), "vecRead", + buffer, res.getLength(), originalData); + } + } + + /** + * Validate that exceptions must be thrown during a vectored + * read operation with specific input ranges. + * + * @param fs FileSystem instance. + * @param fileRanges input file ranges. + * @param clazz type of exception expected. + * + * @throws Exception any other IOE. + */ + protected T verifyExceptionalVectoredRead( + FileSystem fs, + List fileRanges, + Class clazz) throws Exception { + + try (FSDataInputStream in = fs.open(testFilePath)) { + readVectoredRanges(in, fileRanges, allocate); + fail("expected error reading " + in); + // for the compiler + return null; + } catch (AssertionError e) { + throw e; + } catch (Exception e) { + if (!clazz.isAssignableFrom(e.getClass())) { + throw e; + } + return (T) e; + } + } +}