Skip to content

Commit

Permalink
Move all use of the dynamic methods into HadoopFileIO
Browse files Browse the repository at this point in the history
* eases future upgrades of hadoop dependencies.
* updated uses of FileSystem.open() where the file read policy
  is known and/or exists()/getFileStatus() calls are executed
  immediately before.
* use it in footer reading as well as file reading

This looks like coverage of the core production use;
ignoring CLI operations

Change-Id: Id1c35619a04a500c7cccd131358b22eaa1e0f984
  • Loading branch information
steveloughran committed Jul 15, 2024
1 parent 530d6c3 commit 80dd0d8
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
import org.apache.parquet.hadoop.util.HadoopFileIO;

public class HadoopFSKeyMaterialStore implements FileKeyMaterialStore {
public static final String KEY_MATERIAL_FILE_PREFIX = "_KEY_MATERIAL_FOR_";
Expand Down Expand Up @@ -73,7 +74,7 @@ public String getKeyMaterial(String keyIDInFile) throws ParquetCryptoRuntimeExce
}

private void loadKeyMaterialMap() {
try (FSDataInputStream keyMaterialStream = hadoopFileSystem.open(keyMaterialFile)) {
try (FSDataInputStream keyMaterialStream = HadoopFileIO.openFile(hadoopFileSystem, keyMaterialFile, false)) {
JsonNode keyMaterialJson = objectMapper.readTree(keyMaterialStream);
keyMaterialMap =
objectMapper.readValue(keyMaterialJson.traverse(), new TypeReference<Map<String, String>>() {});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopFileIO;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
import org.apache.parquet.hadoop.util.wrapped.io.FutureIO;
Expand Down Expand Up @@ -438,19 +439,37 @@ public static List<Footer> readSummaryFile(Configuration configuration, FileStat
return footersFromSummaryFile(parent, mergedFooters);
}

/**
* Read the summary metadata for a path, first looking for {@code basePath/_common_metadata}
* if {@code skipRowGroups} is true.
* If row groups are not to be skipped, or the common file was not found, look for {@code basePath/_metadata},
* @param configuration configuration to use
* @param basePath parent path
* @param skipRowGroups should row groups be excluded from the summary
* @return the metadata or null if no summary was found.
* @throws IOException failure to load a summary.
*/
static ParquetMetadata readSummaryMetadata(Configuration configuration, Path basePath, boolean skipRowGroups)
throws IOException {
Path metadataFile = new Path(basePath, PARQUET_METADATA_FILE);
Path commonMetaDataFile = new Path(basePath, PARQUET_COMMON_METADATA_FILE);
FileSystem fileSystem = basePath.getFileSystem(configuration);
if (skipRowGroups && fileSystem.exists(commonMetaDataFile)) {
if (skipRowGroups) {
// reading the summary file that does not contain the row groups
LOG.info("reading summary file: {}", commonMetaDataFile);
return readFooter(configuration, commonMetaDataFile, filter(skipRowGroups));
} else if (fileSystem.exists(metadataFile)) {
// fetch the file status to save another probe when opening the file
FileStatus commonFileStatus = HadoopFileIO.getFileStatusOrNull(fileSystem, commonMetaDataFile);
if (commonFileStatus != null) {
LOG.info("reading summary file: {}", commonMetaDataFile);
return readFooter(configuration, commonFileStatus, filter(true));
}
}
// row groups required of the common medata data not found: try to read the file specific metadata;
FileStatus metadataFileStatus = HadoopFileIO.getFileStatusOrNull(fileSystem, metadataFile);
if (metadataFileStatus != null) {
LOG.info("reading summary file: {}", metadataFile);
return readFooter(configuration, metadataFile, filter(skipRowGroups));
return readFooter(configuration, metadataFileStatus, filter(skipRowGroups));
} else {
// no metadata files found
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.parquet.hadoop.util;

import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.util.wrapped.io.DynamicWrappedIO;

/**
* Class for enhanced FileIO, calling {@link DynamicWrappedIO} as appropriate.
* If/when Parquet sets a baseline release with the relevant methods directly
* invocable, this class is where changes would need to be made.
*/
public class HadoopFileIO {

/**
* Get the status of file; if the file is not found downgrade to null.
* @param fileSystem FileSystem to use
* @param filePath file to load
* @return the status or null
* @throws IOException IO failure other than FileNotFoundException
*/
public static FileStatus getFileStatusOrNull(final FileSystem fileSystem, final Path filePath) throws IOException {
final FileStatus commonFileStatus;
try {
commonFileStatus = fileSystem.getFileStatus(filePath);
} catch (FileNotFoundException e) {
// file does not exist
return null;
}
return commonFileStatus;
}

/**
* Open a file for reading using a read policy appropriate for the purpose,
* passing in a status object containing filename, length and possibly more
* <p>
* Note that for filesystems with lazy IO, existence checks may be delayed until
* the first read() operation.
*
* @param fileSystem FileSystem to use
* @param status file status
* @param randomIO is the file parquet file to be read with random IO?
*
* @return an input stream.
*
* @throws IOException failure to open the file.
*/
public static FSDataInputStream openFile(
final FileSystem fileSystem, final FileStatus status, final boolean randomIO) throws IOException {
return DynamicWrappedIO.openFile(fileSystem, status.getPath(), status, readPolicies(randomIO));
}

/**
* Open a file for reading using a read policy appropriate for the purpose.
* <p>
* Note that for filesystems with lazy IO, existence checks may be delayed until
* the first read() operation.
*
* @param fileSystem FileSystem to use
* @param path path to file
* @param randomIO is the file parquet file to be read with random IO?
*
* @return an input stream.
*
* @throws IOException failure to open the file.
*/
public static FSDataInputStream openFile(final FileSystem fileSystem, final Path path, final boolean randomIO)
throws IOException {
return DynamicWrappedIO.openFile(fileSystem, path, null, readPolicies(randomIO));
}

/**
* Choose the read policies for the desired purpose.
* @param randomIO is the file parquet file to be read with random IO?
* @return appropriate policy string
*/
private static String readPolicies(final boolean randomIO) {
return randomIO ? DynamicWrappedIO.PARQUET_READ_POLICIES : DynamicWrappedIO.SEQUENTIAL_READ_POLICIES;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,40 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.util.wrapped.io.DynamicWrappedIO;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.SeekableInputStream;

/**
* Create a Hadoop input file.
*/
public class HadoopInputFile implements InputFile {

private final FileSystem fs;
private final FileStatus stat;
private final Configuration conf;

/**
* Open from a path.
* <p>
* Includes a {@code getFileStatus()} call to fill in {@link #stat}.
* @param path path of the file
* @param conf configuration.
* @return the input file
* @throws IOException IO failure creating the FS or retrieving the FileStatus
*/
public static HadoopInputFile fromPath(Path path, Configuration conf) throws IOException {
FileSystem fs = path.getFileSystem(conf);
return new HadoopInputFile(fs, fs.getFileStatus(path), conf);
}

/**
* Create from path raising an RTE on any IO failure.
* See {@link #fromPath(Path, Configuration)}
* @param path path of the file
* @param conf configuration.
* @return the input file
* @throws RuntimeException IO failure creating the FS or retrieving the FileStatus
*/
public static HadoopInputFile fromPathUnchecked(Path path, Configuration conf) {
try {
return fromPath(path, conf);
Expand All @@ -47,6 +66,16 @@ public static HadoopInputFile fromPathUnchecked(Path path, Configuration conf) {
}
}

/**
* Create from a file status; determing both the path and the file length.
* <p>
* When opening a file from an object store, this may eliminate the overhead
* of a HEAD request when later opening the file.
* @param stat status of the file
* @param conf configuration
* @return the input file
* @throws IOException IO failure creating the FS or retrieving the FileStatus
*/
public static HadoopInputFile fromStatus(FileStatus stat, Configuration conf) throws IOException {
FileSystem fs = stat.getPath().getFileSystem(conf);
return new HadoopInputFile(fs, stat, conf);
Expand All @@ -73,7 +102,7 @@ public long getLength() {

@Override
public SeekableInputStream newStream() throws IOException {
return HadoopStreams.wrap(DynamicWrappedIO.openFile(fs, stat, DynamicWrappedIO.PARQUET_READ_POLICIES));
return HadoopStreams.wrap(HadoopFileIO.openFile(fs, stat, true));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public final class DynamicWrappedIO {
*/
public static final String PARQUET_READ_POLICIES = "parquet, columnar, vector, random";

/**
* Read policy for sequential files: {@value}.
*/
public static final String SEQUENTIAL_READ_POLICIES = "sequential";

/**
* Was wrapped IO loaded?
* In the hadoop codebase, this is true.
Expand Down Expand Up @@ -378,12 +383,17 @@ public static boolean isAvailable() {
* the file status.
* <p>
* If not, falls back to the classic {@code fs.open(Path)} call.
* @param fs filesystem
* <p>
* Note that for filesystems with lazy IO, existence checks may be delayed until
* the first read() operation.
* @param fileSystem FileSystem to use
* @param status file status
* @param policy read policy
* @throws IOException any IO failure.
*/
public static FSDataInputStream openFile(FileSystem fs, FileStatus status, String policy) throws IOException {
public static FSDataInputStream openFile(
FileSystem fileSystem, Path path, @Nullable FileStatus status, String policy) throws IOException {

final DynamicWrappedIO instance = DynamicWrappedIO.instance();
FSDataInputStream stream;
if (instance.fileSystem_openFile_available()) {
Expand All @@ -394,10 +404,10 @@ public static FSDataInputStream openFile(FileSystem fs, FileStatus status, Strin
// For other stores, it ultimately invokes the classic open(Path)
// call so is no more expensive than before.
LOG.debug("Opening file {} through fileSystem_openFile() with policy {}", status, policy);
stream = instance.fileSystem_openFile(fs, status.getPath(), policy, status, null, null);
stream = instance.fileSystem_openFile(fileSystem, path, policy, status, null, null);
} else {
LOG.debug("Opening file {} through open()", status);
stream = fs.open(status.getPath());
stream = fileSystem.open(path);
}
return stream;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir;
import static org.apache.parquet.hadoop.util.HadoopFileIO.openFile;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
import static org.apache.parquet.schema.Type.Repetition.REPEATED;
Expand Down Expand Up @@ -98,7 +99,6 @@
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.hadoop.util.HiddenFileFilter;
import org.apache.parquet.hadoop.util.wrapped.io.DynamicWrappedIO;
import org.apache.parquet.internal.column.columnindex.BinaryTruncator;
import org.apache.parquet.internal.column.columnindex.BoundaryOrder;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
Expand Down Expand Up @@ -652,7 +652,7 @@ public void testAlignmentWithPadding() throws Exception {
long fileLen = stat.getLen();

long footerLen;
try (FSDataInputStream data = DynamicWrappedIO.openFile(fs, stat, DynamicWrappedIO.PARQUET_READ_POLICIES)) {
try (FSDataInputStream data = openFile(fs, stat, true)) {
data.seek(fileLen - 8); // 4-byte offset + "PAR1"
footerLen = BytesUtils.readIntLittleEndian(data);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.hadoop.util.HadoopFileIO;
import org.apache.parquet.io.ParquetFileRange;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -176,7 +177,7 @@ public static void createFile(FileSystem fs, Path path, byte[] data) throws IOEx
* @throws IOException failure to open
*/
private FSDataInputStream openTestFile() throws IOException {
return getFileSystem().open(testFilePath);
return HadoopFileIO.openFile(getFileSystem(), testFilePath, true);
}

/**
Expand Down

0 comments on commit 80dd0d8

Please sign in to comment.