Skip to content

Commit

Permalink
[GLUTEN-8455][VL] Port encrypted file checks to shim layer (#8501)
Browse files Browse the repository at this point in the history
  • Loading branch information
ArnavBalyan authored Jan 16, 2025
1 parent ddf7e32 commit 67ebbc8
Show file tree
Hide file tree
Showing 8 changed files with 293 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@
package org.apache.gluten.utils

import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.ParquetReadFormat

import org.apache.spark.util.SerializableConfiguration

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
import org.apache.parquet.crypto.ParquetCryptoRuntimeException
import org.apache.parquet.hadoop.ParquetFileReader

object ParquetMetadataUtils {

Expand Down Expand Up @@ -98,38 +97,9 @@ object ParquetMetadataUtils {
while (filesIterator.hasNext && checkedFileCount < fileLimit) {
val fileStatus = filesIterator.next()
checkedFileCount += 1
try {
ParquetFileReader.readFooter(conf, fileStatus.getPath).toString
} catch {
case e: Exception if hasCause(e, classOf[ParquetCryptoRuntimeException]) =>
return true
case e: Exception =>
}
}
false
}

/**
* Utility to check the exception for the specified type. Parquet 1.12 does not provide direct
* utility to check for encryption. Newer versions provide utility to check encryption from read
* footer which can be used in the future once Spark brings it in.
*
* @param throwable
* Exception to check
* @param causeType
* Class of the cause to look for
* @tparam T
* Type of the cause
* @return
* True if the cause is found; false otherwise
*/
private def hasCause[T <: Throwable](throwable: Throwable, causeType: Class[T]): Boolean = {
var current = throwable
while (current != null) {
if (causeType.isInstance(current)) {
if (SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, conf)) {
return true
}
current = current.getCause
}
false
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.utils

import org.apache.gluten.sql.shims.SparkShimLoader

import org.apache.spark.sql.{GlutenQueryTest, SparkSession}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
import org.apache.parquet.crypto.{ColumnEncryptionProperties, FileEncryptionProperties}
import org.apache.parquet.example.data.simple.SimpleGroup
import org.apache.parquet.hadoop.example.ExampleParquetWriter
import org.apache.parquet.hadoop.metadata.ColumnPath
import org.apache.parquet.schema.{MessageType, PrimitiveType, Type, Types}
import org.junit.Assert._

import java.nio.charset.StandardCharsets
import java.util.Base64

import scala.collection.JavaConverters._

/**
* This suite attempt to test parquet encryption for fallback of scan operator. Will check the
* following:
* 1. Plain Parquet File:
* - Writes a Parquet file with no encryption.
* - Asserts that parquet is not encrypted
*
* 2. Encrypted Parquet File (with encrypted footer):
* - Writes a Parquet file with column-level encryption and an encrypted footer.
* - Asserts that the file is encrypted.
*
* 3. Encrypted Parquet File (with plaintext footer):
* - Writes a Parquet file with column-level encryption but a plaintext (unencrypted) footer.
* - Ensures the file is still detected as encrypted despite the plaintext footer.
*/

class ParquetEncryptionDetectionSuite extends GlutenQueryTest {

private val masterKey =
Base64.getEncoder.encodeToString("0123456789012345".getBytes(StandardCharsets.UTF_8))
private val columnKey =
Base64.getEncoder.encodeToString("1234567890123456".getBytes(StandardCharsets.UTF_8))

private val schema: MessageType = Types
.buildMessage()
.addField(
Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED).named("id"))
.addField(
Types
.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.REQUIRED)
.named("name"))
.named("TestSchema")

private var _spark: SparkSession = _

override protected def spark: SparkSession = _spark

private def writeParquet(
path: String,
encryptionProperties: Option[FileEncryptionProperties],
data: Seq[Map[String, Any]]
): Unit = {
val configuration = new Configuration()
val writerBuilder = ExampleParquetWriter
.builder(new Path(path))
.withConf(configuration)
.withType(schema)
.withWriteMode(org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE)

encryptionProperties.foreach(writerBuilder.withEncryption)

val writer = writerBuilder.build()
try {
data.foreach {
row =>
val group = new SimpleGroup(schema)
row.foreach {
case (key, value) =>
value match {
case i: Int => group.add(key, i)
case s: String => group.add(key, s)
}
}
writer.write(group)
}
} finally {
writer.close()
}
}

private def getLocatedFileStatus(path: String): LocatedFileStatus = {
val conf = new Configuration()
val fs = FileSystem.get(conf)
fs.listFiles(new Path(path), false).next()
}

testWithSpecifiedSparkVersion(
"Detect encrypted Parquet with encrypted footer",
Array("3.2", "3.3", "3.4")) {
withTempDir {
tempDir =>
val filePath = s"${tempDir.getAbsolutePath}/encrypted_footer.parquet"
val encryptionProps = FileEncryptionProperties
.builder(Base64.getDecoder.decode(masterKey))
.withEncryptedColumns(
Map(
ColumnPath.get("name") -> ColumnEncryptionProperties
.builder(ColumnPath.get("name"))
.withKey(Base64.getDecoder.decode(columnKey))
.build()).asJava)
.build()

writeParquet(filePath, Some(encryptionProps), Seq(Map("id" -> 1, "name" -> "Alice")))
val fileStatus = getLocatedFileStatus(filePath)

assertTrue(
SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new Configuration()))
}
}

testWithSpecifiedSparkVersion(
"Detect encrypted Parquet without encrypted footer (plaintext footer)",
Array("3.2", "3.3", "3.4")) {
withTempDir {
tempDir =>
val filePath = s"${tempDir.getAbsolutePath}/plaintext_footer.parquet"
val encryptionProps = FileEncryptionProperties
.builder(Base64.getDecoder.decode(masterKey))
.withEncryptedColumns(
Map(
ColumnPath.get("name") -> ColumnEncryptionProperties
.builder(ColumnPath.get("name"))
.withKey(Base64.getDecoder.decode(columnKey))
.build()).asJava)
.withPlaintextFooter()
.build()

writeParquet(filePath, Some(encryptionProps), Seq(Map("id" -> 1, "name" -> "Bob")))
val fileStatus = getLocatedFileStatus(filePath)
assertTrue(
SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new Configuration()))
}
}

testWithSpecifiedSparkVersion(
"Detect plain (unencrypted) Parquet file",
Array("3.2", "3.3", "3.4")) {
withTempDir {
tempDir =>
val filePath = s"${tempDir.getAbsolutePath}/plain.parquet"

writeParquet(filePath, None, Seq(Map("id" -> 1, "name" -> "Charlie")))
val fileStatus = getLocatedFileStatus(filePath)

assertFalse(
SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new Configuration()))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ import org.apache.spark.sql.types.{DecimalType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{BlockId, BlockManagerId}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.LocatedFileStatus
import org.apache.parquet.schema.MessageType

import java.util.{Map => JMap, Properties}
Expand Down Expand Up @@ -285,4 +287,7 @@ trait SparkShims {

/** Shim method for usages from GlutenExplainUtils.scala. */
def unsetOperatorId(plan: QueryPlan[_]): Unit

def isParquetFileEncrypted(fileStatus: LocatedFileStatus, conf: Configuration): Boolean

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.utils

object ExceptionUtils {

/**
* Utility to check the exception for the specified type.
*
* @param throwable
* Exception to check
* @param causeType
* Class of the cause to look for
* @tparam T
* Type of the cause
* @return
* True if the cause is found; false otherwise
*/
def hasCause[T <: Throwable](throwable: Throwable, causeType: Class[T]): Boolean = {
var current = throwable
while (current != null) {
if (causeType.isInstance(current)) {
return true
}
current = current.getCause
}
false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.gluten.sql.shims.spark32
import org.apache.gluten.execution.datasource.GlutenFormatFactory
import org.apache.gluten.expression.{ExpressionNames, Sig}
import org.apache.gluten.sql.shims.{ShimDescriptor, SparkShims}
import org.apache.gluten.utils.ExceptionUtils

import org.apache.spark.{ShuffleUtils, SparkContext, TaskContext, TaskContextUtils}
import org.apache.spark.scheduler.TaskInfo
Expand Down Expand Up @@ -51,7 +52,10 @@ import org.apache.spark.sql.types.{DecimalType, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{BlockId, BlockManagerId}

import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
import org.apache.parquet.crypto.ParquetCryptoRuntimeException
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.MessageType

import java.util.{HashMap => JHashMap, Map => JMap, Properties}
Expand Down Expand Up @@ -296,4 +300,19 @@ class Spark32Shims extends SparkShims {
override def unsetOperatorId(plan: QueryPlan[_]): Unit = {
plan.unsetTagValue(QueryPlan.OP_ID_TAG)
}

override def isParquetFileEncrypted(
fileStatus: LocatedFileStatus,
conf: Configuration): Boolean = {
try {
ParquetFileReader.readFooter(new Configuration(), fileStatus.getPath).toString
false
} catch {
case e: Exception if ExceptionUtils.hasCause(e, classOf[ParquetCryptoRuntimeException]) =>
true
case e: Throwable =>
e.printStackTrace()
false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.gluten.execution.datasource.GlutenFormatFactory
import org.apache.gluten.expression.{ExpressionNames, Sig}
import org.apache.gluten.expression.ExpressionNames.{CEIL, FLOOR, KNOWN_NULLABLE, TIMESTAMP_ADD}
import org.apache.gluten.sql.shims.{ShimDescriptor, SparkShims}
import org.apache.gluten.utils.ExceptionUtils

import org.apache.spark._
import org.apache.spark.scheduler.TaskInfo
Expand Down Expand Up @@ -53,7 +54,10 @@ import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{BlockId, BlockManagerId}

import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
import org.apache.parquet.crypto.ParquetCryptoRuntimeException
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.MessageType

import java.time.ZoneOffset
Expand Down Expand Up @@ -377,4 +381,19 @@ class Spark33Shims extends SparkShims {
override def unsetOperatorId(plan: QueryPlan[_]): Unit = {
plan.unsetTagValue(QueryPlan.OP_ID_TAG)
}
override def isParquetFileEncrypted(
fileStatus: LocatedFileStatus,
conf: Configuration): Boolean = {
try {
ParquetFileReader.readFooter(new Configuration(), fileStatus.getPath).toString
false
} catch {
case e: Exception if ExceptionUtils.hasCause(e, classOf[ParquetCryptoRuntimeException]) =>
true
case e: Throwable =>
e.printStackTrace()
false
}
}

}
Loading

0 comments on commit 67ebbc8

Please sign in to comment.