Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
PragmaTwice committed Oct 23, 2024
1 parent c6979e0 commit dc2c4f3
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 22 deletions.
31 changes: 19 additions & 12 deletions core/src/raw/adapters/kv/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,37 @@ use crate::Capability;
use crate::Scheme;
use crate::*;

/// ScanIter is the async iterator returned by `Adapter::scan`.
/// Scan is the async iterator returned by `Adapter::scan`.
pub trait Scan: Send + Sync + Unpin {
/// Fetch the next key in the current key prefix
///
/// `None` means no further key will be returned
fn next(&mut self) -> impl Future<Output = Option<Result<String>>> + MaybeSend;
/// `Ok(None)` means no further key will be returned
fn next(&mut self) -> impl Future<Output = Result<Option<String>>> + MaybeSend;
}

/// A noop implementation of Scan
impl Scan for () {
async fn next(&mut self) -> Option<Result<String>> {
None
async fn next(&mut self) -> Result<Option<String>> {
Ok(None)
}
}

/// A ScanIterator implementation for all trivial non-async iterators
/// A Scan implementation for all trivial non-async iterators
pub struct ScanStdIter<I>(I);

#[cfg(any(
feature = "services-cloudflare-kv",
feature = "services-etcd",
feature = "services-nebula-graph",
feature = "services-rocksdb",
feature = "services-sled"
))]
impl<I> ScanStdIter<I>
where
I: Iterator<Item = Result<String>> + Unpin + Send + Sync,
{
/// Create a new ScanStdIter from an Iterator
pub fn new(inner: I) -> Self {
pub(crate) fn new(inner: I) -> Self {
Self(inner)
}
}
Expand All @@ -58,26 +65,26 @@ impl<I> Scan for ScanStdIter<I>
where
I: Iterator<Item = Result<String>> + Unpin + Send + Sync,
{
async fn next(&mut self) -> Option<Result<String>> {
self.0.next()
async fn next(&mut self) -> Result<Option<String>> {
self.0.next().transpose()
}
}

/// A type-erased wrapper of Scan
pub type Scanner = Box<dyn ScanDyn>;

pub trait ScanDyn: Unpin + Send + Sync {
fn next_dyn(&mut self) -> BoxedFuture<Option<Result<String>>>;
fn next_dyn(&mut self) -> BoxedFuture<Result<Option<String>>>;
}

impl<T: Scan + ?Sized> ScanDyn for T {
fn next_dyn(&mut self) -> BoxedFuture<Option<Result<String>>> {
fn next_dyn(&mut self) -> BoxedFuture<Result<Option<String>>> {
Box::pin(self.next())
}
}

impl<T: ScanDyn + ?Sized> Scan for Box<T> {
async fn next(&mut self) -> Option<Result<String>> {
async fn next(&mut self) -> Result<Option<String>> {
self.deref_mut().next_dyn().await
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/raw/adapters/kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ where
}

async fn inner_next(&mut self) -> Result<Option<oio::Entry>> {
Ok(self.inner.next().await.transpose()?.map(|v| {
Ok(self.inner.next().await?.map(|v| {
let mode = if v.ends_with('/') {
EntryMode::DIR
} else {
Expand Down
9 changes: 8 additions & 1 deletion core/src/raw/adapters/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,14 @@ mod api;
pub use api::Adapter;
pub use api::Metadata;
pub use api::Scan;
pub use api::ScanStdIter;
#[cfg(any(
feature = "services-cloudflare-kv",
feature = "services-etcd",
feature = "services-nebula-graph",
feature = "services-rocksdb",
feature = "services-sled"
))]
pub(crate) use api::ScanStdIter;
pub use api::Scanner;

mod backend;
Expand Down
16 changes: 8 additions & 8 deletions core/src/services/sqlite/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl Adapter {
}

#[self_referencing]
pub struct SqlStream {
pub struct SqliteScanner {
pool: SqlitePool,
query: String,

Expand All @@ -205,24 +205,24 @@ pub struct SqlStream {
stream: BoxStream<'this, Result<String>>,
}

impl Stream for SqlStream {
impl Stream for SqliteScanner {
type Item = Result<String>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.with_stream_mut(|s| s.poll_next_unpin(cx))
}
}

unsafe impl Sync for SqlStream {}
unsafe impl Sync for SqliteScanner {}

impl kv::Scan for SqlStream {
async fn next(&mut self) -> Option<Result<String>> {
<Self as StreamExt>::next(self).await
impl kv::Scan for SqliteScanner {
async fn next(&mut self) -> Result<Option<String>> {
<Self as StreamExt>::next(self).await.transpose()
}
}

impl kv::Adapter for Adapter {
type Scanner = SqlStream;
type Scanner = SqliteScanner;

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Expand Down Expand Up @@ -286,7 +286,7 @@ impl kv::Adapter for Adapter {

async fn scan(&self, path: &str) -> Result<Self::Scanner> {
let pool = self.get_client().await?;
let stream = SqlStreamBuilder {
let stream = SqliteScannerBuilder {
pool: pool.clone(),
query: format!(
"SELECT `{}` FROM `{}` WHERE `{}` LIKE $1",
Expand Down

0 comments on commit dc2c4f3

Please sign in to comment.