Skip to content

Conversation

mbutrovich
Copy link
Contributor

@mbutrovich mbutrovich commented Sep 23, 2025

Which issue does this PR close?

Closes #.

Rationale for this change

We want to add Parquet Module Encryption support for the native readers when using a Spark KMS. We use the encryption factory features added in DataFusion 50 to register an encryption factory that uses JNI to get decryption keys from Spark.

What changes are included in this PR?

How are these changes tested?

  • Existing PME tests with new readers added.
  • New tests that exercise PME options like plaintext footer, etc.

@mbutrovich mbutrovich changed the title feat: Parquet Modular Encryption support for native_datafusion and native_iceberg_compat readers feat: Parquet Modular Encryption with Spark KMS for native_datafusion and native_iceberg_compat readers Sep 23, 2025
@mbutrovich mbutrovich changed the title feat: Parquet Modular Encryption with Spark KMS for native_datafusion and native_iceberg_compat readers feat: Parquet Modular Encryption with Spark KMS for native readers Sep 23, 2025
@codecov-commenter
Copy link

codecov-commenter commented Sep 23, 2025

Codecov Report

❌ Patch coverage is 31.57895% with 52 lines in your changes missing coverage. Please review.
✅ Project coverage is 58.30%. Comparing base (f09f8af) to head (7d1bf39).
⚠️ Report is 561 commits behind head on main.

Files with missing lines Patch % Lines
...rg/apache/comet/parquet/CometFileKeyUnwrapper.java 0.00% 17 Missing ⚠️
...a/org/apache/comet/parquet/CometParquetUtils.scala 0.00% 15 Missing ⚠️
...ain/scala/org/apache/comet/CometExecIterator.scala 36.36% 6 Missing and 1 partial ⚠️
...va/org/apache/comet/parquet/NativeBatchReader.java 0.00% 5 Missing ⚠️
...n/scala/org/apache/comet/rules/CometScanRule.scala 42.85% 3 Missing and 1 partial ⚠️
...n/scala/org/apache/spark/sql/comet/operators.scala 77.77% 3 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #2447      +/-   ##
============================================
+ Coverage     56.12%   58.30%   +2.17%     
- Complexity      976     1436     +460     
============================================
  Files           119      147      +28     
  Lines         11743    13564    +1821     
  Branches       2251     2357     +106     
============================================
+ Hits           6591     7908    +1317     
- Misses         4012     4426     +414     
- Partials       1140     1230      +90     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@parthchandra
Copy link
Contributor

@mbutrovich mbutrovich marked this pull request as ready for review September 26, 2025 20:31
# Conflicts:
#	spark/src/main/scala/org/apache/comet/CometExecIterator.scala
// Each hadoopConf yields a unique DecryptionPropertiesFactory. While it's unlikely that
// this Comet plan contains more than one hadoopConf, we don't want to assume that. So we'll
// provide the ability to cache more than one Factory with a map.
private final ConcurrentHashMap<Configuration, DecryptionPropertiesFactory> factoryCache =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is only one hadoop conf in a spark session so this may be overkill.

Copy link
Contributor Author

@mbutrovich mbutrovich Sep 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Session hadoopConf is not what the scans use though. They add all the relation options (Parquet options like encryption keys) to the hadoopConf, so each scan can have a unique hadoopConf. Whether we could have a Comet plan with multiple Parquet scans is the real question.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether we could have a Comet plan with multiple Parquet scans is the real question.

I don't know what you mean by this. What exactly are you calling a Parquet scan?

public class CometFileKeyUnwrapper {

// Each file path gets a unique DecryptionKeyRetriever
private final ConcurrentHashMap<String, DecryptionKeyRetriever> retrieverCache =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every file path? This can get rather large when the number of files starts to reach 100K or more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark must also be keeping the same number in memory for its scans. Also, it's whatever subset of files this plan is responsible for.

public void storeDecryptionKeyRetriever(final String filePath, final Configuration hadoopConf) {
// Use DecryptionPropertiesFactory.loadFactory to get the factory and then call
// getFileDecryptionProperties
DecryptionPropertiesFactory factory = factoryCache.get(hadoopConf);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this hadoop conf the entire hadoop configuration (which can have a thousand entries) or just the incremental properties specified for the session? Hashing this can become time comsuming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Parquet library decides what the fields it needs are. If we want to limit the versions we want to support I could start hard-coding the values, but that's creating future work for ourselves any time a new config is added.

if (encryptionEnabled) {
// hadoopConf isn't serializable, so we have to do a broadcasted config.
val broadcastedConf =
scan.relation.sparkSession.sparkContext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain a little what you are doing here? What is the additional hadoop conf information that needs to be broadcast per file path (as opposed to encryption properties that are defined once per table)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only broadcasting one hadoopConf per relation (table). It is broadcasting a mapping from file to hadoopConf for each file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be broadcast? Won't each executor instance will have its own copy of the scan.relation.options already.

@mbutrovich
Copy link
Contributor Author

Results attached from the benchmark I added to CometReadBenchmark, and a small chart with highlights to see what the overhead of encryption is for the various readers.

decryption

benchmark_decryption.txt


// Call instance method FileKeyUnwrapper.getKey(String, byte[]) -> byte[]
let result = unsafe {
env.call_method_unchecked(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can throw an exception can it not? Elsewhere we use try_unwrap_or_throw (errors.rs) to report the error.

// Each hadoopConf yields a unique DecryptionPropertiesFactory. While it's unlikely that
// this Comet plan contains more than one hadoopConf, we don't want to assume that. So we'll
// provide the ability to cache more than one Factory with a map.
private final ConcurrentHashMap<Configuration, DecryptionPropertiesFactory> factoryCache =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether we could have a Comet plan with multiple Parquet scans is the real question.

I don't know what you mean by this. What exactly are you calling a Parquet scan?

DecryptionPropertiesFactory factory = factoryCache.get(hadoopConf);
if (factory == null) {
factory = DecryptionPropertiesFactory.loadFactory(hadoopConf);
factoryCache.put(hadoopConf, factory);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you use ConcurrentMap you probably want to use its #computeIfAbsent() method instead of #get() + check for null + #put()

impl CometKeyRetriever {
pub fn new(file_path: &str, key_unwrapper: GlobalRef) -> Result<Self, ExecutionError> {
// Get JNI environment
let mut env = JVMClasses::get_env()?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below you use

let mut env = JVMClasses::get_env()
            .map_err(|e| datafusion::parquet::errors::ParquetError::General(e.to_string()))?;

No need of error mapping here ?!

)
};

let result = result.unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use result? instead ? To return an Err instead of panic-ing.
Or use pattern match and return a custom error.

Few more unwraps below.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants