Skip to content

Commit 628f5a6

Browse files
committed
Replacing SeekableByteChannelChunkedNioStream
1 parent 26147aa commit 628f5a6

File tree

3 files changed

+87
-129
lines changed

3 files changed

+87
-129
lines changed

src/main/java/com/emc/mongoose/storage/driver/coop/netty/NettyStorageDriverBase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.emc.mongoose.base.logging.Loggers;
2222
import com.emc.mongoose.storage.driver.coop.CoopStorageDriverBase;
2323
import com.emc.mongoose.storage.driver.coop.netty.data.DataItemFileRegion;
24-
import com.emc.mongoose.storage.driver.coop.netty.data.PartialChunkedNioStream;
2524
import com.emc.mongoose.storage.driver.coop.netty.data.SeekableByteChannelChunkedNioStream;
2625
import com.github.akurilov.commons.collection.Range;
2726
import com.github.akurilov.commons.concurrent.ThreadUtil;
@@ -461,7 +460,7 @@ protected final void sendRequestData(final Channel channel, final O op) throws I
461460
final var srcPath = dataOp.srcPath();
462461
if (0 < dataItem.size() && (null == srcPath || srcPath.isEmpty())) {
463462
if (sslFlag) {
464-
channel.write(new PartialChunkedNioStream(dataItem));
463+
channel.write(new SeekableByteChannelChunkedNioStream(dataItem));
465464
} else {
466465
channel.write(new DataItemFileRegion(dataItem));
467466
}

src/main/java/com/emc/mongoose/storage/driver/coop/netty/data/PartialChunkedNioStream.java

Lines changed: 0 additions & 114 deletions
This file was deleted.

src/main/java/com/emc/mongoose/storage/driver/coop/netty/data/SeekableByteChannelChunkedNioStream.java

Lines changed: 86 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,36 +2,109 @@
22

33
import static com.emc.mongoose.base.storage.driver.StorageDriver.BUFF_SIZE_MAX;
44

5-
import io.netty.handler.stream.ChunkedNioStream;
6-
75
import java.io.IOException;
6+
import java.nio.ByteBuffer;
7+
import java.nio.channels.ReadableByteChannel;
88
import java.nio.channels.SeekableByteChannel;
99

10-
/**
11-
Created by andrey on 24.04.17.
12-
*/
13-
public final class SeekableByteChannelChunkedNioStream
14-
extends ChunkedNioStream {
10+
import io.netty.buffer.ByteBuf;
11+
import io.netty.buffer.ByteBufAllocator;
12+
import io.netty.channel.ChannelHandlerContext;
13+
import io.netty.handler.stream.ChunkedInput;
1514

15+
/**
16+
* Modified version of io.netty.handler.stream.ChunkedNioStream.
17+
* Ensures that the final call to readChunk() writes only a partial chunk to the
18+
* given byte channel and isEndOfInput() causes the stream to end.
19+
*/
20+
public final class SeekableByteChannelChunkedNioStream implements ChunkedInput<ByteBuf> {
21+
private final ReadableByteChannel in;
22+
private final int chunkSize;
23+
private final ByteBuffer byteBuffer;
1624
private final long sizeToTransfer;
1725

18-
public SeekableByteChannelChunkedNioStream(final SeekableByteChannel sbc)
19-
throws IOException {
26+
private long bytesTransferred = 0;
27+
28+
public SeekableByteChannelChunkedNioStream(final SeekableByteChannel sbc) throws IOException {
2029
this(sbc, sbc.size());
2130
}
2231

2332
private SeekableByteChannelChunkedNioStream(final SeekableByteChannel sbc, final long sizeToTransfer) {
24-
super(sbc, sizeToTransfer > BUFF_SIZE_MAX ? BUFF_SIZE_MAX : (int) sizeToTransfer);
33+
this.in = sbc;
34+
this.chunkSize = (int) (sizeToTransfer > BUFF_SIZE_MAX ? BUFF_SIZE_MAX : sizeToTransfer);
35+
this.byteBuffer = ByteBuffer.allocate(chunkSize);
2536
this.sizeToTransfer = sizeToTransfer;
2637
}
2738

39+
@Override
40+
public final boolean isEndOfInput() {
41+
return bytesTransferred == sizeToTransfer;
42+
}
43+
44+
@Override
45+
public void close() throws Exception {
46+
in.close();
47+
}
48+
49+
@Deprecated
50+
@Override
51+
public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
52+
return readChunk(ctx.alloc());
53+
}
54+
55+
@Override
56+
public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
57+
if (isEndOfInput()) {
58+
return null;
59+
}
60+
61+
int nextChunkSize = chunkSize;
62+
long bytesRemaining = length() - progress();
63+
64+
// Is there less than a chunk size of data remaining?
65+
if (bytesRemaining < chunkSize) {
66+
// Limit the byte buffer and next chunk size
67+
byteBuffer.limit((int) bytesRemaining);
68+
nextChunkSize = (int) bytesRemaining;
69+
}
70+
71+
// Read into the byte buffer
72+
int readBytes = byteBuffer.position();
73+
while (true) {
74+
int localReadBytes = in.read(byteBuffer);
75+
if (localReadBytes < 0) {
76+
break;
77+
}
78+
readBytes += localReadBytes;
79+
bytesTransferred += localReadBytes;
80+
if (readBytes == nextChunkSize) {
81+
break;
82+
}
83+
}
84+
85+
// Write from the byte buffer
86+
byteBuffer.flip();
87+
boolean release = true;
88+
ByteBuf buffer = allocator.buffer(byteBuffer.remaining());
89+
try {
90+
buffer.writeBytes(byteBuffer);
91+
byteBuffer.clear();
92+
release = false;
93+
return buffer;
94+
} finally {
95+
if (release) {
96+
buffer.release();
97+
}
98+
}
99+
}
100+
28101
@Override
29102
public long length() {
30103
return sizeToTransfer;
31104
}
32105

33106
@Override
34-
public final boolean isEndOfInput() {
35-
return sizeToTransfer == transferredBytes();
107+
public long progress() {
108+
return bytesTransferred;
36109
}
37-
}
110+
}

0 commit comments

Comments
 (0)