Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Pluggable gc with exemptions #17

Merged
merged 9 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 66 additions & 53 deletions src/net_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
#![allow(missing_docs)]

use std::{
collections::BTreeMap,
collections::{BTreeMap, BTreeSet},
fmt::Debug,
ops::DerefMut,
sync::{Arc, OnceLock},
};

use anyhow::{anyhow, Result};
use anyhow::{anyhow, bail, Result};
use futures_lite::future::Boxed as BoxedFuture;
use futures_util::future::BoxFuture;
use iroh::{endpoint::Connecting, protocol::ProtocolHandler, Endpoint, NodeAddr};
use iroh_base::hash::{BlobFormat, Hash};
use serde::{Deserialize, Serialize};
Expand All @@ -23,27 +25,32 @@ use crate::{
Stats,
},
provider::EventSender,
store::GcConfig,
util::{
local_pool::LocalPoolHandle,
local_pool::{self, LocalPoolHandle},
progress::{AsyncChannelProgressSender, ProgressSender},
SetTagOption,
},
HashAndFormat, TempTag,
};

// pub type ProtectCb = Box<dyn Fn(&mut BTreeSet<Hash>) -> BoxFuture<()> + Send + Sync>;
//
// #[derive(derive_more::Debug)]
// enum GcState {
// Initial(#[debug(skip)] Vec<ProtectCb>),
// Started(#[allow(dead_code)] Option<local_pool::Run<()>>),
// }
//
// impl Default for GcState {
// fn default() -> Self {
// Self::Initial(Vec::new())
// }
// }
/// A callback that blobs can ask about a set of hashes that should not be garbage collected.
pub type ProtectCb = Box<dyn Fn(&mut BTreeSet<Hash>) -> BoxFuture<()> + Send + Sync>;

/// The state of the gc loop.
#[derive(derive_more::Debug)]
enum GcState {
// Gc loop is not yet running. Other protocols can add protect callbacks
Initial(#[debug(skip)] Vec<ProtectCb>),
// Gc loop is running. No more protect callbacks can be added.
Started(#[allow(dead_code)] Option<local_pool::Run<()>>),
}

impl Default for GcState {
fn default() -> Self {
Self::Initial(Vec::new())
}
}

#[derive(Debug)]
pub struct Blobs<S> {
Expand All @@ -53,6 +60,7 @@ pub struct Blobs<S> {
downloader: Downloader,
batches: tokio::sync::Mutex<BlobBatches>,
endpoint: Endpoint,
gc_state: Arc<std::sync::Mutex<GcState>>,
#[cfg(feature = "rpc")]
pub(crate) rpc_handler: Arc<OnceLock<crate::rpc::RpcHandler>>,
}
Expand Down Expand Up @@ -184,6 +192,7 @@ impl<S: crate::store::Store> Blobs<S> {
downloader,
endpoint,
batches: Default::default(),
gc_state: Default::default(),
#[cfg(feature = "rpc")]
rpc_handler: Arc::new(OnceLock::new()),
}
Expand All @@ -205,43 +214,47 @@ impl<S: crate::store::Store> Blobs<S> {
&self.endpoint
}

// pub fn add_protected(&self, cb: ProtectCb) -> Result<()> {
// let mut state = self.gc_state.lock().unwrap();
// match &mut *state {
// GcState::Initial(cbs) => {
// cbs.push(cb);
// }
// GcState::Started(_) => {
// anyhow::bail!("cannot add protected blobs after gc has started");
// }
// }
// Ok(())
// }
//
// pub fn start_gc(&self, config: GcConfig) -> Result<()> {
// let mut state = self.gc_state.lock().unwrap();
// let protected = match state.deref_mut() {
// GcState::Initial(items) => std::mem::take(items),
// GcState::Started(_) => anyhow::bail!("gc already started"),
// };
// let protected = Arc::new(protected);
// let protected_cb = move || {
// let protected = protected.clone();
// async move {
// let mut set = BTreeSet::new();
// for cb in protected.iter() {
// cb(&mut set).await;
// }
// set
// }
// };
// let store = self.store.clone();
// let run = self
// .rt
// .spawn(move || async move { store.gc_run(config, protected_cb).await });
// *state = GcState::Started(Some(run));
// Ok(())
// }
/// Add a callback that will be called before the garbage collector runs.
///
/// This can only be called before the garbage collector has started, otherwise it will return an error.
pub fn add_protected(&self, cb: ProtectCb) -> Result<()> {
let mut state = self.gc_state.lock().unwrap();
match &mut *state {
GcState::Initial(cbs) => {
cbs.push(cb);
}
GcState::Started(_) => {
anyhow::bail!("cannot add protected blobs after gc has started");
}
}
Ok(())
}

/// Start garbage collection with the given settings.
pub fn start_gc(&self, config: GcConfig) -> Result<()> {
let mut state = self.gc_state.lock().unwrap();
let protected = match state.deref_mut() {
GcState::Initial(items) => std::mem::take(items),
GcState::Started(_) => bail!("gc already started"),
};
let protected = Arc::new(protected);
let protected_cb = move || {
let protected = protected.clone();
async move {
let mut set = BTreeSet::new();
for cb in protected.iter() {
cb(&mut set).await;
}
set
}
};
let store = self.store.clone();
let run = self
.rt
.spawn(move || async move { store.gc_run(config, protected_cb).await });
*state = GcState::Started(Some(run));
Ok(())
}

pub(crate) async fn batches(&self) -> tokio::sync::MutexGuard<'_, BlobBatches> {
self.batches.lock().await
Expand Down
72 changes: 72 additions & 0 deletions tests/blobs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#![cfg(feature = "net_protocol")]
use std::{
sync::{Arc, Mutex},
time::Duration,
};

use iroh::Endpoint;
use iroh_blobs::{net_protocol::Blobs, store::GcConfig, util::local_pool::LocalPool};
use testresult::TestResult;

#[tokio::test]
async fn blobs_gc_smoke() -> TestResult<()> {
let pool = LocalPool::default();
let endpoint = Endpoint::builder().bind().await?;
let blobs = Blobs::memory().build(pool.handle(), &endpoint);
let client = blobs.clone().client();
blobs.start_gc(GcConfig {
period: Duration::from_millis(1),
done_callback: None,
})?;
let h1 = client.add_bytes(b"test".to_vec()).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(client.has(h1.hash).await?);
client.tags().delete(h1.tag).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(!client.has(h1.hash).await?);
Ok(())
}

#[tokio::test]
async fn blobs_gc_protected() -> TestResult<()> {
let pool = LocalPool::default();
let endpoint = Endpoint::builder().bind().await?;
let blobs = Blobs::memory().build(pool.handle(), &endpoint);
let client: iroh_blobs::rpc::client::blobs::Client<
quic_rpc::transport::flume::FlumeConnector<
iroh_blobs::rpc::proto::Response,
iroh_blobs::rpc::proto::Request,
>,
> = blobs.clone().client();
let h1 = client.add_bytes(b"test".to_vec()).await?;
let protected = Arc::new(Mutex::new(Vec::new()));
blobs.add_protected(Box::new({
let protected = protected.clone();
move |x| {
let protected = protected.clone();
Box::pin(async move {
let protected = protected.lock().unwrap();
for h in protected.as_slice() {
x.insert(*h);
}
})
}
}))?;
blobs.start_gc(GcConfig {
period: Duration::from_millis(1),
done_callback: None,
})?;
tokio::time::sleep(Duration::from_millis(100)).await;
// protected from gc due to tag
assert!(client.has(h1.hash).await?);
protected.lock().unwrap().push(h1.hash);
client.tags().delete(h1.tag).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
// protected from gc due to being in protected set
assert!(client.has(h1.hash).await?);
protected.lock().unwrap().clear();
tokio::time::sleep(Duration::from_millis(100)).await;
// not protected, must be gone
assert!(!client.has(h1.hash).await?);
Ok(())
}
Loading