Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,19 @@
*/
package org.apache.hadoop.hdfs;

import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
import org.apache.hadoop.io.erasurecode.ECChunk;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.Time;

import java.io.IOException;
Expand Down Expand Up @@ -120,6 +119,7 @@ void skip() {
protected final RawErasureDecoder decoder;
protected final DFSStripedInputStream dfsStripedInputStream;
private long readTo = -1;
private final int readDNMaxAttempts;

protected ECChunk[] decodeInputs;

Expand All @@ -138,6 +138,8 @@ void skip() {
this.corruptedBlocks = corruptedBlocks;
this.decoder = decoder;
this.dfsStripedInputStream = dfsStripedInputStream;
this.readDNMaxAttempts = dfsStripedInputStream.getDFSClient()
.getConf().getStripedReadDnMaxAttempts();

service = new ExecutorCompletionService<>(
dfsStripedInputStream.getStripedReadsThreadPool());
Expand Down Expand Up @@ -233,47 +235,60 @@ private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) {

private int readToBuffer(BlockReader blockReader,
DatanodeInfo currentNode, ByteBufferStrategy strategy,
ExtendedBlock currentBlock) throws IOException {
LocatedBlock currentBlock, int chunkIndex, long offsetInBlock)
throws IOException {
final int targetLength = strategy.getTargetLength();
int length = 0;
try {
while (length < targetLength) {
int ret = strategy.readFromBlock(blockReader);
if (ret < 0) {
throw new IOException("Unexpected EOS from the reader");
int curAttempts = 0;
while (true) {
int length = 0;
try {
while (length < targetLength) {
int ret = strategy.readFromBlock(blockReader);
if (ret < 0) {
throw new IOException("Unexpected EOS from the reader");
}
length += ret;
}
length += ret;
}
return length;
} catch (ChecksumException ce) {
DFSClient.LOG.warn("Found Checksum error for "
+ currentBlock + " from " + currentNode
+ " at " + ce.getPos());
//Clear buffer to make next decode success
strategy.getReadBuffer().clear();
// we want to remember which block replicas we have tried
corruptedBlocks.addCorruptedBlock(currentBlock, currentNode);
if (blockReader != null) {
blockReader.close();
}
throw ce;
} catch (IOException e) {
DFSClient.LOG.warn("Exception while reading from "
+ currentBlock + " of " + dfsStripedInputStream.getSrc() + " from "
+ currentNode, e);
//Clear buffer to make next decode success
strategy.getReadBuffer().clear();
if (blockReader != null) {
blockReader.close();
return length;
} catch (ChecksumException ce) {
DFSClient.LOG.warn("Found Checksum error for {} from {} at {}",
currentBlock, currentNode, ce.getPos());
//Clear buffer to make next decode success
strategy.getReadBuffer().clear();
// we want to remember which block replicas we have tried
corruptedBlocks.addCorruptedBlock(currentBlock.getBlock(), currentNode);
if (blockReader != null) {
blockReader.close();
}
throw ce;
} catch (IOException e) {
//Clear buffer to make next decode success
strategy.getReadBuffer().clear();
if (blockReader != null) {
blockReader.close();
}
if (curAttempts < readDNMaxAttempts) {
curAttempts++;
if (dfsStripedInputStream.createBlockReader(currentBlock,
offsetInBlock, targetBlocks,
readerInfos, chunkIndex, readTo)) {
blockReader = readerInfos[chunkIndex].reader;
DFSClient.LOG.warn("Reconnect to {} for block {}",
currentNode.getInfoAddr(), currentBlock.getBlock());
continue;
}
}
DFSClient.LOG.warn("Exception while reading from {} of {} from {}",
currentBlock, dfsStripedInputStream.getSrc(), currentNode, e);
throw e;
}
throw e;
}
}

private Callable<BlockReadStats> readCells(final BlockReader reader,
final DatanodeInfo datanode, final long currentReaderOffset,
final long targetReaderOffset, final ByteBufferStrategy[] strategies,
final ExtendedBlock currentBlock) {
final LocatedBlock currentBlock, final int chunkIndex) {
return () -> {
// reader can be null if getBlockReaderWithRetry failed or
// the reader hit exception before
Expand All @@ -290,8 +305,9 @@ private Callable<BlockReadStats> readCells(final BlockReader reader,

int ret = 0;
for (ByteBufferStrategy strategy : strategies) {
int bytesReead = readToBuffer(reader, datanode, strategy, currentBlock);
ret += bytesReead;
int bytesRead = readToBuffer(reader, datanode, strategy, currentBlock,
chunkIndex, alignedStripe.getOffsetInBlock() + ret);
ret += bytesRead;
}
return new BlockReadStats(ret, reader.isShortCircuit(),
reader.getNetworkDistance());
Expand Down Expand Up @@ -324,7 +340,7 @@ boolean readChunk(final LocatedBlock block, int chunkIndex)
readerInfos[chunkIndex].datanode,
readerInfos[chunkIndex].blockReaderOffset,
alignedStripe.getOffsetInBlock(), getReadStrategies(chunk),
block.getBlock());
block, chunkIndex);

Future<BlockReadStats> request = service.submit(readCallable);
futures.put(request, chunkIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,13 @@ interface StripedRead {
* span 6 DNs, so this default value accommodates 3 read streams
*/
int THREADPOOL_SIZE_DEFAULT = 18;
/**
* The number of times to reconnect to DN
* during the striped read process, the default is 1.
*/
String DATANODE_MAX_ATTEMPTS = PREFIX +
"datanode.max.attempts";
int DATANODE_MAX_ATTEMPTS_DEFAULT = 1;
}

/** dfs.http.client configuration properties */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public class DfsClientConf {
private final boolean deadNodeDetectionEnabled;
private final long leaseHardLimitPeriod;
private final boolean recoverLeaseOnCloseException;
private final int stripedReadDnMaxAttempts;

public DfsClientConf(Configuration conf) {
// The hdfsTimeout is currently the same as the ipc timeout
Expand Down Expand Up @@ -296,6 +297,13 @@ public DfsClientConf(Configuration conf) {
Preconditions.checkArgument(stripedReadThreadpoolSize > 0, "The value of " +
HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY +
" must be greater than 0.");
stripedReadDnMaxAttempts =
conf.getInt(
HdfsClientConfigKeys.StripedRead.DATANODE_MAX_ATTEMPTS,
HdfsClientConfigKeys.StripedRead.DATANODE_MAX_ATTEMPTS_DEFAULT);
Preconditions.checkArgument(stripedReadDnMaxAttempts > 0, "The value of " +
HdfsClientConfigKeys.StripedRead.DATANODE_MAX_ATTEMPTS +
" must be greater than 0.");
replicaAccessorBuilderClasses = loadReplicaAccessorBuilderClasses(conf);

leaseHardLimitPeriod =
Expand Down Expand Up @@ -703,6 +711,13 @@ public boolean isDeadNodeDetectionEnabled() {
return deadNodeDetectionEnabled;
}

/**
* @return the stripedReadDnMaxAttempts
*/
public int getStripedReadDnMaxAttempts() {
return stripedReadDnMaxAttempts;
}

/**
* @return the leaseHardLimitPeriod
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4388,4 +4388,11 @@ boolean isSlownode() {
public BlockPoolManager getBlockPoolManager() {
return blockPoolManager;
}

@VisibleForTesting
public void closeDataXceiverServer() {
if (xserver != null) {
xserver.closeAllPeers();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4625,6 +4625,15 @@
</description>
</property>

<property>
<name>dfs.client.read.striped.datanode.max.attempts</name>
<value>1</value>
<description>
The number of times to reconnect to DN during
the striped read process, the default is 1.
</description>
</property>

<property>
<name>dfs.client.replica.accessor.builder.classes</name>
<value></value>
Expand Down
Loading