-
Notifications
You must be signed in to change notification settings - Fork 242
feat: Parquet Modular Encryption with Spark KMS for native readers #2447
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…ark side accessed via JNI.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2447 +/- ##
============================================
+ Coverage 56.12% 58.30% +2.17%
- Complexity 976 1436 +460
============================================
Files 119 147 +28
Lines 11743 13564 +1821
Branches 2251 2357 +106
============================================
+ Hits 6591 7908 +1317
- Misses 4012 4426 +414
- Partials 1140 1230 +90 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
Outdated
Show resolved
Hide resolved
…yption factory registration in parquet_exec.rs.
Also look at https://github.com/apache/parquet-java/blob/master/parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java to see if there are any tests that might be relevant here. |
# Conflicts: # spark/src/main/scala/org/apache/comet/CometExecIterator.scala
// Each hadoopConf yields a unique DecryptionPropertiesFactory. While it's unlikely that | ||
// this Comet plan contains more than one hadoopConf, we don't want to assume that. So we'll | ||
// provide the ability to cache more than one Factory with a map. | ||
private final ConcurrentHashMap<Configuration, DecryptionPropertiesFactory> factoryCache = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is only one hadoop conf in a spark session so this may be overkill.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Session hadoopConf is not what the scans use though. They add all the relation options (Parquet options like encryption keys) to the hadoopConf, so each scan can have a unique hadoopConf. Whether we could have a Comet plan with multiple Parquet scans is the real question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether we could have a Comet plan with multiple Parquet scans is the real question.
I don't know what you mean by this. What exactly are you calling a Parquet scan?
public class CometFileKeyUnwrapper { | ||
|
||
// Each file path gets a unique DecryptionKeyRetriever | ||
private final ConcurrentHashMap<String, DecryptionKeyRetriever> retrieverCache = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every file path? This can get rather large when the number of files starts to reach 100K or more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark must also be keeping the same number in memory for its scans. Also, it's whatever subset of files this plan is responsible for.
public void storeDecryptionKeyRetriever(final String filePath, final Configuration hadoopConf) { | ||
// Use DecryptionPropertiesFactory.loadFactory to get the factory and then call | ||
// getFileDecryptionProperties | ||
DecryptionPropertiesFactory factory = factoryCache.get(hadoopConf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this hadoop conf the entire hadoop configuration (which can have a thousand entries) or just the incremental properties specified for the session? Hashing this can become time comsuming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Parquet library decides what the fields it needs are. If we want to limit the versions we want to support I could start hard-coding the values, but that's creating future work for ourselves any time a new config is added.
common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
Outdated
Show resolved
Hide resolved
common/src/main/scala/org/apache/comet/parquet/CometParquetUtils.scala
Outdated
Show resolved
Hide resolved
if (encryptionEnabled) { | ||
// hadoopConf isn't serializable, so we have to do a broadcasted config. | ||
val broadcastedConf = | ||
scan.relation.sparkSession.sparkContext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain a little what you are doing here? What is the additional hadoop conf information that needs to be broadcast per file path (as opposed to encryption properties that are defined once per table)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's only broadcasting one hadoopConf per relation (table). It is broadcasting a mapping from file to hadoopConf for each file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this need to be broadcast? Won't each executor instance will have its own copy of the scan.relation.options
already.
|
||
// Call instance method FileKeyUnwrapper.getKey(String, byte[]) -> byte[] | ||
let result = unsafe { | ||
env.call_method_unchecked( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can throw an exception can it not? Elsewhere we use try_unwrap_or_throw
(errors.rs) to report the error.
// Each hadoopConf yields a unique DecryptionPropertiesFactory. While it's unlikely that | ||
// this Comet plan contains more than one hadoopConf, we don't want to assume that. So we'll | ||
// provide the ability to cache more than one Factory with a map. | ||
private final ConcurrentHashMap<Configuration, DecryptionPropertiesFactory> factoryCache = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether we could have a Comet plan with multiple Parquet scans is the real question.
I don't know what you mean by this. What exactly are you calling a Parquet scan?
DecryptionPropertiesFactory factory = factoryCache.get(hadoopConf); | ||
if (factory == null) { | ||
factory = DecryptionPropertiesFactory.loadFactory(hadoopConf); | ||
factoryCache.put(hadoopConf, factory); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you use ConcurrentMap
you probably want to use its #computeIfAbsent()
method instead of #get() + check for null + #put()
impl CometKeyRetriever { | ||
pub fn new(file_path: &str, key_unwrapper: GlobalRef) -> Result<Self, ExecutionError> { | ||
// Get JNI environment | ||
let mut env = JVMClasses::get_env()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Below you use
let mut env = JVMClasses::get_env()
.map_err(|e| datafusion::parquet::errors::ParquetError::General(e.to_string()))?;
No need of error mapping here ?!
) | ||
}; | ||
|
||
let result = result.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use result?
instead ? To return an Err instead of panic-ing.
Or use pattern match and return a custom error.
Few more unwraps below.
Which issue does this PR close?
Closes #.
Rationale for this change
We want to add Parquet Module Encryption support for the native readers when using a Spark KMS. We use the encryption factory features added in DataFusion 50 to register an encryption factory that uses JNI to get decryption keys from Spark.
What changes are included in this PR?
How are these changes tested?