Skip to content

Commit a8ef548

Browse files
committed
PARQUET-2171. Vector IO: review changes (comments, logging, spotless)
- improve doc comments of new methods and parameterized tests. - fix build to work after rebasing - which includes spotless:apply
1 parent a064890 commit a8ef548

File tree

14 files changed

+207
-278
lines changed

14 files changed

+207
-278
lines changed

parquet-common/src/main/java/org/apache/parquet/io/ParquetFileRange.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,6 @@ public void setDataReadFuture(CompletableFuture<ByteBuffer> dataReadFuture) {
6666

6767
@Override
6868
public String toString() {
69-
return "range[" + this.offset + "," + (this.offset + (long)this.length) + ")";
69+
return "range[" + this.offset + "," + (this.offset + (long) this.length) + ")";
7070
}
7171
}

parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,7 @@ public abstract class SeekableInputStream extends InputStream {
111111
* Read a set of file ranges in a vectored manner.
112112
* @throws UnsupportedOperationException if not available in this class/runtime.
113113
*/
114-
public void readVectored(List<ParquetFileRange> ranges,
115-
IntFunction<ByteBuffer> allocate) throws IOException {
114+
public void readVectored(List<ParquetFileRange> ranges, IntFunction<ByteBuffer> allocate) throws IOException {
116115

117116
throw new UnsupportedOperationException("Vectored IO is not supported for " + this);
118117
}
@@ -123,5 +122,4 @@ public void readVectored(List<ParquetFileRange> ranges,
123122
public boolean readVectoredAvailable() {
124123
return false;
125124
}
126-
127125
}

parquet-common/src/main/java/org/apache/parquet/util/DynMethods.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@
3131
import org.slf4j.Logger;
3232
import org.slf4j.LoggerFactory;
3333

34-
import static org.apache.parquet.Exceptions.throwIfInstance;
35-
3634
public class DynMethods {
3735

3836
private static final Logger LOG = LoggerFactory.getLogger(DynMethods.class);
@@ -315,8 +313,7 @@ public Builder ctorImpl(Class<?> targetClass, Class<?>... argClasses) {
315313
.buildChecked();
316314
} catch (NoSuchMethodException e) {
317315
// not the right implementation
318-
LOG.debug("failed to load constructor arity {} from class {}",
319-
argClasses.length, targetClass, e);
316+
LOG.debug("failed to load constructor arity {} from class {}", argClasses.length, targetClass, e);
320317
}
321318
return this;
322319
}
@@ -333,9 +330,7 @@ public Builder ctorImpl(String className, Class<?>... argClasses) {
333330
.buildChecked();
334331
} catch (NoSuchMethodException e) {
335332
// not the right implementation
336-
LOG.debug("failed to load constructor arity {} from class {}",
337-
argClasses.length, className, e);
338-
333+
LOG.debug("failed to load constructor arity {} from class {}", argClasses.length, className, e);
339334
}
340335
return this;
341336
}

parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@
3131
import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
3232
import org.apache.parquet.hadoop.ParquetMetricsCallback;
3333

34-
import static org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED;
35-
3634
public class HadoopReadOptions extends ParquetReadOptions {
3735
private final Configuration conf;
3836

@@ -127,6 +125,7 @@ public ParquetReadOptions build() {
127125
usePageChecksumVerification,
128126
useBloomFilter,
129127
useOffHeapDecryptBuffer,
128+
useHadoopVectoredIo,
130129
recordFilter,
131130
metadataFilter,
132131
codecFactory,

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java

Lines changed: 56 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1145,11 +1145,9 @@ public ColumnChunkPageReadStore readFilteredRowGroup(int blockIndex, RowRanges r
11451145
* @throws IOException any IOE.
11461146
*/
11471147
private void readAllPartsVectoredOrNormal(List<ConsecutivePartList> allParts, ChunkListBuilder builder)
1148-
throws IOException {
1149-
boolean isVectoredIO = options.useHadoopVectoredIO()
1150-
&& f.readVectoredAvailable()
1151-
&& partsLengthValidForVectoredIO(allParts);
1152-
if (isVectoredIO) {
1148+
throws IOException {
1149+
1150+
if (shouldUseVectoredIO(allParts)) {
11531151
readVectored(allParts, builder);
11541152
} else {
11551153
for (ConsecutivePartList consecutiveChunks : allParts) {
@@ -1158,16 +1156,34 @@ private void readAllPartsVectoredOrNormal(List<ConsecutivePartList> allParts, Ch
11581156
}
11591157
}
11601158

1159+
/**
1160+
* Should the read use vectored IO?
1161+
* <p>
1162+
* This returns true if all necessary conditions are met:
1163+
* <ol>
1164+
* <li> The option is enabled</li>
1165+
* <li> The Hadoop version supports vectored IO</li>
1166+
* <li> Thfe part lengths are all valid for vectored IO</li>
1167+
* </ol>
1168+
* @param allParts all parts to read.
1169+
* @return true or false.
1170+
*/
1171+
private boolean shouldUseVectoredIO(final List<ConsecutivePartList> allParts) {
1172+
return options.useHadoopVectoredIO() && f.readVectoredAvailable() && arePartLengthsValidForVectoredIO(allParts);
1173+
}
1174+
11611175
/**
11621176
* Vectored IO doesn't support reading ranges of size greater than
11631177
* Integer.MAX_VALUE.
11641178
* @param allParts all parts to read.
11651179
* @return true or false.
11661180
*/
1167-
private boolean partsLengthValidForVectoredIO(List<ConsecutivePartList> allParts) {
1168-
for (ConsecutivePartList consecutivePart : allParts) {
1181+
private boolean arePartLengthsValidForVectoredIO(List<ConsecutivePartList> allParts) {
1182+
for (ConsecutivePartList consecutivePart : allParts) {
11691183
if (consecutivePart.length >= Integer.MAX_VALUE) {
1170-
LOG.debug("Part length {} greater than Integer.MAX_VALUE thus disabling vectored IO", consecutivePart.length);
1184+
LOG.debug(
1185+
"Part length {} greater than Integer.MAX_VALUE thus disabling vectored IO",
1186+
consecutivePart.length);
11711187
return false;
11721188
}
11731189
}
@@ -1176,26 +1192,36 @@ private boolean partsLengthValidForVectoredIO(List<ConsecutivePartList> allParts
11761192

11771193
/**
11781194
* Read all parts through vectored IO.
1195+
* <p>
1196+
* The API is available in recent hadoop builds for all implementations of PositionedReadable;
1197+
* the default implementation simply does a sequence of reads at different offsets.
1198+
* <p>
1199+
* If directly implemented by a Filesystem then it is likely to be a more efficient
1200+
* operation such as a scatter-gather read (native IO) or set of parallel
1201+
* GET requests against an object store.
11791202
* @param allParts all parts to be read.
11801203
* @param builder used to build chunk list to read the pages for the different columns.
11811204
* @throws IOException any IOE.
11821205
*/
1183-
private void readVectored(List<ConsecutivePartList> allParts,
1184-
ChunkListBuilder builder) throws IOException {
1206+
private void readVectored(List<ConsecutivePartList> allParts, ChunkListBuilder builder) throws IOException {
11851207

11861208
List<ParquetFileRange> ranges = new ArrayList<>(allParts.size());
1209+
long totalSize = 0;
11871210
for (ConsecutivePartList consecutiveChunks : allParts) {
1188-
Preconditions.checkArgument(consecutiveChunks.length < Integer.MAX_VALUE,
1189-
"Invalid length %s for vectored read operation. It must be less than max integer value.",
1190-
consecutiveChunks.length);
1191-
ranges.add(new ParquetFileRange(consecutiveChunks.offset, (int) consecutiveChunks.length));
1192-
}
1193-
LOG.debug("Doing vectored IO for ranges {}", ranges);
1211+
final long len = consecutiveChunks.length;
1212+
Preconditions.checkArgument(
1213+
len < Integer.MAX_VALUE,
1214+
"Invalid length %s for vectored read operation. It must be less than max integer value.",
1215+
len);
1216+
ranges.add(new ParquetFileRange(consecutiveChunks.offset, (int) len));
1217+
totalSize += len;
1218+
}
1219+
LOG.info("Reading {} bytes of data with vectored IO in {} ranges", totalSize, ranges.size());
11941220
ByteBufferAllocator allocator = options.getAllocator();
1195-
//blocking or asynchronous vectored read.
1221+
// Request a vectored read;
11961222
f.readVectored(ranges, allocator::allocate);
11971223
int k = 0;
1198-
for (ConsecutivePartList consecutivePart : allParts) {
1224+
for (ConsecutivePartList consecutivePart : allParts) {
11991225
ParquetFileRange currRange = ranges.get(k++);
12001226
consecutivePart.readFromVectoredRange(currRange, builder);
12011227
}
@@ -2093,22 +2119,26 @@ private void setReadMetrics(long startNs) {
20932119
}
20942120

20952121
/**
2096-
* Populate data in a parquet file range from vectored range.
2122+
* Populate data in a parquet file range from a vectored range; will block for up
2123+
* to {@link #HADOOP_VECTORED_READ_TIMEOUT_SECONDS} seconds.
20972124
* @param currRange range to populated.
20982125
* @param builder used to build chunk list to read the pages for the different columns.
2099-
* @throws IOException if there is an error while reading from the stream.
2126+
* @throws IOException if there is an error while reading from the stream, including a timeout.
21002127
*/
2101-
public void readFromVectoredRange(ParquetFileRange currRange,
2102-
ChunkListBuilder builder) throws IOException {
2128+
public void readFromVectoredRange(ParquetFileRange currRange, ChunkListBuilder builder) throws IOException {
21032129
ByteBuffer buffer;
2130+
final long timeoutSeconds = HADOOP_VECTORED_READ_TIMEOUT_SECONDS;
21042131
try {
2105-
LOG.debug("Waiting for vectored read to finish for range {} ", currRange);
2106-
buffer = BindingUtils.awaitFuture(currRange.getDataReadFuture(),
2107-
HADOOP_VECTORED_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS);
2132+
LOG.debug(
2133+
"Waiting for vectored read to finish for range {} with timeout {} seconds",
2134+
currRange,
2135+
timeoutSeconds);
2136+
buffer = BindingUtils.awaitFuture(currRange.getDataReadFuture(), timeoutSeconds, TimeUnit.SECONDS);
21082137
// report in a counter the data we just scanned
21092138
BenchmarkCounter.incrementBytesRead(currRange.getLength());
21102139
} catch (TimeoutException e) {
2111-
String error = String.format("Timeout while fetching result for %s", currRange);
2140+
String error = String.format(
2141+
"Timeout while fetching result for %s with time limit %d seconds", currRange, timeoutSeconds);
21122142
LOG.error(error, e);
21132143
throw new IOException(error, e);
21142144
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@
2424
import java.nio.ByteBuffer;
2525
import java.util.List;
2626
import java.util.function.IntFunction;
27-
import org.apache.parquet.hadoop.util.vectorio.VectorIOBridge;
2827
import org.apache.hadoop.fs.FSDataInputStream;
28+
import org.apache.parquet.hadoop.util.vectorio.VectorIOBridge;
2929
import org.apache.parquet.io.DelegatingSeekableInputStream;
30+
import org.apache.parquet.io.ParquetFileRange;
3031

3132
/**
3233
* SeekableInputStream implementation for FSDataInputStream that implements

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/BindingUtils.java

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,17 @@
2626
import java.util.concurrent.Future;
2727
import java.util.concurrent.TimeUnit;
2828
import java.util.concurrent.TimeoutException;
29-
29+
import org.apache.parquet.util.DynMethods;
3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
3232

33-
import org.apache.parquet.util.DynMethods;
34-
3533
/**
3634
* Binding utils.
3735
*/
3836
public final class BindingUtils {
3937
private static final Logger LOG = LoggerFactory.getLogger(BindingUtils.class);
4038

41-
42-
private BindingUtils() {
43-
}
39+
private BindingUtils() {}
4440

4541
/**
4642
* Given a future, evaluate it.
@@ -61,17 +57,13 @@ private BindingUtils() {
6157
* @throws RuntimeException any nested RTE thrown
6258
* @throws TimeoutException the future timed out.
6359
*/
64-
public static <T> T awaitFuture(final Future<T> future,
65-
final long timeout,
66-
final TimeUnit unit)
67-
throws InterruptedIOException, IOException, RuntimeException,
68-
TimeoutException {
60+
public static <T> T awaitFuture(final Future<T> future, final long timeout, final TimeUnit unit)
61+
throws InterruptedIOException, IOException, RuntimeException, TimeoutException {
6962
try {
7063
LOG.debug("Awaiting future");
7164
return future.get(timeout, unit);
7265
} catch (InterruptedException e) {
73-
throw (InterruptedIOException) new InterruptedIOException(e.toString())
74-
.initCause(e);
66+
throw (InterruptedIOException) new InterruptedIOException(e.toString()).initCause(e);
7567
} catch (ExecutionException e) {
7668
return raiseInnerCause(e);
7769
}
@@ -93,8 +85,7 @@ public static <T> T awaitFuture(final Future<T> future,
9385
* any non-Runtime-Exception
9486
* @throws RuntimeException if that is the inner cause.
9587
*/
96-
public static <T> T raiseInnerCause(final ExecutionException e)
97-
throws IOException {
88+
public static <T> T raiseInnerCause(final ExecutionException e) throws IOException {
9889
throw unwrapInnerException(e);
9990
}
10091

@@ -155,16 +146,13 @@ public static IOException unwrapInnerException(final Throwable e) {
155146
* @return the method or "unavailable"
156147
*/
157148
static <T> DynMethods.UnboundMethod loadInvocation(
158-
Class<?> source,
159-
Class<? extends T> returnType, String name,
160-
Class<?>... parameterTypes) {
149+
Class<?> source, Class<? extends T> returnType, String name, Class<?>... parameterTypes) {
161150

162151
if (source != null) {
163-
final DynMethods.UnboundMethod m = new DynMethods
164-
.Builder(name)
165-
.impl(source, name, parameterTypes)
166-
.orNoop()
167-
.build();
152+
final DynMethods.UnboundMethod m = new DynMethods.Builder(name)
153+
.impl(source, name, parameterTypes)
154+
.orNoop()
155+
.build();
168156
if (m.isNoop()) {
169157
// this is a sign of a mismatch between this class's expected
170158
// signatures and actual ones.
@@ -187,8 +175,7 @@ static <T> DynMethods.UnboundMethod loadInvocation(
187175
* @return a no-op method.
188176
*/
189177
static DynMethods.UnboundMethod noop(final String name) {
190-
return new DynMethods.Builder(name)
191-
.orNoop().build();
178+
return new DynMethods.Builder(name).orNoop().build();
192179
}
193180

194181
/**

0 commit comments

Comments
 (0)