Skip to content
This repository has been archived by the owner on Apr 29, 2024. It is now read-only.

Commit

Permalink
node: Ensure dequeued fetches are correct
Browse files Browse the repository at this point in the history
Dequeued fetches were always fetching all refs, instead of fetching the
specific refs that were initially requested. We fix that by storing the
refs to fetch in the fetch queue.
  • Loading branch information
cloudhead committed Jan 31, 2024
1 parent f38d2b2 commit 15d1709
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 104 deletions.
78 changes: 64 additions & 14 deletions radicle-node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ struct QueuedFetch {
rid: RepoId,
/// Peer being fetched from.
from: NodeId,
/// Refs being fetched.
refs_at: Vec<RefsAt>,
/// Result channel.
channel: Option<chan::Sender<FetchResult>>,
}
Expand Down Expand Up @@ -791,8 +793,9 @@ where
from: NodeId,
refs: NonEmpty<RefsAt>,
timeout: time::Duration,
channel: Option<chan::Sender<FetchResult>>,
) {
self._fetch(rid, from, refs.into(), timeout, None)
self._fetch(rid, from, refs.into(), timeout, channel)
}

/// Initiate an outgoing fetch for some repository.
Expand All @@ -814,7 +817,7 @@ where
timeout: time::Duration,
channel: Option<chan::Sender<FetchResult>>,
) {
match self.try_fetch(rid, &from, refs_at, timeout) {
match self.try_fetch(rid, &from, refs_at.clone(), timeout) {
Ok(fetching) => {
if let Some(c) = channel {
fetching.subscribe(c);
Expand All @@ -832,12 +835,22 @@ where
}
} else {
debug!(target: "service", "Queueing fetch for {rid} with {from}..");
self.queue.push_back(QueuedFetch { rid, from, channel });
self.queue.push_back(QueuedFetch {
rid,
refs_at,
from,
channel,
});
}
}
Err(TryFetchError::SessionCapacityReached) => {
debug!(target: "service", "Fetch capacity reached for {from}, queueing {rid}..");
self.queue.push_back(QueuedFetch { rid, from, channel });
self.queue.push_back(QueuedFetch {
rid,
refs_at,
from,
channel,
});
}
Err(e) => {
if let Some(c) = channel {
Expand All @@ -850,6 +863,7 @@ where
}
}

// TODO: Buffer/throttle fetches.
fn try_fetch(
&mut self,
rid: RepoId,
Expand All @@ -863,6 +877,8 @@ where
};
let fetching = self.fetching.entry(rid);

trace!(target: "service", "Trying to fetch {refs_at:?} for {rid}..");

if let Entry::Occupied(fetching) = fetching {
// We're already fetching this repo from some peer.
return Err(TryFetchError::AlreadyFetching(fetching.into_mut()));
Expand Down Expand Up @@ -991,10 +1007,40 @@ where
/// 1. The RID was already being fetched.
/// 2. The session was already at fetch capacity.
pub fn dequeue_fetch(&mut self) {
if let Some(QueuedFetch { rid, from, channel }) = self.queue.pop_front() {
while let Some(QueuedFetch {
rid,
from,
refs_at,
channel,
}) = self.queue.pop_front()
{
debug!(target: "service", "Dequeued fetch for {rid} from session {from}..");

self.fetch(rid, from, FETCH_TIMEOUT, channel);
// If no refs are specified, always do a full fetch.
if refs_at.is_empty() {
self.fetch(rid, from, FETCH_TIMEOUT, channel);
return;
}

let repo_entry = self
.policies
.seed_policy(&rid)
.expect("Service::dequeue_fetch: error accessing repo seeding configuration");

match self.refs_status_of(rid, refs_at, &repo_entry.scope) {
Ok(status) => {
if let Some(refs) = NonEmpty::from_vec(status.fresh) {
self.fetch_refs_at(rid, from, refs, FETCH_TIMEOUT, channel);
return;
} else {
debug!(target: "service", "Skipping dequeued fetch for {rid}, all refs are already in local storage");
}
}
Err(e) => {
error!(target: "service", "Error getting the refs status of {rid}: {e}");
return;
}
}
}
}

Expand Down Expand Up @@ -1330,13 +1376,15 @@ where
}
}

// TODO: Buffer/throttle fetches.
let repo_entry = self.policies.seed_policy(&message.rid).expect(
"Service::handle_announcement: error accessing repo seeding configuration",
);

if repo_entry.policy == Policy::Allow {
let (fresh, stale) = match self.refs_status_of(message, &repo_entry.scope) {
let (fresh, stale) = match self.refs_status_of(
message.rid,
message.refs.clone().into(),
&repo_entry.scope,
) {
Ok(RefsStatus { fresh, stale }) => (fresh, stale),
Err(e) => {
error!(target: "service", "Failed to check refs status: {e}");
Expand Down Expand Up @@ -1392,7 +1440,9 @@ where
// Finally, if there's anything to fetch, we fetch it from the
// remote.
if let Some(fresh) = NonEmpty::from_vec(fresh) {
self.fetch_refs_at(message.rid, remote.id, fresh, FETCH_TIMEOUT);
self.fetch_refs_at(message.rid, remote.id, fresh, FETCH_TIMEOUT, None);
} else {
debug!(target: "service", "Skipping fetch, all refs of {} are already in local storage", message.rid);
}
} else {
trace!(
Expand Down Expand Up @@ -1479,22 +1529,22 @@ where
/// A convenient method to check if we should fetch from a `RefsAnnouncement` with `scope`.
fn refs_status_of(
&self,
message: &RefsAnnouncement,
rid: RepoId,
refs: Vec<RefsAt>,
scope: &policy::Scope,
) -> Result<RefsStatus, Error> {
let mut refs = message.refs_status(&self.storage)?;
let mut refs = RefsStatus::new(rid, refs, &self.storage)?;

// First, check the freshness.
if refs.fresh.is_empty() {
debug!(target: "service", "All refs of {} are already in local storage", &message.rid);
return Ok(refs);
}

// Second, check the scope.
match scope {
policy::Scope::All => Ok(refs),
policy::Scope::Followed => {
match self.policies.namespaces_for(&self.storage, &message.rid) {
match self.policies.namespaces_for(&self.storage, &rid) {
Ok(Namespaces::All) => Ok(refs),
Ok(Namespaces::Followed(mut followed)) => {
// Get the set of followed nodes except self.
Expand Down
54 changes: 28 additions & 26 deletions radicle-node/src/service/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,34 @@ pub struct RefsStatus {
}

impl RefsStatus {
/// Get the set of `fresh` and `stale` `RefsAt`'s for the given
/// announcement.
pub fn new<S: ReadStorage>(
rid: RepoId,
refs: Vec<RefsAt>,
storage: S,
) -> Result<RefsStatus, storage::Error> {
let repo = match storage.repository(rid) {
// If the repo doesn't exist, we consider this
// announcement "fresh", since we obviously don't
// have the refs.
Err(e) if e.is_not_found() => {
return Ok(RefsStatus {
fresh: refs.clone(),
stale: Vec::new(),
})
}
Err(e) => return Err(e),
Ok(r) => r,
};

let mut status = RefsStatus::default();
for theirs in refs.iter() {
status.insert(*theirs, &repo)?;
}
Ok(status)
}

fn insert<S: ReadRepository>(
&mut self,
theirs: RefsAt,
Expand Down Expand Up @@ -216,32 +244,6 @@ impl RefsStatus {
}
}

impl RefsAnnouncement {
/// Get the set of `fresh` and `stale` `RefsAt`'s for the given
/// announcement.
pub fn refs_status<S: ReadStorage>(&self, storage: S) -> Result<RefsStatus, storage::Error> {
let repo = match storage.repository(self.rid) {
// If the repo doesn't exist, we consider this
// announcement "fresh", since we obviously don't
// have the refs.
Err(e) if e.is_not_found() => {
return Ok(RefsStatus {
fresh: self.refs.clone().into(),
stale: Vec::new(),
})
}
Err(e) => return Err(e),
Ok(r) => r,
};

let mut status = RefsStatus::default();
for theirs in self.refs.iter() {
status.insert(*theirs, &repo)?;
}
Ok(status)
}
}

/// Node announcing its inventory to the network.
/// This should be the whole inventory every time.
#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down
18 changes: 13 additions & 5 deletions radicle-node/src/test/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use radicle::node::address::Store as _;
use radicle::node::Database;
use radicle::node::{address, Alias, ConnectOptions};
use radicle::rad;
use radicle::storage::refs::RefsAt;
use radicle::storage::refs::{RefsAt, SignedRefsAt};
use radicle::storage::{ReadRepository, RemoteRepository};
use radicle::Storage;

Expand Down Expand Up @@ -319,14 +319,22 @@ where
}
}

let ann = AnnouncementMessage::from(RefsAnnouncement {
self.announcement(RefsAnnouncement {
rid,
refs,
timestamp: self.timestamp(),
});
let msg = ann.signed(self.signer());
})
}

msg.into()
pub fn announcement(&self, ann: impl Into<AnnouncementMessage>) -> Message {
ann.into().signed(self.signer()).into()
}

pub fn signed_refs_at(&self, refs: Refs, at: radicle::git::Oid) -> SignedRefsAt {
SignedRefsAt {
sigrefs: refs.signed(self.signer()).unwrap(),
at,
}
}

pub fn connect_from(&mut self, peer: &Self) {
Expand Down
Loading

0 comments on commit 15d1709

Please sign in to comment.