Skip to content

Commit

Permalink
GH-3078: Use Hadoop FileSystem.openFile() to open files
Browse files Browse the repository at this point in the history
* Open files with FileSystem.openFile(), passing in file status
* And read policy of "parquet, vector, random, adaptive"
  • Loading branch information
steveloughran committed Nov 27, 2024
1 parent 7b599ed commit ab31f8f
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@

package org.apache.parquet.hadoop.util;

import static org.apache.parquet.hadoop.util.wrapped.io.FutureIO.awaitFuture;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -29,6 +34,24 @@

public class HadoopInputFile implements InputFile {

/**
* openFile() option name for setting the read policy: {@value}.
*/
private static final String OPENFILE_READ_POLICY_KEY = "fs.option.openfile.read.policy";

/**
* Read policy when opening parquet files: {@value}.
* <p>Policy-aware stores pick the first policy they recognize in the list.
* everything recognizes "random";
* "vector" came in with 3.4.0, while "parquet" came with Hadoop 3.4.1
* parquet means "this is a Parquet file, so be clever about footers, prefetch,
* and expect vector and/or random IO".
* <p>In Hadoop 3.4.1, "parquet" and "vector" are both mapped to "random" for the
* S3A connector, but as the ABFS and GCS connectors do footer caching, they
* may use it as a hint to say "fetch the footer and keep it in memory"
*/
private static final String PARQUET_READ_POLICY = "parquet, vector, random, adaptive";

private final FileSystem fs;
private final FileStatus stat;
private final Configuration conf;
Expand Down Expand Up @@ -70,9 +93,38 @@ public long getLength() {
return stat.getLen();
}

/**
* Open the file.
* <p>Uses {@code FileSystem.openFile()} so that
* the existing FileStatus can be passed down: saves a HEAD request on cloud storage.
* and ignored everywhere else.
*
* @return the input stream.
*
* @throws InterruptedIOException future was interrupted
* @throws IOException if something went wrong
* @throws RuntimeException any nested RTE thrown
*/
@Override
public SeekableInputStream newStream() throws IOException {
return HadoopStreams.wrap(fs.open(stat.getPath()));
FSDataInputStream stream;
try {
// this method is async so that implementations may do async HEAD head
// requests. Not done in S3A/ABFS when a file status passed down (as is done here)
final CompletableFuture<FSDataInputStream> future = fs.openFile(stat.getPath())
.withFileStatus(stat)
.opt(OPENFILE_READ_POLICY_KEY, PARQUET_READ_POLICY)
.build();
stream = awaitFuture(future);
} catch (RuntimeException e) {
// S3A < 3.3.5 would raise illegal path exception if the openFile path didn't
// equal the path in the FileStatus; Hive virtual FS could create this condition.
// As the path to open is derived from stat.getPath(), this condition seems
// near-impossible to create -but is handled here for due diligence.
stream = fs.open(stat.getPath());
}

return HadoopStreams.wrap(stream);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,29 @@ public static <T> T awaitFuture(final Future<T> future, final long timeout, fina
}
}

/**
* Given a future, evaluate it.
* <p>
* Any exception generated in the future is
* extracted and rethrown.
* </p>
* @param future future to evaluate
* @param <T> type of the result.
* @return the result, if all went well.
* @throws InterruptedIOException future was interrupted
* @throws IOException if something went wrong
* @throws RuntimeException any nested RTE thrown
*/
public static <T> T awaitFuture(final Future<T> future)
throws InterruptedIOException, IOException, RuntimeException {
try {
return future.get();
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException(e.toString()).initCause(e);
} catch (ExecutionException e) {
throw unwrapInnerException(e);
}
}
/**
* From the inner cause of an execution exception, extract the inner cause
* to an IOException, raising Errors immediately.
Expand Down

0 comments on commit ab31f8f

Please sign in to comment.