diff --git a/Cargo.lock b/Cargo.lock index 641ff87f5..440e5977f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2962,8 +2962,7 @@ dependencies = [ [[package]] name = "object_store" version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25a0c4b3a0e31f8b66f71ad8064521efa773910196e2cde791436f13409f3b45" +source = "git+https://github.com/apache/arrow-rs.git?rev=23b6ff9f432e8e29c08d47a315ba0b7cb8758225#23b6ff9f432e8e29c08d47a315ba0b7cb8758225" dependencies = [ "async-trait", "base64 0.22.0", @@ -3448,7 +3447,7 @@ checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" dependencies = [ "bytes", "heck 0.5.0", - "itertools 0.13.0", + "itertools 0.12.1", "log", "multimap", "once_cell", @@ -3468,7 +3467,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" dependencies = [ "anyhow", - "itertools 0.13.0", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.79", diff --git a/Cargo.toml b/Cargo.toml index 156820ec5..0e263dc4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,10 @@ [workspace] members = ["server"] resolver = "2" + +[patch.crates-io] +# object_store added support for SSE-C headers in: +# - https://github.com/apache/arrow-rs/pull/6230 +# - https://github.com/apache/arrow-rs/pull/6260 +# But a new version hasn't been published to crates.io for this yet. So, we are using this patch temporarily. +object_store = { git = "https://github.com/apache/arrow-rs.git", rev = "23b6ff9f432e8e29c08d47a315ba0b7cb8758225" } diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 0627100a2..0c9eb982e 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -32,8 +32,10 @@ use object_store::{ClientOptions, ObjectStore, PutPayload}; use relative_path::{RelativePath, RelativePathBuf}; use std::collections::BTreeMap; +use std::fmt::Display; use std::iter::Iterator; use std::path::Path as StdPath; +use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -84,6 +86,16 @@ pub struct S3Config { #[arg(long, env = "P_S3_BUCKET", value_name = "bucket-name", required = true)] pub bucket_name: String, + /// Server side encryption to use for operations with objects. + /// Currently, this only supports SSE-C. Value should be + /// like SSE-C:AES256:. + #[arg( + long, + env = "P_S3_SSEC_ENCRYPTION_KEY", + value_name = "ssec-encryption-key" + )] + pub ssec_encryption_key: Option, + /// Set client to send checksum header on every put request #[arg( long, @@ -130,6 +142,72 @@ pub struct S3Config { pub metadata_endpoint: Option, } +/// This represents the server side encryption to be +/// used when working with S3 objects. +#[derive(Debug, Clone)] +pub enum SSECEncryptionKey { + /// https://docs.aws.amazon.com/AmazonS3/latest/userguide/ServerSideEncryptionCustomerKeys.html + SseC { + // algorithm unused but being tracked separately to maintain + // consistent interface via CLI if AWS adds any new algorithms + // in future. + _algorithm: ObjectEncryptionAlgorithm, + base64_encryption_key: String, + }, +} + +impl FromStr for SSECEncryptionKey { + type Err = String; + + fn from_str(s: &str) -> Result { + let parts = s.split(':').collect::>(); + if parts.len() == 3 { + let sse_type = parts[0]; + if sse_type != "SSE-C" { + return Err("Only SSE-C is supported for object encryption for now".into()); + } + + let algorithm = parts[1]; + let encryption_key = parts[2]; + + let alg = ObjectEncryptionAlgorithm::from_str(algorithm)?; + + Ok(match alg { + ObjectEncryptionAlgorithm::Aes256 => SSECEncryptionKey::SseC { + _algorithm: alg, + base64_encryption_key: encryption_key.to_owned(), + }, + }) + } else { + Err("Expected SSE-C:AES256:".into()) + } + } +} + +#[derive(Debug, Clone, Copy)] +pub enum ObjectEncryptionAlgorithm { + Aes256, +} + +impl FromStr for ObjectEncryptionAlgorithm { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "AES256" => Ok(ObjectEncryptionAlgorithm::Aes256), + _ => Err("Invalid SSE algorithm. Following are supported: AES256".into()), + } + } +} + +impl Display for ObjectEncryptionAlgorithm { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ObjectEncryptionAlgorithm::Aes256 => write!(f, "AES256"), + } + } +} + impl S3Config { fn get_default_builder(&self) -> AmazonS3Builder { let mut client_options = ClientOptions::default() @@ -160,6 +238,17 @@ impl S3Config { .with_secret_access_key(secret_key); } + if let Some(ssec_encryption_key) = &self.ssec_encryption_key { + match ssec_encryption_key { + SSECEncryptionKey::SseC { + _algorithm, + base64_encryption_key, + } => { + builder = builder.with_ssec_encryption(base64_encryption_key); + } + } + } + if let Ok(relative_uri) = std::env::var(AWS_CONTAINER_CREDENTIALS_RELATIVE_URI) { builder = builder.with_config( AmazonS3ConfigKey::ContainerCredentialsRelativeUri,