Skip to content

Commit

Permalink
feat(core): make list return path itself
Browse files Browse the repository at this point in the history
  • Loading branch information
meteorgan committed Aug 5, 2024
1 parent fac74d4 commit d40e03b
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 93 deletions.
12 changes: 4 additions & 8 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,7 @@ impl<A: Access> CompleteAccessor<A> {
if path.ends_with('/') && capability.list_with_recursive {
let (_, mut l) = self
.inner
.list(
path.trim_end_matches('/'),
OpList::default().with_recursive(true).with_limit(1),
)
.list(path, OpList::default().with_recursive(true).with_limit(1))
.await?;

return if oio::List::next(&mut l).await?.is_some() {
Expand Down Expand Up @@ -246,10 +243,9 @@ impl<A: Access> CompleteAccessor<A> {

// Otherwise, we can simulate stat a dir path via `list`.
if path.ends_with('/') && capability.list_with_recursive {
let (_, mut l) = self.inner.blocking_list(
path.trim_end_matches('/'),
OpList::default().with_recursive(true).with_limit(1),
)?;
let (_, mut l) = self
.inner
.blocking_list(path, OpList::default().with_recursive(true).with_limit(1))?;

return if oio::BlockingList::next(&mut l)?.is_some() {
Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
Expand Down
2 changes: 1 addition & 1 deletion core/src/raw/http_util/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl HttpClient {
// Get content length from header so that we can check it.
//
// - If the request method is HEAD, we will ignore content length.
// - If response contains content_encoding, we should omit it's content length.
// - If response contains content_encoding, we should omit its content length.
let content_length = if is_head || parse_content_encoding(resp.headers())?.is_some() {
None
} else {
Expand Down
59 changes: 25 additions & 34 deletions core/src/raw/oio/list/flat_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ use crate::*;
/// always output directly while listing.
pub struct FlatLister<A: Access, L> {
acc: A,
root: String,

next_dir: Option<oio::Entry>,
active_lister: Vec<(Option<oio::Entry>, L)>,
Expand All @@ -79,7 +78,6 @@ where
pub fn new(acc: A, path: &str) -> FlatLister<A, L> {
FlatLister {
acc,
root: path.to_string(),
next_dir: Some(oio::Entry::new(path, Metadata::new(EntryMode::DIR))),
active_lister: vec![],
}
Expand All @@ -105,25 +103,22 @@ where

match lister.next().await? {
Some(v) if v.mode().is_dir() => {
self.next_dir = Some(v);
continue;
// should not loop itself again
if v.path() != de.as_ref().expect("de should not be none here").path() {
self.next_dir = Some(v);
continue;
}
}
Some(v) => return Ok(Some(v)),
None => {
match de.take() {
Some(de) => {
// Only push entry if it's not root dir
if de.path() != self.root {
return Ok(Some(de));
}
continue;
}
None => {
let _ = self.active_lister.pop();
continue;
}
None => match de.take() {
Some(de) => {
return Ok(Some(de));
}
}
None => {
let _ = self.active_lister.pop();
continue;
}
},
}
}
}
Expand All @@ -149,25 +144,21 @@ where

match lister.next()? {
Some(v) if v.mode().is_dir() => {
self.next_dir = Some(v);
continue;
if v.path() != de.as_ref().expect("de should not be none here").path() {
self.next_dir = Some(v);
continue;
}
}
Some(v) => return Ok(Some(v)),
None => {
match de.take() {
Some(de) => {
// Only push entry if it's not root dir
if de.path() != self.root {
return Ok(Some(de));
}
continue;
}
None => {
let _ = self.active_lister.pop();
continue;
}
None => match de.take() {
Some(de) => {
return Ok(Some(de));
}
}
None => {
let _ = self.active_lister.pop();
continue;
}
},
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions core/src/raw/oio/list/page_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,15 @@ pub struct PageContext {
pub done: bool,
/// token is used by underlying storage services to fetch next page.
pub token: String,
/// entries is used to store entries fetched from underlying storage.
/// entries are used to store entries fetched from underlying storage.
///
/// Please always reuse the same `VecDeque` to avoid unnecessary memory allocation.
/// PageLister makes sure that entries is reset before calling `next_page`. Implementer
/// can calling `push_back` on `entries` directly.
/// can call `push_back` on `entries` directly.
pub entries: VecDeque<oio::Entry>,

/// whether the path itself has been added
pub path_added: bool,
}

/// PageLister implements [`oio::List`] based on [`PageList`].
Expand All @@ -80,6 +83,7 @@ where
done: false,
token: "".to_string(),
entries: VecDeque::new(),
path_added: false,
},
}
}
Expand Down
13 changes: 2 additions & 11 deletions core/src/raw/oio/list/prefix_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,14 @@ impl<L> PrefixLister<L> {
}
}

#[inline]
fn starts_with_not_eq(entry: &oio::Entry, prefix: &str) -> bool {
match entry.path().strip_prefix(prefix) {
None => false,
Some("") => false,
Some(_) => true,
}
}

impl<L> oio::List for PrefixLister<L>
where
L: oio::List,
{
async fn next(&mut self) -> Result<Option<oio::Entry>> {
loop {
match self.lister.next().await {
Ok(Some(e)) if !starts_with_not_eq(&e, &self.prefix) => continue,
Ok(Some(e)) if !e.path().starts_with(&self.prefix) => continue,
v => return v,
}
}
Expand All @@ -79,7 +70,7 @@ where
fn next(&mut self) -> Result<Option<oio::Entry>> {
loop {
match self.lister.next() {
Ok(Some(e)) if !starts_with_not_eq(&e, &self.prefix) => continue,
Ok(Some(e)) if !e.path().starts_with(&self.prefix) => continue,
v => return v,
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/services/fs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ impl Access for FsBackend {
}
};

let rd = FsLister::new(&self.core.root, f);
let rd = FsLister::new(&self.core.root, path, f);

Ok((RpList::default(), Some(rd)))
}
Expand Down Expand Up @@ -536,7 +536,7 @@ impl Access for FsBackend {
}
};

let rd = FsLister::new(&self.core.root, f);
let rd = FsLister::new(&self.core.root, path, f);

Ok((RpList::default(), Some(rd)))
}
Expand Down
22 changes: 18 additions & 4 deletions core/src/services/fs/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,26 @@
// specific language governing permissions and limitations
// under the License.

use std::path::Path;
use std::path::PathBuf;

use crate::raw::*;
use crate::EntryMode;
use crate::Metadata;
use crate::Result;
use std::path::Path;
use std::path::PathBuf;

pub struct FsLister<P> {
root: PathBuf,

current_path: Option<String>,

rd: P,
}

impl<P> FsLister<P> {
pub fn new(root: &Path, rd: P) -> Self {
pub fn new(root: &Path, path: &str, rd: P) -> Self {
Self {
root: root.to_owned(),
current_path: Some(path.to_string()),
rd,
}
}
Expand All @@ -45,6 +47,12 @@ unsafe impl<P> Sync for FsLister<P> {}

impl oio::List for FsLister<tokio::fs::ReadDir> {
async fn next(&mut self) -> Result<Option<oio::Entry>> {
// since list should return path itself, we return it first
if let Some(path) = self.current_path.take() {
let e = oio::Entry::new(path.as_str(), Metadata::new(EntryMode::DIR));
return Ok(Some(e));
}

let Some(de) = self.rd.next_entry().await.map_err(new_std_io_error)? else {
return Ok(None);
};
Expand Down Expand Up @@ -75,6 +83,12 @@ impl oio::List for FsLister<tokio::fs::ReadDir> {

impl oio::BlockingList for FsLister<std::fs::ReadDir> {
fn next(&mut self) -> Result<Option<oio::Entry>> {
// since list should return path itself, we return it first
if let Some(path) = self.current_path.take() {
let e = oio::Entry::new(path.as_str(), Metadata::new(EntryMode::DIR));
return Ok(Some(e));
}

let de = match self.rd.next() {
Some(de) => de.map_err(new_std_io_error)?,
None => return Ok(None),
Expand Down
24 changes: 19 additions & 5 deletions core/src/services/s3/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl oio::PageList for S3Lister {
&ctx.token,
self.delimiter,
self.limit,
// State after should only be set for the first page.
// start after should only be set for the first page.
if ctx.token.is_empty() {
self.start_after.clone()
} else {
Expand All @@ -90,7 +90,7 @@ impl oio::PageList for S3Lister {
//
// - Check `is_truncated`
// - Check `next_continuation_token`
// - Check the length of `common_prefixes` and `contents` (very rarely case)
// - Check the length of `common_prefixes` and `contents` (very rare case)
ctx.done = if let Some(is_truncated) = output.is_truncated {
!is_truncated
} else if let Some(next_continuation_token) = output.next_continuation_token.as_ref() {
Expand All @@ -100,20 +100,21 @@ impl oio::PageList for S3Lister {
};
ctx.token = output.next_continuation_token.clone().unwrap_or_default();

let mut prefix_existing = false;
for prefix in output.common_prefixes {
let de = oio::Entry::new(
&build_rel_path(&self.core.root, &prefix.prefix),
Metadata::new(EntryMode::DIR),
);

ctx.entries.push_back(de);
prefix_existing = true;
}

for object in output.contents {
let path = build_rel_path(&self.core.root, &object.key);

// s3 could return the dir itself in contents.
if path == self.path || path.is_empty() {
if path.is_empty() {
continue;
}

Expand All @@ -125,12 +126,25 @@ impl oio::PageList for S3Lister {
}
meta.set_content_length(object.size);

// object.last_modified provides more precious time that contains
// object.last_modified provides more precise time that contains
// nanosecond, let's trim them.
meta.set_last_modified(parse_datetime_from_rfc3339(object.last_modified.as_str())?);

if path == self.path {
ctx.path_added = true;
}
let de = oio::Entry::with(path, meta);
ctx.entries.push_back(de);

prefix_existing = true;
}

// if the path is dir, but hasn't created yet and exists only as the prefix of a key,
// add it manually
if prefix_existing && self.path.ends_with("/") && !ctx.path_added {
let entry = oio::Entry::new(self.path.as_str(), Metadata::new(EntryMode::DIR));
ctx.entries.push_front(entry);
ctx.path_added = true;
}

Ok(())
Expand Down
Loading

0 comments on commit d40e03b

Please sign in to comment.