Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(services/gdrive): List shows modified timestamp gdrive #5226

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/services/gdrive/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ impl Access for GdriveBackend {
Ok(RpDelete::default())
}

async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let path = build_abs_path(&self.core.root, path);
let l = GdriveLister::new(path, self.core.clone());
let l = GdriveLister::new(path, self.core.clone(), args);
Ok((RpList::default(), oio::PageLister::new(l)))
}

Expand Down
80 changes: 73 additions & 7 deletions core/src/services/gdrive/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,38 @@ use std::sync::Arc;
use http::StatusCode;

use super::core::GdriveCore;
use super::core::GdriveFile;
use super::core::GdriveFileList;
use super::error::parse_error;
use crate::raw::*;
use crate::*;
use bytes::Buf;
use chrono::Utc;

pub struct GdriveLister {
path: String,
core: Arc<GdriveCore>,
op: OpList,
}

async fn stat_file(core: Arc<GdriveCore>, path: &str) -> Result<GdriveFile, Error> {
// reuse gdrive_stat which resolves `file_id` by path via core's `path_cache`.
let resp = core.gdrive_stat(path).await?;

if resp.status() != StatusCode::OK {
return Err(parse_error(resp));
}

let bs = resp.into_body();
let gdrive_file: GdriveFile =
serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;

Ok(gdrive_file)
}

impl GdriveLister {
pub fn new(path: String, core: Arc<GdriveCore>) -> Self {
Self { path, core }
pub fn new(path: String, core: Arc<GdriveCore>, op: OpList) -> Self {
Self { path, core, op }
}
}

Expand Down Expand Up @@ -64,10 +83,32 @@ impl oio::PageList for GdriveLister {
return Ok(());
}

let stat_file_metadata = !self
.op
.metakey()
.is_disjoint(Metakey::ContentLength | Metakey::LastModified);

// Return self at the first page.
if ctx.token.is_empty() && !ctx.done {
let path = build_rel_path(&self.core.root, &self.path);
let e = oio::Entry::new(&path, Metadata::new(EntryMode::DIR));
let mut metadata = Metadata::new(EntryMode::DIR);
if stat_file_metadata {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, sorry for not making the Metakey behavior clearer. (I'm working on this.)

Metakey represents a best effort hint and is not processed server-side. The service merely needs to supply the most comprehensive metadata available during listing.

The Operator will call stat as required, for example:

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Returns `None` if we have errored.
if self.errored {
return Poll::Ready(None);
}
// Trying to pull more tasks if there are more space.
if self.tasks.has_remaining() {
// Building future if we have a lister available.
if let Some(mut lister) = self.lister.take() {
let fut = async move {
let res = lister.next_dyn().await;
(lister, res)
};
self.fut = Some(Box::pin(fut));
}
if let Some(fut) = self.fut.as_mut() {
if let Poll::Ready((lister, entry)) = fut.as_mut().poll(cx) {
self.lister = Some(lister);
self.fut = None;
match entry {
Ok(Some(oe)) => {
let (path, metadata) = oe.into_entry().into_parts();
if metadata.contains_metakey(self.required_metakey) {
self.tasks
.push_back(StatTask::Known(Some((path, metadata))));
} else {
let acc = self.acc.clone();
let fut = async move {
let res = acc.stat(&path, OpStat::default()).await;
(path, res.map(|rp| rp.into_metadata()))
};
self.tasks.push_back(StatTask::Stating(Box::pin(fut)));
}
}
Ok(None) => {
self.lister = None;
}
Err(err) => {
self.errored = true;
return Poll::Ready(Some(Err(err)));
}
}
}
}
}
// Try to poll tasks
if let Some((path, rp)) = ready!(self.tasks.poll_next_unpin(cx)) {
let metadata = rp?;
return Poll::Ready(Some(Ok(Entry::new(path, metadata))));
}
if self.lister.is_some() || self.fut.is_some() {
Poll::Pending
} else {
Poll::Ready(None)
}
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. I get a weird Metadata instance problem, that Gdrivebackend::stat returns metadata with a timestamp but Lister::poll_next gets a result of Metadata without the timestamp.

I will debug it a bit.

let gdrive_file = stat_file(self.core.clone(), &path).await?;
if let Some(v) = gdrive_file.size {
metadata.set_content_length(v.parse::<u64>().map_err(|e| {
Error::new(ErrorKind::Unexpected, "parse content length").set_source(e)
})?);
}
if let Some(v) = gdrive_file.modified_time {
metadata.set_last_modified(v.parse::<chrono::DateTime<Utc>>().map_err(
|e| {
Error::new(ErrorKind::Unexpected, "parse last modified time")
.set_source(e)
},
)?);
}
}
let e = oio::Entry::new(&path, metadata);
ctx.entries.push_back(e);
}

Expand All @@ -90,14 +131,39 @@ impl oio::PageList for GdriveLister {
EntryMode::FILE
};

let root = &self.core.root;
let path = format!("{}{}", &self.path, file.name);
let normalized_path = build_rel_path(root, &path);

// Update path cache with list result.
self.core.path_cache.insert(&path, &file.id).await;
//
// Only cache non-existent entry. When Google Drive converts a format,
// for example, Microsoft Powerpoint, they will be two entries.
// These two entries have the same file id.
if let Ok(None) = self.core.path_cache.get(&path).await {
self.core.path_cache.insert(&path, &file.id).await;
}

let root = &self.core.root;
let normalized_path = build_rel_path(root, &path);

let mut metadata = Metadata::new(file_type);
if stat_file_metadata {
let gdrive_file = stat_file(self.core.clone(), &normalized_path).await?;
if let Some(v) = gdrive_file.size {
metadata.set_content_length(v.parse::<u64>().map_err(|e| {
Error::new(ErrorKind::Unexpected, "parse content length").set_source(e)
})?);
}
if let Some(v) = gdrive_file.modified_time {
metadata.set_last_modified(v.parse::<chrono::DateTime<Utc>>().map_err(
|e| {
Error::new(ErrorKind::Unexpected, "parse last modified time")
.set_source(e)
},
)?);
}
}

let entry = oio::Entry::new(&normalized_path, Metadata::new(file_type));
let entry = oio::Entry::new(&normalized_path, metadata);
ctx.entries.push_back(entry);
}

Expand Down
Loading