diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java index 48c79fa5eb..8d1199d192 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java @@ -19,8 +19,13 @@ package org.apache.parquet.hadoop.util; +import static org.apache.parquet.hadoop.util.wrapped.io.FutureIO.awaitFuture; + import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.CompletableFuture; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -29,6 +34,24 @@ public class HadoopInputFile implements InputFile { + /** + * openFile() option name for setting the read policy: {@value}. + */ + private static final String OPENFILE_READ_POLICY_KEY = "fs.option.openfile.read.policy"; + + /** + * Read policy when opening parquet files: {@value}. + *
Policy-aware stores pick the first policy they recognize in the list. + * everything recognizes "random"; + * "vector" came in with 3.4.0, while "parquet" came with Hadoop 3.4.1 + * parquet means "this is a Parquet file, so be clever about footers, prefetch, + * and expect vector and/or random IO". + *
In Hadoop 3.4.1, "parquet" and "vector" are both mapped to "random" for the + * S3A connector, but as the ABFS and GCS connectors do footer caching, they + * may use it as a hint to say "fetch the footer and keep it in memory" + */ + private static final String PARQUET_READ_POLICY = "parquet, vector, random, adaptive"; + private final FileSystem fs; private final FileStatus stat; private final Configuration conf; @@ -70,9 +93,38 @@ public long getLength() { return stat.getLen(); } + /** + * Open the file. + *
Uses {@code FileSystem.openFile()} so that
+ * the existing FileStatus can be passed down: saves a HEAD request on cloud storage.
+ * and ignored everywhere else.
+ *
+ * @return the input stream.
+ *
+ * @throws InterruptedIOException future was interrupted
+ * @throws IOException if something went wrong
+ * @throws RuntimeException any nested RTE thrown
+ */
@Override
public SeekableInputStream newStream() throws IOException {
- return HadoopStreams.wrap(fs.open(stat.getPath()));
+ FSDataInputStream stream;
+ try {
+ // this method is async so that implementations may do async HEAD head
+ // requests. Not done in S3A/ABFS when a file status passed down (as is done here)
+ final CompletableFuture
+ * Any exception generated in the future is
+ * extracted and rethrown.
+ *