From ab31f8fb646dcaa0f04c1d82d90851cef164781c Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 26 Nov 2024 15:40:25 +0000 Subject: [PATCH] GH-3078: Use Hadoop FileSystem.openFile() to open files * Open files with FileSystem.openFile(), passing in file status * And read policy of "parquet, vector, random, adaptive" --- .../parquet/hadoop/util/HadoopInputFile.java | 54 ++++++++++++++++++- .../hadoop/util/wrapped/io/FutureIO.java | 23 ++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java index 48c79fa5eb..8d1199d192 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java @@ -19,8 +19,13 @@ package org.apache.parquet.hadoop.util; +import static org.apache.parquet.hadoop.util.wrapped.io.FutureIO.awaitFuture; + import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.CompletableFuture; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -29,6 +34,24 @@ public class HadoopInputFile implements InputFile { + /** + * openFile() option name for setting the read policy: {@value}. + */ + private static final String OPENFILE_READ_POLICY_KEY = "fs.option.openfile.read.policy"; + + /** + * Read policy when opening parquet files: {@value}. + *

Policy-aware stores pick the first policy they recognize in the list. + * everything recognizes "random"; + * "vector" came in with 3.4.0, while "parquet" came with Hadoop 3.4.1 + * parquet means "this is a Parquet file, so be clever about footers, prefetch, + * and expect vector and/or random IO". + *

In Hadoop 3.4.1, "parquet" and "vector" are both mapped to "random" for the + * S3A connector, but as the ABFS and GCS connectors do footer caching, they + * may use it as a hint to say "fetch the footer and keep it in memory" + */ + private static final String PARQUET_READ_POLICY = "parquet, vector, random, adaptive"; + private final FileSystem fs; private final FileStatus stat; private final Configuration conf; @@ -70,9 +93,38 @@ public long getLength() { return stat.getLen(); } + /** + * Open the file. + *

Uses {@code FileSystem.openFile()} so that + * the existing FileStatus can be passed down: saves a HEAD request on cloud storage. + * and ignored everywhere else. + * + * @return the input stream. + * + * @throws InterruptedIOException future was interrupted + * @throws IOException if something went wrong + * @throws RuntimeException any nested RTE thrown + */ @Override public SeekableInputStream newStream() throws IOException { - return HadoopStreams.wrap(fs.open(stat.getPath())); + FSDataInputStream stream; + try { + // this method is async so that implementations may do async HEAD head + // requests. Not done in S3A/ABFS when a file status passed down (as is done here) + final CompletableFuture future = fs.openFile(stat.getPath()) + .withFileStatus(stat) + .opt(OPENFILE_READ_POLICY_KEY, PARQUET_READ_POLICY) + .build(); + stream = awaitFuture(future); + } catch (RuntimeException e) { + // S3A < 3.3.5 would raise illegal path exception if the openFile path didn't + // equal the path in the FileStatus; Hive virtual FS could create this condition. + // As the path to open is derived from stat.getPath(), this condition seems + // near-impossible to create -but is handled here for due diligence. + stream = fs.open(stat.getPath()); + } + + return HadoopStreams.wrap(stream); } @Override diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/FutureIO.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/FutureIO.java index 47f4e959b4..23533c75bf 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/FutureIO.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/FutureIO.java @@ -70,6 +70,29 @@ public static T awaitFuture(final Future future, final long timeout, fina } } + /** + * Given a future, evaluate it. + *

+ * Any exception generated in the future is + * extracted and rethrown. + *

+ * @param future future to evaluate + * @param type of the result. + * @return the result, if all went well. + * @throws InterruptedIOException future was interrupted + * @throws IOException if something went wrong + * @throws RuntimeException any nested RTE thrown + */ + public static T awaitFuture(final Future future) + throws InterruptedIOException, IOException, RuntimeException { + try { + return future.get(); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException(e.toString()).initCause(e); + } catch (ExecutionException e) { + throw unwrapInnerException(e); + } + } /** * From the inner cause of an execution exception, extract the inner cause * to an IOException, raising Errors immediately.