Skip to content

Commit

Permalink
prioritize recent versions during sync
Browse files Browse the repository at this point in the history
  • Loading branch information
somtochiama committed Nov 1, 2024
1 parent 6fb772b commit 6b0ac51
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 17 deletions.
13 changes: 10 additions & 3 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -863,15 +863,22 @@ pub async fn handle_changes(
continue;
}

// drop items when the queue is full.
// drop old items when the queue is full.
if queue.len() > max_queue_len {
let change = queue.pop_front();
if let Some(change) = change {
for v in change.0.versions() {
let _ = seen.remove(&(change.0.actor_id, v));
}
}

drop_log_count += 1;
if is_pow_10(drop_log_count) {
if drop_log_count == 1 {
warn!("dropping a change because changes queue is full");
warn!("dropped an old change because changes queue is full");
} else {
warn!(
"dropping {} changes because changes queue is full",
"droppped {} old changes because changes queue is full",
drop_log_count
);
}
Expand Down
25 changes: 12 additions & 13 deletions crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use futures::{Future, Stream, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use metrics::counter;
use quinn::{RecvStream, SendStream};
use rand::seq::SliceRandom;
use rangemap::{RangeInclusiveMap, RangeInclusiveSet};
use rusqlite::{named_params, Connection};
use speedy::Writable;
Expand Down Expand Up @@ -405,6 +404,7 @@ fn handle_need(
-- [:start]---[end_version]---[:end]
( end_version BETWEEN :start AND :end )
)
ORDER BY start_version DESC
",
)?;

Expand Down Expand Up @@ -1179,7 +1179,6 @@ pub async fn parallel_sync(
let len = syncers.len();

let (readers, mut servers) = {
let mut rng = rand::thread_rng();
syncers.into_iter().fold(
(Vec::with_capacity(len), Vec::with_capacity(len)),
|(mut readers, mut servers), (actor_id, addr, needs, tx, read)| {
Expand All @@ -1191,6 +1190,7 @@ pub async fn parallel_sync(

trace!(%actor_id, "needs: {needs:?}");


debug!(%actor_id, %addr, "needs len: {}", needs.values().map(|needs| needs.iter().map(|need| match need {
SyncNeedV1::Full {versions} => (versions.end().0 - versions.start().0) as usize + 1,
SyncNeedV1::Partial {..} => 0,
Expand All @@ -1200,7 +1200,7 @@ pub async fn parallel_sync(
let actor_needs = needs
.into_iter()
.flat_map(|(actor_id, needs)| {
let mut needs: Vec<_> = needs
let needs: Vec<_> = needs
.into_iter()
.flat_map(|need| match need {
// chunk the versions, sometimes it's 0..=1000000 and that's far too big for a chunk!
Expand All @@ -1213,7 +1213,6 @@ pub async fn parallel_sync(
.collect();

// NOTE: IMPORTANT! shuffle the vec so we don't keep looping over the same later on
needs.shuffle(&mut rng);

needs
.into_iter()
Expand Down Expand Up @@ -1265,7 +1264,7 @@ pub async fn parallel_sync(
let mut drained = 0;

while drained < 10 {
let (actor_id, need) = match needs.pop_front() {
let (actor_id, need) = match needs.pop_back() {
Some(popped) => popped,
None => {
break;
Expand Down Expand Up @@ -2018,8 +2017,8 @@ mod tests {
SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 {
actor_id,
changeset: Changeset::Full {
version: Version(2),
changes: vec![change2],
version: Version(3),
changes: vec![change3.clone()],
seqs: CrsqlSeq(0)..=CrsqlSeq(0),
last_seq: CrsqlSeq(0),
ts,
Expand All @@ -2033,8 +2032,8 @@ mod tests {
SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 {
actor_id,
changeset: Changeset::Full {
version: Version(3),
changes: vec![change3.clone()],
version: Version(2),
changes: vec![change2.clone()],
seqs: CrsqlSeq(0)..=CrsqlSeq(0),
last_seq: CrsqlSeq(0),
ts,
Expand Down Expand Up @@ -2186,8 +2185,8 @@ mod tests {
SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 {
actor_id,
changeset: Changeset::Full {
version: Version(3),
changes: vec![change3],
version: Version(4),
changes: vec![change4],
seqs: CrsqlSeq(0)..=CrsqlSeq(0),
last_seq: CrsqlSeq(0),
ts,
Expand All @@ -2201,8 +2200,8 @@ mod tests {
SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 {
actor_id,
changeset: Changeset::Full {
version: Version(4),
changes: vec![change4],
version: Version(3),
changes: vec![change3],
seqs: CrsqlSeq(0)..=CrsqlSeq(0),
last_seq: CrsqlSeq(0),
ts,
Expand Down
2 changes: 1 addition & 1 deletion crates/corro-agent/src/broadcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ pub fn runtime_loop(
}
}

for mut pending in to_broadcast.drain(..) {
for mut pending in to_broadcast.drain(..).rev() {
trace!("{} to broadcast: {pending:?}", actor_id);

let (member_count, max_transmissions) = {
Expand Down

0 comments on commit 6b0ac51

Please sign in to comment.