Skip to content

Commit ea3eecf

Browse files
committed
PARQUET-2171. tests all working
Testing found a couple of bugs with the initial Vector IO PR; fixed ParquetFileReader + parameterized the tests which had failed, so as to ensure coverage + TestVectorIOBridge uses checksumfs, which does implement vector IO (which implies all file:// links get the speedup) + improved assertions in the failing tests, which were all reporting mismatches in data read expected via actual. As well as these changes, I've also locally run the entire suite with a core-site.xml set to turn on vector IO; this is how the regressions were identified.
1 parent 78a3307 commit ea3eecf

File tree

5 files changed

+60
-18
lines changed

5 files changed

+60
-18
lines changed

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -993,9 +993,6 @@ private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOE
993993
}
994994
// actually read all the chunks
995995
ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount());
996-
for (ConsecutivePartList consecutiveChunks : allParts) {
997-
consecutiveChunks.readAll(f, builder);
998-
}
999996
readAllPartsVectoredOrNormal(allParts, builder);
1000997
for (Chunk chunk : builder.build()) {
1001998
readChunkPages(chunk, block, rowGroup);
@@ -1953,6 +1950,8 @@ public void readFromVectoredRange(ParquetFileRange currRange,
19531950
LOG.debug("Waiting for vectored read to finish for range {} ", currRange);
19541951
buffer = BindingUtils.awaitFuture(currRange.getDataReadFuture(),
19551952
HADOOP_VECTORED_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS);
1953+
// report in a counter the data we just scanned
1954+
BenchmarkCounter.incrementBytesRead(currRange.getLength());
19561955
} catch (TimeoutException e) {
19571956
String error = String.format("Timeout while fetching result for %s", currRange);
19581957
LOG.error(error, e);

parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,21 @@
4242
import org.junit.Rule;
4343
import org.junit.Test;
4444
import org.junit.rules.TemporaryFolder;
45+
import org.junit.runner.RunWith;
46+
import org.junit.runners.Parameterized;
47+
4548
import java.io.File;
4649
import java.io.FileOutputStream;
4750
import java.io.IOException;
51+
import java.util.Arrays;
52+
import java.util.List;
4853
import java.util.UUID;
4954

5055
import static java.lang.Thread.sleep;
5156
import static org.apache.parquet.schema.OriginalType.UTF8;
5257
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
5358

59+
@RunWith(Parameterized.class)
5460
public class TestInputFormatColumnProjection {
5561
public static final String FILE_CONTENT = "" +
5662
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ," +
@@ -96,7 +102,19 @@ protected void map(Void key, Group value, Context context)
96102

97103
@Rule
98104
public TemporaryFolder temp = new TemporaryFolder();
105+
@Parameterized.Parameters(name = "vectored : {0}")
106+
public static List<Boolean> params() {
107+
return Arrays.asList(true, false);
108+
}
99109

110+
/**
111+
* Read type: true for vectored IO.
112+
*/
113+
private final boolean readType;
114+
115+
public TestInputFormatColumnProjection(boolean readType) {
116+
this.readType = readType;
117+
}
100118
@Test
101119
public void testProjectionSize() throws Exception {
102120
Assume.assumeTrue( // only run this test for Hadoop 2
@@ -115,6 +133,8 @@ public void testProjectionSize() throws Exception {
115133
outputFolder.delete();
116134

117135
Configuration conf = new Configuration();
136+
// set the vector IO option
137+
conf.setBoolean(ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED, readType);
118138
// set the projection schema
119139
conf.set("parquet.read.schema", Types.buildMessage()
120140
.required(BINARY).as(UTF8).named("char")
@@ -164,7 +184,8 @@ public void testProjectionSize() throws Exception {
164184
bytesRead = Reader.bytesReadCounter.getValue();
165185
}
166186

167-
Assert.assertTrue("Should read less than 10% of the input file size",
187+
Assert.assertTrue("Should read (" + bytesRead + " bytes)"
188+
+ " less than 10% of the input file size (" + bytesWritten + ")",
168189
bytesRead < (bytesWritten / 10));
169190
}
170191

parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -125,25 +125,27 @@ public class TestParquetFileWriter {
125125

126126
private String writeSchema;
127127

128-
private final String readType;
129-
130128
@Rule
131129
public final TemporaryFolder temp = new TemporaryFolder();
132130

133-
@Parameterized.Parameters(name = "Read type : {0}")
134-
public static List<String> params() {
135-
return Arrays.asList("vectored", "normal");
131+
@Parameterized.Parameters(name = "vectored : {0}")
132+
public static List<Boolean> params() {
133+
return Arrays.asList(true, false);
136134
}
137135

136+
/**
137+
* Read type: true for vectored IO.
138+
*/
139+
private final boolean readType;
138140

139-
public TestParquetFileWriter(String readType) {
141+
public TestParquetFileWriter(boolean readType) {
140142
this.readType = readType;
141143
}
142144

143145
private Configuration getTestConfiguration() {
144146
Configuration conf = new Configuration();
145-
conf.set(ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED,
146-
String.valueOf(readType.equals("vectored")));
147+
// set the vector IO option
148+
conf.setBoolean(ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED, readType);
147149
return conf;
148150
}
149151

parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.lang.reflect.Method;
3131
import java.nio.charset.StandardCharsets;
3232
import java.nio.file.Files;
33+
import java.util.Arrays;
3334
import java.util.Collections;
3435
import java.util.HashMap;
3536
import java.util.Iterator;
@@ -63,9 +64,13 @@
6364
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
6465
import org.apache.parquet.hadoop.util.ContextUtil;
6566
import org.apache.parquet.schema.MessageTypeParser;
67+
68+
import org.junit.runner.RunWith;
69+
import org.junit.runners.Parameterized;
6670
import org.slf4j.Logger;
6771
import org.slf4j.LoggerFactory;
6872

73+
@RunWith(Parameterized.class)
6974
public class TestInputOutputFormat {
7075
private static final Logger LOG = LoggerFactory.getLogger(TestInputOutputFormat.class);
7176

@@ -82,9 +87,24 @@ public class TestInputOutputFormat {
8287
private Class<? extends Mapper<?,?,?,?>> readMapperClass;
8388
private Class<? extends Mapper<?,?,?,?>> writeMapperClass;
8489

90+
@Parameterized.Parameters(name = "vectored : {0}")
91+
public static List<Boolean> params() {
92+
return Arrays.asList(true, false);
93+
}
94+
95+
/**
96+
* Read type: true for vectored IO.
97+
*/
98+
private final boolean readType;
99+
100+
public TestInputOutputFormat(boolean readType) {
101+
this.readType = readType;
102+
}
85103
@Before
86104
public void setUp() {
87105
conf = new Configuration();
106+
// set the vector IO option
107+
conf.setBoolean(ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED, readType);
88108
writeSchema = "message example {\n" +
89109
"required int32 line;\n" +
90110
"required binary content;\n" +
@@ -335,8 +355,9 @@ public void testReadWriteWithCounter() throws Exception {
335355

336356
assertTrue(value(readJob, "parquet", "bytesread") > 0L);
337357
assertTrue(value(readJob, "parquet", "bytestotal") > 0L);
338-
assertTrue(value(readJob, "parquet", "bytesread")
339-
== value(readJob, "parquet", "bytestotal"));
358+
assertEquals("bytestotal != bytesread",
359+
value(readJob, "parquet", "bytestotal"),
360+
value(readJob, "parquet", "bytesread"));
340361
//not testing the time read counter since it could be zero due to the size of data is too small
341362
}
342363

parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/vectorio/TestVectorIOBridge.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.apache.hadoop.fs.FileSystem;
3939
import org.apache.hadoop.fs.LocalFileSystem;
4040
import org.apache.hadoop.fs.Path;
41-
import org.apache.hadoop.fs.RawLocalFileSystem;
4241
import org.apache.hadoop.io.ByteBufferPool;
4342
import org.apache.hadoop.io.ElasticByteBufferPool;
4443
import org.apache.parquet.bytes.HeapByteBufferAllocator;
@@ -83,7 +82,7 @@ public class TestVectorIOBridge {
8382
return pool.getBuffer(false, value);
8483
};
8584

86-
private RawLocalFileSystem fileSystem;
85+
private FileSystem fileSystem;
8786
private Path testFilePath;
8887

8988
public TestVectorIOBridge() {
@@ -95,7 +94,7 @@ public void setUp() throws IOException {
9594
// skip the tests if the FileRangeBridge goes not load.
9695
assumeTrue("Bridge not available", FileRangeBridge.bridgeAvailable());
9796

98-
fileSystem = (RawLocalFileSystem) FileSystem.getLocal(new Configuration()).getRaw();
97+
fileSystem = FileSystem.getLocal(new Configuration());
9998
testFilePath = fileSystem.makeQualified(vectoredPath);
10099
createFile(fileSystem, testFilePath, DATASET);
101100
}
@@ -107,7 +106,7 @@ public void tearDown() throws IOException {
107106
}
108107
}
109108

110-
public RawLocalFileSystem getFileSystem() {
109+
public FileSystem getFileSystem() {
111110
return fileSystem;
112111
}
113112

0 commit comments

Comments
 (0)