Skip to content

docs: Expiring tags example #89

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 185 additions & 0 deletions examples/expiring-tags.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
//! This example shows how to create tags that expire after a certain time.
//!
//! We use a prefix so we can distinguish between expiring and normal tags, and
//! then encode the expiry date in the tag name after the prefix, in a format
//! that sorts in the same order as the expiry date.
//!
//! The example creates a number of blobs and protects them directly or indirectly
//! with expiring tags. Watch as the expired tags are deleted and the blobs
//! are removed from the store.
use std::{
ops::Deref,
time::{Duration, SystemTime},
};

use chrono::Utc;
use futures_lite::StreamExt;
use iroh_blobs::{
api::{blobs::AddBytesOptions, Store, Tag},
hashseq::HashSeq,
store::fs::options::{BatchOptions, GcConfig, InlineOptions, Options, PathOptions},
BlobFormat, Hash,
};
use tokio::signal::ctrl_c;

/// Using an iroh rpc client, create a tag that is marked to expire at `expiry` for all the given hashes.
///
/// The tag name will be `prefix`- followed by the expiry date in iso8601 format (e.g. `expiry-2025-01-01T12:00:00Z`).
async fn create_expiring_tag(
store: &Store,
hashes: &[Hash],
prefix: &str,
expiry: SystemTime,
) -> anyhow::Result<()> {
let expiry = chrono::DateTime::<chrono::Utc>::from(expiry);
let expiry = expiry.to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
let tagname = format!("{prefix}-{expiry}");
if hashes.is_empty() {
return Ok(());
} else if hashes.len() == 1 {
let hash = hashes[0];
store.tags().set(&tagname, hash).await?;
} else {
let hs = hashes.iter().copied().collect::<HashSeq>();
store
.add_bytes_with_opts(AddBytesOptions {
data: hs.into(),
format: BlobFormat::HashSeq,
})
.with_named_tag(&tagname)
.await?;
};
println!("Created tag {tagname}");
Ok(())
}

async fn delete_expired_tags(blobs: &Store, prefix: &str, bulk: bool) -> anyhow::Result<()> {
let prefix = format!("{prefix}-");
let now = chrono::Utc::now();
let end = format!(
"{}-{}",
prefix,
now.to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
);
if bulk {
// delete all tags with the prefix and an expiry date before now
//
// this should be very efficient, since it is just a single database operation
blobs
.tags()
.delete_range(Tag::from(prefix.clone())..Tag::from(end))
.await?;
} else {
// find tags to delete one by one and then delete them
//
// this allows us to print the tags before deleting them
let mut tags = blobs.tags().list().await?;
let mut to_delete = Vec::new();
while let Some(tag) = tags.next().await {
let tag = tag?.name;
if let Some(rest) = tag.0.strip_prefix(prefix.as_bytes()) {
let Ok(expiry) = std::str::from_utf8(rest) else {
tracing::warn!("Tag {} does have non utf8 expiry", tag);
continue;
};
let Ok(expiry) = chrono::DateTime::parse_from_rfc3339(expiry) else {
tracing::warn!("Tag {} does have invalid expiry date", tag);
continue;
};
let expiry = expiry.with_timezone(&Utc);
if expiry < now {
to_delete.push(tag);
}
}
}
for tag in to_delete {
println!("Deleting expired tag {tag}\n");
blobs.tags().delete(tag).await?;
}
}
Ok(())
}

async fn print_store_info(store: &Store) -> anyhow::Result<()> {
let now = chrono::Utc::now();
let mut tags = store.tags().list().await?;
println!(
"Current time: {}",
now.to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
);
println!("Tags:");
while let Some(tag) = tags.next().await {
let tag = tag?;
println!(" {tag:?}");
}
let mut blobs = store.list().stream().await?;
println!("Blobs:");
while let Some(item) = blobs.next().await {
println!(" {}", item?);
}
println!();
Ok(())
}

async fn info_task(store: Store) -> anyhow::Result<()> {
tokio::time::sleep(Duration::from_secs(1)).await;
loop {
print_store_info(&store).await?;
tokio::time::sleep(Duration::from_secs(5)).await;
}
}

async fn delete_expired_tags_task(store: Store, prefix: &str) -> anyhow::Result<()> {
loop {
delete_expired_tags(&store, prefix, false).await?;
tokio::time::sleep(Duration::from_secs(5)).await;
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let path = std::env::current_dir()?.join("blobs");
let options = Options {
path: PathOptions::new(&path),
gc: Some(GcConfig {
interval: Duration::from_secs(10),
}),
inline: InlineOptions::default(),
batch: BatchOptions::default(),
};
let store =
iroh_blobs::store::fs::FsStore::load_with_opts(path.join("blobs.db"), options).await?;

// setup: add some data and tag it
{
// add several blobs and tag them with an expiry date 10 seconds in the future
let batch = store.batch().await?;
let a = batch.add_bytes("blob 1".as_bytes()).await?;
let b = batch.add_bytes("blob 2".as_bytes()).await?;

let expires_at = SystemTime::now()
.checked_add(Duration::from_secs(10))
.unwrap();
create_expiring_tag(&store, &[*a.hash(), *b.hash()], "expiring", expires_at).await?;

// add a single blob and tag it with an expiry date 60 seconds in the future
let c = batch.add_bytes("blob 3".as_bytes()).await?;
let expires_at = SystemTime::now()
.checked_add(Duration::from_secs(60))
.unwrap();
create_expiring_tag(&store, &[*c.hash()], "expiring", expires_at).await?;
// batch goes out of scope, so data is only protected by the tags we created
}

// delete expired tags every 5 seconds
let delete_task = tokio::spawn(delete_expired_tags_task(store.deref().clone(), "expiring"));
// print all tags and blobs every 5 seconds
let info_task = tokio::spawn(info_task(store.deref().clone()));

ctrl_c().await?;
delete_task.abort();
info_task.abort();
store.shutdown().await?;
Ok(())
}
3 changes: 2 additions & 1 deletion src/store/fs/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use std::{
time::Duration,
};

use super::{gc::GcConfig, meta::raw_outboard_size, temp_name};
pub use super::gc::GcConfig;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing export!

use super::{meta::raw_outboard_size, temp_name};
use crate::Hash;

/// Options for directories used by the file store.
Expand Down
Loading