Skip to content

Commit 065c271

Browse files
fuatbasikahmarsuhail
authored andcommitted
Add support to seek beyond end of stream (#192)
## Description of change This pull request adds support to seek beyond stream end. Since seek is done lazily this is the correct behaviour. It also adds closed checks to read operations. With this PR, we are reconciling S3SeekableStream behaviour with its' documentation. #### Relevant issues [Issue#83 ](#83) #### Does this contribution introduce any breaking changes to the existing APIs or behaviors? Yes. Seek will no longer Throw EOF exception when position to seek is > contentLength. #### Does this contribution introduce any new public APIs or behaviors? Yes, Seek to beyond stream end will no longer throw. Read/Seek operations on closed stream will throw. #### How was the contribution tested? Added new unit tests and property based tests. Removed %size from `seekChangesPosition` property tests to ensure validSizes can go beyond stream size. #### Does this contribution need a changelog entry? - [X] I have updated the CHANGELOG or README if appropriate --- By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the [Developer Certificate of Origin (DCO)](https://developercertificate.org/).
1 parent 1db412f commit 065c271

File tree

5 files changed

+80
-23
lines changed

5 files changed

+80
-23
lines changed

input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStream.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package software.amazon.s3.analyticsaccelerator;
1717

18-
import java.io.EOFException;
1918
import java.io.IOException;
2019
import lombok.NonNull;
2120
import software.amazon.s3.analyticsaccelerator.common.Preconditions;
@@ -38,6 +37,7 @@ public class S3SeekableInputStream extends SeekableInputStream {
3837
private final Telemetry telemetry;
3938
private final S3URI s3URI;
4039
private long position;
40+
private boolean closed;
4141
private static final int EOF = -1;
4242

4343
private static final String OPERATION_READ = "stream.read";
@@ -60,6 +60,7 @@ public class S3SeekableInputStream extends SeekableInputStream {
6060
this.logicalIO = logicalIO;
6161
this.telemetry = telemetry;
6262
this.position = 0;
63+
this.closed = false;
6364
}
6465

6566
/**
@@ -75,6 +76,8 @@ public class S3SeekableInputStream extends SeekableInputStream {
7576
*/
7677
@Override
7778
public int read() throws IOException {
79+
throwIfClosed("cannot read from closed stream");
80+
7881
// -1 if we are past the end of the stream
7982
if (this.position >= getContentLength()) {
8083
return EOF;
@@ -130,6 +133,8 @@ public int read() throws IOException {
130133
*/
131134
@Override
132135
public int read(byte @NonNull [] buffer, int offset, int length) throws IOException {
136+
throwIfClosed("cannot read from closed stream");
137+
133138
if (this.position >= getContentLength()) {
134139
return EOF;
135140
}
@@ -163,13 +168,9 @@ public void seek(long pos) throws IOException {
163168
// TODO: S3A throws an EOFException here, S3FileIO does IllegalArgumentException
164169
// TODO: https://github.com/awslabs/analytics-accelerator-s3/issues/84
165170
Preconditions.checkArgument(pos >= 0, "position must be non-negative");
171+
throwIfClosed("cannot seek on closed stream");
166172

167-
// TODO: seeking past the end of the stream should be allowed.
168-
// TODO: https://github.com/awslabs/analytics-accelerator-s3/issues/83
169-
if (pos >= getContentLength()) {
170-
throw new EOFException("zero-indexed seek position must be less than the object size");
171-
}
172-
173+
// As we are seeking lazily, we support seek beyond the stream size .
173174
this.position = pos;
174175
}
175176

@@ -194,6 +195,7 @@ public long getPos() {
194195
*/
195196
@Override
196197
public int readTail(byte[] buf, int off, int n) throws IOException {
198+
throwIfClosed("cannot read from closed stream");
197199
return this.telemetry.measureVerbose(
198200
() ->
199201
Operation.builder()
@@ -223,6 +225,7 @@ public void close() throws IOException {
223225

224226
// Flush telemetry after a stream closes to have full coverage of all operations of this stream
225227
this.telemetry.flush();
228+
this.closed = true;
226229
}
227230

228231
/**
@@ -246,4 +249,10 @@ private int advancePosition(int bytesRead) {
246249
}
247250
return bytesRead;
248251
}
252+
253+
private void throwIfClosed(String msg) throws IOException {
254+
if (closed) {
255+
throw new IOException(msg);
256+
}
257+
}
249258
}

input-stream/src/referenceTest/java/software/amazon/s3/analyticsaccelerator/property/InMemoryS3SeekableInputStream.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,9 @@ public int readTail(byte[] buf, int off, int n) throws IOException {
111111
public int read() throws IOException {
112112
return this.delegate.read();
113113
}
114+
115+
@Override
116+
public void close() throws IOException {
117+
this.delegate.close();
118+
}
114119
}

input-stream/src/referenceTest/java/software/amazon/s3/analyticsaccelerator/property/SeekableStreamPropertiesTest.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,8 @@ boolean seekChangesPosition(
4343
throws IOException {
4444
try (InMemoryS3SeekableInputStream s =
4545
new InMemoryS3SeekableInputStream("test-bucket", "test-key", size)) {
46-
47-
int jumpInSideObject = pos % size;
48-
s.seek(jumpInSideObject);
49-
return s.getPos() == jumpInSideObject;
46+
s.seek(pos);
47+
return s.getPos() == pos;
5048
}
5149
}
5250

@@ -84,4 +82,13 @@ void canCloseStreamMultipleTimes(@ForAll("streamSizes") int size) throws IOExcep
8482
s.close();
8583
s.close();
8684
}
85+
86+
@Property
87+
void accessToClosedStreamThrows(@ForAll("streamSizes") int size) throws IOException {
88+
InMemoryS3SeekableInputStream s =
89+
new InMemoryS3SeekableInputStream("test-bucket", "test-key", size);
90+
s.close();
91+
assertThrows(IOException.class, () -> s.seek(123));
92+
assertThrows(IOException.class, () -> s.read());
93+
}
8794
}

input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamTest.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import static software.amazon.s3.analyticsaccelerator.util.Constants.ONE_MB;
2121

2222
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
23-
import java.io.EOFException;
2423
import java.io.IOException;
2524
import java.nio.ByteBuffer;
2625
import java.nio.charset.StandardCharsets;
@@ -142,13 +141,12 @@ void testSeekToVeryEnd() throws IOException {
142141

143142
@Test
144143
void testSeekAfterEnd() throws IOException {
145-
// Given
146-
try (S3SeekableInputStream stream =
147-
new S3SeekableInputStream(TEST_URI, fakeLogicalIO, TestTelemetry.DEFAULT)) {
148-
149-
// When: we seek past EOF we get EOFException
150-
assertThrows(EOFException.class, () -> stream.seek(TEST_DATA.length() + 1));
151-
}
144+
S3SeekableInputStream stream = getTestStream();
145+
assertDoesNotThrow(() -> stream.seek(Long.MAX_VALUE));
146+
assertDoesNotThrow(() -> stream.seek(TEST_DATA.length()));
147+
assertEquals(TEST_DATA.length(), stream.getPos());
148+
assertDoesNotThrow(() -> stream.seek(TEST_DATA.length() + 10));
149+
assertEquals(TEST_DATA.length() + 10, stream.getPos());
152150
}
153151

154152
@Test
@@ -168,11 +166,7 @@ void testReadOnEmptyObject() throws IOException {
168166
void testInvalidSeek() throws IOException {
169167
// Given
170168
try (S3SeekableInputStream stream = getTestStream()) {
171-
172169
// When: seek is to an invalid position then exception is thrown
173-
assertThrows(Exception.class, () -> stream.seek(TEST_DATA.length()));
174-
assertThrows(Exception.class, () -> stream.seek(TEST_DATA.length() + 10));
175-
assertThrows(Exception.class, () -> stream.seek(Long.MAX_VALUE));
176170
assertThrows(Exception.class, () -> stream.seek(-1));
177171
assertThrows(Exception.class, () -> stream.seek(Long.MIN_VALUE));
178172
}
@@ -405,6 +399,26 @@ void testMultiThreadUsage() throws IOException, InterruptedException {
405399
}
406400
}
407401

402+
@Test
403+
public void testReadOnClosedStream() throws IOException {
404+
S3SeekableInputStream seekableInputStream = getTestStream();
405+
seekableInputStream.close();
406+
SpotBugsLambdaWorkaround.assertReadResult(IOException.class, seekableInputStream::read, -1);
407+
SpotBugsLambdaWorkaround.assertReadResult(
408+
IOException.class, () -> seekableInputStream.read(new byte[8]), -1);
409+
SpotBugsLambdaWorkaround.assertReadResult(
410+
IOException.class, () -> seekableInputStream.read(new byte[8], 0, 8), -1);
411+
SpotBugsLambdaWorkaround.assertReadResult(
412+
IOException.class, () -> seekableInputStream.readTail(new byte[8], 0, 8), -1);
413+
}
414+
415+
@Test
416+
public void testSeekOnClosedStream() throws IOException {
417+
S3SeekableInputStream seekableInputStream = getTestStream();
418+
seekableInputStream.close();
419+
assertThrows(IOException.class, () -> seekableInputStream.seek(0));
420+
}
421+
408422
private S3SeekableInputStream getTestStream() {
409423
return new S3SeekableInputStream(TEST_URI, fakeLogicalIO, TestTelemetry.DEFAULT);
410424
}

input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/SpotBugsLambdaWorkaround.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,26 @@ public static <T extends Throwable, C extends Closeable> void assertThrowsClosab
4848
}
4949
fail(String.format("Exception of type '%s' was expected. Nothing was thrown", expectedType));
5050
}
51+
52+
/**
53+
* In situations where a read method of a class extending InputStream, spotbugs wants to assert
54+
* return value is checked (@link https://spotbugs.readthedocs.io/en/stable/bugDescriptions.html).
55+
*
56+
* @param expectedType exception type expected
57+
* @param executable code that returns something, and expected to throw
58+
* @param expected expected return value, though will never be executed as executable is throwing
59+
* @param <T> exception type
60+
* @param <R> return type
61+
*/
62+
public static <T extends Throwable, R> void assertReadResult(
63+
Class<T> expectedType, ThrowingSupplier<R> executable, R expected) {
64+
try {
65+
R result = executable.get();
66+
assertEquals(expected, result);
67+
} catch (Throwable throwable) {
68+
assertInstanceOf(expectedType, throwable);
69+
return;
70+
}
71+
fail(String.format("Exception of type '%s' was expected. Nothing was thrown", expectedType));
72+
}
5173
}

0 commit comments

Comments
 (0)