Skip to content

Commit

Permalink
Chunk changes by byte size instead of number of changes (#57)
Browse files Browse the repository at this point in the history
* chunk changes by size instead of number of changes

* smaller chunk sizes when syncing

* try larger ones I guess

* I think this is going to be better than send. feed items into the sink and flush (close) at the end only

* need to flush when writing the initial sink messages

* sync speed tweaks again, remove sink just send raw into stream

* bring back chunk sent metric
  • Loading branch information
jeromegn authored Sep 19, 2023
1 parent f10aca8 commit ec62f80
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 125 deletions.
4 changes: 2 additions & 2 deletions crates/corro-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1607,7 +1607,7 @@ async fn process_fully_buffered_changes(
Ok(inserted)
}

pub async fn process_single_version(
pub async fn process_single_change(
agent: &Agent,
change: ChangeV1,
) -> Result<Option<Changeset>, ChangeError> {
Expand Down Expand Up @@ -1900,7 +1900,7 @@ async fn process_msg(
Ok(match bcast {
BroadcastV1::Change(change) => {
let actor_id = change.actor_id;
let changeset = process_single_version(agent, change).await?;
let changeset = process_single_change(agent, change).await?;

changeset.map(|changeset| {
BroadcastV1::Change(ChangeV1 {
Expand Down
186 changes: 75 additions & 111 deletions crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,27 @@ use std::ops::RangeInclusive;
use std::sync::Arc;
use std::time::Duration;

use bytes::{BufMut, Bytes, BytesMut};
use bytes::{Buf, BufMut, BytesMut};
use corro_types::agent::{Agent, KnownDbVersion, SplitPool};
use corro_types::broadcast::{ChangeV1, Changeset};
use corro_types::change::row_to_change;
use corro_types::config::{GossipConfig, TlsClientConfig};
use corro_types::sync::{SyncMessage, SyncMessageEncodeError, SyncMessageV1, SyncStateV1};
use futures::{Sink, SinkExt, Stream, TryFutureExt};
use futures::{Stream, TryFutureExt};
use metrics::counter;
use quinn::{RecvStream, SendStream};
use rusqlite::params;
use speedy::Writable;
use tokio::sync::mpsc::{self, channel, Sender};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{channel, Sender};
use tokio::task::block_in_place;
use tokio::time::interval;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use tokio_util::codec::{Encoder, FramedRead, LengthDelimitedCodec};
use tracing::{debug, error, info, trace, warn};
use tripwire::{Outcome, TimeoutFutureExt};

use crate::agent::{process_single_version, SyncRecvError};
use crate::api::public::{ChunkedChanges, MAX_CHANGES_PER_MESSAGE};
use crate::agent::{process_single_change, SyncRecvError};
use crate::api::public::ChunkedChanges;

use corro_types::{
actor::ActorId,
Expand Down Expand Up @@ -320,6 +319,8 @@ impl rustls::client::ServerCertVerifier for SkipServerVerification {
}
}

const MAX_CHANGES_BYTES_PER_MESSAGE: usize = 64 * 1024;

async fn process_range(
booked: &Booked,
pool: &SplitPool,
Expand Down Expand Up @@ -422,8 +423,15 @@ async fn process_version(
row_to_change,
)?;

let chunked =
ChunkedChanges::new(rows, *start_seq, *end_seq, MAX_CHANGES_PER_MESSAGE);
// FIXME: at this point, we don't need to lock anymore, I don't think!

let chunked = ChunkedChanges::new(
rows,
*start_seq,
*end_seq,
// TODO: make this adaptive based on how long it takes to send changes
MAX_CHANGES_BYTES_PER_MESSAGE,
);
for changes_seqs in chunked {
match changes_seqs {
Ok((changes, seqs)) => {
Expand All @@ -446,7 +454,7 @@ async fn process_version(
},
},
)))
.with_timeout(Duration::from_secs(2))
.with_timeout(Duration::from_secs(5))
.await
{
error!("timed out sending chunk of changes");
Expand Down Expand Up @@ -513,7 +521,7 @@ async fn process_version(
rows,
*start_seq,
*end_seq,
MAX_CHANGES_PER_MESSAGE,
MAX_CHANGES_BYTES_PER_MESSAGE,
);
for changes_seqs in chunked {
match changes_seqs {
Expand Down Expand Up @@ -659,29 +667,24 @@ async fn process_sync(
Ok(())
}

fn append_write_buf(buf: &mut BytesMut, msg: SyncMessage) -> Result<(), SyncSendError> {
msg.write_to_stream(buf.writer())
.map_err(SyncMessageEncodeError::from)?;
Ok(())
}

async fn write_sync_msg<W: Sink<Bytes, Error = std::io::Error> + Unpin>(
async fn write_sync_msg<W: AsyncWrite + Unpin>(
codec: &mut LengthDelimitedCodec,
buf: &mut BytesMut,
msg: SyncMessage,
write: &mut W,
) -> Result<(), SyncSendError> {
append_write_buf(buf, msg)?;
send_sync_write_buffer(buf, write).await
}
msg.write_to_stream(buf.writer())
.map_err(SyncMessageEncodeError::from)?;

async fn send_sync_write_buffer<W: Sink<Bytes, Error = std::io::Error> + Unpin>(
buf: &mut BytesMut,
write: &mut W,
) -> Result<(), SyncSendError> {
let buf_len = buf.len();
write.send(buf.split().freeze()).await?;
codec.encode(buf.split().freeze(), buf)?;

counter!("corro.sync.chunk.sent.bytes", buf_len as u64);
while buf.has_remaining() {
let n = write.write_buf(buf).await?;
if n == 0 {
break;
}
counter!("corro.sync.chunk.sent.bytes", n as u64);
}

Ok(())
}
Expand All @@ -691,10 +694,13 @@ pub async fn read_sync_msg<R: Stream<Item = std::io::Result<BytesMut>> + Unpin>(
) -> Result<Option<SyncMessage>, SyncRecvError> {
match tokio::time::timeout(Duration::from_secs(5), read.next()).await {
Ok(Some(buf_res)) => match buf_res {
Ok(mut buf) => match SyncMessage::from_buf(&mut buf) {
Ok(msg) => Ok(Some(msg)),
Err(e) => Err(SyncRecvError::from(e)),
},
Ok(mut buf) => {
counter!("corro.sync.chunk.recv.bytes", buf.len() as u64);
match SyncMessage::from_buf(&mut buf) {
Ok(msg) => Ok(Some(msg)),
Err(e) => Err(SyncRecvError::from(e)),
}
}
Err(e) => Err(SyncRecvError::from(e)),
},
Ok(None) => Ok(None),
Expand All @@ -707,21 +713,23 @@ pub async fn bidirectional_sync(
our_sync_state: SyncStateV1,
their_sync_state: Option<SyncStateV1>,
read: RecvStream,
write: SendStream,
mut write: SendStream,
) -> Result<usize, SyncError> {
let (tx, mut rx) = channel::<SyncMessage>(256);
let (tx, mut rx) = channel::<SyncMessage>(128);

let mut read = FramedRead::new(read, LengthDelimitedCodec::new());
let mut write = FramedWrite::new(write, LengthDelimitedCodec::new());

let mut codec = LengthDelimitedCodec::new();
let mut send_buf = BytesMut::new();

write_sync_msg(
&mut codec,
&mut send_buf,
SyncMessage::V1(SyncMessageV1::State(our_sync_state)),
&mut write,
)
.await?;
write.flush().await.map_err(SyncSendError::from)?;

let their_sync_state = match their_sync_state {
Some(state) => state,
Expand All @@ -735,11 +743,13 @@ pub async fn bidirectional_sync(
let their_actor_id = their_sync_state.actor_id;

write_sync_msg(
&mut codec,
&mut send_buf,
SyncMessage::V1(SyncMessageV1::Clock(agent.clock().new_timestamp().into())),
&mut write,
)
.await?;
write.flush().await.map_err(SyncSendError::from)?;

match read_sync_msg(&mut read).await? {
Some(SyncMessage::V1(SyncMessageV1::Clock(ts))) => {
Expand Down Expand Up @@ -769,42 +779,14 @@ pub async fn bidirectional_sync(
async move {
let mut count = 0;

let mut flush_interval = interval(Duration::from_millis(200));

loop {
tokio::select! {
maybe_msg = rx.recv() => match maybe_msg {
Some(msg) => {
if let SyncMessage::V1(SyncMessageV1::Changeset(change)) = &msg {
count += change.len();
}
append_write_buf(&mut send_buf, msg)?;
},
None => break,
},
_ = flush_interval.tick(), if !send_buf.is_empty() => {
let buf_len = send_buf.len();
send_sync_write_buffer(&mut send_buf, &mut write).await?;
debug!("sent {buf_len} bytes after interval");
continue;
}
}

if send_buf.len() >= 2 * 1024 {
let buf_len = send_buf.len();
send_sync_write_buffer(&mut send_buf, &mut write).await?;
debug!("sent {buf_len} bytes during loop");
while let Some(msg) = rx.recv().await {
if let SyncMessage::V1(SyncMessageV1::Changeset(change)) = &msg {
count += change.len();
}
write_sync_msg(&mut codec, &mut send_buf, msg, &mut write).await?;
}

if !send_buf.is_empty() {
let buf_len = send_buf.len();
send_sync_write_buffer(&mut send_buf, &mut write).await?;
debug!("sent {buf_len} bytes after loop");
}

let mut send = write.into_inner();
if let Err(e) = send.finish().await {
if let Err(e) = write.finish().await {
warn!("could not properly finish QUIC send stream: {e}");
}

Expand All @@ -817,50 +799,32 @@ pub async fn bidirectional_sync(
async move {
let mut count = 0;

let (msg_tx, msg_rx) = mpsc::channel(100);

tokio::spawn(async move {
loop {
match read_sync_msg(&mut read).await {
Ok(None) => {
break;
loop {
match read_sync_msg(&mut read).await {
Ok(None) => {
break;
}
Err(e) => {
error!("sync recv error: {e}");
break;
}
Ok(Some(msg)) => match msg {
SyncMessage::V1(SyncMessageV1::Changeset(change)) => {
let len = change.len();
process_single_change(agent, change)
.await
.map_err(SyncRecvError::from)?;
count += len;
}
Err(e) => {
error!("sync recv error: {e}");
break;
SyncMessage::V1(SyncMessageV1::State(_)) => {
warn!("received sync state message more than once, ignoring");
continue;
}
Ok(Some(msg)) => match msg {
SyncMessage::V1(SyncMessageV1::Changeset(change)) => {
if let Err(e) = msg_tx.send(change).await {
error!("could not send into msg channel: {e}");
break;
}
}
SyncMessage::V1(SyncMessageV1::State(_)) => {
warn!("received sync state message more than once, ignoring");
continue;
}
SyncMessage::V1(SyncMessageV1::Clock(_)) => {
warn!("received sync clock message more than once, ignoring");
continue;
}
},
}
}
});

let chunker =
ReceiverStream::new(msg_rx).chunks_timeout(10, Duration::from_millis(500));
tokio::pin!(chunker);

while let Some(changes) = chunker.next().await {
for change in changes {
let len = change.len();
// TODO: make a "process many versions" function that only needs 1 db conn
process_single_version(agent, change)
.await
.map_err(SyncRecvError::from)?;
count += len;
SyncMessage::V1(SyncMessageV1::Clock(_)) => {
warn!("received sync clock message more than once, ignoring");
continue;
}
},
}
}

Expand Down
Loading

0 comments on commit ec62f80

Please sign in to comment.