diff --git a/crates/storage/opendal/src/lib.rs b/crates/storage/opendal/src/lib.rs index 52f4e68ed3..454abe2807 100644 --- a/crates/storage/opendal/src/lib.rs +++ b/crates/storage/opendal/src/lib.rs @@ -100,6 +100,15 @@ cfg_if! { mod resolving; pub use resolving::{OpenDalResolvingStorage, OpenDalResolvingStorageFactory}; +/// Returns `true` when the property map contains S3-style configuration, +/// indicating that `oss://` paths should be routed through the S3 backend +/// (e.g. REST catalog vended credentials). +#[cfg(all(feature = "opendal-oss", feature = "opendal-s3"))] +fn has_s3_config(props: &HashMap) -> bool { + props.contains_key(iceberg::io::S3_ENDPOINT) + || props.contains_key(iceberg::io::S3_ACCESS_KEY_ID) +} + /// OpenDAL-based storage factory. /// /// Maps scheme to the corresponding OpenDalStorage storage variant. @@ -155,7 +164,23 @@ impl StorageFactory for OpenDalStorageFactory { OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs { config: gcs_config_parse(config.props().clone())?.into(), })), - #[cfg(feature = "opendal-oss")] + // OSS: if `s3.*` props are present (REST catalog vended credentials), + // route through S3 backend (OSS is S3-API-compatible). + // Otherwise fall back to native OSS backend (RAM/OIDC auth). + #[cfg(all(feature = "opendal-oss", feature = "opendal-s3"))] + OpenDalStorageFactory::Oss => { + if has_s3_config(config.props()) { + Ok(Arc::new(OpenDalStorage::S3 { + config: s3_config_parse(config.props().clone())?.into(), + customized_credential_load: None, + })) + } else { + Ok(Arc::new(OpenDalStorage::Oss { + config: oss_config_parse(config.props().clone())?.into(), + })) + } + } + #[cfg(all(feature = "opendal-oss", not(feature = "opendal-s3")))] OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss { config: oss_config_parse(config.props().clone())?.into(), })), @@ -217,7 +242,10 @@ pub enum OpenDalStorage { /// GCS configuration. config: Arc, }, - /// OSS storage variant. + /// OSS storage variant (native OpenDAL OSS backend). + /// + /// Used when no `s3.*` configuration is present, allowing native + /// Aliyun authentication (RAM/OIDC/assume-role). #[cfg(feature = "opendal-oss")] Oss { /// OSS configuration. @@ -736,20 +764,6 @@ mod tests { ); } - #[cfg(feature = "opendal-oss")] - #[test] - fn test_relativize_path_oss_invalid_scheme() { - let storage = OpenDalStorage::Oss { - config: Arc::new(OssConfig::default()), - }; - - assert!( - storage - .relativize_path("s3://my-bucket/path/to/file.parquet") - .is_err() - ); - } - #[cfg(feature = "opendal-azdls")] #[test] fn test_relativize_path_azdls() { diff --git a/crates/storage/opendal/src/oss.rs b/crates/storage/opendal/src/oss.rs index add8b7a0f7..46bc258f54 100644 --- a/crates/storage/opendal/src/oss.rs +++ b/crates/storage/opendal/src/oss.rs @@ -15,9 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; - -use iceberg::io::{OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT}; use iceberg::{Error, ErrorKind, Result}; use opendal::services::OssConfig; use opendal::{Configurator, Operator}; @@ -26,7 +23,11 @@ use url::Url; use crate::utils::from_opendal_error; /// Parse iceberg props to oss config. -pub(crate) fn oss_config_parse(mut m: HashMap) -> Result { +pub(crate) fn oss_config_parse( + mut m: std::collections::HashMap, +) -> Result { + use iceberg::io::{OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT}; + let mut cfg: OssConfig = OssConfig::default(); if let Some(endpoint) = m.remove(OSS_ENDPOINT) { cfg.endpoint = Some(endpoint); diff --git a/crates/storage/opendal/src/resolving.rs b/crates/storage/opendal/src/resolving.rs index 86993220a8..69131e0d9f 100644 --- a/crates/storage/opendal/src/resolving.rs +++ b/crates/storage/opendal/src/resolving.rs @@ -102,7 +102,25 @@ fn build_storage_for_scheme( config: Arc::new(config), }) } - #[cfg(feature = "opendal-oss")] + // OSS: if `s3.*` props are present (REST catalog vended credentials), + // route through S3 backend (OSS is S3-API-compatible). + // Otherwise fall back to native OSS backend (RAM/OIDC auth). + #[cfg(all(feature = "opendal-oss", feature = "opendal-s3"))] + "oss" => { + if crate::has_s3_config(props) { + let config = crate::s3::s3_config_parse(props.clone())?; + Ok(OpenDalStorage::S3 { + config: Arc::new(config), + customized_credential_load: customized_credential_load.clone(), + }) + } else { + let config = crate::oss::oss_config_parse(props.clone())?; + Ok(OpenDalStorage::Oss { + config: Arc::new(config), + }) + } + } + #[cfg(all(feature = "opendal-oss", not(feature = "opendal-s3")))] "oss" => { let config = crate::oss::oss_config_parse(props.clone())?; Ok(OpenDalStorage::Oss { @@ -371,4 +389,62 @@ mod tests { "abfss and abfs should share one instance" ); } + + #[cfg(all(feature = "opendal-oss", feature = "opendal-s3"))] + #[test] + fn oss_routes_through_s3_when_s3_config_present() { + use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT}; + + let mut props = HashMap::new(); + props.insert(S3_ACCESS_KEY_ID.to_string(), "AKIA_TEST".to_string()); + props.insert( + S3_ENDPOINT.to_string(), + "https://oss-cn-shanghai.aliyuncs.com".to_string(), + ); + + let storage = build_storage_for_scheme("oss", &props, &None).unwrap(); + assert!( + matches!(storage, OpenDalStorage::S3 { .. }), + "OSS with s3.* config should route through S3 backend" + ); + } + + #[cfg(all(feature = "opendal-oss", feature = "opendal-s3"))] + #[test] + fn oss_falls_back_to_native_without_s3_config() { + let props = HashMap::new(); + + let storage = build_storage_for_scheme("oss", &props, &None).unwrap(); + assert!( + matches!(storage, OpenDalStorage::Oss { .. }), + "OSS without s3.* config should use native OSS backend" + ); + } + + #[cfg(all(feature = "opendal-oss", feature = "opendal-s3"))] + #[test] + fn oss_resolve_uses_s3_backend_and_caches() { + use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT}; + + let mut props = HashMap::new(); + props.insert(S3_ACCESS_KEY_ID.to_string(), "AKIA_TEST".to_string()); + props.insert( + S3_ENDPOINT.to_string(), + "https://oss-cn-shanghai.aliyuncs.com".to_string(), + ); + + let resolving = OpenDalResolvingStorage { + props, + storages: RwLock::new(HashMap::new()), + #[cfg(feature = "opendal-s3")] + customized_credential_load: None, + }; + + let storage = resolving.resolve("oss://my-bucket/path/to/file").unwrap(); + assert!(matches!(storage.as_ref(), OpenDalStorage::S3 { .. })); + + // Second call should hit the cache. + let cached = resolving.resolve("oss://my-bucket/other").unwrap(); + assert!(Arc::ptr_eq(&storage, &cached)); + } }