Skip to content

Commit 768a5d2

Browse files
committed
PARQUET-2171. Vector IO through self-contained shim classes.
Takes the vector IO shim code from the hadoop-shim library and reimplements in through the DynMethods class. Needs * lower level tests of behaviour * review of exception handling/unwrapping. PARQUET-2171. Vector IO * tests working * debug logs added DynMethods * purge shim library refs from pom.xml * move all static methods to BindingUtils; might be good to split into public/private classes, or move the await for stuff to VectorIOBridge and keep the rest hidden. PARQUET-2171. Vector IO Copy VectoredIO contract tests to parquet code; adapt. All tests are working except a probe to verify that the vectored IO API is directly implemented by the RawLocal stream. That is odd, as the hasCapabilities probe should be in Hadoop 3.3.5. PARQUET-2171. Vector IO Fix name of hasCapabilities method to look for, so the test works. +add test javadocs PARQUET-2171. Vector IO: checkstyles. PARQUET-2171. tests all working Testing found a couple of bugs with the initial Vector IO PR; fixed ParquetFileReader + parameterized the tests which had failed, so as to ensure coverage + TestVectorIOBridge uses checksumfs, which does implement vector IO (which implies all file:// links get the speedup) + improved assertions in the failing tests, which were all reporting mismatches in data read expected via actual. As well as these changes, I've also locally run the entire suite with a core-site.xml set to turn on vector IO; this is how the regressions were identified.
1 parent 9be37b0 commit 768a5d2

File tree

18 files changed

+1608
-108
lines changed

18 files changed

+1608
-108
lines changed

parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,19 @@ public abstract class SeekableInputStream extends InputStream {
109109

110110
/**
111111
* Read a set of file ranges in a vectored manner.
112+
* @throws UnsupportedOperationException if not available in this class/runtime.
112113
*/
113114
public void readVectored(List<ParquetFileRange> ranges,
114-
IntFunction<ByteBuffer> allocate) throws IOException {
115-
throw new IOException(String.format("Vectored io is not supported for %s .",
116-
this.getClass().getCanonicalName()));
115+
IntFunction<ByteBuffer> allocate) throws IOException {
116+
117+
throw new UnsupportedOperationException("Vectored IO is not supported for " + this);
118+
}
119+
120+
/**
121+
* Is the {@link #readVectored(List, IntFunction)} method available.
122+
*/
123+
public boolean readVectoredAvailable() {
124+
return false;
117125
}
118126

119127
}

parquet-common/src/main/java/org/apache/parquet/util/DynMethods.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,15 @@
2727
import java.security.PrivilegedAction;
2828
import java.util.Arrays;
2929

30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
3033
import static org.apache.parquet.Exceptions.throwIfInstance;
3134

3235
public class DynMethods {
3336

37+
private static final Logger LOG = LoggerFactory.getLogger(DynMethods.class);
38+
3439
/**
3540
* Convenience wrapper class around {@link java.lang.reflect.Method}.
3641
*
@@ -243,7 +248,8 @@ public Builder impl(String className, String methodName, Class<?>... argClasses)
243248
Class<?> targetClass = Class.forName(className, true, loader);
244249
impl(targetClass, methodName, argClasses);
245250
} catch (ClassNotFoundException e) {
246-
// not the right implementation
251+
// class not found on supplied classloader.
252+
LOG.debug("failed to load class {}", className, e);
247253
}
248254
return this;
249255
}
@@ -281,6 +287,7 @@ public Builder impl(Class<?> targetClass, String methodName, Class<?>... argClas
281287
targetClass.getMethod(methodName, argClasses), name);
282288
} catch (NoSuchMethodException e) {
283289
// not the right implementation
290+
LOG.debug("failed to load method {} from class {}", methodName, targetClass, e);
284291
}
285292
return this;
286293
}
@@ -311,6 +318,8 @@ public Builder ctorImpl(Class<?> targetClass, Class<?>... argClasses) {
311318
.buildChecked();
312319
} catch (NoSuchMethodException e) {
313320
// not the right implementation
321+
LOG.debug("failed to load constructor arity {} from class {}",
322+
argClasses.length, targetClass, e);
314323
}
315324
return this;
316325
}
@@ -327,6 +336,9 @@ public Builder ctorImpl(String className, Class<?>... argClasses) {
327336
.buildChecked();
328337
} catch (NoSuchMethodException e) {
329338
// not the right implementation
339+
LOG.debug("failed to load constructor arity {} from class {}",
340+
argClasses.length, className, e);
341+
330342
}
331343
return this;
332344
}
@@ -349,7 +361,8 @@ public Builder hiddenImpl(String className, String methodName, Class<?>... argCl
349361
Class<?> targetClass = Class.forName(className, true, loader);
350362
hiddenImpl(targetClass, methodName, argClasses);
351363
} catch (ClassNotFoundException e) {
352-
// not the right implementation
364+
// class not found on supplied classloader.
365+
LOG.debug("failed to load class {}", className, e);
353366
}
354367
return this;
355368
}
@@ -388,6 +401,7 @@ public Builder hiddenImpl(Class<?> targetClass, String methodName, Class<?>... a
388401
this.method = new UnboundMethod(hidden, name);
389402
} catch (SecurityException | NoSuchMethodException e) {
390403
// unusable or not the right implementation
404+
LOG.debug("failed to load method {} from class {}", methodName, targetClass, e);
391405
}
392406
return this;
393407
}

parquet-hadoop/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,11 @@
7979
<version>${hadoop.version}</version>
8080
<scope>provided</scope>
8181
</dependency>
82-
<dependency>
82+
<!-- <dependency>
8383
<groupId>org.apache.hadoop.extensions</groupId>
8484
<artifactId>hadoop-api-shim</artifactId>
8585
<scope>provided</scope>
86-
</dependency>
86+
</dependency>-->
8787
<dependency>
8888
<groupId>org.apache.parquet</groupId>
8989
<artifactId>parquet-jackson</artifactId>

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import java.io.Closeable;
3636
import java.io.IOException;
3737
import java.io.InputStream;
38-
import java.io.InterruptedIOException;
3938
import java.io.SequenceInputStream;
4039
import java.nio.ByteBuffer;
4140
import java.util.ArrayList;
@@ -57,14 +56,12 @@
5756
import java.util.concurrent.TimeUnit;
5857
import java.util.concurrent.TimeoutException;
5958
import java.util.stream.Collectors;
60-
import java.util.stream.Stream;
6159
import java.util.zip.CRC32;
6260

6361
import org.apache.hadoop.conf.Configuration;
6462
import org.apache.hadoop.fs.FileStatus;
6563
import org.apache.hadoop.fs.FileSystem;
6664
import org.apache.hadoop.fs.Path;
67-
import org.apache.hadoop.fs.shim.functional.FutureIO;
6865
import org.apache.parquet.HadoopReadOptions;
6966
import org.apache.parquet.ParquetReadOptions;
7067
import org.apache.parquet.Preconditions;
@@ -108,6 +105,7 @@
108105
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
109106
import org.apache.parquet.hadoop.util.HadoopInputFile;
110107
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
108+
import org.apache.parquet.hadoop.util.vectorio.BindingUtils;
111109
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
112110
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
113111
import org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter;
@@ -995,9 +993,6 @@ private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOE
995993
}
996994
// actually read all the chunks
997995
ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount());
998-
for (ConsecutivePartList consecutiveChunks : allParts) {
999-
consecutiveChunks.readAll(f, builder);
1000-
}
1001996
readAllPartsVectoredOrNormal(allParts, builder);
1002997
for (Chunk chunk : builder.build()) {
1003998
readChunkPages(chunk, block, rowGroup);
@@ -1075,14 +1070,16 @@ public ColumnChunkPageReadStore readFilteredRowGroup(int blockIndex, RowRanges r
10751070
}
10761071

10771072
/**
1078-
* Read data in all parts via either vectored IO or serial IO.
1073+
* Read data in all parts via either vectored IO or serial IO.
10791074
* @param allParts all parts to be read.
10801075
* @param builder used to build chunk list to read the pages for the different columns.
10811076
* @throws IOException any IOE.
10821077
*/
10831078
private void readAllPartsVectoredOrNormal(List<ConsecutivePartList> allParts, ChunkListBuilder builder)
10841079
throws IOException {
1085-
boolean isVectoredIO = options.useHadoopVectoredIO() && partsLengthValidForVectoredIO(allParts);
1080+
boolean isVectoredIO = options.useHadoopVectoredIO()
1081+
&& f.readVectoredAvailable()
1082+
&& partsLengthValidForVectoredIO(allParts);
10861083
if (isVectoredIO) {
10871084
readVectored(allParts, builder);
10881085
} else {
@@ -1114,9 +1111,10 @@ private boolean partsLengthValidForVectoredIO(List<ConsecutivePartList> allParts
11141111
* @param builder used to build chunk list to read the pages for the different columns.
11151112
* @throws IOException any IOE.
11161113
*/
1117-
private void readVectored(List<ConsecutivePartList> allParts, ChunkListBuilder builder)
1118-
throws IOException {
1119-
List<ParquetFileRange> ranges = new ArrayList<>();
1114+
private void readVectored(List<ConsecutivePartList> allParts,
1115+
ChunkListBuilder builder) throws IOException {
1116+
1117+
List<ParquetFileRange> ranges = new ArrayList<>(allParts.size());
11201118
for (ConsecutivePartList consecutiveChunks : allParts) {
11211119
Preconditions.checkArgument(consecutiveChunks.length < Integer.MAX_VALUE,
11221120
"Invalid length %s for vectored read operation. It must be less than max integer value.",
@@ -1125,6 +1123,7 @@ private void readVectored(List<ConsecutivePartList> allParts, ChunkListBuilder b
11251123
}
11261124
LOG.debug("Doing vectored IO for ranges {}", ranges);
11271125
ByteBufferAllocator allocator = options.getAllocator();
1126+
//blocking or asynchronous vectored read.
11281127
f.readVectored(ranges, allocator::allocate);
11291128
int k = 0;
11301129
for (ConsecutivePartList consecutivePart : allParts) {
@@ -1949,8 +1948,10 @@ public void readFromVectoredRange(ParquetFileRange currRange,
19491948
ByteBuffer buffer;
19501949
try {
19511950
LOG.debug("Waiting for vectored read to finish for range {} ", currRange);
1952-
buffer = FutureIO.awaitFuture(currRange.getDataReadFuture(),
1951+
buffer = BindingUtils.awaitFuture(currRange.getDataReadFuture(),
19531952
HADOOP_VECTORED_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS);
1953+
// report in a counter the data we just scanned
1954+
BenchmarkCounter.incrementBytesRead(currRange.getLength());
19541955
} catch (TimeoutException e) {
19551956
String error = String.format("Timeout while fetching result for %s", currRange);
19561957
LOG.error(error, e);

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,8 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
125125
public static final String STATS_FILTERING_ENABLED = "parquet.filter.stats.enabled";
126126

127127
/**
128-
* Key to enable/disable vectored io while reading parquet files.
128+
* Key to enable/disable vectored io while reading parquet files:
129+
* {@value}.
129130
*/
130131
public static final String HADOOP_VECTORED_IO_ENABLED = "parquet.hadoop.vectored.io.enabled";
131132

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.parquet.hadoop.util;
2121

2222
import org.apache.hadoop.fs.FSDataInputStream;
23+
import org.apache.parquet.hadoop.util.vectorio.VectorIOBridge;
2324
import org.apache.parquet.io.DelegatingSeekableInputStream;
2425
import org.apache.parquet.io.ParquetFileRange;
2526

@@ -28,8 +29,6 @@
2829
import java.util.List;
2930
import java.util.function.IntFunction;
3031

31-
import static org.apache.parquet.hadoop.util.ParquetVectoredIOUtil.readVectoredAndPopulate;
32-
3332
/**
3433
* SeekableInputStream implementation that implements read(ByteBuffer) for
3534
* Hadoop 1 FSDataInputStream.
@@ -63,8 +62,13 @@ public void readFully(byte[] bytes, int start, int len) throws IOException {
6362
stream.readFully(bytes, start, len);
6463
}
6564

65+
@Override
66+
public boolean readVectoredAvailable() {
67+
return VectorIOBridge.bridgeAvailable();
68+
}
69+
6670
@Override
6771
public void readVectored(List<ParquetFileRange> ranges, IntFunction<ByteBuffer> allocate) throws IOException {
68-
readVectoredAndPopulate(ranges, allocate, stream);
72+
VectorIOBridge.readVectoredRanges(stream, ranges, allocate);
6973
}
7074
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.parquet.hadoop.util;
2121

2222
import org.apache.hadoop.fs.FSDataInputStream;
23+
import org.apache.parquet.hadoop.util.vectorio.VectorIOBridge;
2324
import org.apache.parquet.io.DelegatingSeekableInputStream;
2425
import org.apache.parquet.io.ParquetFileRange;
2526

@@ -29,8 +30,6 @@
2930
import java.util.List;
3031
import java.util.function.IntFunction;
3132

32-
import static org.apache.parquet.hadoop.util.ParquetVectoredIOUtil.readVectoredAndPopulate;
33-
3433
/**
3534
* SeekableInputStream implementation for FSDataInputStream that implements
3635
* ByteBufferReadable in Hadoop 2.
@@ -88,9 +87,14 @@ public int read(ByteBuffer buf) throws IOException {
8887
}
8988
}
9089

90+
@Override
91+
public boolean readVectoredAvailable() {
92+
return VectorIOBridge.bridgeAvailable();
93+
}
94+
9195
@Override
9296
public void readVectored(List<ParquetFileRange> ranges, IntFunction<ByteBuffer> allocate) throws IOException {
93-
readVectoredAndPopulate(ranges, allocate, stream);
97+
VectorIOBridge.readVectoredRanges(stream, ranges, allocate);
9498
}
9599

96100
public static void readFully(Reader reader, ByteBuffer buf) throws IOException {

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ParquetVectoredIOUtil.java

Lines changed: 0 additions & 64 deletions
This file was deleted.

0 commit comments

Comments
 (0)