Skip to content

Commit

Permalink
apacheGH-3078. openFile() invocation/failure tests.
Browse files Browse the repository at this point in the history
The builder API is a real PITA to mock, even though it
works great in application code.

Uses the o.a.h.fs.impl implementation, and I've created
HADOOP-19355. Mark FutureDataInputStreamBuilderImpl as VisibleForTesting
to make it clear that this is allowed.
  • Loading branch information
steveloughran committed Dec 3, 2024
1 parent ab31f8f commit 79a4c04
Show file tree
Hide file tree
Showing 3 changed files with 369 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public SeekableInputStream newStream() throws IOException {
FSDataInputStream stream;
try {
// this method is async so that implementations may do async HEAD head
// requests. Not done in S3A/ABFS when a file status passed down (as is done here)
// requests, such as S3A/ABFS when a file status is passed down.
final CompletableFuture<FSDataInputStream> future = fs.openFile(stat.getPath())
.withFileStatus(stat)
.opt(OPENFILE_READ_POLICY_KEY, PARQUET_READ_POLICY)
Expand All @@ -121,7 +121,14 @@ public SeekableInputStream newStream() throws IOException {
// equal the path in the FileStatus; Hive virtual FS could create this condition.
// As the path to open is derived from stat.getPath(), this condition seems
// near-impossible to create -but is handled here for due diligence.
stream = fs.open(stat.getPath());
try {
stream = fs.open(stat.getPath());
} catch (IOException | RuntimeException ex) {
// failure on this attempt attaches the failure of the openFile() call
// so the stack trace is preserved.
ex.addSuppressed(e);
throw ex;
}
}

return HadoopStreams.wrap(stream);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;

/**
* Based on {@code org.apache.hadoop.fs.FileSystemTestHelper},
* This class exports the package private {@code FileSystem}
* methods which can be used to push FS instances into the
* map of URI -> fs instance.
* <p>
* This makes it easy to add instances of Mocked filesystems
* to the map, which will then be picked up by any
* code retrieving an FS instance for that URI
* <p>
* The API is stable and used elsewhere. What is important
* is to remove FS instances after each test case.
* {@link #cleanFilesystemCache()} cleans the entire cache
* and should be used in teardown methods.
*/
public final class FileSystemTestBinder {

/**
* Empty configuration.
* Part of the FileSystem method signatures, but not used behind them.
*/
public static final Configuration CONF = new Configuration(false);

/**
* Inject a filesystem into the cache.
* @param uri filesystem URI
* @param fs filesystem to inject
* @throws UncheckedIOException Hadoop UGI problems.
*/
public static void addFileSystemForTesting(URI uri, FileSystem fs) {
try {
FileSystem.addFileSystemForTesting(uri, CONF, fs);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

/**
* Clean up the filesystem cache.
* This swallows any IOE nominally raised in the process, to ensure
* this can safely invoked in teardowns.
*/
public static void cleanFilesystemCache() {
try {
FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
} catch (IOException ignored) {
// Ignore the exception as if getCurrentUser() fails then it'll
// have been impossible to add mock instances to a per-user cache.
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.parquet.hadoop.util;

import static org.apache.hadoop.fs.FileSystemTestBinder.addFileSystemForTesting;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestBinder;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
import org.apache.parquet.io.SeekableInputStream;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/**
* Test suite to validate behavior of opening files through
* use of FileSystem.openFile(), especially fallback
* to the original FileSystem.open() method when
* openFile() raises an IllegalArgumentException or other RTE.
* <p>
* These tests use classes in the package org.apache.hadoop.fs.impl}.
* Although an implementation package, it is tagged as `LimitedPrivate("Filesystems")`;
* it is already used outside the hadoop codebase (e.g. google gcs).
*/
public class TestHadoopOpenFile {

private static final int FIRST = MockHadoopInputStream.TEST_ARRAY[0];
private URI fsUri;
private FileStatus status;
private Path path;
private Configuration conf;
private IOException fileNotFound;
private IllegalArgumentException illegal;

@Before
public void setUp() throws URISyntaxException {
// the schema "mock:" is used to not only be confident our injected
// instance is picked up, but to ensure that there will be no
// contamination of any real schema, such as file:
fsUri = new URI("mock://path/");
path = new Path("mock://path/file");

conf = new Configuration(false);
fileNotFound = new FileNotFoundException("file not found");
illegal = new IllegalArgumentException("illegal");
status = new FileStatus(10, false, 1, 1, 0, path);
}

/**
* Clean up the entire FS cache for the current user.
*/
@After
public void tearDown() {
FileSystemTestBinder.cleanFilesystemCache();
}

/**
* The healthy path for opening a file.
*/
@Test
public void testOpenFileGoodPath() throws Throwable {
final FileSystem mockFS = prepareMockFS();
final FSDataInputStream in = new FSDataInputStream(new MockHadoopInputStream());
final StubOpenFileBuilder opener = new StubOpenFileBuilder(mockFS, path, CompletableFuture.completedFuture(in));
doReturn(opener).when(mockFS).openFile(path);

// this looks up the FS binding via the status file path.
openAndRead(fileFromStatus());

// The fallback call of open(path) never took place.
Mockito.verify(mockFS, never()).open(path);
}

/**
* The openFile() call raises a RuntimeException which it is caught and the
* classic open() call invoked.
*/
@Test
public void testOpenFileEarlyFailure() throws Throwable {
final FileSystem mockFS = prepareMockFS();
final FSDataInputStream in = new FSDataInputStream(new MockHadoopInputStream());

Mockito.doThrow(illegal).when(mockFS).openFile(path);
doReturn(in).when(mockFS).open(path);

// this looks up the FS binding via the status file path.
openAndRead(fileFromStatus());
}

/**
* openFile() failure during the completable future execution with an
* RTE raised.
* Again, this triggers a fallback to open().
*/
@Test
public void testOpenFileLateFailure() throws Throwable {
final FileSystem mockFS = prepareMockFS();
final FSDataInputStream in = new FSDataInputStream(new MockHadoopInputStream());

final StubOpenFileBuilder opener = new StubOpenFileBuilder(
mockFS, path, CompletableFuture.completedFuture(null).thenApply((f) -> {
throw illegal;
}));
doReturn(opener).when(mockFS).openFile(path);
doReturn(in).when(mockFS).open(path);

openAndRead(fileFromStatus());
}

/**
* Open a stream, read the first byte, and assert that it matches
* what is expected.
*
* @param inputFile input file
*
* @throws IOException failure to open
*/
private static void openAndRead(final HadoopInputFile inputFile) throws IOException {
try (SeekableInputStream stream = inputFile.newStream()) {
Assert.assertEquals("byte read", FIRST, stream.read());
}
}

/**
* If openFile() raises an IOException within the future,
* then it is thrown and the classic open() call never invoked.
*/
@Test
public void testOpenFileRaisesIOException() throws Throwable {
final FileSystem mockFS = prepareMockFS();

final StubOpenFileBuilder opener = new StubOpenFileBuilder(
mockFS, path, CompletableFuture.completedFuture(null).thenApply((f) -> {
// throw a wrapped IOE
throw new UncheckedIOException(fileNotFound);
}));
doReturn(opener).when(mockFS).openFile(path);

final HadoopInputFile inputFile = fileFromStatus();
Assert.assertThrows(FileNotFoundException.class, inputFile::newStream);
Mockito.verify(mockFS, never()).open(path);
}

/**
* If openFile() raises a RuntimeException, this it is caught and the.
* classic open() call invoked.
* If that call raises an IOE.
* Outcome: the IOE is thrown but the caught RTE is added to the
* suppressed list.
*/
@Test
public void testOpenFileDoubleFailure() throws Throwable {
final FileSystem mockFS = prepareMockFS();

Mockito.doThrow(illegal).when(mockFS).openFile(path);
Mockito.doThrow(fileNotFound).when(mockFS).open(path);

// this looks up the FS binding via the status file path.
final HadoopInputFile inputFile = fileFromStatus();

final FileNotFoundException caught = Assert.assertThrows(FileNotFoundException.class, inputFile::newStream);
Assert.assertSame(fileNotFound, caught);
final Throwable[] suppressed = caught.getSuppressed();
Assert.assertEquals("number of suppressed exceptions", 1, suppressed.length);
Assert.assertSame(illegal, suppressed[0]);
}

/**
* The handling of a double RTE is the same as the case of
* the sequence of: RTE, IOE.
*/
@Test
public void testOpenFileDoubleRTE() throws Throwable {
final FileSystem mockFS = prepareMockFS();

Mockito.doThrow(illegal).when(mockFS).openFile(path);
NullPointerException npe = new NullPointerException("null");
Mockito.doThrow(npe).when(mockFS).open(path);

// this looks up the FS binding via the status file path.
final HadoopInputFile inputFile = fileFromStatus();

final NullPointerException caught = Assert.assertThrows(NullPointerException.class, inputFile::newStream);
Assert.assertSame(npe, caught);
final Throwable[] suppressed = caught.getSuppressed();
Assert.assertEquals("number of suppressed exceptions", 1, suppressed.length);
Assert.assertSame(illegal, suppressed[0]);
}

/**
* Create a mock FileSystem with the foundational operations
* mocked. The FS is added as the binding for the mock URI.
*
* @return a mock FileSystem
*
* @throws IOException stub signature only.
*/
private FileSystem prepareMockFS() throws IOException {
final FileSystem mockFS = mock(FileSystem.class);
doNothing().when(mockFS).close();
doReturn(conf).when(mockFS).getConf();
doReturn(status).when(mockFS).getFileStatus(path);

// register the FS instance under the mock URI
addFileSystemForTesting(fsUri, mockFS);
return mockFS;
}

/**
* Build an input file from the status field.
* @return an input file.
* @throws IOException failure to create the associated filesystem.
*/
private HadoopInputFile fileFromStatus() throws IOException {
return HadoopInputFile.fromStatus(status, conf);
}

/**
* Stub implementation of {@link FutureDataInputStreamBuilder}.
* Trying to mock the interface is troublesome as the interface has added
* some new methods over time, instead this uses the base implementation class
* within o.a.h.fs.impl.
*/
private static final class StubOpenFileBuilder extends FutureDataInputStreamBuilderImpl {

/**
* Operation to invoke to build the result.
*/
private final CompletableFuture<FSDataInputStream> result;

/**
* Create the builder. The FS and path must be non-null.
*
* @param fileSystem fs
* @param path path to open
* @param result builder result.
*/
private StubOpenFileBuilder(
final FileSystem fileSystem, Path path, final CompletableFuture<FSDataInputStream> result) {
super(fileSystem, path);
this.result = result;
}

@Override
public CompletableFuture<FSDataInputStream> build()
throws IllegalArgumentException, UnsupportedOperationException {
return result;
}
}
}

0 comments on commit 79a4c04

Please sign in to comment.