From ec62f8013686c9194b2ab2af23a495c56c1add34 Mon Sep 17 00:00:00 2001 From: Jerome Gravel-Niquet Date: Tue, 19 Sep 2023 13:51:42 -0400 Subject: [PATCH] Chunk changes by byte size instead of number of changes (#57) * chunk changes by size instead of number of changes * smaller chunk sizes when syncing * try larger ones I guess * I think this is going to be better than send. feed items into the sink and flush (close) at the end only * need to flush when writing the initial sink messages * sync speed tweaks again, remove sink just send raw into stream * bring back chunk sent metric --- crates/corro-agent/src/agent.rs | 4 +- crates/corro-agent/src/api/peer.rs | 186 +++++++++-------------- crates/corro-agent/src/api/public/mod.rs | 33 ++-- crates/corro-api-types/src/lib.rs | 28 ++++ 4 files changed, 126 insertions(+), 125 deletions(-) diff --git a/crates/corro-agent/src/agent.rs b/crates/corro-agent/src/agent.rs index 2285fb3e..0e75b9e8 100644 --- a/crates/corro-agent/src/agent.rs +++ b/crates/corro-agent/src/agent.rs @@ -1607,7 +1607,7 @@ async fn process_fully_buffered_changes( Ok(inserted) } -pub async fn process_single_version( +pub async fn process_single_change( agent: &Agent, change: ChangeV1, ) -> Result, ChangeError> { @@ -1900,7 +1900,7 @@ async fn process_msg( Ok(match bcast { BroadcastV1::Change(change) => { let actor_id = change.actor_id; - let changeset = process_single_version(agent, change).await?; + let changeset = process_single_change(agent, change).await?; changeset.map(|changeset| { BroadcastV1::Change(ChangeV1 { diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index 26a662cc..e6ea3df4 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -5,28 +5,27 @@ use std::ops::RangeInclusive; use std::sync::Arc; use std::time::Duration; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::{Buf, BufMut, BytesMut}; use corro_types::agent::{Agent, KnownDbVersion, SplitPool}; use corro_types::broadcast::{ChangeV1, Changeset}; use corro_types::change::row_to_change; use corro_types::config::{GossipConfig, TlsClientConfig}; use corro_types::sync::{SyncMessage, SyncMessageEncodeError, SyncMessageV1, SyncStateV1}; -use futures::{Sink, SinkExt, Stream, TryFutureExt}; +use futures::{Stream, TryFutureExt}; use metrics::counter; use quinn::{RecvStream, SendStream}; use rusqlite::params; use speedy::Writable; -use tokio::sync::mpsc::{self, channel, Sender}; +use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio::sync::mpsc::{channel, Sender}; use tokio::task::block_in_place; -use tokio::time::interval; -use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; -use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; +use tokio_util::codec::{Encoder, FramedRead, LengthDelimitedCodec}; use tracing::{debug, error, info, trace, warn}; use tripwire::{Outcome, TimeoutFutureExt}; -use crate::agent::{process_single_version, SyncRecvError}; -use crate::api::public::{ChunkedChanges, MAX_CHANGES_PER_MESSAGE}; +use crate::agent::{process_single_change, SyncRecvError}; +use crate::api::public::ChunkedChanges; use corro_types::{ actor::ActorId, @@ -320,6 +319,8 @@ impl rustls::client::ServerCertVerifier for SkipServerVerification { } } +const MAX_CHANGES_BYTES_PER_MESSAGE: usize = 64 * 1024; + async fn process_range( booked: &Booked, pool: &SplitPool, @@ -422,8 +423,15 @@ async fn process_version( row_to_change, )?; - let chunked = - ChunkedChanges::new(rows, *start_seq, *end_seq, MAX_CHANGES_PER_MESSAGE); + // FIXME: at this point, we don't need to lock anymore, I don't think! + + let chunked = ChunkedChanges::new( + rows, + *start_seq, + *end_seq, + // TODO: make this adaptive based on how long it takes to send changes + MAX_CHANGES_BYTES_PER_MESSAGE, + ); for changes_seqs in chunked { match changes_seqs { Ok((changes, seqs)) => { @@ -446,7 +454,7 @@ async fn process_version( }, }, ))) - .with_timeout(Duration::from_secs(2)) + .with_timeout(Duration::from_secs(5)) .await { error!("timed out sending chunk of changes"); @@ -513,7 +521,7 @@ async fn process_version( rows, *start_seq, *end_seq, - MAX_CHANGES_PER_MESSAGE, + MAX_CHANGES_BYTES_PER_MESSAGE, ); for changes_seqs in chunked { match changes_seqs { @@ -659,29 +667,24 @@ async fn process_sync( Ok(()) } -fn append_write_buf(buf: &mut BytesMut, msg: SyncMessage) -> Result<(), SyncSendError> { - msg.write_to_stream(buf.writer()) - .map_err(SyncMessageEncodeError::from)?; - Ok(()) -} - -async fn write_sync_msg + Unpin>( +async fn write_sync_msg( + codec: &mut LengthDelimitedCodec, buf: &mut BytesMut, msg: SyncMessage, write: &mut W, ) -> Result<(), SyncSendError> { - append_write_buf(buf, msg)?; - send_sync_write_buffer(buf, write).await -} + msg.write_to_stream(buf.writer()) + .map_err(SyncMessageEncodeError::from)?; -async fn send_sync_write_buffer + Unpin>( - buf: &mut BytesMut, - write: &mut W, -) -> Result<(), SyncSendError> { - let buf_len = buf.len(); - write.send(buf.split().freeze()).await?; + codec.encode(buf.split().freeze(), buf)?; - counter!("corro.sync.chunk.sent.bytes", buf_len as u64); + while buf.has_remaining() { + let n = write.write_buf(buf).await?; + if n == 0 { + break; + } + counter!("corro.sync.chunk.sent.bytes", n as u64); + } Ok(()) } @@ -691,10 +694,13 @@ pub async fn read_sync_msg> + Unpin>( ) -> Result, SyncRecvError> { match tokio::time::timeout(Duration::from_secs(5), read.next()).await { Ok(Some(buf_res)) => match buf_res { - Ok(mut buf) => match SyncMessage::from_buf(&mut buf) { - Ok(msg) => Ok(Some(msg)), - Err(e) => Err(SyncRecvError::from(e)), - }, + Ok(mut buf) => { + counter!("corro.sync.chunk.recv.bytes", buf.len() as u64); + match SyncMessage::from_buf(&mut buf) { + Ok(msg) => Ok(Some(msg)), + Err(e) => Err(SyncRecvError::from(e)), + } + } Err(e) => Err(SyncRecvError::from(e)), }, Ok(None) => Ok(None), @@ -707,21 +713,23 @@ pub async fn bidirectional_sync( our_sync_state: SyncStateV1, their_sync_state: Option, read: RecvStream, - write: SendStream, + mut write: SendStream, ) -> Result { - let (tx, mut rx) = channel::(256); + let (tx, mut rx) = channel::(128); let mut read = FramedRead::new(read, LengthDelimitedCodec::new()); - let mut write = FramedWrite::new(write, LengthDelimitedCodec::new()); + let mut codec = LengthDelimitedCodec::new(); let mut send_buf = BytesMut::new(); write_sync_msg( + &mut codec, &mut send_buf, SyncMessage::V1(SyncMessageV1::State(our_sync_state)), &mut write, ) .await?; + write.flush().await.map_err(SyncSendError::from)?; let their_sync_state = match their_sync_state { Some(state) => state, @@ -735,11 +743,13 @@ pub async fn bidirectional_sync( let their_actor_id = their_sync_state.actor_id; write_sync_msg( + &mut codec, &mut send_buf, SyncMessage::V1(SyncMessageV1::Clock(agent.clock().new_timestamp().into())), &mut write, ) .await?; + write.flush().await.map_err(SyncSendError::from)?; match read_sync_msg(&mut read).await? { Some(SyncMessage::V1(SyncMessageV1::Clock(ts))) => { @@ -769,42 +779,14 @@ pub async fn bidirectional_sync( async move { let mut count = 0; - let mut flush_interval = interval(Duration::from_millis(200)); - - loop { - tokio::select! { - maybe_msg = rx.recv() => match maybe_msg { - Some(msg) => { - if let SyncMessage::V1(SyncMessageV1::Changeset(change)) = &msg { - count += change.len(); - } - append_write_buf(&mut send_buf, msg)?; - }, - None => break, - }, - _ = flush_interval.tick(), if !send_buf.is_empty() => { - let buf_len = send_buf.len(); - send_sync_write_buffer(&mut send_buf, &mut write).await?; - debug!("sent {buf_len} bytes after interval"); - continue; - } - } - - if send_buf.len() >= 2 * 1024 { - let buf_len = send_buf.len(); - send_sync_write_buffer(&mut send_buf, &mut write).await?; - debug!("sent {buf_len} bytes during loop"); + while let Some(msg) = rx.recv().await { + if let SyncMessage::V1(SyncMessageV1::Changeset(change)) = &msg { + count += change.len(); } + write_sync_msg(&mut codec, &mut send_buf, msg, &mut write).await?; } - if !send_buf.is_empty() { - let buf_len = send_buf.len(); - send_sync_write_buffer(&mut send_buf, &mut write).await?; - debug!("sent {buf_len} bytes after loop"); - } - - let mut send = write.into_inner(); - if let Err(e) = send.finish().await { + if let Err(e) = write.finish().await { warn!("could not properly finish QUIC send stream: {e}"); } @@ -817,50 +799,32 @@ pub async fn bidirectional_sync( async move { let mut count = 0; - let (msg_tx, msg_rx) = mpsc::channel(100); - - tokio::spawn(async move { - loop { - match read_sync_msg(&mut read).await { - Ok(None) => { - break; + loop { + match read_sync_msg(&mut read).await { + Ok(None) => { + break; + } + Err(e) => { + error!("sync recv error: {e}"); + break; + } + Ok(Some(msg)) => match msg { + SyncMessage::V1(SyncMessageV1::Changeset(change)) => { + let len = change.len(); + process_single_change(agent, change) + .await + .map_err(SyncRecvError::from)?; + count += len; } - Err(e) => { - error!("sync recv error: {e}"); - break; + SyncMessage::V1(SyncMessageV1::State(_)) => { + warn!("received sync state message more than once, ignoring"); + continue; } - Ok(Some(msg)) => match msg { - SyncMessage::V1(SyncMessageV1::Changeset(change)) => { - if let Err(e) = msg_tx.send(change).await { - error!("could not send into msg channel: {e}"); - break; - } - } - SyncMessage::V1(SyncMessageV1::State(_)) => { - warn!("received sync state message more than once, ignoring"); - continue; - } - SyncMessage::V1(SyncMessageV1::Clock(_)) => { - warn!("received sync clock message more than once, ignoring"); - continue; - } - }, - } - } - }); - - let chunker = - ReceiverStream::new(msg_rx).chunks_timeout(10, Duration::from_millis(500)); - tokio::pin!(chunker); - - while let Some(changes) = chunker.next().await { - for change in changes { - let len = change.len(); - // TODO: make a "process many versions" function that only needs 1 db conn - process_single_version(agent, change) - .await - .map_err(SyncRecvError::from)?; - count += len; + SyncMessage::V1(SyncMessageV1::Clock(_)) => { + warn!("received sync clock message more than once, ignoring"); + continue; + } + }, } } diff --git a/crates/corro-agent/src/api/public/mod.rs b/crates/corro-agent/src/api/public/mod.rs index 0f9f80c1..2c256d8e 100644 --- a/crates/corro-agent/src/api/public/mod.rs +++ b/crates/corro-agent/src/api/public/mod.rs @@ -38,15 +38,14 @@ use crate::agent::process_subs; pub mod pubsub; -pub const MAX_CHANGES_PER_MESSAGE: usize = 50; - pub struct ChunkedChanges { iter: Peekable, changes: Vec, last_pushed_seq: i64, last_start_seq: i64, last_seq: i64, - chunk_size: usize, + max_byte_size: usize, + buffered_size: usize, done: bool, } @@ -54,14 +53,15 @@ impl ChunkedChanges where I: Iterator, { - pub fn new(iter: I, start_seq: i64, last_seq: i64, chunk_size: usize) -> Self { + pub fn new(iter: I, start_seq: i64, last_seq: i64, max_byte_size: usize) -> Self { Self { iter: iter.peekable(), changes: vec![], last_pushed_seq: 0, last_start_seq: start_seq, last_seq, - chunk_size, + max_byte_size, + buffered_size: 0, done: false, } } @@ -79,6 +79,11 @@ where return None; } + debug_assert!(self.changes.is_empty()); + + // reset the buffered size + self.buffered_size = 0; + loop { trace!("chunking through the rows iterator"); match self.iter.next() { @@ -87,6 +92,8 @@ where self.last_pushed_seq = change.seq; + self.buffered_size += change.estimated_byte_size(); + self.changes.push(change); if self.last_pushed_seq == self.last_seq { @@ -94,7 +101,7 @@ where break; } - if self.changes.len() >= self.chunk_size { + if self.buffered_size >= self.max_byte_size { // chunking it up let start_seq = self.last_start_seq; @@ -131,6 +138,8 @@ where } } +const MAX_CHANGES_BYTE_SIZE: usize = 8 * 1024; + pub async fn make_broadcastable_changes( agent: &Agent, f: F, @@ -223,7 +232,7 @@ where ORDER BY seq ASC "#)?; let rows = prepped.query_map([db_version], row_to_change)?; - let chunked = ChunkedChanges::new(rows, 0, last_seq, MAX_CHANGES_PER_MESSAGE); + let chunked = ChunkedChanges::new(rows, 0, last_seq, MAX_CHANGES_BYTE_SIZE); for changes_seqs in chunked { match changes_seqs { Ok((changes, seqs)) => { @@ -1070,7 +1079,7 @@ mod tests { .into_iter(), 0, 100, - 2, + changes[0].estimated_byte_size() + changes[1].estimated_byte_size(), ); assert_eq!( @@ -1087,7 +1096,7 @@ mod tests { vec![Ok(changes[0].clone()), Ok(changes[1].clone())].into_iter(), 0, 0, - 1, + changes[0].estimated_byte_size(), ); assert_eq!(chunker.next(), Some(Ok((vec![changes[0].clone()], 0..=0)))); @@ -1098,7 +1107,7 @@ mod tests { vec![Ok(changes[0].clone()), Ok(changes[2].clone())].into_iter(), 0, 100, - 2, + changes[0].estimated_byte_size() + changes[2].estimated_byte_size(), ); assert_eq!( @@ -1119,7 +1128,7 @@ mod tests { .into_iter(), 0, 100, - 50, + 100000, // just send them all! ); assert_eq!( @@ -1148,7 +1157,7 @@ mod tests { .into_iter(), 0, 10, - 2, + changes[2].estimated_byte_size() + changes[4].estimated_byte_size(), ); assert_eq!( diff --git a/crates/corro-api-types/src/lib.rs b/crates/corro-api-types/src/lib.rs index c30c38b9..53d41f18 100644 --- a/crates/corro-api-types/src/lib.rs +++ b/crates/corro-api-types/src/lib.rs @@ -72,6 +72,24 @@ pub struct Change { pub cl: i64, } +impl Change { + // this is an ESTIMATE, it should give a rough idea of how many bytes will + // be required on the wire + pub fn estimated_byte_size(&self) -> usize { + self.table.len() + self.pk.len() + self.cid.len() + self.val.estimated_byte_size() + + // col_version + 8 + + // db_version + 8 + + // seq + 8 + + // site_id + 16 + + // cl + 8 + } +} + pub fn row_to_change(row: &Row) -> Result { Ok(Change { table: row.get(0)?, @@ -293,6 +311,16 @@ impl SqliteValue { SqliteValue::Blob(v) => SqliteValueRef::Blob(v.as_slice()), } } + + pub fn estimated_byte_size(&self) -> usize { + 1 + match self { + SqliteValue::Null => 1, + SqliteValue::Integer(_) => 8, + SqliteValue::Real(_) => 8, + SqliteValue::Text(t) => 4 + t.len(), + SqliteValue::Blob(v) => 4 + v.len(), + } + } } impl From<&str> for SqliteValue {