Skip to content

Commit

Permalink
Fix Len = 0 and Insufficient Buffer behaviours for positioned reads (#…
Browse files Browse the repository at this point in the history
…203)

## Description of change

According to InputStream documentation Len = 0 should return 0

`If len is zero, then no bytes are read and 0 is returned;` 

Following the InputStream implementation, this commit is adding argument
checks to Positioned Reads. This validation is implemented in the
SeekableInputStream to ensure all subclasses behave the same way in the
unhappy path.

Checks are done in the same order with InputStream implementation. 

#### Relevant issues

#201

#### Does this contribution introduce any breaking changes to the
existing APIs or behaviors?

Yes. For a 0-length read to a 0-sized object read will now return 0
instead of -1.
Insufficient Buffer Capacity will now throw IndexOutOfBoundsException
instead of IllegalArgumentException.


#### Does this contribution introduce any new public APIs or behaviors?
No, it changes existing public API behaviour. 


#### How was the contribution tested?
Added new unit tests. 

#### Does this contribution need a changelog entry?
- [N/A] I have updated the CHANGELOG or README if appropriate

---

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license and I agree to the terms of
the [Developer Certificate of Origin
(DCO)](https://developercertificate.org/).
  • Loading branch information
fuatbasik authored Jan 7, 2025
1 parent 6555978 commit 32946d0
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,11 @@ public int read() throws IOException {
@Override
public int read(byte @NonNull [] buffer, int offset, int length) throws IOException {
throwIfClosed("cannot read from closed stream");
validatePositionedReadArgs(position, buffer, offset, length);

if (this.position >= getContentLength()) {
if (length == 0) {
return 0;
} else if (this.position >= getContentLength()) {
return EOF;
}

Expand Down Expand Up @@ -188,23 +191,30 @@ public long getPos() {
* Reads the last n bytes from the stream into a byte buffer. Blocks until end of stream is
* reached. Leaves the position of the stream unaltered.
*
* @param buf buffer to read data into
* @param off start position in buffer at which data is written
* @param n the number of bytes to read; the n-th byte should be the last byte of the stream.
* @param buffer buffer to read data into
* @param offset start position in buffer at which data is written
* @param length the number of bytes to read; the n-th byte should be the last byte of the stream.
* @return the total number of bytes read into the buffer
*/
@Override
public int readTail(byte[] buf, int off, int n) throws IOException {
public int readTail(byte[] buffer, int offset, int length) throws IOException {
throwIfClosed("cannot read from closed stream");
validatePositionedReadArgs(position, buffer, offset, length);

if (length == 0) {
return 0;
}

return this.telemetry.measureVerbose(
() ->
Operation.builder()
.name(OPERATION_READ)
.attribute(StreamAttributes.variant(FLAVOR_TAIL))
.attribute(StreamAttributes.uri(this.s3URI))
.attribute(StreamAttributes.range(getContentLength() - n, getContentLength() - 1))
.attribute(
StreamAttributes.range(getContentLength() - length, getContentLength() - 1))
.build(),
() -> logicalIO.readTail(buf, off, n));
() -> logicalIO.readTail(buffer, offset, length));
}

/**
Expand All @@ -221,7 +231,7 @@ public void close() throws IOException {
.attribute(
StreamAttributes.streamRelativeTimestamp(System.nanoTime() - streamBirth))
.build(),
() -> this.logicalIO.close());
this.logicalIO::close);

// Flush telemetry after a stream closes to have full coverage of all operations of this stream
this.telemetry.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.IOException;
import java.io.InputStream;
import software.amazon.s3.analyticsaccelerator.common.Preconditions;

/**
* A SeekableInputStream is like a conventional InputStream but equipped with two additional
Expand Down Expand Up @@ -57,4 +58,36 @@ public abstract class SeekableInputStream extends InputStream {
* @throws IOException if an error occurs while reading the file
*/
public abstract int readTail(byte[] buf, int off, int n) throws IOException;

/**
* Validates the arguments for a read operation. This method is available to use in all subclasses
* to ensure consistency.
*
* @param position the position to read from
* @param buffer the buffer to read into
* @param offset the offset in the buffer to start writing at
* @param length the number of bytes to read
* @throws IllegalArgumentException if the position, offset or length is negative
* @throws NullPointerException if the buffer is null
* @throws IndexOutOfBoundsException if the offset or length are invalid for the given buffer
*/
protected void validatePositionedReadArgs(long position, byte[] buffer, int offset, int length) {
Preconditions.checkNotNull(buffer, "Null destination buffer");
Preconditions.checkArgument(length >= 0, "Length is negative");
Preconditions.checkArgument(offset >= 0, "Offset is negative");

// TODO: S3A throws an EOFException here, S3FileIO does IllegalArgumentException
// TODO: https://github.com/awslabs/analytics-accelerator-s3/issues/84
Preconditions.checkArgument(position >= 0, "Position is negative");
Preconditions.checkPositionIndex(
length,
buffer.length - offset,
"Too many bytes for destination buffer "
+ ": request length="
+ length
+ ", with offset ="
+ offset
+ "; buffer capacity ="
+ (buffer.length - offset));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ void testSeekToVeryEnd() throws IOException {
}

@Test
void testSeekAfterEnd() throws IOException {
void testSeekAfterEnd() {
S3SeekableInputStream stream = getTestStream();
assertDoesNotThrow(() -> stream.seek(Long.MAX_VALUE));
assertDoesNotThrow(() -> stream.seek(TEST_DATA.length()));
Expand Down Expand Up @@ -247,13 +247,15 @@ void testReadWithBufferOutOfBounds() throws IOException {
try (S3SeekableInputStream stream = getTestStream()) {

// Read beyond EOF, expect all bytes to be read and pos to be EOF.
assertEquals(TEST_DATA.length(), stream.read(new byte[20], 0, TEST_DATA.length() + 20));
assertEquals(
TEST_DATA.length(),
stream.read(new byte[TEST_DATA.length() + 20], 0, TEST_DATA.length() + 20));
assertEquals(20, stream.getPos());

// Read beyond EOF after a seek, expect only num bytes read to be equal to that left in the
// stream, and pos to be EOF.
stream.seek(18);
assertEquals(2, stream.read(new byte[20], 0, TEST_DATA.length() + 20));
assertEquals(2, stream.read(new byte[TEST_DATA.length() + 20], 0, TEST_DATA.length() + 20));
assertEquals(20, stream.getPos());
}
}
Expand All @@ -268,7 +270,7 @@ void testReadTailWithInvalidArgument() throws IOException {
// 100K is bigger than test data size
assertThrows(IllegalArgumentException.class, () -> stream.readTail(new byte[103], 0, 100));
// Requesting more data than byte buffer size
assertThrows(IllegalArgumentException.class, () -> stream.readTail(new byte[10], 0, 100));
assertThrows(IndexOutOfBoundsException.class, () -> stream.readTail(new byte[10], 0, 100));
}
}

Expand Down Expand Up @@ -419,6 +421,26 @@ public void testSeekOnClosedStream() throws IOException {
assertThrows(IOException.class, () -> seekableInputStream.seek(0));
}

@Test
public void testZeroLengthRead() throws IOException {
S3SeekableInputStream seekableInputStream = getTestStream();
assertEquals(0, seekableInputStream.read(new byte[0], 0, 0));
assertEquals(0, seekableInputStream.readTail(new byte[0], 0, 0));
}

@Test
public void testInsufficientBuffer() throws IOException {
S3SeekableInputStream seekableInputStream = getTestStream();
SpotBugsLambdaWorkaround.assertReadResult(
IndexOutOfBoundsException.class, () -> seekableInputStream.read(new byte[8], 1, 8), -1);
SpotBugsLambdaWorkaround.assertReadResult(
IndexOutOfBoundsException.class, () -> seekableInputStream.read(new byte[0], 0, 8), -1);
SpotBugsLambdaWorkaround.assertReadResult(
IndexOutOfBoundsException.class, () -> seekableInputStream.readTail(new byte[0], 0, 8), -1);
SpotBugsLambdaWorkaround.assertReadResult(
IndexOutOfBoundsException.class, () -> seekableInputStream.readTail(new byte[0], 0, 8), -1);
}

private S3SeekableInputStream getTestStream() {
return new S3SeekableInputStream(TEST_URI, fakeLogicalIO, TestTelemetry.DEFAULT);
}
Expand All @@ -435,10 +457,10 @@ private S3SeekableInputStream getTestStreamWithContent(String content, S3URI s3U
PhysicalIOConfiguration.DEFAULT);

return new S3SeekableInputStream(
TEST_URI,
s3URI,
new ParquetLogicalIOImpl(
TEST_OBJECT,
new PhysicalIOImpl(TEST_OBJECT, metadataStore, blobStore, TestTelemetry.DEFAULT),
s3URI,
new PhysicalIOImpl(s3URI, metadataStore, blobStore, TestTelemetry.DEFAULT),
TestTelemetry.DEFAULT,
LogicalIOConfiguration.DEFAULT,
new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT)),
Expand Down

0 comments on commit 32946d0

Please sign in to comment.