Skip to content
This repository has been archived by the owner on Apr 29, 2024. It is now read-only.

Commit

Permalink
fetch: only send changed wants and haves
Browse files Browse the repository at this point in the history
This change limits the amount of `wants` and `haves` data that is sent
to the serving side of a fetch.

When a `RefsAt` announcement is sent, the fetching peer can calculate
the difference between the `rad/sigrefs` they're aware of -- if it
exists -- and the newly advertised `rad/sigrefs`. This means they can
efficiently ask for the `wants` and `haves` of references that have
changed -- saving some data sent over to the serving side.

Signed-off-by: Fintan Halpenny <[email protected]>
X-Clacks-Overhead: GNU Terry Pratchett
  • Loading branch information
FintanH authored and cloudhead committed Mar 12, 2024
1 parent 0f0f9ff commit 3ad2b44
Show file tree
Hide file tree
Showing 10 changed files with 417 additions and 65 deletions.
4 changes: 2 additions & 2 deletions radicle-fetch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use state::{FetchLimit, FetchResult};
pub use transport::Transport;

use radicle::crypto::PublicKey;
use radicle::storage::refs::RefsAt;
use radicle::storage::refs::SignedRefsUpdate;
use radicle::storage::ReadRepository as _;
use state::FetchState;
use thiserror::Error;
Expand Down Expand Up @@ -54,7 +54,7 @@ pub fn pull<S>(
handle: &mut Handle<S>,
limit: FetchLimit,
remote: PublicKey,
refs_at: Option<Vec<RefsAt>>,
refs_at: Option<Vec<SignedRefsUpdate>>,
) -> Result<FetchResult, Error>
where
S: transport::ConnectionStream,
Expand Down
67 changes: 65 additions & 2 deletions radicle-fetch/src/sigrefs.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::collections::{BTreeMap, BTreeSet};
use std::ops::{Deref, Not as _};
use std::ops::{Deref, DerefMut, Not as _};

pub use radicle::storage::refs::SignedRefsAt;
pub use radicle::storage::refs::{DiffedRefs, SignedRefsAt};
pub use radicle::storage::{git::Validation, Validations};

use radicle::storage::refs;
use radicle::{crypto::PublicKey, storage::ValidateRepository};

use crate::state::Cached;
Expand Down Expand Up @@ -150,6 +152,67 @@ impl<'a> IntoIterator for &'a RemoteRefs {
}
}

/// A set of [`DiffedRefs`] per remote `PublicKey`.
///
/// To construct use [`DiffedRefs::load`].
#[derive(Clone, Debug, Default)]
pub struct RemoteDiffedRefs(BTreeMap<PublicKey, DiffedRefs>);

impl RemoteDiffedRefs {
/// Given a set of [`refs::RefsUpdate`]s, compute its
/// [`DiffedRefs`] and use its [`refs::RefsUpdate::remote`] as the
/// key for the `RemoteDiffedRefs` entry.
///
/// If the `remote` is in the `may` set, then it is allowed to
/// fail and will not be inserted in the set iff it does fail to
/// load.
///
/// If the `remote` is in the `must` set, then this method will
/// fail iff loading the `DiffedRefs` fails.
pub(crate) fn load<S>(
cached: &Cached<S>,
updates: Vec<refs::SignedRefsUpdate>,
Select { must, may }: Select,
) -> Result<Self, error::Load> {
updates
.into_iter()
.try_fold(Self::default(), |mut refs, update| {
match cached.load_diffed_refs(&update) {
Ok(diff) => {
refs.insert(update.remote, diff);
Ok(refs)
}
Err(e) if must.contains(&update.remote) => Err(e),
Err(_) if may.contains(&update.remote) => Ok(refs),
Err(e) => Err(e),
}
})
}
}

impl Deref for RemoteDiffedRefs {
type Target = BTreeMap<PublicKey, DiffedRefs>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl DerefMut for RemoteDiffedRefs {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

impl<'a> IntoIterator for &'a RemoteDiffedRefs {
type Item = <&'a BTreeMap<PublicKey, DiffedRefs> as IntoIterator>::Item;
type IntoIter = <&'a BTreeMap<PublicKey, DiffedRefs> as IntoIterator>::IntoIter;

fn into_iter(self) -> Self::IntoIter {
self.0.iter()
}
}

pub struct Select<'a> {
pub must: &'a BTreeSet<PublicKey>,
pub may: &'a BTreeSet<PublicKey>,
Expand Down
119 changes: 113 additions & 6 deletions radicle-fetch/src/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
//! `rad/sigrefs`, for each configured namespace, i.e. followed
//! and delegate peers if the scope is "followed" and all peers is the
//! scope is all.
//! 3. [`DataRefs`]: fetches the `Oid`s for each reference listed in
//! the `rad/sigrefs` for each fetched peer in the
//! [`SpecialRefs`] stage. Additionally, any references that have
//! been removed from `rad/sigrefs` are marked for deletion.
//! 3. [`DataRefs`]/[`DiffedRefs`]: fetches the `Oid`s for each
//! reference listed in the `rad/sigrefs` for each fetched peer
//! in the [`SpecialRefs`] stage. Additionally, any references
//! that have been removed from `rad/sigrefs` are marked for
//! deletion.
//!
//! ### Pull
//!
//! A `pull` is split into two stages:
//!
//! 1. [`SpecialRefs`]: see above.
//! 2. [`DataRefs`]: see above.
//! 2. [`DataRefs`]/[`DiffedRefs`]: see above.
use std::collections::{BTreeMap, BTreeSet, HashSet};

Expand Down Expand Up @@ -380,7 +381,7 @@ impl ProtocolStage for SigrefsAt {
}

/// The [`ProtocolStage`] for fetching data refs from the set of
/// remotes in `trusted`.
/// `remotes`.
///
/// All refs that are listed in the `remotes` sigrefs are checked
/// against our refdb/odb to build a set of `wants` and `haves`. The
Expand Down Expand Up @@ -493,6 +494,112 @@ impl ProtocolStage for DataRefs {
}
}

/// The [`ProtocolStage`] that is similar to [`DataRefs`], however it
/// is aware that it is an update of `rad/sigrefs`. This means that it
/// will only compute `wants` and `haves` based on any modified
/// `Oid`s.
///
/// All refs and objects are prepared for updating as per usual, since
/// we keep track of in-memory references for validation.
#[derive(Debug)]
pub struct DiffedRefs {
/// The node that is being fetched from.
pub remote: PublicKey,
/// The set of signed references from each remote that was
/// fetched.
pub remotes: sigrefs::RemoteDiffedRefs,
/// The data limit for this stage of fetching.
pub limit: u64,
}

impl ProtocolStage for DiffedRefs {
fn ls_refs(&self) -> Option<NonEmpty<BString>> {
None
}

fn ref_filter(&self, _r: Ref) -> Option<ReceivedRef> {
None
}

fn pre_validate(&self, _refs: &[ReceivedRef]) -> Result<(), error::Layout> {
Ok(())
}

fn wants_haves(
&self,
refdb: &Repository,
_refs: &[ReceivedRef],
) -> Result<WantsHaves, error::WantsHaves> {
let mut wants_haves = WantsHaves::default();

for (remote, refs) in &self.remotes {
wants_haves.add(
refdb,
refs.iter().filter_map(|(refname, up)| {
let refname = Qualified::from_refstr(refname)
.map(|refname| refname.with_namespace(Component::from(remote)))?;
let tip = up.modified()?;
Some((refname, *tip))
}),
)?;
}

Ok(wants_haves)
}

fn prepare_updates<'a>(
&self,
_s: &FetchState,
_repo: &Repository,
_refs: &'a [ReceivedRef],
) -> Result<Updates<'a>, error::Prepare> {
use radicle::storage::refs::Update::{Added, Changed, Deleted, Same};

let mut updates = Updates::default();
let prefix_rad = refname!("refs/rad");

for (remote, refs) in &self.remotes {
for (name, up) in refs.iter() {
let is_refs_rad = name.starts_with(prefix_rad.as_str());
let tracking: Namespaced<'_> = Qualified::from_refstr(name)
.and_then(|q| refs::ReceivedRefname::remote(*remote, q).to_namespaced())
.expect("we checked sigrefs well-formedness in wants_refs already");
match up {
Added { oid } | Changed { oid } => updates.add(
*remote,
Update::Direct {
name: tracking,
target: *oid,
no_ff: Policy::Allow,
},
),
Deleted { oid } if !is_refs_rad => updates.add(
*remote,
Update::Prune {
name: tracking,
prev: either::Left(*oid),
},
),
// N.b. create an update for this reference so
// that the in-memory refdb is updated.
Same { oid } => updates.add(
*remote,
Update::Direct {
name: tracking,
target: *oid,
no_ff: Policy::Allow,
},
),
// N.b. `refs/rad` is not subject to pruning.
Deleted { .. } => continue,
}
}
}

Ok(updates)
}
}

// N.b. the `delegates` are the delegates of the repository, with the
// potential removal of the local peer in the case of a `pull`.
fn special_refs_updates<'a>(
Expand Down
Loading

0 comments on commit 3ad2b44

Please sign in to comment.