Skip to content

Commit 9d0f0bc

Browse files
committed
fix: only fall back to listing prefixes on 404 errors
This logic would previously swallow all errors, including things like DNS resolution failures. If the path was indeed a path and not a prefix, that would result in the path being dropped, since the prefix wouldn't include any further files. Fixes #18242.
1 parent 408e1e4 commit 9d0f0bc

File tree

1 file changed

+131
-15
lines changed
  • datafusion/datasource/src

1 file changed

+131
-15
lines changed

datafusion/datasource/src/url.rs

Lines changed: 131 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,10 @@ impl ListingTableUrl {
252252
.boxed(),
253253
// If the head command fails, it is likely that object doesn't exist.
254254
// Retry as though it were a prefix (aka a collection)
255-
Err(_) => list_with_cache(ctx, store, &self.prefix).await?,
255+
Err(object_store::Error::NotFound { .. }) => {
256+
list_with_cache(ctx, store, &self.prefix).await?
257+
}
258+
Err(e) => return Err(e.into()),
256259
}
257260
};
258261

@@ -405,6 +408,8 @@ fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
405408
#[cfg(test)]
406409
mod tests {
407410
use super::*;
411+
use async_trait::async_trait;
412+
use bytes::Bytes;
408413
use datafusion_common::config::TableOptions;
409414
use datafusion_common::DFSchema;
410415
use datafusion_execution::config::SessionConfig;
@@ -414,9 +419,13 @@ mod tests {
414419
use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF};
415420
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
416421
use datafusion_physical_plan::ExecutionPlan;
417-
use object_store::PutPayload;
422+
use object_store::{
423+
GetOptions, GetResult, ListResult, MultipartUpload, PutMultipartOptions,
424+
PutPayload,
425+
};
418426
use std::any::Any;
419427
use std::collections::HashMap;
428+
use std::ops::Range;
420429
use tempfile::tempdir;
421430

422431
#[test]
@@ -632,48 +641,68 @@ mod tests {
632641
}
633642

634643
#[tokio::test]
635-
async fn test_list_files() {
636-
let store = object_store::memory::InMemory::new();
644+
async fn test_list_files() -> Result<()> {
645+
let store = MockObjectStore {
646+
in_mem: object_store::memory::InMemory::new(),
647+
forbidden_paths: vec!["forbidden/e.parquet".into()],
648+
};
649+
637650
// Create some files:
638651
create_file(&store, "a.parquet").await;
639652
create_file(&store, "/t/b.parquet").await;
640653
create_file(&store, "/t/c.csv").await;
641654
create_file(&store, "/t/d.csv").await;
642655

656+
// This file returns a permission error.
657+
create_file(&store, "/forbidden/e.parquet").await;
658+
643659
assert_eq!(
644-
list_all_files("/", &store, "parquet").await,
660+
list_all_files("/", &store, "parquet").await?,
645661
vec!["a.parquet"],
646662
);
647663

648664
// test with and without trailing slash
649665
assert_eq!(
650-
list_all_files("/t/", &store, "parquet").await,
666+
list_all_files("/t/", &store, "parquet").await?,
651667
vec!["t/b.parquet"],
652668
);
653669
assert_eq!(
654-
list_all_files("/t", &store, "parquet").await,
670+
list_all_files("/t", &store, "parquet").await?,
655671
vec!["t/b.parquet"],
656672
);
657673

658674
// test with and without trailing slash
659675
assert_eq!(
660-
list_all_files("/t", &store, "csv").await,
676+
list_all_files("/t", &store, "csv").await?,
661677
vec!["t/c.csv", "t/d.csv"],
662678
);
663679
assert_eq!(
664-
list_all_files("/t/", &store, "csv").await,
680+
list_all_files("/t/", &store, "csv").await?,
665681
vec!["t/c.csv", "t/d.csv"],
666682
);
667683

668684
// Test a non existing prefix
669685
assert_eq!(
670-
list_all_files("/NonExisting", &store, "csv").await,
686+
list_all_files("/NonExisting", &store, "csv").await?,
671687
vec![] as Vec<String>
672688
);
673689
assert_eq!(
674-
list_all_files("/NonExisting/", &store, "csv").await,
690+
list_all_files("/NonExisting/", &store, "csv").await?,
675691
vec![] as Vec<String>
676692
);
693+
694+
// Including forbidden.parquet generates an error.
695+
let Err(DataFusionError::ObjectStore(err)) =
696+
list_all_files("/forbidden/e.parquet", &store, "parquet").await
697+
else {
698+
panic!("Expected ObjectStore error");
699+
};
700+
701+
let object_store::Error::PermissionDenied { .. } = &*err else {
702+
panic!("Expected PermissionDenied error");
703+
};
704+
705+
Ok(())
677706
}
678707

679708
/// Creates a file with "hello world" content at the specified path
@@ -691,10 +720,8 @@ mod tests {
691720
url: &str,
692721
store: &dyn ObjectStore,
693722
file_extension: &str,
694-
) -> Vec<String> {
695-
try_list_all_files(url, store, file_extension)
696-
.await
697-
.unwrap()
723+
) -> Result<Vec<String>> {
724+
try_list_all_files(url, store, file_extension).await
698725
}
699726

700727
/// Runs "list_all_files" and returns their paths
@@ -716,6 +743,95 @@ mod tests {
716743
Ok(files)
717744
}
718745

746+
#[derive(Debug)]
747+
struct MockObjectStore {
748+
in_mem: object_store::memory::InMemory,
749+
forbidden_paths: Vec<Path>,
750+
}
751+
752+
impl std::fmt::Display for MockObjectStore {
753+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
754+
self.in_mem.fmt(f)
755+
}
756+
}
757+
758+
#[async_trait]
759+
impl ObjectStore for MockObjectStore {
760+
async fn put_opts(
761+
&self,
762+
location: &Path,
763+
payload: PutPayload,
764+
opts: object_store::PutOptions,
765+
) -> object_store::Result<object_store::PutResult> {
766+
self.in_mem.put_opts(location, payload, opts).await
767+
}
768+
769+
async fn put_multipart_opts(
770+
&self,
771+
location: &Path,
772+
opts: PutMultipartOptions,
773+
) -> object_store::Result<Box<dyn MultipartUpload>> {
774+
self.in_mem.put_multipart_opts(location, opts).await
775+
}
776+
777+
async fn get_opts(
778+
&self,
779+
location: &Path,
780+
options: GetOptions,
781+
) -> object_store::Result<GetResult> {
782+
self.in_mem.get_opts(location, options).await
783+
}
784+
785+
async fn get_ranges(
786+
&self,
787+
location: &Path,
788+
ranges: &[Range<u64>],
789+
) -> object_store::Result<Vec<Bytes>> {
790+
self.in_mem.get_ranges(location, ranges).await
791+
}
792+
793+
async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
794+
if self.forbidden_paths.contains(location) {
795+
Err(object_store::Error::PermissionDenied {
796+
path: location.to_string(),
797+
source: "forbidden".into(),
798+
})
799+
} else {
800+
self.in_mem.head(location).await
801+
}
802+
}
803+
804+
async fn delete(&self, location: &Path) -> object_store::Result<()> {
805+
self.in_mem.delete(location).await
806+
}
807+
808+
fn list(
809+
&self,
810+
prefix: Option<&Path>,
811+
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
812+
self.in_mem.list(prefix)
813+
}
814+
815+
async fn list_with_delimiter(
816+
&self,
817+
prefix: Option<&Path>,
818+
) -> object_store::Result<ListResult> {
819+
self.in_mem.list_with_delimiter(prefix).await
820+
}
821+
822+
async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
823+
self.in_mem.copy(from, to).await
824+
}
825+
826+
async fn copy_if_not_exists(
827+
&self,
828+
from: &Path,
829+
to: &Path,
830+
) -> object_store::Result<()> {
831+
self.in_mem.copy_if_not_exists(from, to).await
832+
}
833+
}
834+
719835
struct MockSession {
720836
config: SessionConfig,
721837
runtime_env: Arc<RuntimeEnv>,

0 commit comments

Comments
 (0)