From 84aa1764db6c968c7f7fd63c9c8467a0e9a30616 Mon Sep 17 00:00:00 2001 From: Jan Kolena Date: Sun, 8 Nov 2020 18:23:22 +0100 Subject: [PATCH] Made AIO backend API available --- lib/src/aio/b2.rs | 8 +++--- lib/src/aio/backend.rs | 6 ++-- lib/src/aio/local.rs | 10 +++---- lib/src/aio/mod.rs | 14 ++++++---- lib/src/lib.rs | 63 ++++++++++++++++++++++++++++++++++++++---- 5 files changed, 78 insertions(+), 23 deletions(-) diff --git a/lib/src/aio/b2.rs b/lib/src/aio/b2.rs index b8986163..4e923e40 100644 --- a/lib/src/aio/b2.rs +++ b/lib/src/aio/b2.rs @@ -22,7 +22,7 @@ use crate::config; // TODO: make a thread, that keeps updating // a timestamp file on the backend -struct Lock { +pub struct Lock { path: PathBuf, } @@ -35,17 +35,17 @@ impl Lock { impl aio::Lock for Lock {} #[derive(Debug)] -pub(crate) struct B2 { +pub struct B2 { cred: B2Credentials, bucket: String, } -struct Auth { +pub struct Auth { auth: B2Authorization, upload_auth: UploadAuthorization, } -struct B2Thread { +pub struct B2Thread { cred: B2Credentials, auth: RefCell>, client: Client, diff --git a/lib/src/aio/backend.rs b/lib/src/aio/backend.rs index 5284a9b3..d9cce8ef 100644 --- a/lib/src/aio/backend.rs +++ b/lib/src/aio/backend.rs @@ -7,13 +7,13 @@ use sgdata::SGData; /// A lock held on the backend /// /// It doesn't do much, except unlock on `drop`. -pub(crate) trait Lock {} +pub trait Lock {} /// Backend API /// /// Backend is thread-safe, and the actual work /// is implemented by per-thread instances of it. -pub(crate) trait Backend: Send + Sync { +pub trait Backend: Send + Sync { /// Lock the repository exclusively /// /// Use to protect operations that are potentially destructive, @@ -29,7 +29,7 @@ pub(crate) trait Backend: Send + Sync { fn new_thread(&self) -> io::Result>; } -pub(crate) trait BackendThread: Send { +pub trait BackendThread: Send { fn remove_dir_all(&mut self, path: PathBuf) -> io::Result<()>; fn rename( diff --git a/lib/src/aio/local.rs b/lib/src/aio/local.rs index 559103cf..04d29922 100644 --- a/lib/src/aio/local.rs +++ b/lib/src/aio/local.rs @@ -23,12 +23,12 @@ pub(crate) fn lock_file_path(path: &Path) -> PathBuf { } #[derive(Debug)] -pub(crate) struct Local { +pub struct Local { path: PathBuf, } #[derive(Debug)] -struct LocalThread { +pub struct LocalThread { path: PathBuf, rand_ext: String, } @@ -64,7 +64,7 @@ impl Backend for Local { } impl Local { - pub(crate) fn new(path: PathBuf) -> Self { + pub fn new(path: PathBuf) -> Self { Local { path } } } @@ -151,8 +151,8 @@ impl BackendThread for LocalThread { let path = self.path.join(path); let md = fs::metadata(&path)?; Ok(Metadata { - _len: md.len(), - _is_file: md.is_file(), + len: md.len(), + is_file: md.is_file(), }) } diff --git a/lib/src/aio/mod.rs b/lib/src/aio/mod.rs index 60872863..841b16e0 100644 --- a/lib/src/aio/mod.rs +++ b/lib/src/aio/mod.rs @@ -7,18 +7,19 @@ use std::sync::{Arc, Mutex}; use std::{io, thread}; use dangerous_option::DangerousOption as AutoOption; +use serde::{Deserialize, Serialize}; use sgdata::SGData; use slog::{o, trace}; use slog::{Level, Logger}; use slog_perf::TimeReporter; use url::Url; -mod local; +pub(crate) mod local; pub(crate) use self::local::Local; -mod b2; +pub(crate) mod b2; pub(crate) use self::b2::B2; -mod backend; +pub(crate) mod backend; use self::backend::*; // {{{ Misc @@ -29,9 +30,10 @@ struct WriteArgs { complete_tx: Option>>, } -pub(crate) struct Metadata { - _len: u64, - _is_file: bool, +#[derive(Debug, Serialize, Deserialize)] +pub struct Metadata { + pub len: u64, + pub is_file: bool, } /// A result of async io operation diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 56e052c1..281a70a5 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -56,6 +56,20 @@ mod misc; use self::misc::*; // }}} +// Fancy reexport of backends API and particular backends structs +pub mod backends { + pub use crate::aio::backend::{Backend, BackendThread, Lock}; + pub use crate::aio::Metadata; + + pub mod local { + pub use crate::aio::local::{Local, LocalThread}; + } + + pub mod b2 { + pub use crate::aio::b2::{Auth, B2Thread, Lock, B2}; + } +} + type ArcDecrypter = Arc; type ArcEncrypter = Arc; @@ -63,7 +77,12 @@ const INGRESS_BUFFER_SIZE: usize = 128 * 1024; const DIGEST_SIZE: usize = 32; /// Type of user provided closure that will ask user for a passphrase is needed -type PassphraseFn<'a> = &'a dyn Fn() -> io::Result; +pub type PassphraseFn<'a> = &'a dyn Fn() -> io::Result; + +/// Type of user provided closure that will find backend based on URL +pub type BackendSelectFn = &'static (dyn Fn(&Url) -> io::Result> + + Send + + Sync); #[derive(Copy, Clone, Debug, PartialEq, Eq)] /// Data type (index/data) @@ -111,6 +130,7 @@ pub struct EncryptHandle { #[derive(Clone)] pub struct Repo { url: Url, + backend_select: BackendSelectFn, config: config::Repo, compression: compression::ArcCompression, @@ -168,6 +188,33 @@ impl Repo { settings: settings::Repo, log: L, ) -> Result + where + L: Into>, + { + Self::init_custom( + url, + &aio::backend_from_url, + passphrase, + settings, + log, + ) + } + + pub fn open(url: &Url, log: L) -> Result + where + L: Into>, + { + Self::open_custom(url, &aio::backend_from_url, log) + } + + /// Create new rdedup repository + pub fn init_custom( + url: &Url, + backend_select: BackendSelectFn, + passphrase: PassphraseFn<'_>, + settings: settings::Repo, + log: L, + ) -> Result where L: Into>, { @@ -175,7 +222,7 @@ impl Repo { .into() .unwrap_or_else(|| Logger::root(slog::Discard, o!())); - let backend = aio::backend_from_url(url)?; + let backend = backend_select(&url)?; let aio = aio::AsyncIO::new(backend, log.clone())?; Repo::ensure_repo_empty_or_new(&aio)?; @@ -187,6 +234,7 @@ impl Repo { Ok(Repo { url: url.clone(), + backend_select, config, compression, hasher, @@ -195,7 +243,11 @@ impl Repo { }) } - pub fn open(url: &Url, log: L) -> Result + pub fn open_custom( + url: &Url, + backend_select: BackendSelectFn, + log: L, + ) -> Result where L: Into>, { @@ -203,7 +255,7 @@ impl Repo { .into() .unwrap_or_else(|| Logger::root(slog::Discard, o!())); - let backend = aio::backend_from_url(url)?; + let backend = backend_select(url)?; let aio = aio::AsyncIO::new(backend, log.clone())?; let config = config::Repo::read(&aio)?; @@ -212,6 +264,7 @@ impl Repo { let hasher = config.hashing.to_hasher(); Ok(Repo { url: url.clone(), + backend_select, config, compression, hasher, @@ -786,7 +839,7 @@ impl Repo { let (chunker_tx, chunker_rx) = mpsc::sync_channel(self.write_cpu_thread_num()); - let backend = backend_from_url(&self.url)?; + let backend = (self.backend_select)(&self.url)?; let aio = aio::AsyncIO::new(backend, self.log.clone())?; let stats = aio.stats();