Skip to content

Commit

Permalink
Add support to seek beyond end of stream (#192)
Browse files Browse the repository at this point in the history
## Description of change
This pull request adds support to seek beyond stream end. 
Since seek is done lazily this is the correct behaviour. It also adds
closed checks to read operations.

With this PR, we are reconciling S3SeekableStream behaviour with its'
documentation.

#### Relevant issues
[Issue#83
](#83)

#### Does this contribution introduce any breaking changes to the
existing APIs or behaviors?
Yes. Seek will no longer Throw EOF exception when position to seek is >
contentLength.

#### Does this contribution introduce any new public APIs or behaviors?
Yes, Seek to beyond stream end will no longer throw. Read/Seek
operations on closed stream will throw.


#### How was the contribution tested?
Added new unit tests and property based tests. Removed %size from
`seekChangesPosition` property tests to ensure validSizes can go beyond
stream size.

#### Does this contribution need a changelog entry?
- [X] I have updated the CHANGELOG or README if appropriate

---

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license and I agree to the terms of
the [Developer Certificate of Origin
(DCO)](https://developercertificate.org/).
  • Loading branch information
fuatbasik authored Dec 12, 2024
1 parent b60e2c1 commit 9225c5c
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package software.amazon.s3.analyticsaccelerator;

import java.io.EOFException;
import java.io.IOException;
import lombok.NonNull;
import software.amazon.s3.analyticsaccelerator.common.Preconditions;
Expand All @@ -38,6 +37,7 @@ public class S3SeekableInputStream extends SeekableInputStream {
private final Telemetry telemetry;
private final S3URI s3URI;
private long position;
private boolean closed;
private static final int EOF = -1;

private static final String OPERATION_READ = "stream.read";
Expand All @@ -60,6 +60,7 @@ public class S3SeekableInputStream extends SeekableInputStream {
this.logicalIO = logicalIO;
this.telemetry = telemetry;
this.position = 0;
this.closed = false;
}

/**
Expand All @@ -75,6 +76,8 @@ public class S3SeekableInputStream extends SeekableInputStream {
*/
@Override
public int read() throws IOException {
throwIfClosed("cannot read from closed stream");

// -1 if we are past the end of the stream
if (this.position >= getContentLength()) {
return EOF;
Expand Down Expand Up @@ -130,6 +133,8 @@ public int read() throws IOException {
*/
@Override
public int read(byte @NonNull [] buffer, int offset, int length) throws IOException {
throwIfClosed("cannot read from closed stream");

if (this.position >= getContentLength()) {
return EOF;
}
Expand Down Expand Up @@ -163,13 +168,9 @@ public void seek(long pos) throws IOException {
// TODO: S3A throws an EOFException here, S3FileIO does IllegalArgumentException
// TODO: https://github.com/awslabs/analytics-accelerator-s3/issues/84
Preconditions.checkArgument(pos >= 0, "position must be non-negative");
throwIfClosed("cannot seek on closed stream");

// TODO: seeking past the end of the stream should be allowed.
// TODO: https://github.com/awslabs/analytics-accelerator-s3/issues/83
if (pos >= getContentLength()) {
throw new EOFException("zero-indexed seek position must be less than the object size");
}

// As we are seeking lazily, we support seek beyond the stream size .
this.position = pos;
}

Expand All @@ -194,6 +195,7 @@ public long getPos() {
*/
@Override
public int readTail(byte[] buf, int off, int n) throws IOException {
throwIfClosed("cannot read from closed stream");
return this.telemetry.measureVerbose(
() ->
Operation.builder()
Expand Down Expand Up @@ -223,6 +225,7 @@ public void close() throws IOException {

// Flush telemetry after a stream closes to have full coverage of all operations of this stream
this.telemetry.flush();
this.closed = true;
}

/**
Expand All @@ -246,4 +249,10 @@ private int advancePosition(int bytesRead) {
}
return bytesRead;
}

private void throwIfClosed(String msg) throws IOException {
if (closed) {
throw new IOException(msg);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,9 @@ public int readTail(byte[] buf, int off, int n) throws IOException {
public int read() throws IOException {
return this.delegate.read();
}

@Override
public void close() throws IOException {
this.delegate.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ boolean seekChangesPosition(
throws IOException {
try (InMemoryS3SeekableInputStream s =
new InMemoryS3SeekableInputStream("test-bucket", "test-key", size)) {

int jumpInSideObject = pos % size;
s.seek(jumpInSideObject);
return s.getPos() == jumpInSideObject;
s.seek(pos);
return s.getPos() == pos;
}
}

Expand Down Expand Up @@ -84,4 +82,13 @@ void canCloseStreamMultipleTimes(@ForAll("streamSizes") int size) throws IOExcep
s.close();
s.close();
}

@Property
void accessToClosedStreamThrows(@ForAll("streamSizes") int size) throws IOException {
InMemoryS3SeekableInputStream s =
new InMemoryS3SeekableInputStream("test-bucket", "test-key", size);
s.close();
assertThrows(IOException.class, () -> s.seek(123));
assertThrows(IOException.class, () -> s.read());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static software.amazon.s3.analyticsaccelerator.util.Constants.ONE_MB;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -142,13 +141,12 @@ void testSeekToVeryEnd() throws IOException {

@Test
void testSeekAfterEnd() throws IOException {
// Given
try (S3SeekableInputStream stream =
new S3SeekableInputStream(TEST_URI, fakeLogicalIO, TestTelemetry.DEFAULT)) {

// When: we seek past EOF we get EOFException
assertThrows(EOFException.class, () -> stream.seek(TEST_DATA.length() + 1));
}
S3SeekableInputStream stream = getTestStream();
assertDoesNotThrow(() -> stream.seek(Long.MAX_VALUE));
assertDoesNotThrow(() -> stream.seek(TEST_DATA.length()));
assertEquals(TEST_DATA.length(), stream.getPos());
assertDoesNotThrow(() -> stream.seek(TEST_DATA.length() + 10));
assertEquals(TEST_DATA.length() + 10, stream.getPos());
}

@Test
Expand All @@ -168,11 +166,7 @@ void testReadOnEmptyObject() throws IOException {
void testInvalidSeek() throws IOException {
// Given
try (S3SeekableInputStream stream = getTestStream()) {

// When: seek is to an invalid position then exception is thrown
assertThrows(Exception.class, () -> stream.seek(TEST_DATA.length()));
assertThrows(Exception.class, () -> stream.seek(TEST_DATA.length() + 10));
assertThrows(Exception.class, () -> stream.seek(Long.MAX_VALUE));
assertThrows(Exception.class, () -> stream.seek(-1));
assertThrows(Exception.class, () -> stream.seek(Long.MIN_VALUE));
}
Expand Down Expand Up @@ -405,6 +399,26 @@ void testMultiThreadUsage() throws IOException, InterruptedException {
}
}

@Test
public void testReadOnClosedStream() throws IOException {
S3SeekableInputStream seekableInputStream = getTestStream();
seekableInputStream.close();
SpotBugsLambdaWorkaround.assertReadResult(IOException.class, seekableInputStream::read, -1);
SpotBugsLambdaWorkaround.assertReadResult(
IOException.class, () -> seekableInputStream.read(new byte[8]), -1);
SpotBugsLambdaWorkaround.assertReadResult(
IOException.class, () -> seekableInputStream.read(new byte[8], 0, 8), -1);
SpotBugsLambdaWorkaround.assertReadResult(
IOException.class, () -> seekableInputStream.readTail(new byte[8], 0, 8), -1);
}

@Test
public void testSeekOnClosedStream() throws IOException {
S3SeekableInputStream seekableInputStream = getTestStream();
seekableInputStream.close();
assertThrows(IOException.class, () -> seekableInputStream.seek(0));
}

private S3SeekableInputStream getTestStream() {
return new S3SeekableInputStream(TEST_URI, fakeLogicalIO, TestTelemetry.DEFAULT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,26 @@ public static <T extends Throwable, C extends Closeable> void assertThrowsClosab
}
fail(String.format("Exception of type '%s' was expected. Nothing was thrown", expectedType));
}

/**
* In situations where a read method of a class extending InputStream, spotbugs wants to assert
* return value is checked (@link https://spotbugs.readthedocs.io/en/stable/bugDescriptions.html).
*
* @param expectedType exception type expected
* @param executable code that returns something, and expected to throw
* @param expected expected return value, though will never be executed as executable is throwing
* @param <T> exception type
* @param <R> return type
*/
public static <T extends Throwable, R> void assertReadResult(
Class<T> expectedType, ThrowingSupplier<R> executable, R expected) {
try {
R result = executable.get();
assertEquals(expected, result);
} catch (Throwable throwable) {
assertInstanceOf(expectedType, throwable);
return;
}
fail(String.format("Exception of type '%s' was expected. Nothing was thrown", expectedType));
}
}

0 comments on commit 9225c5c

Please sign in to comment.