Skip to content

Commit c23dc25

Browse files
authored
feat: Parquet Modular Encryption with Spark KMS for native readers (apache#2447)
1 parent 8381108 commit c23dc25

File tree

23 files changed

+1165
-121
lines changed

23 files changed

+1165
-121
lines changed
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.parquet;
21+
22+
import java.util.concurrent.ConcurrentHashMap;
23+
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.fs.Path;
26+
import org.apache.parquet.crypto.DecryptionKeyRetriever;
27+
import org.apache.parquet.crypto.DecryptionPropertiesFactory;
28+
import org.apache.parquet.crypto.FileDecryptionProperties;
29+
import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
30+
31+
// spotless:off
32+
/*
33+
* Architecture Overview:
34+
*
35+
* JVM Side | Native Side
36+
* ┌─────────────────────────────────────┐ | ┌─────────────────────────────────────┐
37+
* │ CometFileKeyUnwrapper │ | │ Parquet File Reading │
38+
* │ │ | │ │
39+
* │ ┌─────────────────────────────┐ │ | │ ┌─────────────────────────────┐ │
40+
* │ │ hadoopConf │ │ | │ │ file1.parquet │ │
41+
* │ │ (Configuration) │ │ | │ │ file2.parquet │ │
42+
* │ └─────────────────────────────┘ │ | │ │ file3.parquet │ │
43+
* │ │ │ | │ └─────────────────────────────┘ │
44+
* │ ▼ │ | │ │ │
45+
* │ ┌─────────────────────────────┐ │ | │ │ │
46+
* │ │ factoryCache │ │ | │ ▼ │
47+
* │ │ (many-to-one mapping) │ │ | │ ┌─────────────────────────────┐ │
48+
* │ │ │ │ | │ │ Parse file metadata & │ │
49+
* │ │ file1 ──┐ │ │ | │ │ extract keyMetadata │ │
50+
* │ │ file2 ──┼─► DecryptionProps │ │ | │ └─────────────────────────────┘ │
51+
* │ │ file3 ──┘ Factory │ │ | │ │ │
52+
* │ └─────────────────────────────┘ │ | │ │ │
53+
* │ │ │ | │ ▼ │
54+
* │ ▼ │ | │ ╔═════════════════════════════╗ │
55+
* │ ┌─────────────────────────────┐ │ | │ ║ JNI CALL: ║ │
56+
* │ │ retrieverCache │ │ | │ ║ getKey(filePath, ║ │
57+
* │ │ filePath -> KeyRetriever │◄───┼───┼───┼──║ keyMetadata) ║ │
58+
* │ └─────────────────────────────┘ │ | │ ╚═════════════════════════════╝ │
59+
* │ │ │ | │ │
60+
* │ ▼ │ | │ │
61+
* │ ┌─────────────────────────────┐ │ | │ │
62+
* │ │ DecryptionKeyRetriever │ │ | │ │
63+
* │ │ .getKey(keyMetadata) │ │ | │ │
64+
* │ └─────────────────────────────┘ │ | │ │
65+
* │ │ │ | │ │
66+
* │ ▼ │ | │ │
67+
* │ ┌─────────────────────────────┐ │ | │ ┌─────────────────────────────┐ │
68+
* │ │ return key bytes │────┼───┼───┼─►│ Use key for decryption │ │
69+
* │ └─────────────────────────────┘ │ | │ │ of parquet data │ │
70+
* └─────────────────────────────────────┘ | │ └─────────────────────────────┘ │
71+
* | └─────────────────────────────────────┘
72+
* |
73+
* JNI Boundary
74+
*
75+
* Setup Phase (storeDecryptionKeyRetriever):
76+
* 1. hadoopConf → DecryptionPropertiesFactory (cached in factoryCache)
77+
* 2. Factory + filePath → DecryptionKeyRetriever (cached in retrieverCache)
78+
*
79+
* Runtime Phase (getKey):
80+
* 3. Native code calls getKey(filePath, keyMetadata) ──► JVM
81+
* 4. Retrieve cached DecryptionKeyRetriever for filePath
82+
* 5. KeyRetriever.getKey(keyMetadata) → decrypted key bytes
83+
* 6. Return key bytes ──► Native code for parquet decryption
84+
*/
85+
// spotless:on
86+
87+
/**
88+
* Helper class to access DecryptionKeyRetriever.getKey from native code via JNI. This class handles
89+
* the complexity of creating and caching properly configured DecryptionKeyRetriever instances using
90+
* DecryptionPropertiesFactory. The life of this object is meant to map to a single Comet plan, so
91+
* associated with CometExecIterator.
92+
*/
93+
public class CometFileKeyUnwrapper {
94+
95+
// Each file path gets a unique DecryptionKeyRetriever
96+
private final ConcurrentHashMap<String, DecryptionKeyRetriever> retrieverCache =
97+
new ConcurrentHashMap<>();
98+
99+
// Cache the factory since we should be using the same hadoopConf for every file in this scan.
100+
private DecryptionPropertiesFactory factory = null;
101+
// Cache the hadoopConf just to assert the assumption above.
102+
private Configuration conf = null;
103+
104+
/**
105+
* Creates and stores a DecryptionKeyRetriever instance for the given file path.
106+
*
107+
* @param filePath The path to the Parquet file
108+
* @param hadoopConf The Hadoop Configuration to use for this file path
109+
*/
110+
public void storeDecryptionKeyRetriever(final String filePath, final Configuration hadoopConf) {
111+
// Use DecryptionPropertiesFactory.loadFactory to get the factory and then call
112+
// getFileDecryptionProperties
113+
if (factory == null) {
114+
factory = DecryptionPropertiesFactory.loadFactory(hadoopConf);
115+
conf = hadoopConf;
116+
} else {
117+
// Check the assumption that all files have the same hadoopConf and thus same Factory
118+
assert (conf == hadoopConf);
119+
}
120+
Path path = new Path(filePath);
121+
FileDecryptionProperties decryptionProperties =
122+
factory.getFileDecryptionProperties(hadoopConf, path);
123+
124+
DecryptionKeyRetriever keyRetriever = decryptionProperties.getKeyRetriever();
125+
retrieverCache.put(filePath, keyRetriever);
126+
}
127+
128+
/**
129+
* Gets the decryption key for the given key metadata using the cached DecryptionKeyRetriever for
130+
* the specified file path.
131+
*
132+
* @param filePath The path to the Parquet file
133+
* @param keyMetadata The key metadata bytes from the Parquet file
134+
* @return The decrypted key bytes
135+
* @throws ParquetCryptoRuntimeException if key unwrapping fails
136+
*/
137+
public byte[] getKey(final String filePath, final byte[] keyMetadata)
138+
throws ParquetCryptoRuntimeException {
139+
DecryptionKeyRetriever keyRetriever = retrieverCache.get(filePath);
140+
if (keyRetriever == null) {
141+
throw new ParquetCryptoRuntimeException(
142+
"Failed to find DecryptionKeyRetriever for path: " + filePath);
143+
}
144+
return keyRetriever.getKey(keyMetadata);
145+
}
146+
}

common/src/main/java/org/apache/comet/parquet/Native.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,9 +267,11 @@ public static native long initRecordBatchReader(
267267
String sessionTimezone,
268268
int batchSize,
269269
boolean caseSensitive,
270-
Map<String, String> objectStoreOptions);
270+
Map<String, String> objectStoreOptions,
271+
CometFileKeyUnwrapper keyUnwrapper);
271272

272273
// arrow native version of read batch
274+
273275
/**
274276
* Read the next batch of data into memory on native side
275277
*
@@ -280,6 +282,7 @@ public static native long initRecordBatchReader(
280282

281283
// arrow native equivalent of currentBatch. 'columnNum' is number of the column in the record
282284
// batch
285+
283286
/**
284287
* Load the column corresponding to columnNum in the currently loaded record batch into JVM
285288
*

common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@
8080
import org.apache.comet.vector.CometVector;
8181
import org.apache.comet.vector.NativeUtil;
8282

83-
import static scala.jdk.javaapi.CollectionConverters.*;
83+
import static scala.jdk.javaapi.CollectionConverters.asJava;
8484

8585
/**
8686
* A vectorized Parquet reader that reads a Parquet file in a batched fashion.
@@ -410,6 +410,15 @@ public void init() throws Throwable {
410410
}
411411
}
412412

413+
boolean encryptionEnabled = CometParquetUtils.encryptionEnabled(conf);
414+
415+
// Create keyUnwrapper if encryption is enabled
416+
CometFileKeyUnwrapper keyUnwrapper = null;
417+
if (encryptionEnabled) {
418+
keyUnwrapper = new CometFileKeyUnwrapper();
419+
keyUnwrapper.storeDecryptionKeyRetriever(file.filePath().toString(), conf);
420+
}
421+
413422
int batchSize =
414423
conf.getInt(
415424
CometConf.COMET_BATCH_SIZE().key(),
@@ -426,7 +435,8 @@ public void init() throws Throwable {
426435
timeZoneId,
427436
batchSize,
428437
caseSensitive,
429-
objectStoreOptions);
438+
objectStoreOptions,
439+
keyUnwrapper);
430440
}
431441
isInitialized = true;
432442
}

common/src/main/scala/org/apache/comet/objectstore/NativeConfig.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ object NativeConfig {
5858
def extractObjectStoreOptions(hadoopConf: Configuration, uri: URI): Map[String, String] = {
5959
val scheme = uri.getScheme.toLowerCase(Locale.ROOT)
6060

61-
import scala.collection.JavaConverters._
61+
import scala.jdk.CollectionConverters._
6262
val options = scala.collection.mutable.Map[String, String]()
6363

6464
// The schemes will use libhdfs

common/src/main/scala/org/apache/comet/parquet/CometParquetUtils.scala

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,25 @@
2020
package org.apache.comet.parquet
2121

2222
import org.apache.hadoop.conf.Configuration
23+
import org.apache.parquet.crypto.DecryptionPropertiesFactory
24+
import org.apache.parquet.crypto.keytools.{KeyToolkit, PropertiesDrivenCryptoFactory}
2325
import org.apache.spark.sql.internal.SQLConf
2426

2527
object CometParquetUtils {
2628
private val PARQUET_FIELD_ID_WRITE_ENABLED = "spark.sql.parquet.fieldId.write.enabled"
2729
private val PARQUET_FIELD_ID_READ_ENABLED = "spark.sql.parquet.fieldId.read.enabled"
2830
private val IGNORE_MISSING_PARQUET_FIELD_ID = "spark.sql.parquet.fieldId.read.ignoreMissing"
2931

32+
// Map of encryption configuration key-value pairs that, if present, are only supported with
33+
// these specific values. Generally, these are the default values that won't be present,
34+
// but if they are present we want to check them.
35+
private val SUPPORTED_ENCRYPTION_CONFIGS: Map[String, Set[String]] = Map(
36+
// https://github.com/apache/arrow-rs/blob/main/parquet/src/encryption/ciphers.rs#L21
37+
KeyToolkit.DATA_KEY_LENGTH_PROPERTY_NAME -> Set(KeyToolkit.DATA_KEY_LENGTH_DEFAULT.toString),
38+
KeyToolkit.KEK_LENGTH_PROPERTY_NAME -> Set(KeyToolkit.KEK_LENGTH_DEFAULT.toString),
39+
// https://github.com/apache/arrow-rs/blob/main/parquet/src/file/metadata/parser.rs#L494
40+
PropertiesDrivenCryptoFactory.ENCRYPTION_ALGORITHM_PROPERTY_NAME -> Set("AES_GCM_V1"))
41+
3042
def writeFieldId(conf: SQLConf): Boolean =
3143
conf.getConfString(PARQUET_FIELD_ID_WRITE_ENABLED, "false").toBoolean
3244

@@ -38,4 +50,36 @@ object CometParquetUtils {
3850

3951
def ignoreMissingIds(conf: SQLConf): Boolean =
4052
conf.getConfString(IGNORE_MISSING_PARQUET_FIELD_ID, "false").toBoolean
53+
54+
/**
55+
* Checks if the given Hadoop configuration contains any unsupported encryption settings.
56+
*
57+
* @param hadoopConf
58+
* The Hadoop configuration to check
59+
* @return
60+
* true if all encryption configurations are supported, false if any unsupported config is
61+
* found
62+
*/
63+
def isEncryptionConfigSupported(hadoopConf: Configuration): Boolean = {
64+
// Check configurations that, if present, can only have specific allowed values
65+
val supportedListCheck = SUPPORTED_ENCRYPTION_CONFIGS.forall {
66+
case (configKey, supportedValues) =>
67+
val configValue = Option(hadoopConf.get(configKey))
68+
configValue match {
69+
case Some(value) => supportedValues.contains(value)
70+
case None => true // Config not set, so it's supported
71+
}
72+
}
73+
74+
supportedListCheck
75+
}
76+
77+
def encryptionEnabled(hadoopConf: Configuration): Boolean = {
78+
// TODO: Are there any other properties to check?
79+
val encryptionKeys = Seq(
80+
DecryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME,
81+
KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME)
82+
83+
encryptionKeys.exists(key => Option(hadoopConf.get(key)).exists(_.nonEmpty))
84+
}
4185
}

native/Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ bytes = { workspace = true }
5959
tempfile = "3.8.0"
6060
itertools = "0.14.0"
6161
paste = "1.0.14"
62-
datafusion = { workspace = true }
62+
datafusion = { workspace = true, features = ["parquet_encryption"] }
6363
datafusion-spark = { workspace = true }
6464
once_cell = "1.18.0"
6565
regex = { workspace = true }

native/core/src/errors.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,15 @@ impl From<CometError> for DataFusionError {
185185
}
186186
}
187187

188+
impl From<CometError> for ParquetError {
189+
fn from(value: CometError) -> Self {
190+
match value {
191+
CometError::Parquet { source } => source,
192+
_ => ParquetError::General(value.to_string()),
193+
}
194+
}
195+
}
196+
188197
impl From<CometError> for ExecutionError {
189198
fn from(value: CometError) -> Self {
190199
match value {

native/core/src/execution/jni_api.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ use crate::execution::spark_plan::SparkPlan;
7878

7979
use crate::execution::tracing::{log_memory_usage, trace_begin, trace_end, with_trace};
8080

81+
use crate::parquet::encryption_support::{CometEncryptionFactory, ENCRYPTION_FACTORY_ID};
8182
use datafusion_comet_proto::spark_operator::operator::OpStruct;
8283
use log::info;
8384
use once_cell::sync::Lazy;
@@ -171,6 +172,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
171172
explain_native: jboolean,
172173
tracing_enabled: jboolean,
173174
max_temp_directory_size: jlong,
175+
key_unwrapper_obj: JObject,
174176
) -> jlong {
175177
try_unwrap_or_throw(&e, |mut env| {
176178
with_trace("createPlan", tracing_enabled != JNI_FALSE, || {
@@ -247,6 +249,17 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
247249
None
248250
};
249251

252+
// Handle key unwrapper for encrypted files
253+
if !key_unwrapper_obj.is_null() {
254+
let encryption_factory = CometEncryptionFactory {
255+
key_unwrapper: jni_new_global_ref!(env, key_unwrapper_obj)?,
256+
};
257+
session.runtime_env().register_parquet_encryption_factory(
258+
ENCRYPTION_FACTORY_ID,
259+
Arc::new(encryption_factory),
260+
);
261+
}
262+
250263
let exec_context = Box::new(ExecutionContext {
251264
id,
252265
task_attempt_id,

native/core/src/execution/planner.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1358,6 +1358,8 @@ impl PhysicalPlanner {
13581358
default_values,
13591359
scan.session_timezone.as_str(),
13601360
scan.case_sensitive,
1361+
self.session_ctx(),
1362+
scan.encryption_enabled,
13611363
)?;
13621364
Ok((
13631365
vec![],

0 commit comments

Comments
 (0)