Skip to content

Commit d25fb79

Browse files
committed
PARQUET-781. Support PathOutputCommitter
If "parquet.path.outputcommitter.enabled" is true then it uses the PathOutputCommitterFactory mechanism to dynamically choose a committer for the output path. Such committers do not generate summary files; a warning about this is printed when appropriate This significantly simplifies writing to s3/azure/gcs though committers which commit correctly and efficiently to the target stores. Change-Id: Ib443c64bbdf37f808213d46d02a4fcba2f8f1361
1 parent dd9014f commit d25fb79

File tree

2 files changed

+35
-3
lines changed

2 files changed

+35
-3
lines changed

parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ public class ParquetProperties {
6666

6767
public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;
6868

69+
public static final boolean DEFAULT_PAGE_PATH_OUTPUT_COMMITTER_ENABLED = false;
70+
6971
public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();
7072

7173
private static final int MIN_SLAB_SIZE = 64;

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import static org.apache.parquet.column.ParquetProperties.DEFAULT_ADAPTIVE_BLOOM_FILTER_ENABLED;
2222
import static org.apache.parquet.column.ParquetProperties.DEFAULT_BLOOM_FILTER_ENABLED;
23+
import static org.apache.parquet.column.ParquetProperties.DEFAULT_PAGE_PATH_OUTPUT_COMMITTER_ENABLED;
2324
import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
2425
import static org.apache.parquet.hadoop.util.ContextUtil.getConfiguration;
2526

@@ -98,7 +99,14 @@
9899
* </pre>
99100
* <p>
100101
* if none of those is set the data is uncompressed.
101-
*
102+
* <p>
103+
* This class also generates the committer required to manifest the work in the
104+
* destination directory if and when the job is committed.
105+
* This has historically always created an instance of {@link ParquetOutputCommitter}.
106+
* If {@link #PAGE_PATH_OUTPUT_COMMITTER_ENABLED} is true, the superclass is used
107+
* to create the committer, which on Hadoop 3.1 and later involves the
108+
* {@code PathOutputCommitterFactory} mechanism to dynamically choose a committer
109+
* for the target filesystem. Such committers do not generated summary files.
102110
* @param <T> the type of the materialized records
103111
*/
104112
public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
@@ -158,6 +166,13 @@ public static enum JobSummaryLevel {
158166
public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
159167
public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled";
160168

169+
/**
170+
* Use the output committer created by the superclass, rather than a {@link ParquetOutputCommitter}.
171+
* This delivers correctness and scalability on cloud storage, but will not write schema files.
172+
* Value: {@value}.
173+
*/
174+
public static final String PAGE_PATH_OUTPUT_COMMITTER_ENABLED = "parquet.path.outputcommitter.enabled";
175+
161176
public static JobSummaryLevel getJobSummaryLevel(Configuration conf) {
162177
String level = conf.get(JOB_SUMMARY_LEVEL);
163178
String deprecatedFlag = conf.get(ENABLE_JOB_SUMMARY);
@@ -390,7 +405,7 @@ public static boolean getPageWriteChecksumEnabled(Configuration conf) {
390405
}
391406

392407
private WriteSupport<T> writeSupport;
393-
private ParquetOutputCommitter committer;
408+
private OutputCommitter committer;
394409

395410
/**
396411
* constructor used when this OutputFormat in wrapped in another one (In Pig for example)
@@ -555,7 +570,22 @@ public WriteSupport<T> getWriteSupport(Configuration configuration) {
555570
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
556571
if (committer == null) {
557572
Path output = getOutputPath(context);
558-
committer = new ParquetOutputCommitter(output, context);
573+
final Configuration conf = context.getConfiguration();
574+
if (conf.getBoolean(PAGE_PATH_OUTPUT_COMMITTER_ENABLED, DEFAULT_PAGE_PATH_OUTPUT_COMMITTER_ENABLED)) {
575+
// hand off creation of a committer to superclass.
576+
// On hadoop 3.1+ this will use a factory mechanism to dynamically
577+
// bind to a filesystem specific committer, an explict override
578+
// or fall back to the classic FileOutputCommitter
579+
committer = super.getOutputCommitter(context);
580+
LOG.debug("Writing to {} with output committer {}", committer, output);
581+
582+
if (ParquetOutputFormat.getJobSummaryLevel(conf) != JobSummaryLevel.NONE) {
583+
// warn if summary file generation has been requested, as they won't be created.
584+
LOG.warn("Committer {} does not support summary files", committer);
585+
}
586+
} else {
587+
committer = new ParquetOutputCommitter(output, context);
588+
}
559589
}
560590
return committer;
561591
}

0 commit comments

Comments
 (0)