Skip to content

Commit

Permalink
PARQUET-2335: Allow the scan subcommand to take multiple files (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
sekikn authored Aug 10, 2023
1 parent d8aaf93 commit ef9929c
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Parameters(commandDescription = "Scan all records from a file")
public class ScanCommand extends BaseCommand {

@Parameter(description = "<file>")
String sourceFile;
List<String> sourceFiles;

@Parameter(
names = {"-c", "--column", "--columns"},
Expand All @@ -50,32 +52,44 @@ public ScanCommand(Logger console) {
@Override
public int run() throws IOException {
Preconditions.checkArgument(
sourceFile != null && !sourceFile.isEmpty(),
sourceFiles != null && !sourceFiles.isEmpty(),
"Missing file name");

Schema schema = getAvroSchema(sourceFile);
Schema projection = Expressions.filterSchema(schema, columns);
// Ensure all source files have the columns specified first
Map<String, Schema> schemas = new HashMap<>();
for (String sourceFile : sourceFiles) {
Schema schema = getAvroSchema(sourceFile);
schemas.put(sourceFile, Expressions.filterSchema(schema, columns));
}

long startTime = System.currentTimeMillis();
Iterable<Object> reader = openDataFile(sourceFile, projection);
boolean threw = true;
long count = 0;
try {
for (Object record : reader) {
count += 1;
long totalStartTime = System.currentTimeMillis();
long totalCount = 0;
for (String sourceFile : sourceFiles) {
long startTime = System.currentTimeMillis();
Iterable<Object> reader = openDataFile(sourceFile, schemas.get(sourceFile));
boolean threw = true;
long count = 0;
try {
for (Object record : reader) {
count += 1;
}
threw = false;
} catch (RuntimeException e) {
throw new RuntimeException("Failed on record " + count + " in " + sourceFile, e);
} finally {
if (reader instanceof Closeable) {
Closeables.close((Closeable) reader, threw);
}
}
threw = false;
} catch (RuntimeException e) {
throw new RuntimeException("Failed on record " + count, e);
} finally {
if (reader instanceof Closeable) {
Closeables.close((Closeable) reader, threw);
totalCount += count;
if (1 < sourceFiles.size()) {
long endTime = System.currentTimeMillis();
console.info("Scanned " + count + " records from " + sourceFile + " in " + (endTime - startTime) / 1000.0 + " s");
}
}
long endTime = System.currentTimeMillis();

console.info("Scanned " + count + " records from " + sourceFile);
console.info("Time: " + (endTime - startTime) / 1000.0 + " s");
long totalEndTime = System.currentTimeMillis();
console.info("Scanned " + totalCount + " records from " + sourceFiles.size() + " file(s)");
console.info("Time: " + (totalEndTime - totalStartTime) / 1000.0 + " s");
return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,27 @@ public class ScanCommandTest extends ParquetFileTest {
public void testScanCommand() throws IOException {
File file = parquetFile();
ScanCommand command = new ScanCommand(createLogger());
command.sourceFile = file.getAbsolutePath();
command.sourceFiles = Arrays.asList(file.getAbsolutePath());
command.setConf(new Configuration());
Assert.assertEquals(0, command.run());
}

@Test
public void testScanCommandWithMultipleSourceFiles() throws IOException {
File file = parquetFile();
ScanCommand command = new ScanCommand(createLogger());
command.sourceFiles = Arrays.asList(file.getAbsolutePath(), file.getAbsolutePath());
command.setConf(new Configuration());
Assert.assertEquals(0, command.run());
}

@Test(expected = IllegalArgumentException.class)
public void testScanCommandWithInvalidColumnName() throws IOException {
File file = parquetFile();
ScanCommand command = new ScanCommand(createLogger());
command.sourceFiles = Arrays.asList(file.getAbsolutePath());
command.columns = Arrays.asList("invalid_field");
command.setConf(new Configuration());
command.run();
}
}

0 comments on commit ef9929c

Please sign in to comment.