From 863a1f98ea81e3b697ade1b10d0923829c0ce6ac Mon Sep 17 00:00:00 2001
From: Steve Loughran <stevel@cloudera.com>
Date: Mon, 9 Dec 2019 18:51:55 +0000
Subject: [PATCH 1/2] HDFS-15042 Add more tests for
 ByteBufferPositionedReadable.

Changes

* Javadocs in ByteBufferPositionedReadable are a bit stricter.
* switch to parameterized JUnit test runs for on/off heap buffers
* factor put main assertions into higher level asserts

New tests
* negative offset for reads of actual/empty buffers
* read position > EOF.
* empty files
* verify that interleaving simple reads/seeks work.

Change-Id: I27294b15f7cad8e1b87150d9e9fac0623fc44c45
---
 .../fs/ByteBufferPositionedReadable.java      |  43 +-
 .../apache/hadoop/fs/FSExceptionMessages.java |   6 +
 .../hadoop/crypto/TestCryptoStreams.java      |   9 +-
 .../apache/hadoop/hdfs/DFSInputStream.java    |   4 +
 .../hadoop/hdfs/TestByteBufferPread.java      | 366 ++++++++++++++----
 5 files changed, 334 insertions(+), 94 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java
index f8282d88c46c3..0578c63617316 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java
@@ -25,9 +25,24 @@
 import org.apache.hadoop.classification.InterfaceStability;
 
 /**
+ * The javadocs for this interface follows RFC 2119 rules regarding the use of
+ * MUST, MUST NOT, MAY, and SHALL.
+ * <p>
  * Implementers of this interface provide a positioned read API that writes to a
  * {@link ByteBuffer} rather than a {@code byte[]}.
- *
+ * <p>
+ * <b>Thread safety</b>
+ * <p>
+ * These operations doe not change the current offset of a stream as returned
+ * by {@link Seekable#getPos()} and MUST BE thread-safe.
+ * Implementations MAY block other readers while one read operation
+ * is in progress; this MAY include blocking <i>all</i> read() calls.
+ * <p>
+ * <b>Concurrent file access</b>
+ * <p>
+ * No guarantees are made as to when or whether changes made to a file
+ * are visible to readers of this API reading the data through an already
+ * open stream instance.
  * @see PositionedReadable
  * @see ByteBufferReadable
  */
@@ -54,12 +69,11 @@ public interface ByteBufferPositionedReadable {
    * stream supports this interface, otherwise they might get a
    * {@link UnsupportedOperationException}.
    * <p>
-   * Implementations should treat 0-length requests as legitimate, and must not
+   * Implementations MUST treat 0-length requests as legitimate, and MUST NOT
    * signal an error upon their receipt.
-   * <p>
-   * This does not change the current offset of a file, and is thread-safe.
-   *
-   * @param position position within file
+   * The {@code position} offset MUST BE zero or positive; if negative
+   * an EOFException SHALL BE raised.
+   * @param position position within stream. This MUST be &gt;=0.
    * @param buf the ByteBuffer to receive the results of the read operation.
    * @return the number of bytes read, possibly zero, or -1 if reached
    *         end-of-stream
@@ -78,12 +92,23 @@ public interface ByteBufferPositionedReadable {
    * {@link #read(long, ByteBuffer)}, the difference is that this method is
    * guaranteed to read data until the {@link ByteBuffer} is full, or until
    * the end of the data stream is reached.
-   *
-   * @param position position within file
+   * <p>
+   * The {@code position} offset MUST BE zero or positive; if negative
+   * an EOFException SHALL BE raised.
+   * <p>
+   * Implementations MUST treat 0-length requests as legitimate -and, and so
+   * MUST NOT signal an error if the read position was valid.
+   * For zero-byte reads, read position greater {@code getPos()} MAY be
+   * considered valid; a negative position MUST always fail.
+   * <p>
+   * If the EOF was reached before the buffer was completely read -the
+   * state of the buffer is undefined. It may be unchanged or it may be
+   * partially or completely overwritten.
+   * @param position position within stream. This must be &gt;=0.
    * @param buf the ByteBuffer to receive the results of the read operation.
    * @throws IOException if there is some error performing the read
    * @throws EOFException the end of the data was reached before
-   * the read operation completed
+   * the read operation completed, or the position value is negative.
    * @see #read(long, ByteBuffer)
    */
   void readFully(long position, ByteBuffer buf) throws IOException;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSExceptionMessages.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSExceptionMessages.java
index f4616f1d72bc7..9f1568137952b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSExceptionMessages.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSExceptionMessages.java
@@ -35,6 +35,12 @@ public class FSExceptionMessages {
   public static final String NEGATIVE_SEEK =
     "Cannot seek to a negative offset";
 
+  /**
+   * Negative offset read forbidden : {@value}
+   */
+  public static final String NEGATIVE_POSITION_READ =
+    "Cannot read from a negative position";
+
   /**
    * Seeks : {@value}
    */
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
index 73c6249612387..6f03f2faac736 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
@@ -45,6 +45,9 @@
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.apache.hadoop.fs.FSExceptionMessages.CANNOT_SEEK_PAST_EOF;
+import static org.apache.hadoop.fs.FSExceptionMessages.EOF_IN_READ_FULLY;
+import static org.apache.hadoop.fs.FSExceptionMessages.NEGATIVE_POSITION_READ;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
 
 public class TestCryptoStreams extends CryptoStreamsTestBase {
@@ -339,16 +342,16 @@ public void readFully(long position, ByteBuffer buf) throws IOException {
       }
 
       if (position > length) {
-        throw new IOException("Cannot read after EOF.");
+        throw new EOFException(CANNOT_SEEK_PAST_EOF);
       }
       if (position < 0) {
-        throw new IOException("Cannot read to negative offset.");
+        throw new EOFException(NEGATIVE_POSITION_READ);
       }
 
       checkStream();
 
       if (position + buf.remaining() > length) {
-        throw new EOFException("Reach the end of stream.");
+        throw new EOFException(EOF_IN_READ_FULLY);
       }
 
       buf.put(data, (int) position, buf.remaining());
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 7b664e4f31164..846146f232bad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -94,6 +94,7 @@
 
 import javax.annotation.Nonnull;
 
+import static org.apache.hadoop.fs.FSExceptionMessages.NEGATIVE_POSITION_READ;
 import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics;
 
 /****************************************************************
@@ -1684,6 +1685,9 @@ public int read(long position, final ByteBuffer buf) throws IOException {
   @Override
   public void readFully(long position, final ByteBuffer buf)
       throws IOException {
+    if (position < 0) {
+      throw new EOFException(NEGATIVE_POSITION_READ);
+    }
     int nread = 0;
     while (buf.hasRemaining()) {
       int nbytes = read(position + nread, buf);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java
index 1c7f1500f3689..2ddfe5614d7fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java
@@ -17,21 +17,34 @@
  */
 package org.apache.hadoop.hdfs;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Random;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
 
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import static org.apache.hadoop.fs.FSExceptionMessages.EOF_IN_READ_FULLY;
+import static org.apache.hadoop.fs.FSExceptionMessages.NEGATIVE_POSITION_READ;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -41,22 +54,55 @@
  * This class tests the DFS positional read functionality on a single node
  * mini-cluster. These tests are inspired from {@link TestPread}. The tests
  * are much less comprehensive than other pread tests because pread already
- * internally uses {@link ByteBuffer}s.
+ * internally uses ByteBuffers.
  */
+@SuppressWarnings({"NestedAssignment", "StaticNonFinalField"})
+@RunWith(Parameterized.class)
 public class TestByteBufferPread {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestByteBufferPread.class);
+
   private static MiniDFSCluster cluster;
   private static FileSystem fs;
   private static byte[] fileContents;
   private static Path testFile;
+  private static Path emptyFile;
   private static Random rand;
 
   private static final long SEED = 0xDEADBEEFL;
   private static final int BLOCK_SIZE = 4096;
   private static final int FILE_SIZE = 12 * BLOCK_SIZE;
 
+  public static final int HALF_SIZE = FILE_SIZE / 2;
+  public static final int QUARTER_SIZE = FILE_SIZE / 4;
+  public static final int EOF_POS = FILE_SIZE -1;
+
+  private final boolean useHeap;
+
+  private ByteBuffer buffer;
+
+  private ByteBuffer emptyBuffer;
+
+  /**
+   * This test suite is parameterized for the different heap allocation
+   * options.
+   * @return a list of allocation policies to test.
+   */
+  @Parameterized.Parameters(name = "heap={0}")
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][]{
+        {true},
+        {false},
+    });
+  }
+
+  public TestByteBufferPread(final boolean useHeap) {
+    this.useHeap = useHeap;
+  }
+
   @BeforeClass
-  public static void setup() throws IOException {
+  public static void setupClass() throws IOException {
     // Setup the cluster with a small block size so we can create small files
     // that span multiple blocks
     Configuration conf = new Configuration();
@@ -73,39 +119,27 @@ public static void setup() throws IOException {
     try (FSDataOutputStream out = fs.create(testFile, (short) 3)) {
       out.write(fileContents);
     }
+    emptyFile = new Path("/byte-buffer-pread-emptyfile.dat");
+    ContractTestUtils.touch(fs, emptyFile);
   }
 
-  /**
-   * Test preads with {@link java.nio.HeapByteBuffer}s.
-   */
-  @Test
-  public void testPreadWithHeapByteBuffer() throws IOException {
-    testPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE));
-    testPreadWithFullByteBuffer(ByteBuffer.allocate(FILE_SIZE));
-    testPreadWithPositionedByteBuffer(ByteBuffer.allocate(FILE_SIZE));
-    testPreadWithLimitedByteBuffer(ByteBuffer.allocate(FILE_SIZE));
-    testPositionedPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE));
-    testPreadFullyWithByteBuffer(ByteBuffer.allocate(FILE_SIZE));
-  }
-
-  /**
-   * Test preads with {@link java.nio.DirectByteBuffer}s.
-   */
-  @Test
-  public void testPreadWithDirectByteBuffer() throws IOException {
-    testPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
-    testPreadWithFullByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
-    testPreadWithPositionedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
-    testPreadWithLimitedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
-    testPositionedPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
-    testPreadFullyWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
+  @Before
+  public void setup() throws IOException {
+    if (useHeap) {
+      buffer = ByteBuffer.allocate(FILE_SIZE);
+      emptyBuffer = ByteBuffer.allocate(0);
+    } else {
+      buffer = ByteBuffer.allocateDirect(FILE_SIZE);
+      emptyBuffer = ByteBuffer.allocateDirect(0);
+    }
   }
 
   /**
    * Reads the entire testFile using the pread API and validates that its
-   * contents are properly loaded into the supplied {@link ByteBuffer}.
+   * contents are properly loaded into the {@link ByteBuffer}.
    */
-  private void testPreadWithByteBuffer(ByteBuffer buffer) throws IOException {
+  @Test
+  public void testPreadWithByteBuffer() throws IOException {
     int bytesRead;
     int totalBytesRead = 0;
     try (FSDataInputStream in = fs.open(testFile)) {
@@ -113,33 +147,67 @@ private void testPreadWithByteBuffer(ByteBuffer buffer) throws IOException {
         totalBytesRead += bytesRead;
         // Check that each call to read changes the position of the ByteBuffer
         // correctly
-        assertEquals(totalBytesRead, buffer.position());
+        assertBufferPosition(totalBytesRead);
       }
 
       // Make sure the buffer is full
-      assertFalse(buffer.hasRemaining());
+      assertBufferIsFull();
       // Make sure the contents of the read buffer equal the contents of the
       // file
-      buffer.position(0);
-      byte[] bufferContents = new byte[FILE_SIZE];
-      buffer.get(bufferContents);
-      assertArrayEquals(bufferContents, fileContents);
+      assertBufferEqualsFileContents(0, FILE_SIZE, 0);
     }
   }
 
+  /**
+   * Assert that the value of {@code buffer.position()} equals
+   * the expected value.
+   * @param pos required position.
+   */
+  private void assertBufferPosition(final int pos) {
+    assertEquals("Buffer position",
+        pos, buffer.position());
+  }
+
+  /**
+   * Assert the stream is at the given position.
+   * @param in stream
+   * @param pos required position
+   * @throws IOException seek() failure.
+   */
+  private void assertStreamPosition(final Seekable in, long pos)
+      throws IOException {
+    assertEquals("Buffer position",
+        pos, in.getPos());
+  }
+
+  /**
+   * Assert that the buffer is full.
+   */
+  private void assertBufferIsFull() {
+    assertFalse("Buffer is not full", buffer.hasRemaining());
+  }
+
+  /**
+   * Assert that the buffer is not full full.
+   */
+  private void assertBufferIsNotFull() {
+    assertTrue("Buffer is full", buffer.hasRemaining());
+  }
+
   /**
    * Attempts to read the testFile into a {@link ByteBuffer} that is already
    * full, and validates that doing so does not change the contents of the
    * supplied {@link ByteBuffer}.
    */
-  private void testPreadWithFullByteBuffer(ByteBuffer buffer)
+  @Test
+  public void testPreadWithFullByteBuffer()
           throws IOException {
     // Load some dummy data into the buffer
     byte[] existingBufferBytes = new byte[FILE_SIZE];
     rand.nextBytes(existingBufferBytes);
     buffer.put(existingBufferBytes);
     // Make sure the buffer is full
-    assertFalse(buffer.hasRemaining());
+    assertBufferIsFull();
 
     try (FSDataInputStream in = fs.open(testFile)) {
       // Attempt to read into the buffer, 0 bytes should be read since the
@@ -148,7 +216,7 @@ private void testPreadWithFullByteBuffer(ByteBuffer buffer)
 
       // Double check the buffer is still full and its contents have not
       // changed
-      assertFalse(buffer.hasRemaining());
+      assertBufferIsFull();
       buffer.position(0);
       byte[] bufferContents = new byte[FILE_SIZE];
       buffer.get(bufferContents);
@@ -161,32 +229,32 @@ private void testPreadWithFullByteBuffer(ByteBuffer buffer)
    * {@link ByteBuffer#limit()} on the buffer. Validates that only half of the
    * testFile is loaded into the buffer.
    */
-  private void testPreadWithLimitedByteBuffer(
-          ByteBuffer buffer) throws IOException {
+  @Test
+  public void testPreadWithLimitedByteBuffer() throws IOException {
     int bytesRead;
     int totalBytesRead = 0;
     // Set the buffer limit to half the size of the file
-    buffer.limit(FILE_SIZE / 2);
+    buffer.limit(HALF_SIZE);
 
     try (FSDataInputStream in = fs.open(testFile)) {
+      in.seek(EOF_POS);
       while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) {
         totalBytesRead += bytesRead;
         // Check that each call to read changes the position of the ByteBuffer
         // correctly
-        assertEquals(totalBytesRead, buffer.position());
+        assertBufferPosition(totalBytesRead);
       }
 
       // Since we set the buffer limit to half the size of the file, we should
       // have only read half of the file into the buffer
-      assertEquals(totalBytesRead, FILE_SIZE / 2);
+      assertEquals(HALF_SIZE, totalBytesRead);
       // Check that the buffer is full and the contents equal the first half of
       // the file
-      assertFalse(buffer.hasRemaining());
-      buffer.position(0);
-      byte[] bufferContents = new byte[FILE_SIZE / 2];
-      buffer.get(bufferContents);
-      assertArrayEquals(bufferContents,
-              Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2));
+      assertBufferIsFull();
+      assertBufferEqualsFileContents(0, HALF_SIZE, 0);
+
+      // position hasn't changed
+      assertStreamPosition(in, EOF_POS);
     }
   }
 
@@ -194,97 +262,231 @@ private void testPreadWithLimitedByteBuffer(
    * Reads half of the testFile into the {@link ByteBuffer} by setting the
    * {@link ByteBuffer#position()} the half the size of the file. Validates that
    * only half of the testFile is loaded into the buffer.
+   * <p>
+   * This test interleaves reading from the stream by the classic input
+   * stream API, verifying those bytes are also as expected.
+   * This lets us validate the requirement that these positions reads must
+   * not interfere with the conventional read sequence.
    */
-  private void testPreadWithPositionedByteBuffer(
-          ByteBuffer buffer) throws IOException {
+  @Test
+  public void testPreadWithPositionedByteBuffer() throws IOException {
     int bytesRead;
     int totalBytesRead = 0;
     // Set the buffer position to half the size of the file
-    buffer.position(FILE_SIZE / 2);
+    buffer.position(HALF_SIZE);
+    int counter = 0;
 
     try (FSDataInputStream in = fs.open(testFile)) {
+      assertEquals("Byte read from stream",
+          fileContents[counter++], in.read());
       while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) {
         totalBytesRead += bytesRead;
         // Check that each call to read changes the position of the ByteBuffer
         // correctly
-        assertEquals(totalBytesRead + FILE_SIZE / 2, buffer.position());
+        assertBufferPosition(totalBytesRead + HALF_SIZE);
+        // read the next byte.
+        assertEquals("Byte read from stream",
+            fileContents[counter++], in.read());
       }
 
       // Since we set the buffer position to half the size of the file, we
       // should have only read half of the file into the buffer
-      assertEquals(totalBytesRead, FILE_SIZE / 2);
+      assertEquals("bytes read",
+          HALF_SIZE, totalBytesRead);
       // Check that the buffer is full and the contents equal the first half of
       // the file
-      assertFalse(buffer.hasRemaining());
-      buffer.position(FILE_SIZE / 2);
-      byte[] bufferContents = new byte[FILE_SIZE / 2];
-      buffer.get(bufferContents);
-      assertArrayEquals(bufferContents,
-              Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2));
+      assertBufferIsFull();
+      assertBufferEqualsFileContents(HALF_SIZE, HALF_SIZE, 0);
     }
   }
 
+  /**
+   * Assert the buffer ranges matches that in the file.
+   * @param bufferPosition buffer position
+   * @param length length of data to check
+   * @param fileOffset offset in file.
+   */
+  private void assertBufferEqualsFileContents(int bufferPosition,
+      int length,
+      int fileOffset) {
+    buffer.position(bufferPosition);
+    byte[] bufferContents = new byte[length];
+    buffer.get(bufferContents);
+    assertArrayEquals(
+        "Buffer data from [" + bufferPosition + "-" + length + "]",
+        bufferContents,
+        Arrays.copyOfRange(fileContents, fileOffset, fileOffset + length));
+  }
+
   /**
    * Reads half of the testFile into the {@link ByteBuffer} by specifying a
    * position for the pread API that is half of the file size. Validates that
    * only half of the testFile is loaded into the buffer.
    */
-  private void testPositionedPreadWithByteBuffer(
-          ByteBuffer buffer) throws IOException {
+  @Test
+  public void testPositionedPreadWithByteBuffer() throws IOException {
     int bytesRead;
     int totalBytesRead = 0;
 
     try (FSDataInputStream in = fs.open(testFile)) {
       // Start reading from halfway through the file
-      while ((bytesRead = in.read(totalBytesRead + FILE_SIZE / 2,
+      while ((bytesRead = in.read(totalBytesRead + HALF_SIZE,
               buffer)) > 0) {
         totalBytesRead += bytesRead;
         // Check that each call to read changes the position of the ByteBuffer
         // correctly
-        assertEquals(totalBytesRead, buffer.position());
+        assertBufferPosition(totalBytesRead);
       }
 
       // Since we starting reading halfway through the file, the buffer should
       // only be half full
-      assertEquals(totalBytesRead, FILE_SIZE / 2);
-      assertEquals(buffer.position(), FILE_SIZE / 2);
-      assertTrue(buffer.hasRemaining());
+      assertEquals("bytes read", HALF_SIZE, totalBytesRead);
+      assertBufferPosition(HALF_SIZE);
+      assertBufferIsNotFull();
       // Check that the buffer contents equal the second half of the file
-      buffer.position(0);
-      byte[] bufferContents = new byte[FILE_SIZE / 2];
-      buffer.get(bufferContents);
-      assertArrayEquals(bufferContents,
-              Arrays.copyOfRange(fileContents, FILE_SIZE / 2, FILE_SIZE));
+      assertBufferEqualsFileContents(0, HALF_SIZE, HALF_SIZE);
     }
   }
 
   /**
    * Reads the entire testFile using the preadFully API and validates that its
-   * contents are properly loaded into the supplied {@link ByteBuffer}.
+   * contents are properly loaded into the {@link ByteBuffer}.
    */
-  private void testPreadFullyWithByteBuffer(ByteBuffer buffer)
-          throws IOException {
+  @Test
+  public void testPreadFullyWithByteBuffer() throws IOException {
     int totalBytesRead = 0;
     try (FSDataInputStream in = fs.open(testFile)) {
       in.readFully(totalBytesRead, buffer);
       // Make sure the buffer is full
-      assertFalse(buffer.hasRemaining());
+      assertBufferIsFull();
       // Make sure the contents of the read buffer equal the contents of the
       // file
-      buffer.position(0);
-      byte[] bufferContents = new byte[FILE_SIZE];
-      buffer.get(bufferContents);
-      assertArrayEquals(bufferContents, fileContents);
+      assertBufferEqualsFileContents(0, FILE_SIZE, 0);
+    }
+  }
+
+  /**
+   * readFully past the end of the file into an empty buffer; expect this
+   * to be a no-op.
+   */
+  @Test
+  public void testPreadFullyPastEOFEmptyByteBuffer() throws IOException {
+    try (FSDataInputStream in = fs.open(testFile)) {
+      in.readFully(FILE_SIZE + 10, emptyBuffer);
+    }
+  }
+
+  /**
+   * Reads from a negative position -expects a failure.
+   * Also uses the new openFile() API to improve its coverage.
+   */
+  @Test
+  public void testPreadFullyNegativeOffset() throws Exception {
+    try (FSDataInputStream in = fs.openFile(testFile).build().get()) {
+      in.seek(QUARTER_SIZE);
+      intercept(EOFException.class, NEGATIVE_POSITION_READ,
+          () -> in.readFully(-1, buffer));
+      // the stream position has not changed.
+      assertStreamPosition(in, QUARTER_SIZE);
+    }
+  }
+
+  /**
+   * Read fully with a start position past the EOF -expects a failure.
+   */
+  @Test
+  public void testPreadFullyPositionPastEOF() throws Exception {
+    try (FSDataInputStream in = fs.openFile(testFile).build().get()) {
+      in.seek(QUARTER_SIZE);
+      intercept(EOFException.class, EOF_IN_READ_FULLY,
+          () -> in.readFully(FILE_SIZE * 2, buffer));
+      // the stream position has not changed.
+      assertStreamPosition(in, QUARTER_SIZE);
+    }
+  }
+
+  /**
+   * Read which goes past the EOF; expects a failure.
+   * The final state of the buffer is undefined; it may fail fast or fail late.
+   * Also uses the new openFile() API to improve its coverage.
+   */
+  @Test
+  public void testPreadFullySpansEOF() throws Exception {
+    try (FSDataInputStream in = fs.openFile(testFile).build().get()) {
+      intercept(EOFException.class, EOF_IN_READ_FULLY,
+          () -> in.readFully(FILE_SIZE - 10, buffer));
+      if (buffer.position() > 0) {
+        // this implementation does not do a range check before the read;
+        // it got partway through before failing.
+        // this is not an error -just inefficient.
+        LOG.warn("Buffer reads began before range checks with {}", in);
+      }
+    }
+  }
+
+  /**
+   * Reads from a negative position into an empty buffer -expects a failure.
+   * That is: position checks happen before any probes for an empty buffer.
+   */
+  @Test
+  public void testPreadFullyEmptyBufferNegativeOffset() throws Exception {
+    try (FSDataInputStream in = fs.openFile(testFile).build().get()) {
+      intercept(EOFException.class, NEGATIVE_POSITION_READ,
+          () -> in.readFully(-1, emptyBuffer));
+    }
+  }
+
+  /**
+   * Reads from position 0 on an empty file into a non-empty buffer
+   * will fail as there is not enough data.
+   */
+  @Test
+  public void testPreadFullyEmptyFile() throws Exception {
+    try (FSDataInputStream in = fs.openFile(emptyFile).build().get()) {
+      intercept(EOFException.class, EOF_IN_READ_FULLY,
+          () -> in.readFully(0, buffer));
+    }
+  }
+
+  /**
+   * Reads from position 0 on an empty file into an empty buffer,
+   * This MUST succeed.
+   */
+  @Test
+  public void testPreadFullyEmptyFileEmptyBuffer() throws Exception {
+    try (FSDataInputStream in = fs.openFile(emptyFile).build().get()) {
+      in.readFully(0, emptyBuffer);
+      assertStreamPosition(in, 0);
+    }
+  }
+
+  /**
+   * Pread from position 0 of an empty file.
+   * This MUST succeed, but 0 bytes will be read.
+   */
+  @Test
+  public void testPreadEmptyFile() throws Exception {
+    try (FSDataInputStream in = fs.openFile(emptyFile).build().get()) {
+      int bytesRead = in.read(0, buffer);
+      assertEquals("bytes read from empty file", -1, bytesRead);
+      assertStreamPosition(in, 0);
     }
   }
 
   @AfterClass
   public static void shutdown() throws IOException {
     try {
-      fs.delete(testFile, false);
-      fs.close();
+      if (fs != null) {
+        fs.delete(testFile, false);
+        fs.delete(emptyFile, false);
+        fs.close();
+      }
     } finally {
-      cluster.shutdown(true);
+      fs = null;
+      if (cluster != null) {
+        cluster.shutdown(true);
+        cluster = null;
+      }
     }
   }
 }

From ac0bcd8e379f186a5d244284ef4ffcd640c89ed6 Mon Sep 17 00:00:00 2001
From: Steve Loughran <stevel@cloudera.com>
Date: Fri, 27 Nov 2020 14:01:00 +0000
Subject: [PATCH 2/2] HDFS-15042. Javaoc and checkstyle on FSExceptionMessages.

Change-Id: I4c8b17dd897a11d4c8e22b9cbc366455c958242c
---
 .../org/apache/hadoop/fs/FSExceptionMessages.java  | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSExceptionMessages.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSExceptionMessages.java
index 9f1568137952b..c74c1a6485105 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSExceptionMessages.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSExceptionMessages.java
@@ -20,29 +20,29 @@
 
 /**
  * Standard strings to use in exception messages in filesystems
- * HDFS is used as the reference source of the strings
+ * HDFS is used as the reference source of the strings.
  */
 public class FSExceptionMessages {
 
   /**
-   * The operation failed because the stream is closed: {@value}
+   * The operation failed because the stream is closed: {@value}.
    */
   public static final String STREAM_IS_CLOSED = "Stream is closed!";
 
   /**
-   * Negative offset seek forbidden : {@value}
+   * Negative offset seek forbidden : {@value}.
    */
   public static final String NEGATIVE_SEEK =
-    "Cannot seek to a negative offset";
+      "Cannot seek to a negative offset";
 
   /**
-   * Negative offset read forbidden : {@value}
+   * Negative offset read forbidden : {@value}.
    */
   public static final String NEGATIVE_POSITION_READ =
-    "Cannot read from a negative position";
+      "Cannot read from a negative position";
 
   /**
-   * Seeks : {@value}
+   * Seeks : {@value}.
    */
   public static final String CANNOT_SEEK_PAST_EOF =
       "Attempted to seek or read past the end of the file";