Skip to content

Commit

Permalink
PARQUET-2334: Allow the cat subcommand to take multiple files (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
sekikn authored Aug 10, 2023
1 parent d25e000 commit d8aaf93
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.slf4j.Logger;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.parquet.cli.util.Expressions.select;

Expand Down Expand Up @@ -58,35 +60,38 @@ public int run() throws IOException {
Preconditions.checkArgument(
sourceFiles != null && !sourceFiles.isEmpty(),
"Missing file name");
Preconditions.checkArgument(sourceFiles.size() == 1,
"Only one file can be given");

final String source = sourceFiles.get(0);

Schema schema = getAvroSchema(source);
Schema projection = Expressions.filterSchema(schema, columns);
// Ensure all source files have the columns specified first
Map<String, Schema> schemas = new HashMap<>();
for (String source : sourceFiles) {
Schema schema = getAvroSchema(source);
schemas.put(source, Expressions.filterSchema(schema, columns));
}

Iterable<Object> reader = openDataFile(source, projection);
boolean threw = true;
long count = 0;
try {
for (Object record : reader) {
if (numRecords > 0 && count >= numRecords) {
break;
for (String source : sourceFiles) {
Schema projection = schemas.get(source);
Iterable<Object> reader = openDataFile(source, projection);
boolean threw = true;
long count = 0;
try {
for (Object record : reader) {
if (numRecords > 0 && count >= numRecords) {
break;
}
if (columns == null || columns.size() != 1) {
console.info(String.valueOf(record));
} else {
console.info(String.valueOf(select(projection, record, columns.get(0))));
}
count += 1;
}
if (columns == null || columns.size() != 1) {
console.info(String.valueOf(record));
} else {
console.info(String.valueOf(select(projection, record, columns.get(0))));
threw = false;
} catch (RuntimeException e) {
throw new RuntimeException("Failed on record " + count + " in file " + source, e);
} finally {
if (reader instanceof Closeable) {
Closeables.close((Closeable) reader, threw);
}
count += 1;
}
threw = false;
} catch (RuntimeException e) {
throw new RuntimeException("Failed on record " + count, e);
} finally {
if (reader instanceof Closeable) {
Closeables.close((Closeable) reader, threw);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,33 @@ public void testCatCommand() throws IOException {
command.setConf(new Configuration());
Assert.assertEquals(0, command.run());
}

@Test
public void testCatCommandWithMultipleInput() throws IOException {
File file = parquetFile();
CatCommand command = new CatCommand(createLogger(), 0);
command.sourceFiles = Arrays.asList(file.getAbsolutePath(), file.getAbsolutePath());
command.setConf(new Configuration());
Assert.assertEquals(0, command.run());
}

@Test
public void testCatCommandWithSpecificColumns() throws IOException {
File file = parquetFile();
CatCommand command = new CatCommand(createLogger(), 0);
command.sourceFiles = Arrays.asList(file.getAbsolutePath());
command.columns = Arrays.asList(INT32_FIELD, INT64_FIELD);
command.setConf(new Configuration());
Assert.assertEquals(0, command.run());
}

@Test(expected = IllegalArgumentException.class)
public void testCatCommandWithInvalidColumn() throws IOException {
File file = parquetFile();
CatCommand command = new CatCommand(createLogger(), 0);
command.sourceFiles = Arrays.asList(file.getAbsolutePath());
command.columns = Arrays.asList("invalid_field");
command.setConf(new Configuration());
command.run();
}
}

0 comments on commit d8aaf93

Please sign in to comment.