Skip to content

Commit

Permalink
Made AIO backend API available
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Kolena committed Nov 8, 2020
1 parent a5941b0 commit 84aa176
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 23 deletions.
8 changes: 4 additions & 4 deletions lib/src/aio/b2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::config;

// TODO: make a thread, that keeps updating
// a timestamp file on the backend
struct Lock {
pub struct Lock {
path: PathBuf,
}

Expand All @@ -35,17 +35,17 @@ impl Lock {
impl aio::Lock for Lock {}

#[derive(Debug)]
pub(crate) struct B2 {
pub struct B2 {
cred: B2Credentials,
bucket: String,
}

struct Auth {
pub struct Auth {
auth: B2Authorization,
upload_auth: UploadAuthorization,
}

struct B2Thread {
pub struct B2Thread {
cred: B2Credentials,
auth: RefCell<Option<Auth>>,
client: Client,
Expand Down
6 changes: 3 additions & 3 deletions lib/src/aio/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ use sgdata::SGData;
/// A lock held on the backend
///
/// It doesn't do much, except unlock on `drop`.
pub(crate) trait Lock {}
pub trait Lock {}

/// Backend API
///
/// Backend is thread-safe, and the actual work
/// is implemented by per-thread instances of it.
pub(crate) trait Backend: Send + Sync {
pub trait Backend: Send + Sync {
/// Lock the repository exclusively
///
/// Use to protect operations that are potentially destructive,
Expand All @@ -29,7 +29,7 @@ pub(crate) trait Backend: Send + Sync {
fn new_thread(&self) -> io::Result<Box<dyn BackendThread>>;
}

pub(crate) trait BackendThread: Send {
pub trait BackendThread: Send {
fn remove_dir_all(&mut self, path: PathBuf) -> io::Result<()>;

fn rename(
Expand Down
10 changes: 5 additions & 5 deletions lib/src/aio/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ pub(crate) fn lock_file_path(path: &Path) -> PathBuf {
}

#[derive(Debug)]
pub(crate) struct Local {
pub struct Local {
path: PathBuf,
}

#[derive(Debug)]
struct LocalThread {
pub struct LocalThread {
path: PathBuf,
rand_ext: String,
}
Expand Down Expand Up @@ -64,7 +64,7 @@ impl Backend for Local {
}

impl Local {
pub(crate) fn new(path: PathBuf) -> Self {
pub fn new(path: PathBuf) -> Self {
Local { path }
}
}
Expand Down Expand Up @@ -151,8 +151,8 @@ impl BackendThread for LocalThread {
let path = self.path.join(path);
let md = fs::metadata(&path)?;
Ok(Metadata {
_len: md.len(),
_is_file: md.is_file(),
len: md.len(),
is_file: md.is_file(),
})
}

Expand Down
14 changes: 8 additions & 6 deletions lib/src/aio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@ use std::sync::{Arc, Mutex};
use std::{io, thread};

use dangerous_option::DangerousOption as AutoOption;
use serde::{Deserialize, Serialize};
use sgdata::SGData;
use slog::{o, trace};
use slog::{Level, Logger};
use slog_perf::TimeReporter;
use url::Url;

mod local;
pub(crate) mod local;
pub(crate) use self::local::Local;
mod b2;
pub(crate) mod b2;
pub(crate) use self::b2::B2;

mod backend;
pub(crate) mod backend;
use self::backend::*;

// {{{ Misc
Expand All @@ -29,9 +30,10 @@ struct WriteArgs {
complete_tx: Option<mpsc::Sender<io::Result<()>>>,
}

pub(crate) struct Metadata {
_len: u64,
_is_file: bool,
#[derive(Debug, Serialize, Deserialize)]
pub struct Metadata {
pub len: u64,
pub is_file: bool,
}

/// A result of async io operation
Expand Down
63 changes: 58 additions & 5 deletions lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,33 @@ mod misc;
use self::misc::*;
// }}}

// Fancy reexport of backends API and particular backends structs
pub mod backends {
pub use crate::aio::backend::{Backend, BackendThread, Lock};
pub use crate::aio::Metadata;

pub mod local {
pub use crate::aio::local::{Local, LocalThread};
}

pub mod b2 {
pub use crate::aio::b2::{Auth, B2Thread, Lock, B2};
}
}

type ArcDecrypter = Arc<dyn encryption::Decrypter + Send + Sync + 'static>;
type ArcEncrypter = Arc<dyn encryption::Encrypter + Send + Sync + 'static>;

const INGRESS_BUFFER_SIZE: usize = 128 * 1024;
const DIGEST_SIZE: usize = 32;

/// Type of user provided closure that will ask user for a passphrase is needed
type PassphraseFn<'a> = &'a dyn Fn() -> io::Result<String>;
pub type PassphraseFn<'a> = &'a dyn Fn() -> io::Result<String>;

/// Type of user provided closure that will find backend based on URL
pub type BackendSelectFn = &'static (dyn Fn(&Url) -> io::Result<Box<dyn backends::Backend + Send + Sync>>
+ Send
+ Sync);

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
/// Data type (index/data)
Expand Down Expand Up @@ -111,6 +130,7 @@ pub struct EncryptHandle {
#[derive(Clone)]
pub struct Repo {
url: Url,
backend_select: BackendSelectFn,
config: config::Repo,

compression: compression::ArcCompression,
Expand Down Expand Up @@ -168,14 +188,41 @@ impl Repo {
settings: settings::Repo,
log: L,
) -> Result<Repo>
where
L: Into<Option<Logger>>,
{
Self::init_custom(
url,
&aio::backend_from_url,
passphrase,
settings,
log,
)
}

pub fn open<L>(url: &Url, log: L) -> Result<Repo>
where
L: Into<Option<Logger>>,
{
Self::open_custom(url, &aio::backend_from_url, log)
}

/// Create new rdedup repository
pub fn init_custom<L>(
url: &Url,
backend_select: BackendSelectFn,
passphrase: PassphraseFn<'_>,
settings: settings::Repo,
log: L,
) -> Result<Repo>
where
L: Into<Option<Logger>>,
{
let log = log
.into()
.unwrap_or_else(|| Logger::root(slog::Discard, o!()));

let backend = aio::backend_from_url(url)?;
let backend = backend_select(&url)?;
let aio = aio::AsyncIO::new(backend, log.clone())?;

Repo::ensure_repo_empty_or_new(&aio)?;
Expand All @@ -187,6 +234,7 @@ impl Repo {

Ok(Repo {
url: url.clone(),
backend_select,
config,
compression,
hasher,
Expand All @@ -195,15 +243,19 @@ impl Repo {
})
}

pub fn open<L>(url: &Url, log: L) -> Result<Repo>
pub fn open_custom<L>(
url: &Url,
backend_select: BackendSelectFn,
log: L,
) -> Result<Repo>
where
L: Into<Option<Logger>>,
{
let log = log
.into()
.unwrap_or_else(|| Logger::root(slog::Discard, o!()));

let backend = aio::backend_from_url(url)?;
let backend = backend_select(url)?;
let aio = aio::AsyncIO::new(backend, log.clone())?;

let config = config::Repo::read(&aio)?;
Expand All @@ -212,6 +264,7 @@ impl Repo {
let hasher = config.hashing.to_hasher();
Ok(Repo {
url: url.clone(),
backend_select,
config,
compression,
hasher,
Expand Down Expand Up @@ -786,7 +839,7 @@ impl Repo {
let (chunker_tx, chunker_rx) =
mpsc::sync_channel(self.write_cpu_thread_num());

let backend = backend_from_url(&self.url)?;
let backend = (self.backend_select)(&self.url)?;
let aio = aio::AsyncIO::new(backend, self.log.clone())?;

let stats = aio.stats();
Expand Down

0 comments on commit 84aa176

Please sign in to comment.