Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 131 additions & 15 deletions datafusion/datasource/src/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,10 @@ impl ListingTableUrl {
.boxed(),
// If the head command fails, it is likely that object doesn't exist.
// Retry as though it were a prefix (aka a collection)
Err(_) => list_with_cache(ctx, store, &self.prefix).await?,
Err(object_store::Error::NotFound { .. }) => {
list_with_cache(ctx, store, &self.prefix).await?
}
Err(e) => return Err(e.into()),
}
};

Expand Down Expand Up @@ -405,6 +408,8 @@ fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion_common::config::TableOptions;
use datafusion_common::DFSchema;
use datafusion_execution::config::SessionConfig;
Expand All @@ -414,9 +419,13 @@ mod tests {
use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_plan::ExecutionPlan;
use object_store::PutPayload;
use object_store::{
GetOptions, GetResult, ListResult, MultipartUpload, PutMultipartOptions,
PutPayload,
};
use std::any::Any;
use std::collections::HashMap;
use std::ops::Range;
use tempfile::tempdir;

#[test]
Expand Down Expand Up @@ -632,48 +641,68 @@ mod tests {
}

#[tokio::test]
async fn test_list_files() {
let store = object_store::memory::InMemory::new();
async fn test_list_files() -> Result<()> {
let store = MockObjectStore {
in_mem: object_store::memory::InMemory::new(),
forbidden_paths: vec!["forbidden/e.parquet".into()],
};

// Create some files:
create_file(&store, "a.parquet").await;
create_file(&store, "/t/b.parquet").await;
create_file(&store, "/t/c.csv").await;
create_file(&store, "/t/d.csv").await;

// This file returns a permission error.
create_file(&store, "/forbidden/e.parquet").await;

assert_eq!(
list_all_files("/", &store, "parquet").await,
list_all_files("/", &store, "parquet").await?,
vec!["a.parquet"],
);

// test with and without trailing slash
assert_eq!(
list_all_files("/t/", &store, "parquet").await,
list_all_files("/t/", &store, "parquet").await?,
vec!["t/b.parquet"],
);
assert_eq!(
list_all_files("/t", &store, "parquet").await,
list_all_files("/t", &store, "parquet").await?,
vec!["t/b.parquet"],
);

// test with and without trailing slash
assert_eq!(
list_all_files("/t", &store, "csv").await,
list_all_files("/t", &store, "csv").await?,
vec!["t/c.csv", "t/d.csv"],
);
assert_eq!(
list_all_files("/t/", &store, "csv").await,
list_all_files("/t/", &store, "csv").await?,
vec!["t/c.csv", "t/d.csv"],
);

// Test a non existing prefix
assert_eq!(
list_all_files("/NonExisting", &store, "csv").await,
list_all_files("/NonExisting", &store, "csv").await?,
vec![] as Vec<String>
);
assert_eq!(
list_all_files("/NonExisting/", &store, "csv").await,
list_all_files("/NonExisting/", &store, "csv").await?,
vec![] as Vec<String>
);

// Including forbidden.parquet generates an error.
let Err(DataFusionError::ObjectStore(err)) =
list_all_files("/forbidden/e.parquet", &store, "parquet").await
else {
panic!("Expected ObjectStore error");
};

let object_store::Error::PermissionDenied { .. } = &*err else {
panic!("Expected PermissionDenied error");
};

Ok(())
}

/// Creates a file with "hello world" content at the specified path
Expand All @@ -691,10 +720,8 @@ mod tests {
url: &str,
store: &dyn ObjectStore,
file_extension: &str,
) -> Vec<String> {
try_list_all_files(url, store, file_extension)
.await
.unwrap()
) -> Result<Vec<String>> {
try_list_all_files(url, store, file_extension).await
}

/// Runs "list_all_files" and returns their paths
Expand All @@ -716,6 +743,95 @@ mod tests {
Ok(files)
}

#[derive(Debug)]
struct MockObjectStore {
in_mem: object_store::memory::InMemory,
forbidden_paths: Vec<Path>,
}

impl std::fmt::Display for MockObjectStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.in_mem.fmt(f)
}
}

#[async_trait]
impl ObjectStore for MockObjectStore {
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: object_store::PutOptions,
) -> object_store::Result<object_store::PutResult> {
self.in_mem.put_opts(location, payload, opts).await
}

async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOptions,
) -> object_store::Result<Box<dyn MultipartUpload>> {
self.in_mem.put_multipart_opts(location, opts).await
}

async fn get_opts(
&self,
location: &Path,
options: GetOptions,
) -> object_store::Result<GetResult> {
self.in_mem.get_opts(location, options).await
}

async fn get_ranges(
&self,
location: &Path,
ranges: &[Range<u64>],
) -> object_store::Result<Vec<Bytes>> {
self.in_mem.get_ranges(location, ranges).await
}

async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
if self.forbidden_paths.contains(location) {
Err(object_store::Error::PermissionDenied {
path: location.to_string(),
source: "forbidden".into(),
})
} else {
self.in_mem.head(location).await
}
}

async fn delete(&self, location: &Path) -> object_store::Result<()> {
self.in_mem.delete(location).await
}

fn list(
&self,
prefix: Option<&Path>,
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
self.in_mem.list(prefix)
}

async fn list_with_delimiter(
&self,
prefix: Option<&Path>,
) -> object_store::Result<ListResult> {
self.in_mem.list_with_delimiter(prefix).await
}

async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.in_mem.copy(from, to).await
}

async fn copy_if_not_exists(
&self,
from: &Path,
to: &Path,
) -> object_store::Result<()> {
self.in_mem.copy_if_not_exists(from, to).await
}
}

struct MockSession {
config: SessionConfig,
runtime_env: Arc<RuntimeEnv>,
Expand Down