From 6e194caa89697e957bfcf916341f26a74f60de31 Mon Sep 17 00:00:00 2001 From: xiezhineng Date: Tue, 11 Jul 2023 20:34:35 +0800 Subject: [PATCH 01/14] rebase trunk and move curAttempts++ --- .../org/apache/hadoop/hdfs/StripeReader.java | 94 +++++++++++-------- .../hdfs/client/HdfsClientConfigKeys.java | 7 ++ .../hdfs/client/impl/DfsClientConf.java | 15 +++ .../src/main/resources/hdfs-default.xml | 9 ++ 4 files changed, 87 insertions(+), 38 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java index bc39bace79588..d128d0f8b690e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java @@ -17,20 +17,19 @@ */ package org.apache.hadoop.hdfs; -import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe; import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats; import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk; -import org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe; import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; import org.apache.hadoop.io.erasurecode.ECChunk; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; -import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks; +import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.Time; import java.io.IOException; @@ -120,6 +119,7 @@ void skip() { protected final RawErasureDecoder decoder; protected final DFSStripedInputStream dfsStripedInputStream; private long readTo = -1; + protected final int readDNMaxAttempts; protected ECChunk[] decodeInputs; @@ -138,6 +138,8 @@ void skip() { this.corruptedBlocks = corruptedBlocks; this.decoder = decoder; this.dfsStripedInputStream = dfsStripedInputStream; + this.readDNMaxAttempts = dfsStripedInputStream.getDFSClient() + .getConf().getStripedReadDnMaxAttempts(); service = new ExecutorCompletionService<>( dfsStripedInputStream.getStripedReadsThreadPool()); @@ -233,47 +235,62 @@ private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) { private int readToBuffer(BlockReader blockReader, DatanodeInfo currentNode, ByteBufferStrategy strategy, - ExtendedBlock currentBlock) throws IOException { + LocatedBlock currentBlock, int chunkIndex) throws IOException { final int targetLength = strategy.getTargetLength(); - int length = 0; - try { - while (length < targetLength) { - int ret = strategy.readFromBlock(blockReader); - if (ret < 0) { - throw new IOException("Unexpected EOS from the reader"); + int curAttempts = 0; + while (true) { + int length = 0; + try { + while (length < targetLength) { + int ret = strategy.readFromBlock(blockReader); + if (ret < 0) { + throw new IOException("Unexpected EOS from the reader"); + } + length += ret; } - length += ret; - } - return length; - } catch (ChecksumException ce) { - DFSClient.LOG.warn("Found Checksum error for " - + currentBlock + " from " + currentNode - + " at " + ce.getPos()); - //Clear buffer to make next decode success - strategy.getReadBuffer().clear(); - // we want to remember which block replicas we have tried - corruptedBlocks.addCorruptedBlock(currentBlock, currentNode); - if (blockReader != null) { - blockReader.close(); - } - throw ce; - } catch (IOException e) { - DFSClient.LOG.warn("Exception while reading from " - + currentBlock + " of " + dfsStripedInputStream.getSrc() + " from " - + currentNode, e); - //Clear buffer to make next decode success - strategy.getReadBuffer().clear(); - if (blockReader != null) { - blockReader.close(); + return length; + } catch (ChecksumException ce) { + DFSClient.LOG.warn("Found Checksum error for " + + currentBlock + " from " + currentNode + + " at " + ce.getPos()); + //Clear buffer to make next decode success + strategy.getReadBuffer().clear(); + // we want to remember which block replicas we have tried + corruptedBlocks.addCorruptedBlock(currentBlock.getBlock(), currentNode); + if (blockReader != null) { + blockReader.close(); + } + throw ce; + } catch (IOException e) { + //Clear buffer to make next decode success + strategy.getReadBuffer().clear(); + if (blockReader != null) { + blockReader.close(); + } + if (curAttempts < readDNMaxAttempts) { + curAttempts++; + if (dfsStripedInputStream.createBlockReader(currentBlock, + alignedStripe.getOffsetInBlock(), targetBlocks, + readerInfos, chunkIndex, readTo)) { + blockReader = readerInfos[chunkIndex].reader; + String msg = "Reconnect to " + currentNode.getInfoAddr() + + " for block " + currentBlock.getBlock(); + DFSClient.LOG.warn(msg); + continue; + } + } + DFSClient.LOG.warn("Exception while reading from " + + currentBlock + " of " + dfsStripedInputStream.getSrc() + " from " + + currentNode, e); + throw e; } - throw e; } } private Callable readCells(final BlockReader reader, final DatanodeInfo datanode, final long currentReaderOffset, final long targetReaderOffset, final ByteBufferStrategy[] strategies, - final ExtendedBlock currentBlock) { + final LocatedBlock currentBlock, final int chunkIndex) { return () -> { // reader can be null if getBlockReaderWithRetry failed or // the reader hit exception before @@ -290,7 +307,8 @@ private Callable readCells(final BlockReader reader, int ret = 0; for (ByteBufferStrategy strategy : strategies) { - int bytesReead = readToBuffer(reader, datanode, strategy, currentBlock); + int bytesReead = readToBuffer(reader, datanode, strategy, currentBlock, + chunkIndex); ret += bytesReead; } return new BlockReadStats(ret, reader.isShortCircuit(), @@ -324,7 +342,7 @@ boolean readChunk(final LocatedBlock block, int chunkIndex) readerInfos[chunkIndex].datanode, readerInfos[chunkIndex].blockReaderOffset, alignedStripe.getOffsetInBlock(), getReadStrategies(chunk), - block.getBlock()); + block, chunkIndex); Future request = service.submit(readCallable); futures.put(request, chunkIndex); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 2044530506757..d0e5bd7017681 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -532,6 +532,13 @@ interface StripedRead { * span 6 DNs, so this default value accommodates 3 read streams */ int THREADPOOL_SIZE_DEFAULT = 18; + /** + * The number of times to reconnect to DN + * during the striped read process, the default is 1. + */ + String DATANODE_MAX_ATTEMPTS = PREFIX + + "datanode.max.attempts"; + int DATANODE_MAX_ATTEMPTS_DEFAULT = 1; } /** dfs.http.client configuration properties */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index dfc89adeab3bd..4b5fbaef0dd59 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -167,6 +167,7 @@ public class DfsClientConf { private final boolean deadNodeDetectionEnabled; private final long leaseHardLimitPeriod; private final boolean recoverLeaseOnCloseException; + private final int stripedReadDnMaxAttempts; public DfsClientConf(Configuration conf) { // The hdfsTimeout is currently the same as the ipc timeout @@ -296,6 +297,13 @@ public DfsClientConf(Configuration conf) { Preconditions.checkArgument(stripedReadThreadpoolSize > 0, "The value of " + HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY + " must be greater than 0."); + stripedReadDnMaxAttempts = + conf.getInt( + HdfsClientConfigKeys.StripedRead.DATANODE_MAX_ATTEMPTS, + HdfsClientConfigKeys.StripedRead.DATANODE_MAX_ATTEMPTS_DEFAULT); + Preconditions.checkArgument(stripedReadDnMaxAttempts > 0, "The value of " + + HdfsClientConfigKeys.StripedRead.DATANODE_MAX_ATTEMPTS + + " must be greater than 0."); replicaAccessorBuilderClasses = loadReplicaAccessorBuilderClasses(conf); leaseHardLimitPeriod = @@ -703,6 +711,13 @@ public boolean isDeadNodeDetectionEnabled() { return deadNodeDetectionEnabled; } + /** + * @return the stripedReadDnMaxAttempts + */ + public int getStripedReadDnMaxAttempts() { + return stripedReadDnMaxAttempts; + } + /** * @return the leaseHardLimitPeriod */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 41dfbbca443fe..4552e789526dd 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4625,6 +4625,15 @@ + + dfs.client.read.striped.datanode.max.attempts + 1 + + The number of times to reconnect to DN during + the striped read process, the default is 1. + + + dfs.client.replica.accessor.builder.classes From 7fca0c417ad91ce40d4676880589369b8589414a Mon Sep 17 00:00:00 2001 From: xiezhineng Date: Mon, 6 Nov 2023 21:36:40 +0800 Subject: [PATCH 02/14] fix checkstyle --- .../src/main/java/org/apache/hadoop/hdfs/StripeReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java index d128d0f8b690e..16ce4df97d290 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java @@ -119,7 +119,7 @@ void skip() { protected final RawErasureDecoder decoder; protected final DFSStripedInputStream dfsStripedInputStream; private long readTo = -1; - protected final int readDNMaxAttempts; + private final int readDNMaxAttempts; protected ECChunk[] decodeInputs; From 950e686f09aa14f39105b3d9745d35b24ed87753 Mon Sep 17 00:00:00 2001 From: xiezhineng Date: Mon, 6 Nov 2023 21:59:10 +0800 Subject: [PATCH 03/14] add unit test --- .../TestDFSStripedInputStreamWithTimeout.java | 168 ++++++++++++++++++ 1 file changed, 168 insertions(+) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java new file mode 100644 index 0000000000000..dbaf0068e0a83 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.io.erasurecode.CodecUtil; +import org.apache.hadoop.io.erasurecode.ErasureCodeNative; +import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import java.io.IOException; +import java.util.Arrays; + +public class TestDFSStripedInputStreamWithTimeout { + + public static final Logger LOG = + LoggerFactory.getLogger(TestDFSStripedInputStreamWithTimeout.class); + + private MiniDFSCluster cluster; + private Configuration conf = new Configuration(); + private DistributedFileSystem fs; + private final Path dirPath = new Path("/striped"); + private Path filePath = new Path(dirPath, "file"); + private ErasureCodingPolicy ecPolicy; + private short dataBlocks; + private short parityBlocks; + private int cellSize; + private final int stripesPerBlock = 2; + private int blockSize; + private int blockGroupSize; + + @Rule + public Timeout globalTimeout = new Timeout(300000); + + public ErasureCodingPolicy getEcPolicy() { + return StripedFileTestUtil.getDefaultECPolicy(); + } + + @Before + public void setup() throws IOException { + /* + * Initialize erasure coding policy. + */ + ecPolicy = getEcPolicy(); + dataBlocks = (short) ecPolicy.getNumDataUnits(); + parityBlocks = (short) ecPolicy.getNumParityUnits(); + cellSize = ecPolicy.getCellSize(); + blockSize = stripesPerBlock * cellSize; + blockGroupSize = dataBlocks * blockSize; + System.out.println("EC policy = " + ecPolicy); + + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); + + conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, 1000); + // SET CONFIG FOR HDFS CLIENT + conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 1000); + conf.setInt(HdfsClientConfigKeys.StripedRead.DATANODE_MAX_ATTEMPTS, 3); + + if (ErasureCodeNative.isNativeCodeLoaded()) { + conf.set( + CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY, + NativeRSRawErasureCoderFactory.CODER_NAME); + } + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, + GenericTestUtils.getRandomizedTempPath()); + SimulatedFSDataset.setFactory(conf); + startUp(); + } + + private void startUp() throws IOException { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes( + dataBlocks + parityBlocks).build(); + cluster.waitActive(); + for (DataNode dn : cluster.getDataNodes()) { + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); + } + fs = cluster.getFileSystem(); + fs.enableErasureCodingPolicy(getEcPolicy().getName()); + fs.mkdirs(dirPath); + fs.getClient() + .setErasureCodingPolicy(dirPath.toString(), ecPolicy.getName()); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + @Test + public void testPreadTimeout() throws Exception { + final int numBlocks = 2; + DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, + stripesPerBlock, false, ecPolicy); + final int fileSize = numBlocks * blockGroupSize; + + LocatedBlocks lbs = fs.getClient().namenode. + getBlockLocations(filePath.toString(), 0, fileSize); + + for (LocatedBlock lb : lbs.getLocatedBlocks()) { + assert lb instanceof LocatedStripedBlock; + LocatedStripedBlock bg = (LocatedStripedBlock) (lb); + for (int i = 0; i < dataBlocks; i++) { + Block blk = new Block(bg.getBlock().getBlockId() + i, + stripesPerBlock * cellSize, + bg.getBlock().getGenerationStamp()); + blk.setGenerationStamp(bg.getBlock().getGenerationStamp()); + cluster.injectBlocks(i, Arrays.asList(blk), + bg.getBlock().getBlockPoolId()); + } + } + + DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(), + filePath.toString(), false, ecPolicy, null); + int bufLen = 1024 * 100; + byte[] buf = new byte[bufLen]; + int readTotal = 0; + try { + while (readTotal < fileSize) { + in.seek(readTotal); + int nread = in.read(buf, 0, bufLen); + readTotal += nread; + // Simulated time-consuming processing operations, such as UDF. + Thread.sleep(10); + } + Assert.assertEquals("Success to read striped file.", fileSize, readTotal); + } catch (Exception e) { + Assert.fail("Fail to read striped time out. "); + } + in.close(); + } +} From c245be4b63f1e621b10038294979b2b10907367f Mon Sep 17 00:00:00 2001 From: xiezhineng Date: Tue, 5 Dec 2023 15:43:48 +0800 Subject: [PATCH 04/14] add testReadFileWithAttempt for unit test --- .../TestDFSStripedInputStreamWithTimeout.java | 52 ++++++++++++------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java index dbaf0068e0a83..145d37a579fe6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java @@ -87,8 +87,7 @@ public void setup() throws IOException { conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, 1000); // SET CONFIG FOR HDFS CLIENT - conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 1000); - conf.setInt(HdfsClientConfigKeys.StripedRead.DATANODE_MAX_ATTEMPTS, 3); + conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 1000); if (ErasureCodeNative.isNativeCodeLoaded()) { conf.set( @@ -132,7 +131,6 @@ public void testPreadTimeout() throws Exception { LocatedBlocks lbs = fs.getClient().namenode. getBlockLocations(filePath.toString(), 0, fileSize); - for (LocatedBlock lb : lbs.getLocatedBlocks()) { assert lb instanceof LocatedStripedBlock; LocatedStripedBlock bg = (LocatedStripedBlock) (lb); @@ -145,24 +143,42 @@ public void testPreadTimeout() throws Exception { bg.getBlock().getBlockPoolId()); } } + try { + testReadFileWithAttempt(1); + Assert.fail("It Should fail to read striped time out with 1 attempt . "); + } catch (Exception e) { + Assert.assertTrue( + "Throw IOException error message with 4 missing blocks. ", + e.getMessage().contains("4 missing blocks")); + } - DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(), - filePath.toString(), false, ecPolicy, null); - int bufLen = 1024 * 100; - byte[] buf = new byte[bufLen]; - int readTotal = 0; try { - while (readTotal < fileSize) { - in.seek(readTotal); - int nread = in.read(buf, 0, bufLen); - readTotal += nread; - // Simulated time-consuming processing operations, such as UDF. - Thread.sleep(10); - } - Assert.assertEquals("Success to read striped file.", fileSize, readTotal); + testReadFileWithAttempt(3); } catch (Exception e) { - Assert.fail("Fail to read striped time out. "); + Assert.fail("It Should successfully read striped file with 3 attempts. "); + } + } + + private void testReadFileWithAttempt(int attempt) throws Exception { + cluster.getConfiguration(0) + .setInt(HdfsClientConfigKeys.StripedRead.DATANODE_MAX_ATTEMPTS, + attempt); + DistributedFileSystem fs = + (DistributedFileSystem) cluster.getNewFileSystemInstance(0); + try(DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(), + filePath.toString(), false, ecPolicy, null)){ + int bufLen = 1024 * 100; + byte[] buf = new byte[bufLen]; + int readTotal = 0; + in.seek(readTotal); + int nread = in.read(buf, 0, bufLen); + // Simulated time-consuming processing operations, such as UDF. + Thread.sleep(2000); + in.seek(nread); + // StripeRange 6MB + bufLen = 1024 * 1024 * 6; + buf = new byte[bufLen]; + in.read(buf, 0, bufLen); } - in.close(); } } From ee6a4f1816edd8e59308d80ee7850c7750cf6f68 Mon Sep 17 00:00:00 2001 From: xiezhineng Date: Wed, 6 Dec 2023 14:51:48 +0800 Subject: [PATCH 05/14] fix chekstyle. --- .../hdfs/TestDFSStripedInputStreamWithTimeout.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java index 145d37a579fe6..d90c7fb91de3b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java @@ -84,10 +84,7 @@ public void setup() throws IOException { conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); - conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, 1000); - // SET CONFIG FOR HDFS CLIENT - conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 1000); if (ErasureCodeNative.isNativeCodeLoaded()) { conf.set( @@ -160,12 +157,15 @@ public void testPreadTimeout() throws Exception { } private void testReadFileWithAttempt(int attempt) throws Exception { + // set dfs client config cluster.getConfiguration(0) .setInt(HdfsClientConfigKeys.StripedRead.DATANODE_MAX_ATTEMPTS, attempt); - DistributedFileSystem fs = + cluster.getConfiguration(0) + .setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 1000); + DistributedFileSystem newFs = (DistributedFileSystem) cluster.getNewFileSystemInstance(0); - try(DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(), + try(DFSStripedInputStream in = new DFSStripedInputStream(newFs.getClient(), filePath.toString(), false, ecPolicy, null)){ int bufLen = 1024 * 100; byte[] buf = new byte[bufLen]; @@ -173,7 +173,7 @@ private void testReadFileWithAttempt(int attempt) throws Exception { in.seek(readTotal); int nread = in.read(buf, 0, bufLen); // Simulated time-consuming processing operations, such as UDF. - Thread.sleep(2000); + Thread.sleep(10000); in.seek(nread); // StripeRange 6MB bufLen = 1024 * 1024 * 6; From 38a508ffe9911b4e4f718a330e3acb781fb1079b Mon Sep 17 00:00:00 2001 From: xiezhineng Date: Mon, 6 Nov 2023 21:36:40 +0800 Subject: [PATCH 06/14] fix checkstyle --- .../main/java/org/apache/hadoop/hdfs/StripeReader.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java index 16ce4df97d290..4b569435367e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java @@ -278,11 +278,19 @@ private int readToBuffer(BlockReader blockReader, DFSClient.LOG.warn(msg); continue; } +<<<<<<< HEAD } DFSClient.LOG.warn("Exception while reading from " + currentBlock + " of " + dfsStripedInputStream.getSrc() + " from " + currentNode, e); throw e; +======= + DFSClient.LOG.warn("Exception while reading from " + + currentBlock + " of " + dfsStripedInputStream.getSrc() + " from " + + currentNode, e); + throw e; + } +>>>>>>> e471564ed1f4 (fix checkstyle) } } } From 38541b586859b89a56d4983b6fe69a0bfbc14dce Mon Sep 17 00:00:00 2001 From: xiezhineng Date: Mon, 6 Nov 2023 21:59:10 +0800 Subject: [PATCH 07/14] add unit test --- .../main/java/org/apache/hadoop/hdfs/StripeReader.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java index 4b569435367e2..16ce4df97d290 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java @@ -278,19 +278,11 @@ private int readToBuffer(BlockReader blockReader, DFSClient.LOG.warn(msg); continue; } -<<<<<<< HEAD } DFSClient.LOG.warn("Exception while reading from " + currentBlock + " of " + dfsStripedInputStream.getSrc() + " from " + currentNode, e); throw e; -======= - DFSClient.LOG.warn("Exception while reading from " - + currentBlock + " of " + dfsStripedInputStream.getSrc() + " from " - + currentNode, e); - throw e; - } ->>>>>>> e471564ed1f4 (fix checkstyle) } } } From 89dd926cb3a584f70386802b279f8f21e7493524 Mon Sep 17 00:00:00 2001 From: xiezhineng Date: Tue, 11 Jul 2023 20:34:35 +0800 Subject: [PATCH 08/14] add dfs.client.read.striped.datanode.max.attempts to fix read ec issue --- .../src/main/java/org/apache/hadoop/hdfs/StripeReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java index 16ce4df97d290..d128d0f8b690e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java @@ -119,7 +119,7 @@ void skip() { protected final RawErasureDecoder decoder; protected final DFSStripedInputStream dfsStripedInputStream; private long readTo = -1; - private final int readDNMaxAttempts; + protected final int readDNMaxAttempts; protected ECChunk[] decodeInputs; From e9c0a4c9464665212a5d72740b92b39af106f1cb Mon Sep 17 00:00:00 2001 From: xiezhineng Date: Mon, 6 Nov 2023 21:59:10 +0800 Subject: [PATCH 09/14] add unit test --- .../hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java index d90c7fb91de3b..a1ff537d75533 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java @@ -84,7 +84,7 @@ public void setup() throws IOException { conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); - conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, 1000); + conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, 500); if (ErasureCodeNative.isNativeCodeLoaded()) { conf.set( From 41d27d8354bc4d1ffaf005e8f7e859d06e3a303b Mon Sep 17 00:00:00 2001 From: xiezhineng Date: Wed, 17 Apr 2024 18:21:53 +0800 Subject: [PATCH 10/14] add offsetInBlock to avoid reading duplicate data from datanode. --- .../java/org/apache/hadoop/hdfs/StripeReader.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java index d128d0f8b690e..2b097af0af674 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java @@ -235,7 +235,8 @@ private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) { private int readToBuffer(BlockReader blockReader, DatanodeInfo currentNode, ByteBufferStrategy strategy, - LocatedBlock currentBlock, int chunkIndex) throws IOException { + LocatedBlock currentBlock, int chunkIndex, long offsetInBlock) + throws IOException { final int targetLength = strategy.getTargetLength(); int curAttempts = 0; while (true) { @@ -270,7 +271,7 @@ private int readToBuffer(BlockReader blockReader, if (curAttempts < readDNMaxAttempts) { curAttempts++; if (dfsStripedInputStream.createBlockReader(currentBlock, - alignedStripe.getOffsetInBlock(), targetBlocks, + offsetInBlock, targetBlocks, readerInfos, chunkIndex, readTo)) { blockReader = readerInfos[chunkIndex].reader; String msg = "Reconnect to " + currentNode.getInfoAddr() @@ -307,9 +308,9 @@ private Callable readCells(final BlockReader reader, int ret = 0; for (ByteBufferStrategy strategy : strategies) { - int bytesReead = readToBuffer(reader, datanode, strategy, currentBlock, - chunkIndex); - ret += bytesReead; + int bytesRead = readToBuffer(reader, datanode, strategy, currentBlock, + chunkIndex, alignedStripe.getOffsetInBlock() + ret); + ret += bytesRead; } return new BlockReadStats(ret, reader.isShortCircuit(), reader.getNetworkDistance()); From 0d9cdfc03cded92e8d52e6aabef1c28ce37780ba Mon Sep 17 00:00:00 2001 From: xiezhineng Date: Fri, 19 Apr 2024 15:37:35 +0800 Subject: [PATCH 11/14] fix check style && log. --- .../java/org/apache/hadoop/hdfs/StripeReader.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java index 2b097af0af674..dfaf076de7f98 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java @@ -251,9 +251,8 @@ private int readToBuffer(BlockReader blockReader, } return length; } catch (ChecksumException ce) { - DFSClient.LOG.warn("Found Checksum error for " - + currentBlock + " from " + currentNode - + " at " + ce.getPos()); + DFSClient.LOG.warn("Found Checksum error for {} from {} at {}", + currentBlock, currentNode, ce.getPos()); //Clear buffer to make next decode success strategy.getReadBuffer().clear(); // we want to remember which block replicas we have tried @@ -274,15 +273,13 @@ private int readToBuffer(BlockReader blockReader, offsetInBlock, targetBlocks, readerInfos, chunkIndex, readTo)) { blockReader = readerInfos[chunkIndex].reader; - String msg = "Reconnect to " + currentNode.getInfoAddr() - + " for block " + currentBlock.getBlock(); - DFSClient.LOG.warn(msg); + DFSClient.LOG.warn("Reconnect to {} for block {}", + currentNode.getInfoAddr(), currentBlock.getBlock()); continue; } } - DFSClient.LOG.warn("Exception while reading from " - + currentBlock + " of " + dfsStripedInputStream.getSrc() + " from " - + currentNode, e); + DFSClient.LOG.warn("Exception while reading from {} of {} from {}", + currentBlock, dfsStripedInputStream.getSrc(), currentNode, e); throw e; } } From 6bc49155c1b7657f6bfe98f0be35f8de9e6cec7b Mon Sep 17 00:00:00 2001 From: xiezhineng Date: Fri, 19 Apr 2024 20:52:29 +0800 Subject: [PATCH 12/14] fix tests --- .../org/apache/hadoop/hdfs/server/datanode/DataNode.java | 7 +++++++ .../hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java | 5 ++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index ef778791cfd9c..24a8757ddd05c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -4388,4 +4388,11 @@ boolean isSlownode() { public BlockPoolManager getBlockPoolManager() { return blockPoolManager; } + + @VisibleForTesting + public void closeDataXceiverServer() { + if (xserver != null) { + xserver.closeAllPeers(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java index a1ff537d75533..cd4e41c2f833e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java @@ -173,7 +173,10 @@ private void testReadFileWithAttempt(int attempt) throws Exception { in.seek(readTotal); int nread = in.read(buf, 0, bufLen); // Simulated time-consuming processing operations, such as UDF. - Thread.sleep(10000); + // And datanodes close connect because of socket timeout. + cluster.dataNodes.forEach(dn -> { + dn.getDatanode().closeDataXceiverServer(); + }); in.seek(nread); // StripeRange 6MB bufLen = 1024 * 1024 * 6; From 1a28725cb6bbb6d5a3b1f4ad5ee4fbe8e2540f33 Mon Sep 17 00:00:00 2001 From: xiezhineng Date: Mon, 22 Apr 2024 09:36:24 +0800 Subject: [PATCH 13/14] fix checkstyle --- .../hadoop-hdfs/src/main/resources/hdfs-default.xml | 2 +- .../hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 4552e789526dd..a7912f8276499 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4633,7 +4633,7 @@ the striped read process, the default is 1. - + dfs.client.replica.accessor.builder.classes diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java index cd4e41c2f833e..58572112701b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStreamWithTimeout.java @@ -174,9 +174,7 @@ private void testReadFileWithAttempt(int attempt) throws Exception { int nread = in.read(buf, 0, bufLen); // Simulated time-consuming processing operations, such as UDF. // And datanodes close connect because of socket timeout. - cluster.dataNodes.forEach(dn -> { - dn.getDatanode().closeDataXceiverServer(); - }); + cluster.dataNodes.forEach(dn -> dn.getDatanode().closeDataXceiverServer()); in.seek(nread); // StripeRange 6MB bufLen = 1024 * 1024 * 6; From 1b1f4c077c98d932c7b4f938d3d418d3bfd0e248 Mon Sep 17 00:00:00 2001 From: xiezhineng Date: Wed, 2 Jul 2025 10:28:36 +0800 Subject: [PATCH 14/14] fix Variable 'readDNMaxAttempts' to be private --- .../src/main/java/org/apache/hadoop/hdfs/StripeReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java index dfaf076de7f98..4a420dcf23fce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java @@ -119,7 +119,7 @@ void skip() { protected final RawErasureDecoder decoder; protected final DFSStripedInputStream dfsStripedInputStream; private long readTo = -1; - protected final int readDNMaxAttempts; + private final int readDNMaxAttempts; protected ECChunk[] decodeInputs;