Skip to content

Commit cc2c9df

Browse files
mukund-thakursteveloughran
authored andcommitted
PARQUET-2171. Support vectored IO in Hadoop stream reads
HADOOP-18103 introduced high performance vectored read API in hadoop filesystems, including the local filesystem and S3A. This patch supports the vectored IO API in ParquetFileReader, switching to it when the hadoop version supports it and the parquet library has it explicitly enabled. To use vectored IO, set parquet.hadoop.vectored.io.enabled=true The API is invoked through reflection so that on Hadoop releases with the feature (3.3.5+) the Vectored IO operations are handed off to the Hadoop input stream. On older versions the feature is not available -the parquet library will compile and run as normal. Some existing tests have been parameterized to verify that behavior is consistent with/without vector IO enabled; On hadoop2 profile test runs, this implicitly verifies that compatibility is maintained.
1 parent 330242b commit cc2c9df

20 files changed

+1869
-19
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.parquet.io;
21+
22+
import java.nio.ByteBuffer;
23+
import java.util.concurrent.CompletableFuture;
24+
25+
/**
26+
* Class to define a file range for a parquet file and to
27+
* hold future data for any ongoing read for that range.
28+
*/
29+
public class ParquetFileRange {
30+
31+
/**
32+
* Start position in file.
33+
*/
34+
private final long offset;
35+
36+
/**
37+
* Length of data to be read from position.
38+
*/
39+
private final int length;
40+
41+
/**
42+
* A future object to hold future for ongoing read.
43+
*/
44+
private CompletableFuture<ByteBuffer> dataReadFuture;
45+
46+
public ParquetFileRange(long offset, int length) {
47+
this.offset = offset;
48+
this.length = length;
49+
}
50+
51+
public long getOffset() {
52+
return offset;
53+
}
54+
55+
public int getLength() {
56+
return length;
57+
}
58+
59+
public CompletableFuture<ByteBuffer> getDataReadFuture() {
60+
return dataReadFuture;
61+
}
62+
63+
public void setDataReadFuture(CompletableFuture<ByteBuffer> dataReadFuture) {
64+
this.dataReadFuture = dataReadFuture;
65+
}
66+
67+
@Override
68+
public String toString() {
69+
return "range[" + this.offset + "," + (this.offset + (long)this.length) + ")";
70+
}
71+
}

parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import java.io.IOException;
2424
import java.io.InputStream;
2525
import java.nio.ByteBuffer;
26+
import java.util.List;
27+
import java.util.function.IntFunction;
2628

2729
/**
2830
* {@code SeekableInputStream} is an interface with the methods needed by
@@ -104,4 +106,22 @@ public abstract class SeekableInputStream extends InputStream {
104106
* fill the buffer, {@code buf.remaining()}
105107
*/
106108
public abstract void readFully(ByteBuffer buf) throws IOException;
109+
110+
/**
111+
* Read a set of file ranges in a vectored manner.
112+
* @throws UnsupportedOperationException if not available in this class/runtime.
113+
*/
114+
public void readVectored(List<ParquetFileRange> ranges,
115+
IntFunction<ByteBuffer> allocate) throws IOException {
116+
117+
throw new UnsupportedOperationException("Vectored IO is not supported for " + this);
118+
}
119+
120+
/**
121+
* Is the {@link #readVectored(List, IntFunction)} method available.
122+
*/
123+
public boolean readVectoredAvailable() {
124+
return false;
125+
}
126+
107127
}

parquet-common/src/main/java/org/apache/parquet/util/DynMethods.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,15 @@
2828
import java.security.PrivilegedAction;
2929
import java.util.Arrays;
3030
import org.apache.parquet.Preconditions;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
import static org.apache.parquet.Exceptions.throwIfInstance;
3135

3236
public class DynMethods {
3337

38+
private static final Logger LOG = LoggerFactory.getLogger(DynMethods.class);
39+
3440
/**
3541
* Convenience wrapper class around {@link java.lang.reflect.Method}.
3642
* <p>
@@ -240,7 +246,8 @@ public Builder impl(String className, String methodName, Class<?>... argClasses)
240246
Class<?> targetClass = Class.forName(className, true, loader);
241247
impl(targetClass, methodName, argClasses);
242248
} catch (ClassNotFoundException e) {
243-
// not the right implementation
249+
// class not found on supplied classloader.
250+
LOG.debug("failed to load class {}", className, e);
244251
}
245252
return this;
246253
}
@@ -277,6 +284,7 @@ public Builder impl(Class<?> targetClass, String methodName, Class<?>... argClas
277284
this.method = new UnboundMethod(targetClass.getMethod(methodName, argClasses), name);
278285
} catch (NoSuchMethodException e) {
279286
// not the right implementation
287+
LOG.debug("failed to load method {} from class {}", methodName, targetClass, e);
280288
}
281289
return this;
282290
}
@@ -307,6 +315,8 @@ public Builder ctorImpl(Class<?> targetClass, Class<?>... argClasses) {
307315
.buildChecked();
308316
} catch (NoSuchMethodException e) {
309317
// not the right implementation
318+
LOG.debug("failed to load constructor arity {} from class {}",
319+
argClasses.length, targetClass, e);
310320
}
311321
return this;
312322
}
@@ -323,6 +333,9 @@ public Builder ctorImpl(String className, Class<?>... argClasses) {
323333
.buildChecked();
324334
} catch (NoSuchMethodException e) {
325335
// not the right implementation
336+
LOG.debug("failed to load constructor arity {} from class {}",
337+
argClasses.length, className, e);
338+
326339
}
327340
return this;
328341
}
@@ -345,7 +358,8 @@ public Builder hiddenImpl(String className, String methodName, Class<?>... argCl
345358
Class<?> targetClass = Class.forName(className, true, loader);
346359
hiddenImpl(targetClass, methodName, argClasses);
347360
} catch (ClassNotFoundException e) {
348-
// not the right implementation
361+
// class not found on supplied classloader.
362+
LOG.debug("failed to load class {}", className, e);
349363
}
350364
return this;
351365
}
@@ -384,6 +398,7 @@ public Builder hiddenImpl(Class<?> targetClass, String methodName, Class<?>... a
384398
this.method = new UnboundMethod(hidden, name);
385399
} catch (SecurityException | NoSuchMethodException e) {
386400
// unusable or not the right implementation
401+
LOG.debug("failed to load method {} from class {}", methodName, targetClass, e);
387402
}
388403
return this;
389404
}

parquet-hadoop/README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,3 +501,11 @@ If `false`, key material is stored in separate new files, created in the same fo
501501
**Description:** Length of key encryption keys (KEKs), randomly generated by parquet key management tools. Can be 128, 192 or 256 bits.
502502
**Default value:** `128`
503503

504+
---
505+
506+
**Property:** `parquet.hadoop.vectored.io.enabled`
507+
**Description:** Flag to enable use of the FileSystem Vector IO API on Hadoop releases which support the feature.
508+
If `true` then an attempt will be made to dynamically load the relevant classes;
509+
if not found then the library will use the classic non-vectored reads: it is safe to enable this option on older releases.
510+
**Default value:** `false`
511+

parquet-hadoop/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@
7979
<version>${hadoop.version}</version>
8080
<scope>provided</scope>
8181
</dependency>
82+
<!-- <dependency>
83+
<groupId>org.apache.hadoop.extensions</groupId>
84+
<artifactId>hadoop-api-shim</artifactId>
85+
<scope>provided</scope>
86+
</dependency>-->
8287
<dependency>
8388
<groupId>org.apache.parquet</groupId>
8489
<artifactId>parquet-jackson</artifactId>

parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
3232
import org.apache.parquet.hadoop.ParquetMetricsCallback;
3333

34+
import static org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED;
35+
3436
public class HadoopReadOptions extends ParquetReadOptions {
3537
private final Configuration conf;
3638

@@ -43,6 +45,7 @@ private HadoopReadOptions(
4345
boolean usePageChecksumVerification,
4446
boolean useBloomFilter,
4547
boolean useOffHeapDecryptBuffer,
48+
boolean useHadoopVectoredIO,
4649
FilterCompat.Filter recordFilter,
4750
MetadataFilter metadataFilter,
4851
CompressionCodecFactory codecFactory,
@@ -61,6 +64,7 @@ private HadoopReadOptions(
6164
usePageChecksumVerification,
6265
useBloomFilter,
6366
useOffHeapDecryptBuffer,
67+
useHadoopVectoredIO,
6468
recordFilter,
6569
metadataFilter,
6670
codecFactory,

parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class ParquetReadOptions {
5555
private static final boolean STATS_FILTERING_ENABLED_DEFAULT = true;
5656
private static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = true;
5757
private static final boolean COLUMN_INDEX_FILTERING_ENABLED_DEFAULT = true;
58+
private static final boolean HADOOP_VECTORED_IO_ENABLED_DEFAULT = false;
5859
private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB
5960
private static final boolean PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT = false;
6061
private static final boolean BLOOM_FILTER_ENABLED_DEFAULT = true;
@@ -68,6 +69,7 @@ public class ParquetReadOptions {
6869
private final boolean usePageChecksumVerification;
6970
private final boolean useBloomFilter;
7071
private final boolean useOffHeapDecryptBuffer;
72+
private final boolean useHadoopVectoredIO;
7173
private final FilterCompat.Filter recordFilter;
7274
private final ParquetMetadataConverter.MetadataFilter metadataFilter;
7375
private final CompressionCodecFactory codecFactory;
@@ -87,6 +89,7 @@ public class ParquetReadOptions {
8789
boolean usePageChecksumVerification,
8890
boolean useBloomFilter,
8991
boolean useOffHeapDecryptBuffer,
92+
boolean useHadoopVectoredIO,
9093
FilterCompat.Filter recordFilter,
9194
ParquetMetadataConverter.MetadataFilter metadataFilter,
9295
CompressionCodecFactory codecFactory,
@@ -104,6 +107,7 @@ public class ParquetReadOptions {
104107
usePageChecksumVerification,
105108
useBloomFilter,
106109
useOffHeapDecryptBuffer,
110+
useHadoopVectoredIO,
107111
recordFilter,
108112
metadataFilter,
109113
codecFactory,
@@ -124,6 +128,7 @@ public class ParquetReadOptions {
124128
boolean usePageChecksumVerification,
125129
boolean useBloomFilter,
126130
boolean useOffHeapDecryptBuffer,
131+
boolean useHadoopVectoredIO,
127132
FilterCompat.Filter recordFilter,
128133
ParquetMetadataConverter.MetadataFilter metadataFilter,
129134
CompressionCodecFactory codecFactory,
@@ -141,6 +146,7 @@ public class ParquetReadOptions {
141146
this.usePageChecksumVerification = usePageChecksumVerification;
142147
this.useBloomFilter = useBloomFilter;
143148
this.useOffHeapDecryptBuffer = useOffHeapDecryptBuffer;
149+
this.useHadoopVectoredIO = useHadoopVectoredIO;
144150
this.recordFilter = recordFilter;
145151
this.metadataFilter = metadataFilter;
146152
this.codecFactory = codecFactory;
@@ -184,6 +190,10 @@ public boolean usePageChecksumVerification() {
184190
return usePageChecksumVerification;
185191
}
186192

193+
public boolean useHadoopVectoredIO() {
194+
return useHadoopVectoredIO;
195+
}
196+
187197
public FilterCompat.Filter getRecordFilter() {
188198
return recordFilter;
189199
}
@@ -242,6 +252,7 @@ public static class Builder {
242252
protected boolean useStatsFilter = STATS_FILTERING_ENABLED_DEFAULT;
243253
protected boolean useDictionaryFilter = DICTIONARY_FILTERING_ENABLED_DEFAULT;
244254
protected boolean useRecordFilter = RECORD_FILTERING_ENABLED_DEFAULT;
255+
protected boolean useHadoopVectoredIo = HADOOP_VECTORED_IO_ENABLED_DEFAULT;
245256
protected boolean useColumnIndexFilter = COLUMN_INDEX_FILTERING_ENABLED_DEFAULT;
246257
protected boolean usePageChecksumVerification = PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT;
247258
protected boolean useBloomFilter = BLOOM_FILTER_ENABLED_DEFAULT;
@@ -320,6 +331,11 @@ public Builder useRecordFilter() {
320331
return this;
321332
}
322333

334+
public Builder useHadoopVectoredIo(boolean useHadoopVectoredIo) {
335+
this.useHadoopVectoredIo = useHadoopVectoredIo;
336+
return this;
337+
}
338+
323339
public Builder useColumnIndexFilter(boolean useColumnIndexFilter) {
324340
this.useColumnIndexFilter = useColumnIndexFilter;
325341
return this;
@@ -418,6 +434,7 @@ public Builder copy(ParquetReadOptions options) {
418434
useDictionaryFilter(options.useDictionaryFilter);
419435
useRecordFilter(options.useRecordFilter);
420436
withRecordFilter(options.recordFilter);
437+
useHadoopVectoredIo(options.useHadoopVectoredIO);
421438
withMetadataFilter(options.metadataFilter);
422439
withCodecFactory(options.codecFactory);
423440
withAllocator(options.allocator);
@@ -449,6 +466,7 @@ public ParquetReadOptions build() {
449466
usePageChecksumVerification,
450467
useBloomFilter,
451468
useOffHeapDecryptBuffer,
469+
useHadoopVectoredIo,
452470
recordFilter,
453471
metadataFilter,
454472
codecFactory,

0 commit comments

Comments
 (0)