diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java
index f8282d88c46c3..0578c63617316 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java
@@ -25,9 +25,24 @@
import org.apache.hadoop.classification.InterfaceStability;
/**
+ * The javadocs for this interface follows RFC 2119 rules regarding the use of
+ * MUST, MUST NOT, MAY, and SHALL.
+ *
* Implementers of this interface provide a positioned read API that writes to a
* {@link ByteBuffer} rather than a {@code byte[]}.
- *
+ *
+ * Thread safety
+ *
+ * These operations doe not change the current offset of a stream as returned
+ * by {@link Seekable#getPos()} and MUST BE thread-safe.
+ * Implementations MAY block other readers while one read operation
+ * is in progress; this MAY include blocking all read() calls.
+ *
+ * Concurrent file access
+ *
+ * No guarantees are made as to when or whether changes made to a file
+ * are visible to readers of this API reading the data through an already
+ * open stream instance.
* @see PositionedReadable
* @see ByteBufferReadable
*/
@@ -54,12 +69,11 @@ public interface ByteBufferPositionedReadable {
* stream supports this interface, otherwise they might get a
* {@link UnsupportedOperationException}.
*
- * Implementations should treat 0-length requests as legitimate, and must not
+ * Implementations MUST treat 0-length requests as legitimate, and MUST NOT
* signal an error upon their receipt.
- *
- * This does not change the current offset of a file, and is thread-safe.
- *
- * @param position position within file
+ * The {@code position} offset MUST BE zero or positive; if negative
+ * an EOFException SHALL BE raised.
+ * @param position position within stream. This MUST be >=0.
* @param buf the ByteBuffer to receive the results of the read operation.
* @return the number of bytes read, possibly zero, or -1 if reached
* end-of-stream
@@ -78,12 +92,23 @@ public interface ByteBufferPositionedReadable {
* {@link #read(long, ByteBuffer)}, the difference is that this method is
* guaranteed to read data until the {@link ByteBuffer} is full, or until
* the end of the data stream is reached.
- *
- * @param position position within file
+ *
+ * The {@code position} offset MUST BE zero or positive; if negative
+ * an EOFException SHALL BE raised.
+ *
+ * Implementations MUST treat 0-length requests as legitimate -and, and so
+ * MUST NOT signal an error if the read position was valid.
+ * For zero-byte reads, read position greater {@code getPos()} MAY be
+ * considered valid; a negative position MUST always fail.
+ *
+ * If the EOF was reached before the buffer was completely read -the
+ * state of the buffer is undefined. It may be unchanged or it may be
+ * partially or completely overwritten.
+ * @param position position within stream. This must be >=0.
* @param buf the ByteBuffer to receive the results of the read operation.
* @throws IOException if there is some error performing the read
* @throws EOFException the end of the data was reached before
- * the read operation completed
+ * the read operation completed, or the position value is negative.
* @see #read(long, ByteBuffer)
*/
void readFully(long position, ByteBuffer buf) throws IOException;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSExceptionMessages.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSExceptionMessages.java
index f4616f1d72bc7..c74c1a6485105 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSExceptionMessages.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSExceptionMessages.java
@@ -20,23 +20,29 @@
/**
* Standard strings to use in exception messages in filesystems
- * HDFS is used as the reference source of the strings
+ * HDFS is used as the reference source of the strings.
*/
public class FSExceptionMessages {
/**
- * The operation failed because the stream is closed: {@value}
+ * The operation failed because the stream is closed: {@value}.
*/
public static final String STREAM_IS_CLOSED = "Stream is closed!";
/**
- * Negative offset seek forbidden : {@value}
+ * Negative offset seek forbidden : {@value}.
*/
public static final String NEGATIVE_SEEK =
- "Cannot seek to a negative offset";
+ "Cannot seek to a negative offset";
/**
- * Seeks : {@value}
+ * Negative offset read forbidden : {@value}.
+ */
+ public static final String NEGATIVE_POSITION_READ =
+ "Cannot read from a negative position";
+
+ /**
+ * Seeks : {@value}.
*/
public static final String CANNOT_SEEK_PAST_EOF =
"Attempted to seek or read past the end of the file";
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
index 73c6249612387..6f03f2faac736 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
@@ -45,6 +45,9 @@
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.apache.hadoop.fs.FSExceptionMessages.CANNOT_SEEK_PAST_EOF;
+import static org.apache.hadoop.fs.FSExceptionMessages.EOF_IN_READ_FULLY;
+import static org.apache.hadoop.fs.FSExceptionMessages.NEGATIVE_POSITION_READ;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
public class TestCryptoStreams extends CryptoStreamsTestBase {
@@ -339,16 +342,16 @@ public void readFully(long position, ByteBuffer buf) throws IOException {
}
if (position > length) {
- throw new IOException("Cannot read after EOF.");
+ throw new EOFException(CANNOT_SEEK_PAST_EOF);
}
if (position < 0) {
- throw new IOException("Cannot read to negative offset.");
+ throw new EOFException(NEGATIVE_POSITION_READ);
}
checkStream();
if (position + buf.remaining() > length) {
- throw new EOFException("Reach the end of stream.");
+ throw new EOFException(EOF_IN_READ_FULLY);
}
buf.put(data, (int) position, buf.remaining());
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 7b664e4f31164..846146f232bad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -94,6 +94,7 @@
import javax.annotation.Nonnull;
+import static org.apache.hadoop.fs.FSExceptionMessages.NEGATIVE_POSITION_READ;
import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics;
/****************************************************************
@@ -1684,6 +1685,9 @@ public int read(long position, final ByteBuffer buf) throws IOException {
@Override
public void readFully(long position, final ByteBuffer buf)
throws IOException {
+ if (position < 0) {
+ throw new EOFException(NEGATIVE_POSITION_READ);
+ }
int nread = 0;
while (buf.hasRemaining()) {
int nbytes = read(position + nread, buf);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java
index 1c7f1500f3689..2ddfe5614d7fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java
@@ -17,21 +17,34 @@
*/
package org.apache.hadoop.hdfs;
+import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Random;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import static org.apache.hadoop.fs.FSExceptionMessages.EOF_IN_READ_FULLY;
+import static org.apache.hadoop.fs.FSExceptionMessages.NEGATIVE_POSITION_READ;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -41,22 +54,55 @@
* This class tests the DFS positional read functionality on a single node
* mini-cluster. These tests are inspired from {@link TestPread}. The tests
* are much less comprehensive than other pread tests because pread already
- * internally uses {@link ByteBuffer}s.
+ * internally uses ByteBuffers.
*/
+@SuppressWarnings({"NestedAssignment", "StaticNonFinalField"})
+@RunWith(Parameterized.class)
public class TestByteBufferPread {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestByteBufferPread.class);
+
private static MiniDFSCluster cluster;
private static FileSystem fs;
private static byte[] fileContents;
private static Path testFile;
+ private static Path emptyFile;
private static Random rand;
private static final long SEED = 0xDEADBEEFL;
private static final int BLOCK_SIZE = 4096;
private static final int FILE_SIZE = 12 * BLOCK_SIZE;
+ public static final int HALF_SIZE = FILE_SIZE / 2;
+ public static final int QUARTER_SIZE = FILE_SIZE / 4;
+ public static final int EOF_POS = FILE_SIZE -1;
+
+ private final boolean useHeap;
+
+ private ByteBuffer buffer;
+
+ private ByteBuffer emptyBuffer;
+
+ /**
+ * This test suite is parameterized for the different heap allocation
+ * options.
+ * @return a list of allocation policies to test.
+ */
+ @Parameterized.Parameters(name = "heap={0}")
+ public static Collection