Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-2416: Use 'mapreduce.outputcommitter.factory.class' in ParquetOutpuFormat #1244

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

Arnaud-Nauwynck
Copy link

see https://issues.apache.org/jira/browse/PARQUET-2416

this PR fix a bug in ParquetOutputFormat, which hard-coded a "new ParquetOutputCommitter(output, context)" with "class ParquetOutputCommitter extends FileOutputCommitter".

The objective is to fix this bug, using the "delegate instead of extends" design pattern. Technically, it would be possible to use "class ParquetOutputCommitter extend PathOutputCommitter" and delegate to underlying "PathOutputCommitter", but it would changes the binary compatibility of the class. Therefore, the class still "extends FileOutputCommitter", and override all public method to delegate.

Indeed, it is necessary for Spark code, because it is hard-coded "if (classOf[FileOutputCommitter].isAssignableFrom(clazz))"

(https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala#L54)

Changing in parquet-mr without changing in spark would give error at runtime:

java.lang.NoSuchMethodException: org.apache.parquet.hadoop.ParquetOutputCommitter.<init>()
        at java.lang.Class.getConstructor0(Unknown Source) ~[?:?]
        at java.lang.Class.getDeclaredConstructor(Unknown Source) ~[?:?]
        at org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol.setupCommitter(SQLHadoopMapReduceCommitProtocol.scala:63) ~[spark-sql_2.12-3.3.3.jar:3.3.3]
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:187) ~[spark-core_2.12-3.3.3.jar:3.3.3]

To cleanup this temporary code in parquet-mr, it is necessary to wait for another PR in Spark code to avoid the "classOf[FileOutputCommitter ].isAssignableFrom"

@wgtmac
Copy link
Member

wgtmac commented Dec 28, 2023

Could you please make CI happy?

public class ParquetOutputCommitter extends FileOutputCommitter {
private static final Logger LOG = LoggerFactory.getLogger(ParquetOutputCommitter.class);

private final Path outputPath;

private final FileOutputCommitter delegate;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

problem here is it's still an FOC.

@steveloughran
Copy link
Contributor

spark already ships with a a workaround for all of this, primarily so we could avoid having to fix parquet up too.

getting something choreographed across spark and parquet would be lovely.

ideally
-spark should just look for a path output committer, and use its factory
-parquet shouldn't insist on its committers always being a subclass of ParquetOutputCommitter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants