3838import java .nio .ByteBuffer ;
3939import java .util .Arrays ;
4040import java .util .HashMap ;
41+ import java .util .HashSet ;
4142import java .util .Map ;
43+ import java .util .Set ;
4244import java .util .concurrent .Callable ;
4345import java .util .concurrent .CompletionService ;
4446import java .util .concurrent .ExecutorCompletionService ;
@@ -90,6 +92,13 @@ static class BlockReaderInfo {
9092 * using it for the next stripe.
9193 */
9294 boolean shouldSkip = false ;
95+ /**
96+ * We use this field to indicate whether we should retry the current reader before
97+ * we mark current reader skipped. possibly retry the same node so that transient errors don't
98+ * result in application level failures (e.g. Datanode could have closed the connection
99+ * because the client is idle for too long).
100+ */
101+ boolean retryCurrentReader = true ;
93102
94103 BlockReaderInfo (BlockReader reader , DatanodeInfo dn , long offset ) {
95104 this .reader = reader ;
@@ -104,6 +113,14 @@ void setOffset(long offset) {
104113 void skip () {
105114 this .shouldSkip = true ;
106115 }
116+
117+ public boolean isRetryCurrentReader () {
118+ return retryCurrentReader ;
119+ }
120+
121+ public void setRetryCurrentReader (boolean retryCurrentReader ) {
122+ this .retryCurrentReader = retryCurrentReader ;
123+ }
107124 }
108125
109126 private final Map <Future <BlockReadStats >, Integer > futures =
@@ -174,11 +191,26 @@ void updateState4SuccessRead(StripingChunkReadResult result) {
174191
175192 private void checkMissingBlocks () throws IOException {
176193 if (alignedStripe .missingChunksNum > parityBlkNum ) {
177- clearFutures ();
178- throw new IOException (alignedStripe .missingChunksNum
179- + " missing blocks, the stripe is: " + alignedStripe
180- + "; locatedBlocks is: " + dfsStripedInputStream .getLocatedBlocks ());
194+ if (countOfNullReaderInfos (readerInfos ) < parityBlkNum ) {
195+ clearFutures ();
196+ throw new IOException (alignedStripe .missingChunksNum
197+ + " missing blocks, the stripe is: " + alignedStripe
198+ + "; locatedBlocks is: " + dfsStripedInputStream .getLocatedBlocks ());
199+ }
200+ }
201+ }
202+
203+ private int countOfNullReaderInfos (BlockReaderInfo [] readerInfos ) {
204+ if (readerInfos == null ) {
205+ return 0 ;
181206 }
207+ int count = 0 ;
208+ for (int i = 0 ; i < readerInfos .length ; i ++) {
209+ if (readerInfos [i ] == null ) {
210+ count ++;
211+ }
212+ }
213+ return count ;
182214 }
183215
184216 /**
@@ -187,6 +219,23 @@ private void checkMissingBlocks() throws IOException {
187219 */
188220 private void readDataForDecoding () throws IOException {
189221 prepareDecodeInputs ();
222+
223+ if (alignedStripe .missingChunksNum > parityBlkNum ) {
224+ Set <Integer > recoveredIndexes = new HashSet <>();
225+ if (countOfNullReaderInfos (readerInfos ) >= parityBlkNum ) {
226+ for (int index = 0 ; index < dataBlkNum + parityBlkNum ; index ++) {
227+ if (readerInfos [index ] == null ) {
228+ alignedStripe .chunks [index ].state = StripingChunk .REQUESTED ;
229+ recoveredIndexes .add (index );
230+ }
231+ }
232+ }
233+
234+ for (int recoveredIndex : recoveredIndexes ) {
235+ alignedStripe .missingChunksNum --;
236+ }
237+ }
238+
190239 for (int i = 0 ; i < dataBlkNum ; i ++) {
191240 Preconditions .checkNotNull (alignedStripe .chunks [i ]);
192241 if (alignedStripe .chunks [i ].state == StripingChunk .REQUESTED ) {
@@ -332,7 +381,7 @@ boolean readChunk(final LocatedBlock block, int chunkIndex)
332381 }
333382
334383 /**
335- * read the whole stripe. do decoding if necessary
384+ * Read the whole stripe. do decoding if necessary.
336385 */
337386 void readStripe () throws IOException {
338387 try {
@@ -349,7 +398,7 @@ void readStripe() throws IOException {
349398 if (alignedStripe .missingChunksNum > 0 ) {
350399 checkMissingBlocks ();
351400 readDataForDecoding ();
352- // read parity chunks
401+ // Read parity chunks.
353402 readParityChunks (alignedStripe .missingChunksNum );
354403 }
355404 } catch (IOException e ) {
@@ -359,7 +408,7 @@ void readStripe() throws IOException {
359408 // TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks
360409
361410 // Input buffers for potential decode operation, which remains null until
362- // first read failure
411+ // first read failure.
363412 while (!futures .isEmpty ()) {
364413 try {
365414 long beginReadMS = Time .monotonicNow ();
@@ -384,8 +433,12 @@ void readStripe() throws IOException {
384433 }
385434 } else {
386435 returnedChunk .state = StripingChunk .MISSING ;
387- // close the corresponding reader
436+ // Close the corresponding reader.
388437 dfsStripedInputStream .closeReader (readerInfos [r .index ]);
438+ if (readerInfos [r .index ].isRetryCurrentReader ()) {
439+ readerInfos [r .index ].setRetryCurrentReader (false );
440+ readerInfos [r .index ] = null ;
441+ }
389442
390443 final int missing = alignedStripe .missingChunksNum ;
391444 alignedStripe .missingChunksNum ++;
@@ -399,7 +452,7 @@ void readStripe() throws IOException {
399452 DFSClient .LOG .error (err , ie );
400453 dfsStripedInputStream .close ();
401454 clearFutures ();
402- // Don't decode if read interrupted
455+ // Don't decode if read interrupted.
403456 throw new InterruptedIOException (err );
404457 }
405458 }
0 commit comments