Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Len = 0 and Insufficient Buffer behaviours for positioned reads #203

Merged
merged 2 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,11 @@ public int read() throws IOException {
@Override
public int read(byte @NonNull [] buffer, int offset, int length) throws IOException {
throwIfClosed("cannot read from closed stream");
validatePositionedReadArgs(position, buffer, offset, length);

if (this.position >= getContentLength()) {
if (length == 0) {
return 0;
} else if (this.position >= getContentLength()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we move this validation as well to the method where we did the previous validations, so that all the validations are in same place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good recommendation. I did not move this validation because full content length might not be available to other streams those will implement this class. In S3 case we know the size thanks to object metadata. Let me know what you think.

return EOF;
}

Expand Down Expand Up @@ -188,23 +191,30 @@ public long getPos() {
* Reads the last n bytes from the stream into a byte buffer. Blocks until end of stream is
* reached. Leaves the position of the stream unaltered.
*
* @param buf buffer to read data into
* @param off start position in buffer at which data is written
* @param n the number of bytes to read; the n-th byte should be the last byte of the stream.
* @param buffer buffer to read data into
* @param offset start position in buffer at which data is written
* @param length the number of bytes to read; the n-th byte should be the last byte of the stream.
* @return the total number of bytes read into the buffer
*/
@Override
public int readTail(byte[] buf, int off, int n) throws IOException {
public int readTail(byte[] buffer, int offset, int length) throws IOException {
throwIfClosed("cannot read from closed stream");
validatePositionedReadArgs(position, buffer, offset, length);

if (length == 0) {
return 0;
}

return this.telemetry.measureVerbose(
() ->
Operation.builder()
.name(OPERATION_READ)
.attribute(StreamAttributes.variant(FLAVOR_TAIL))
.attribute(StreamAttributes.uri(this.s3URI))
.attribute(StreamAttributes.range(getContentLength() - n, getContentLength() - 1))
.attribute(
StreamAttributes.range(getContentLength() - length, getContentLength() - 1))
.build(),
() -> logicalIO.readTail(buf, off, n));
() -> logicalIO.readTail(buffer, offset, length));
}

/**
Expand All @@ -221,7 +231,7 @@ public void close() throws IOException {
.attribute(
StreamAttributes.streamRelativeTimestamp(System.nanoTime() - streamBirth))
.build(),
() -> this.logicalIO.close());
this.logicalIO::close);

// Flush telemetry after a stream closes to have full coverage of all operations of this stream
this.telemetry.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.IOException;
import java.io.InputStream;
import software.amazon.s3.analyticsaccelerator.common.Preconditions;

/**
* A SeekableInputStream is like a conventional InputStream but equipped with two additional
Expand Down Expand Up @@ -57,4 +58,38 @@ public abstract class SeekableInputStream extends InputStream {
* @throws IOException if an error occurs while reading the file
*/
public abstract int readTail(byte[] buf, int off, int n) throws IOException;

/**
* Validates the arguments for a read operation. This method is available to use in all subclasses
* to ensure consistency.
*
* @param position the position to read from
* @param buffer the buffer to read into
* @param offset the offset in the buffer to start writing at
* @param length the number of bytes to read
* @throws IllegalArgumentException if the position, offset or length is negative
* @throws NullPointerException if the buffer is null
* @throws IndexOutOfBoundsException if the offset or length are invalid for the given buffer
*/
protected void validatePositionedReadArgs(long position, byte[] buffer, int offset, int length) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a unit test for this method? should be quick

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well i didn't because it is a protected method and not sure if we want to expose it. Instead i added tests to S3SeekableInputStream class which is publicly exposed. Let me know if you think this is a good approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy with this approach!
Just fyi I think there is a visibleForTesting Annotation that helps with this

Preconditions.checkArgument(offset >= 0, "Offset is negative");
Preconditions.checkArgument(length >= 0, "Length is negative");

// TODO: S3A throws an EOFException here, S3FileIO does IllegalArgumentException
// TODO: https://github.com/awslabs/analytics-accelerator-s3/issues/84
Preconditions.checkArgument(position >= 0, "Position is negative");

if (buffer == null) {
throw new NullPointerException("Null destination buffer");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just have both of these in precondition checks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes perfect sense. Doing it now.

} else if (length > buffer.length - offset) {
throw new IndexOutOfBoundsException(
"Too many bytes for destination buffer "
+ ": request length="
+ length
+ ", with offset ="
+ offset
+ "; buffer capacity ="
+ (buffer.length - offset));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ void testSeekToVeryEnd() throws IOException {
}

@Test
void testSeekAfterEnd() throws IOException {
void testSeekAfterEnd() {
S3SeekableInputStream stream = getTestStream();
assertDoesNotThrow(() -> stream.seek(Long.MAX_VALUE));
assertDoesNotThrow(() -> stream.seek(TEST_DATA.length()));
Expand Down Expand Up @@ -247,13 +247,15 @@ void testReadWithBufferOutOfBounds() throws IOException {
try (S3SeekableInputStream stream = getTestStream()) {

// Read beyond EOF, expect all bytes to be read and pos to be EOF.
assertEquals(TEST_DATA.length(), stream.read(new byte[20], 0, TEST_DATA.length() + 20));
assertEquals(
TEST_DATA.length(),
stream.read(new byte[TEST_DATA.length() + 20], 0, TEST_DATA.length() + 20));
assertEquals(20, stream.getPos());

// Read beyond EOF after a seek, expect only num bytes read to be equal to that left in the
// stream, and pos to be EOF.
stream.seek(18);
assertEquals(2, stream.read(new byte[20], 0, TEST_DATA.length() + 20));
assertEquals(2, stream.read(new byte[TEST_DATA.length() + 20], 0, TEST_DATA.length() + 20));
assertEquals(20, stream.getPos());
}
}
Expand All @@ -268,7 +270,7 @@ void testReadTailWithInvalidArgument() throws IOException {
// 100K is bigger than test data size
assertThrows(IllegalArgumentException.class, () -> stream.readTail(new byte[103], 0, 100));
// Requesting more data than byte buffer size
assertThrows(IllegalArgumentException.class, () -> stream.readTail(new byte[10], 0, 100));
assertThrows(IndexOutOfBoundsException.class, () -> stream.readTail(new byte[10], 0, 100));
}
}

Expand Down Expand Up @@ -419,6 +421,26 @@ public void testSeekOnClosedStream() throws IOException {
assertThrows(IOException.class, () -> seekableInputStream.seek(0));
}

@Test
public void testZeroLengthRead() throws IOException {
S3SeekableInputStream seekableInputStream = getTestStream();
assertEquals(0, seekableInputStream.read(new byte[0], 0, 0));
assertEquals(0, seekableInputStream.readTail(new byte[0], 0, 0));
}

@Test
public void testInsufficientBuffer() throws IOException {
S3SeekableInputStream seekableInputStream = getTestStream();
SpotBugsLambdaWorkaround.assertReadResult(
IndexOutOfBoundsException.class, () -> seekableInputStream.read(new byte[8], 1, 8), -1);
SpotBugsLambdaWorkaround.assertReadResult(
IndexOutOfBoundsException.class, () -> seekableInputStream.read(new byte[0], 0, 8), -1);
SpotBugsLambdaWorkaround.assertReadResult(
IndexOutOfBoundsException.class, () -> seekableInputStream.readTail(new byte[0], 0, 8), -1);
SpotBugsLambdaWorkaround.assertReadResult(
IndexOutOfBoundsException.class, () -> seekableInputStream.readTail(new byte[0], 0, 8), -1);
}

private S3SeekableInputStream getTestStream() {
return new S3SeekableInputStream(TEST_URI, fakeLogicalIO, TestTelemetry.DEFAULT);
}
Expand All @@ -435,10 +457,10 @@ private S3SeekableInputStream getTestStreamWithContent(String content, S3URI s3U
PhysicalIOConfiguration.DEFAULT);

return new S3SeekableInputStream(
TEST_URI,
s3URI,
new ParquetLogicalIOImpl(
TEST_OBJECT,
new PhysicalIOImpl(TEST_OBJECT, metadataStore, blobStore, TestTelemetry.DEFAULT),
s3URI,
new PhysicalIOImpl(s3URI, metadataStore, blobStore, TestTelemetry.DEFAULT),
TestTelemetry.DEFAULT,
LogicalIOConfiguration.DEFAULT,
new ParquetColumnPrefetchStore(LogicalIOConfiguration.DEFAULT)),
Expand Down
Loading