diff --git a/Cargo.lock b/Cargo.lock index ac19d1e2..34ff8c08 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -379,6 +379,70 @@ dependencies = [ "serde", ] +[[package]] +name = "build-info" +version = "0.0.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b301350c1c448e35b896f32b68c49c8ecd969a71978fbafc4ebd09ec3f4eee2" +dependencies = [ + "build-info-common", + "build-info-proc", + "once_cell", +] + +[[package]] +name = "build-info-build" +version = "0.0.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b314717755dd6a06fc11ad3f7909ba4c0ae2ab516f5cb0404fe924c71bfc7d0" +dependencies = [ + "anyhow", + "base64 0.21.0", + "bincode", + "build-info-common", + "cargo_metadata", + "chrono", + "glob", + "once_cell", + "pretty_assertions", + "rustc_version", + "serde_json", + "xz2", +] + +[[package]] +name = "build-info-common" +version = "0.0.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e040d36472d40ec9424c36a7b54be589072e605596b6f20b0c56c5230b460cc" +dependencies = [ + "chrono", + "derive_more", + "semver", + "serde", +] + +[[package]] +name = "build-info-proc" +version = "0.0.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffd5f241ddd417436c48d35da9869480891449ddd1ae3fd483bbcfbae741a422" +dependencies = [ + "anyhow", + "base64 0.21.0", + "bincode", + "build-info-common", + "chrono", + "num-bigint", + "num-traits", + "proc-macro-error", + "proc-macro2", + "quote", + "serde_json", + "syn 2.0.28", + "xz2", +] + [[package]] name = "bumpalo" version = "3.12.0" @@ -406,6 +470,29 @@ dependencies = [ "serde", ] +[[package]] +name = "cargo-platform" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cfa25e60aea747ec7e1124f238816749faa93759c6ff5b31f1ccdda137f4479" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo_metadata" +version = "0.15.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eee4243f1f26fc7a42710e7439c149e2b10b05472f88090acce52632f231a73a" +dependencies = [ + "camino", + "cargo-platform", + "semver", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "castaway" version = "0.2.2" @@ -575,6 +662,12 @@ dependencies = [ "webpki", ] +[[package]] +name = "convert_case" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" + [[package]] name = "core-foundation" version = "0.9.3" @@ -629,8 +722,10 @@ dependencies = [ "eyre", "foca", "futures", + "hex", "http-body", "hyper", + "itertools", "metrics", "parking_lot", "quinn", @@ -641,6 +736,7 @@ dependencies = [ "rusqlite", "rustls", "rustls-pemfile", + "seahash", "serde", "serde_json", "spawn", @@ -674,6 +770,7 @@ dependencies = [ "rusqlite", "serde", "smallvec", + "strum", "thiserror", "tokio", ] @@ -793,6 +890,8 @@ dependencies = [ name = "corrosion" version = "0.1.0" dependencies = [ + "build-info", + "build-info-build", "bytes", "camino", "clap", @@ -1053,6 +1152,25 @@ dependencies = [ "rusticata-macros", ] +[[package]] +name = "derive_more" +version = "0.99.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" +dependencies = [ + "convert_case", + "proc-macro2", + "quote", + "rustc_version", + "syn 1.0.109", +] + +[[package]] +name = "diff" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8" + [[package]] name = "difflib" version = "0.4.0" @@ -1061,9 +1179,9 @@ checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" [[package]] name = "digest" -version = "0.10.6" +version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", "crypto-common", @@ -1369,6 +1487,12 @@ version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0a93d233ebf96623465aad4046a8d3aa4da22d4f4beba5388838c8a434bbb4" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "h2" version = "0.3.17" @@ -1878,6 +2002,17 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "lzma-sys" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fda04ab3764e6cde78b9974eec4f779acaba7c4e84b36eca3cf77c581b85d27" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "mach2" version = "0.4.1" @@ -2361,6 +2496,16 @@ dependencies = [ "termtree", ] +[[package]] +name = "pretty_assertions" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af7cee1a6c8a5b9208b3cb1061f10c0cb689087b3d8ce85fb9d2dd7a29b6ba66" +dependencies = [ + "diff", + "yansi", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -2860,6 +3005,9 @@ name = "semver" version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed" +dependencies = [ + "serde", +] [[package]] name = "serde" @@ -4153,6 +4301,15 @@ dependencies = [ "time", ] +[[package]] +name = "xz2" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "388c44dc09d76f1536602ead6d325eb532f5c122f17782bd57fb47baeeb767e2" +dependencies = [ + "lzma-sys", +] + [[package]] name = "yansi" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index 8d0ef53e..4f4c6cc2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,8 @@ async-trait = "0.1.68" axum = { version = "0.6.15", features = ["http2", "ws", "tracing", "headers"] } bb8 = "0.8.0" bincode = "1.3.3" +build-info = "0.0.31" +build-info-build = { version = "0.0.31", default-features = false } bytes = "1.4.0" camino = {version = "1.1.4", features = ["serde1"] } clap = { version = "4.2.4", features = ["derive"] } diff --git a/crates/corro-agent/Cargo.toml b/crates/corro-agent/Cargo.toml index 8b4687f9..4e76a60d 100644 --- a/crates/corro-agent/Cargo.toml +++ b/crates/corro-agent/Cargo.toml @@ -16,7 +16,9 @@ corro-types = { path = "../corro-types" } eyre = { workspace = true } foca = { workspace = true } futures = { workspace = true } +hex = { workspace = true } hyper = { workspace = true } +itertools = { workspace = true } metrics = { workspace = true } parking_lot = { workspace = true } quinn = { workspace = true } @@ -27,6 +29,7 @@ rangemap = { workspace = true } rusqlite = { workspace = true } rustls = { workspace = true } rustls-pemfile = "*" +seahash = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } spawn = { path = "../spawn" } diff --git a/crates/corro-agent/src/agent.rs b/crates/corro-agent/src/agent.rs index 9092718b..100c6fd7 100644 --- a/crates/corro-agent/src/agent.rs +++ b/crates/corro-agent/src/agent.rs @@ -1,6 +1,7 @@ use std::{ - collections::{BTreeSet, HashMap, HashSet}, + collections::{BTreeMap, BTreeSet, HashMap, HashSet}, convert::Infallible, + hash::{Hash, Hasher}, net::SocketAddr, ops::RangeInclusive, sync::{atomic::AtomicI64, Arc}, @@ -27,7 +28,7 @@ use corro_types::{ BiPayload, BiPayloadV1, BroadcastInput, BroadcastV1, ChangeV1, Changeset, FocaInput, Timestamp, UniPayload, UniPayloadV1, }, - change::Change, + change::{Change, SqliteValue}, config::{AuthzConfig, Config, DEFAULT_GOSSIP_PORT}, members::{MemberEvent, Members}, schema::init_schema, @@ -74,6 +75,7 @@ use trust_dns_resolver::{ const MAX_SYNC_BACKOFF: Duration = Duration::from_secs(60); // 1 minute oughta be enough, we're constantly getting broadcasts randomly + targetted const RANDOM_NODES_CHOICES: usize = 10; const COMPACT_BOOKED_INTERVAL: Duration = Duration::from_secs(300); +const ANNOUNCE_INTERVAL: Duration = Duration::from_secs(300); pub struct AgentOptions { actor_id: ActorId, @@ -573,10 +575,14 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { let agent = agent.clone(); let foca_tx = foca_tx.clone(); async move { - let mut interval = tokio::time::interval(Duration::from_secs(300)); + let mut boff = backoff::Backoff::new(10) + .timeout_range(Duration::from_secs(5), Duration::from_secs(120)) + .iter(); + let timer = tokio::time::sleep(Duration::new(0, 0)); + tokio::pin!(timer); loop { - interval.tick().await; + timer.as_mut().await; match generate_bootstrap( agent.config().gossip.bootstrap.as_slice(), @@ -600,6 +606,9 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { error!("could not find nodes to announce ourselves to: {e}"); } } + + let dur = boff.next().unwrap_or(ANNOUNCE_INTERVAL); + timer.as_mut().reset(tokio::time::Instant::now() + dur); } } }); @@ -647,12 +656,13 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { let res = block_in_place(|| { let tx = conn.transaction()?; + let db_versions = versions.keys().copied().collect(); + let to_clear = { - match compact_booked_for_actor(&tx, &versions.keys().copied().collect()) - { + match find_cleared_db_versions_for_actor(&tx, &db_versions) { Ok(to_clear) => { if to_clear.is_empty() { - return Ok(None); + return Ok(()); } to_clear } @@ -671,8 +681,8 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { let cleared_len = to_clear.len(); - for db_version in to_clear { - if let Some(version) = versions.get(&db_version) { + for db_version in to_clear.iter() { + if let Some(version) = versions.get(db_version) { new_copy.insert(*version..=*version, KnownDbVersion::Cleared); } } @@ -699,18 +709,14 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { debug!("compacted in-db version state for actor {actor_id}, deleted: {deleted}, inserted: {inserted}"); - Ok::<_, eyre::Report>(Some((new_copy, cleared_len))) + **bookedw.inner_mut() = new_copy; + debug!("compacted in-memory cache by clearing {cleared_len} db versions for actor {actor_id}, new total: {}", bookedw.inner().len()); + + Ok::<_, eyre::Report>(()) }); - match res { - Ok(Some((new_booked, cleared_len))) => { - **bookedw.inner_mut() = new_booked; - debug!("compacted in-memory cache by clearing {cleared_len} db versions for actor {actor_id}, new total: {}", bookedw.inner().len()); - } - Ok(None) => {} - Err(e) => { - error!("could not compact versions for actor {actor_id}: {e}"); - } + if let Err(e) = res { + error!("could not compact versions for actor {actor_id}: {e}"); } } @@ -870,7 +876,6 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { .inspect(|_| info!("corrosion agent sync loop is done")), ); - let mut metrics_interval = tokio::time::interval(Duration::from_secs(10)); let mut db_cleanup_interval = tokio::time::interval(Duration::from_secs(60 * 15)); tokio::spawn(handle_gossip_to_send(transport, to_send_rx)); @@ -880,6 +885,7 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { foca_tx.clone(), member_events_tx, )); + tokio::spawn(metrics_loop(agent.clone())); let gossip_chunker = ReceiverStream::new(bcast_rx).chunks_timeout(10, Duration::from_millis(500)); @@ -906,10 +912,6 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> { _ = db_cleanup_interval.tick() => { tokio::spawn(handle_db_cleanup(agent.pool().clone()).preemptible(tripwire.clone())); }, - _ = metrics_interval.tick() => { - let agent = agent.clone(); - tokio::spawn(async move { block_in_place(move || collect_metrics(agent)) }); - }, _ = &mut tripwire => { debug!("tripped corrosion"); break; @@ -943,16 +945,27 @@ async fn require_authz( Ok(next.run(request).await) } -fn collect_metrics(agent: Agent) { +const CHECKSUM_SEEDS: [u64; 4] = [ + 0x16f11fe89b0d677c, + 0xb480a793d8e6c86c, + 0x6fe2e5aaf078ebc9, + 0x14f994a4c5259381, +]; + +async fn metrics_loop(agent: Agent) { + let mut metrics_interval = tokio::time::interval(Duration::from_secs(10)); + + loop { + metrics_interval.tick().await; + + block_in_place(|| collect_metrics(&agent)); + } +} + +fn collect_metrics(agent: &Agent) { agent.pool().emit_metrics(); - let tables = agent - .schema() - .read() - .tables - .keys() - .cloned() - .collect::>(); + let schema = agent.schema().read(); let conn = match agent.pool().read_blocking() { Ok(conn) => conn, @@ -962,12 +975,13 @@ fn collect_metrics(agent: Agent) { } }; - for table in tables { - match conn.query_row(&format!("SELECT count(*) FROM {table}"), [], |row| { - row.get::<_, i64>(0) - }) { + for table in schema.tables.keys() { + match conn + .prepare_cached(&format!("SELECT count(*) FROM {table}")) + .and_then(|mut prepped| prepped.query_row([], |row| row.get::<_, i64>(0))) + { Ok(count) => { - gauge!("corro.db.table.rows.total", count as f64, "table" => table); + gauge!("corro.db.table.rows.total", count as f64, "table" => table.clone()); } Err(e) => { error!("could not query count for table {table}: {e}"); @@ -975,6 +989,57 @@ fn collect_metrics(agent: Agent) { } } } + + match conn + .prepare_cached("SELECT actor_id, count(site_id) FROM __corro_members LEFT JOIN __corro_buffered_changes ON site_id = actor_id GROUP BY actor_id") + .and_then(|mut prepped| { + prepped + .query_map((), |row| { + Ok((row.get::<_, ActorId>(0)?, row.get::<_, i64>(1)?)) + }) + .and_then(|mapped| mapped.collect::, _>>()) + }) { + Ok(mapped) => { + for (actor_id, count) in mapped { + gauge!("corro.db.buffered.changes.rows.total", count as f64, "actor_id" => actor_id.to_string()) + } + } + Err(e) => { + error!("could not query count for buffered changes: {e}"); + } + } + + for (name, table) in schema.tables.iter() { + let pks = table.pk.iter().cloned().collect::>().join(","); + + match conn + .prepare_cached(&format!("SELECT * FROM {name} ORDER BY {pks}")) + .and_then(|mut prepped| { + let col_count = prepped.column_count(); + prepped.query(()).and_then(|mut rows| { + let mut hasher = seahash::SeaHasher::with_seeds( + CHECKSUM_SEEDS[0], + CHECKSUM_SEEDS[1], + CHECKSUM_SEEDS[2], + CHECKSUM_SEEDS[3], + ); + while let Ok(Some(row)) = rows.next() { + for idx in 0..col_count { + let v: SqliteValue = row.get(idx)?; + v.hash(&mut hasher); + } + } + Ok(hasher.finish()) + }) + }) { + Ok(hash) => { + gauge!("corro.db.table.checksum", hash as f64, "table" => name.clone()); + } + Err(e) => { + error!("could not query clock table values for hashing {table}: {e}"); + } + } + } } pub async fn handle_change( @@ -1007,7 +1072,7 @@ pub async fn handle_change( } } -fn compact_booked_for_actor( +fn find_cleared_db_versions_for_actor( tx: &Transaction, versions: &BTreeSet, ) -> eyre::Result> { @@ -1396,26 +1461,6 @@ async fn process_fully_buffered_changes( let tx = conn.transaction()?; - let max_db_version: Option = tx - .prepare_cached( - " - SELECT MAX(db_version) - FROM __corro_buffered_changes - WHERE site_id = ? - AND version = ? - ", - )? - .query_row(params![actor_id.as_bytes(), version], |row| row.get(0))?; - debug!(actor = %agent.actor_id(), "max db_version from buffered rows: {max_db_version:?}"); - - let max_db_version = match max_db_version { - None => { - warn!("zero rows to move, aborting!"); - return Ok(false); - } - Some(v) => v, - }; - info!(actor = %agent.actor_id(), "moving buffered changes to crsql_changes (actor: {actor_id}, version: {version}, last_seq: {last_seq})"); let start = Instant::now(); @@ -1457,11 +1502,8 @@ async fn process_fully_buffered_changes( info!(actor = %agent.actor_id(), "rows impacted by changes: {rows_impacted}"); let known_version = if rows_impacted > 0 { - let db_version: i64 = tx.query_row( - "SELECT MAX(?, crsql_db_version() + 1)", - [max_db_version], - |row| row.get(0), - )?; + let db_version: i64 = + tx.query_row("SELECT crsql_next_db_version()", [], |row| row.get(0))?; debug!("db version: {db_version}"); tx.prepare_cached("INSERT INTO __corro_bookkeeping (actor_id, start_version, db_version, last_seq, ts) VALUES (?, ?, ?, ?, ?);")?.execute(params![actor_id, version, db_version, last_seq, ts])?; @@ -1662,13 +1704,14 @@ pub async fn process_single_version( return Ok((changeset, None)); } - let mut db_version: i64 = - tx.query_row("SELECT crsql_db_version() + 1", (), |row| row.get(0))?; - let mut impactful_changeset = vec![]; let mut last_rows_impacted = 0; + let changes_len = changes.len(); + + let mut changes_per_table = BTreeMap::new(); + for change in changes { trace!("inserting change! {change:?}"); tx.prepare_cached( @@ -1696,24 +1739,33 @@ pub async fn process_single_version( if rows_impacted > last_rows_impacted { debug!(actor = %agent.actor_id(), "inserted a the change into crsql_changes"); - db_version = std::cmp::max(change.db_version, db_version); impactful_changeset.push(change); + if let Some(c) = impactful_changeset.last() { + if let Some(counter) = changes_per_table.get_mut(&c.table) { + *counter = *counter + 1; + } else { + changes_per_table.insert(c.table.clone(), 1); + } + } } last_rows_impacted = rows_impacted; } - debug!( - "inserting bookkeeping row for actor {}, version: {}, db_version: {:?}, ts: {:?}", - actor_id, version, db_version, ts - ); - - tx.prepare_cached("INSERT INTO __corro_bookkeeping (actor_id, start_version, db_version, last_seq, ts) VALUES (?, ?, ?, ?, ?);")?.execute(params![actor_id, version, db_version, last_seq, ts])?; - - debug!("inserted bookkeeping row"); + let db_version: i64 = tx + .prepare_cached("SELECT crsql_next_db_version()")? + .query_row((), |row| row.get(0))?; let (known_version, new_changeset, db_version) = if impactful_changeset.is_empty() { + debug!( + "inserting CLEARED bookkeeping row for actor {actor_id}, version: {version}, db_version: {db_version}, ts: {ts:?} (recv changes: {changes_len}, rows impacted: {last_rows_impacted})", + ); + tx.prepare_cached("INSERT INTO __corro_bookkeeping (actor_id, start_version, end_version) VALUES (?, ?, ?);")?.execute(params![actor_id, version, version])?; (KnownDbVersion::Cleared, Changeset::Empty { versions }, None) } else { + debug!( + "inserting bookkeeping row for actor {actor_id}, version: {version}, db_version: {db_version}, ts: {ts:?} (recv changes: {changes_len}, rows impacted: {last_rows_impacted})", + ); + tx.prepare_cached("INSERT INTO __corro_bookkeeping (actor_id, start_version, db_version, last_seq, ts) VALUES (?, ?, ?, ?, ?);")?.execute(params![actor_id, version, db_version, last_seq, ts])?; ( KnownDbVersion::Current { db_version, @@ -1731,8 +1783,14 @@ pub async fn process_single_version( ) }; + debug!("inserted bookkeeping row"); + tx.commit()?; + for (table_name, count) in changes_per_table { + counter!("corro.changes.committed", count, "table" => table_name.to_string(), "source" => "remote"); + } + debug!("committed transaction"); booked_write.insert_many(new_changeset.versions(), known_version); @@ -1829,10 +1887,14 @@ pub enum SyncRecvError { Change(#[from] ChangeError), #[error(transparent)] Io(#[from] std::io::Error), - #[error("unexpected sync message")] - UnexpectedSyncMessage, + #[error("expected sync state message, received something else")] + ExpectedSyncState, #[error("unexpected end of stream")] UnexpectedEndOfStream, + #[error("expected sync clock message, received something else")] + ExpectedClockMessage, + #[error("timed out waiting for sync message")] + TimedOut, } async fn handle_sync(agent: &Agent, transport: &Transport) -> Result<(), SyncClientError> { @@ -1844,79 +1906,77 @@ async fn handle_sync(agent: &Agent, transport: &Transport) -> Result<(), SyncCli gauge!("corro.sync.client.head", *version as f64, "actor_id" => actor_id.to_string()); } - loop { - let (actor_id, addr) = { - let candidates = { - let members = agent.members().read(); - - members - .states - .iter() - .filter(|(id, _state)| **id != agent.actor_id()) - .map(|(id, state)| (*id, state.addr)) - .collect::>() - }; + let (actor_id, addr) = { + let candidates = { + let members = agent.members().read(); - if candidates.is_empty() { - warn!("could not find any good candidate for sync"); - return Err(SyncClientError::NoGoodCandidate); - } + members + .states + .iter() + .filter(|(id, _state)| **id != agent.actor_id()) + .map(|(id, state)| (*id, state.addr)) + .collect::>() + }; - let mut rng = StdRng::from_entropy(); + if candidates.is_empty() { + warn!("could not find any good candidate for sync"); + return Err(SyncClientError::NoGoodCandidate); + } - let mut choices = candidates.into_iter().choose_multiple(&mut rng, 2); + let mut rng = StdRng::from_entropy(); - choices.sort_by(|a, b| { - sync_state - .need_len_for_actor(&b.0) - .cmp(&sync_state.need_len_for_actor(&a.0)) - }); + let mut choices = candidates.into_iter().choose_multiple(&mut rng, 2); - if let Some(chosen) = choices.get(0).cloned() { - chosen - } else { - return Err(SyncClientError::NoGoodCandidate); - } - }; + choices.sort_by(|a, b| { + sync_state + .need_len_for_actor(&b.0) + .cmp(&sync_state.need_len_for_actor(&a.0)) + }); - debug!( - actor_id = %agent.actor_id(), "syncing with: {}, need len: {}", - actor_id, - sync_state.need_len(), - ); + if let Some(chosen) = choices.get(0).cloned() { + chosen + } else { + return Err(SyncClientError::NoGoodCandidate); + } + }; - debug!(actor = %agent.actor_id(), "sync message: {sync_state:?}"); + debug!( + actor_id = %agent.actor_id(), "syncing with: {}, need len: {}", + actor_id, + sync_state.need_len(), + ); - increment_counter!("corro.sync.client.member", "id" => actor_id.0.to_string(), "addr" => addr.to_string()); + debug!(actor = %agent.actor_id(), "sync message: {sync_state:?}"); - histogram!( - "corro.sync.client.request.operations.need.count", - sync_state.need.len() as f64 - ); + increment_counter!("corro.sync.client.member", "id" => actor_id.0.to_string(), "addr" => addr.to_string()); - let (tx, rx) = transport - .open_bi(addr) - .await - .map_err(crate::transport::ConnectError::from)?; + histogram!( + "corro.sync.client.request.operations.need.count", + sync_state.need.len() as f64 + ); - increment_counter!("corro.sync.attempts.count", "id" => actor_id.0.to_string(), "addr" => addr.to_string()); + let (tx, rx) = transport + .open_bi(addr) + .await + .map_err(crate::transport::ConnectError::from)?; - // FIXME: check if it's ok to sync (don't overload host) + increment_counter!("corro.sync.attempts.count", "id" => actor_id.0.to_string(), "addr" => addr.to_string()); - let start = Instant::now(); - let n = bidirectional_sync(&agent, sync_state, None, rx, tx).await?; - - let elapsed = start.elapsed(); - if n > 0 { - info!( - "synced {n} changes w/ {} in {}s @ {} changes/s", - actor_id, - elapsed.as_secs_f64(), - n as f64 / elapsed.as_secs_f64() - ); - } - return Ok(()); + // FIXME: check if it's ok to sync (don't overload host) + + let start = Instant::now(); + let n = bidirectional_sync(agent, sync_state, None, rx, tx).await?; + + let elapsed = start.elapsed(); + if n > 0 { + info!( + "synced {n} changes w/ {} in {}s @ {} changes/s", + actor_id, + elapsed.as_secs_f64(), + n as f64 / elapsed.as_secs_f64() + ); } + Ok(()) } async fn sync_loop( @@ -2059,10 +2119,21 @@ fn init_migration(tx: &Transaction) -> rusqlite::Result<()> { address TEXT NOT NULL, state TEXT NOT NULL DEFAULT 'down', - foca_state JSON ) WITHOUT ROWID; + -- RTT for members + CREATE TABLE __corro_member_rtts ( + actor_id BLOB PRIMARY KEY NOT NULL, + + rtt_min REAL, + rtt_mean REAL, + rtt_max REAL, + + last_recorded TEXT NOT NULL DEFAULT '[]' -- JSON + + ) WITHOUT ROWID; + -- tracked corrosion schema CREATE TABLE __corro_schema ( tbl_name TEXT NOT NULL, @@ -2552,32 +2623,32 @@ pub mod tests { let tx = conn.transaction()?; - let to_clear = compact_booked_for_actor(&tx, &[1].into())?; + let to_clear = find_cleared_db_versions_for_actor(&tx, &[1].into())?; assert!(to_clear.contains(&1)); assert!(!to_clear.contains(&2)); - let to_clear = compact_booked_for_actor(&tx, &[2].into())?; + let to_clear = find_cleared_db_versions_for_actor(&tx, &[2].into())?; assert!(to_clear.is_empty()); tx.execute("INSERT INTO foo2 (a) VALUES (2)", ())?; tx.commit()?; let tx = conn.transaction()?; - let to_clear = compact_booked_for_actor(&tx, &[2, 3].into())?; + let to_clear = find_cleared_db_versions_for_actor(&tx, &[2, 3].into())?; assert!(to_clear.is_empty()); tx.execute("INSERT INTO foo (a) VALUES (1)", ())?; tx.commit()?; let tx = conn.transaction()?; - let to_clear = compact_booked_for_actor(&tx, &[2, 3, 4].into())?; + let to_clear = find_cleared_db_versions_for_actor(&tx, &[2, 3, 4].into())?; assert!(to_clear.contains(&2)); assert!(!to_clear.contains(&3)); assert!(!to_clear.contains(&4)); - let to_clear = compact_booked_for_actor(&tx, &[3, 4].into())?; + let to_clear = find_cleared_db_versions_for_actor(&tx, &[3, 4].into())?; assert!(to_clear.is_empty()); @@ -2649,7 +2720,7 @@ pub mod tests { ) .await?; - sleep(Duration::from_secs(20)).await; + sleep(Duration::from_secs(10)).await; { let conn = ta2.agent.pool().read().await?; diff --git a/crates/corro-agent/src/api/client.rs b/crates/corro-agent/src/api/client.rs index 6f6cdf8a..2ad79743 100644 --- a/crates/corro-agent/src/api/client.rs +++ b/crates/corro-agent/src/api/client.rs @@ -1,4 +1,5 @@ use std::{ + iter::Peekable, ops::RangeInclusive, time::{Duration, Instant}, }; @@ -15,6 +16,8 @@ use corro_types::{ sqlite::SqlitePoolError, }; use hyper::StatusCode; +use itertools::Itertools; +use metrics::counter; use rusqlite::{params, params_from_iter, ToSql, Transaction}; use spawn::spawn_counted; use tokio::{ @@ -35,8 +38,8 @@ use crate::agent::process_subs; pub const MAX_CHANGES_PER_MESSAGE: usize = 50; -pub struct ChunkedChanges { - iter: I, +pub struct ChunkedChanges { + iter: Peekable, changes: Vec, last_pushed_seq: i64, last_start_seq: i64, @@ -45,10 +48,13 @@ pub struct ChunkedChanges { done: bool, } -impl ChunkedChanges { +impl ChunkedChanges +where + I: Iterator, +{ pub fn new(iter: I, start_seq: i64, last_seq: i64, chunk_size: usize) -> Self { Self { - iter, + iter: iter.peekable(), changes: vec![], last_pushed_seq: 0, last_start_seq: start_seq, @@ -76,19 +82,28 @@ where match self.iter.next() { Some(Ok(change)) => { trace!("got change: {change:?}"); + self.last_pushed_seq = change.seq; + self.changes.push(change); if self.last_pushed_seq == self.last_seq { // this was the last seq! break early - self.done = true; break; } if self.changes.len() >= self.chunk_size { // chunking it up let start_seq = self.last_start_seq; + + if self.iter.peek().is_none() { + // no more rows, break early + break; + } + + // prepare for next round! we're not done... self.last_start_seq = self.last_pushed_seq + 1; + return Some(Ok(( self.changes.drain(..).collect(), start_seq..=self.last_pushed_seq, @@ -96,8 +111,7 @@ where } } None => { - // marking as done for the next `next()` call - self.done = true; + // probably not going to happen since we peek at the next and end early // break out of the loop, don't return, there might be buffered changes break; } @@ -105,6 +119,8 @@ where } } + self.done = true; + // return buffered changes return Some(Ok(( self.changes.clone(), // no need to drain here like before @@ -141,7 +157,7 @@ where let ts = Timestamp::from(agent.clock().new_timestamp()); let db_version: i64 = tx - .prepare_cached("SELECT crsql_db_version() + 1")? + .prepare_cached("SELECT crsql_next_db_version()")? .query_row((), |row| row.get(0))?; let has_changes: bool = tx @@ -197,12 +213,24 @@ where let conn = agent.pool().read().await?; block_in_place(|| { - let mut prepped = conn.prepare_cached(r#"SELECT "table", pk, cid, val, col_version, db_version, seq, COALESCE(site_id, crsql_site_id()), cl FROM crsql_changes WHERE site_id IS NULL AND db_version = ? ORDER BY seq ASC"#)?; + // TODO: make this more generic so both sync and local changes can use it. + let mut prepped = conn.prepare_cached(r#" + SELECT "table", pk, cid, val, col_version, db_version, seq, COALESCE(site_id, crsql_site_id()), cl + FROM crsql_changes + WHERE site_id IS NULL + AND db_version = ? + ORDER BY seq ASC + "#)?; let rows = prepped.query_map([db_version], row_to_change)?; let mut chunked = ChunkedChanges::new(rows, 0, last_seq, MAX_CHANGES_PER_MESSAGE); while let Some(changes_seqs) = chunked.next() { match changes_seqs { Ok((changes, seqs)) => { + for (table_name, count) in + changes.iter().counts_by(|change| &change.table) + { + counter!("corro.changes.committed", count as u64, "table" => table_name.to_string(), "source" => "local"); + } process_subs(&agent, &changes); trace!("broadcasting changes: {changes:?} for seq: {seqs:?}"); @@ -247,7 +275,7 @@ where fn execute_statement(tx: &Transaction, stmt: &Statement) -> rusqlite::Result { match stmt { Statement::Simple(q) => tx.execute(&q, []), - Statement::WithParams(q, params) => tx.execute(&q, params_from_iter(params.into_iter())), + Statement::WithParams(q, params) => tx.execute(&q, params_from_iter(params)), Statement::WithNamedParams(q, params) => tx.execute( &q, params @@ -362,7 +390,7 @@ async fn build_query_rows_response( } }; - let prepped_res = block_in_place(|| match stmt { + let prepped_res = block_in_place(|| match &stmt { Statement::Simple(q) => conn.prepare(q.as_str()), Statement::WithParams(q, _) => conn.prepare(q.as_str()), Statement::WithNamedParams(q, _) => conn.prepare(q.as_str()), @@ -397,7 +425,19 @@ async fn build_query_rows_response( let start = Instant::now(); - let mut rows = match prepped.query(()) { + let query = match stmt { + Statement::Simple(_) => prepped.query(()), + Statement::WithParams(_, params) => prepped.query(params_from_iter(params)), + Statement::WithNamedParams(_, params) => prepped.query( + params + .iter() + .map(|(k, v)| (k.as_str(), v as &dyn ToSql)) + .collect::>() + .as_slice(), + ), + }; + + let mut rows = match query { Ok(rows) => rows, Err(e) => { _ = res_tx.send(Err(( @@ -989,22 +1029,21 @@ mod tests { assert_eq!(chunker.next(), Some(Ok((vec![], 0..=100)))); assert_eq!(chunker.next(), None); - let seq_0 = Change { - seq: 0, - ..Default::default() - }; - let seq_1 = Change { - seq: 1, - ..Default::default() - }; - let seq_2 = Change { - seq: 2, - ..Default::default() - }; + let changes: Vec = (0..100) + .map(|seq| Change { + seq, + ..Default::default() + }) + .collect(); // 2 iterations let mut chunker = ChunkedChanges::new( - vec![Ok(seq_0.clone()), Ok(seq_1.clone()), Ok(seq_2.clone())].into_iter(), + vec![ + Ok(changes[0].clone()), + Ok(changes[1].clone()), + Ok(changes[2].clone()), + ] + .into_iter(), 0, 100, 2, @@ -1012,19 +1051,92 @@ mod tests { assert_eq!( chunker.next(), - Some(Ok((vec![seq_0.clone(), seq_1.clone()], 0..=1))) + Some(Ok((vec![changes[0].clone(), changes[1].clone()], 0..=1))) + ); + assert_eq!( + chunker.next(), + Some(Ok((vec![changes[2].clone()], 2..=100))) ); - assert_eq!(chunker.next(), Some(Ok((vec![seq_2.clone()], 2..=100)))); assert_eq!(chunker.next(), None); let mut chunker = ChunkedChanges::new( - vec![Ok(seq_0.clone()), Ok(seq_1.clone())].into_iter(), + vec![Ok(changes[0].clone()), Ok(changes[1].clone())].into_iter(), 0, 0, 1, ); - assert_eq!(chunker.next(), Some(Ok((vec![seq_0.clone()], 0..=0)))); + assert_eq!(chunker.next(), Some(Ok((vec![changes[0].clone()], 0..=0)))); + assert_eq!(chunker.next(), None); + + // gaps + let mut chunker = ChunkedChanges::new( + vec![Ok(changes[0].clone()), Ok(changes[2].clone())].into_iter(), + 0, + 100, + 2, + ); + + assert_eq!( + chunker.next(), + Some(Ok((vec![changes[0].clone(), changes[2].clone()], 0..=100))) + ); + + assert_eq!(chunker.next(), None); + + // gaps + let mut chunker = ChunkedChanges::new( + vec![ + Ok(changes[2].clone()), + Ok(changes[4].clone()), + Ok(changes[7].clone()), + Ok(changes[8].clone()), + ] + .into_iter(), + 0, + 100, + 50, + ); + + assert_eq!( + chunker.next(), + Some(Ok(( + vec![ + changes[2].clone(), + changes[4].clone(), + changes[7].clone(), + changes[8].clone() + ], + 0..=100 + ))) + ); + + assert_eq!(chunker.next(), None); + + // gaps + let mut chunker = ChunkedChanges::new( + vec![ + Ok(changes[2].clone()), + Ok(changes[4].clone()), + Ok(changes[7].clone()), + Ok(changes[8].clone()), + ] + .into_iter(), + 0, + 10, + 2, + ); + + assert_eq!( + chunker.next(), + Some(Ok((vec![changes[2].clone(), changes[4].clone(),], 0..=4))) + ); + + assert_eq!( + chunker.next(), + Some(Ok((vec![changes[7].clone(), changes[8].clone(),], 5..=10))) + ); + assert_eq!(chunker.next(), None); } } diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index d3ff78e5..dc8757e0 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -1,16 +1,17 @@ +use std::cmp; use std::collections::HashMap; use std::net::SocketAddr; use std::ops::RangeInclusive; use std::sync::Arc; use std::time::Duration; -use bytes::{BufMut, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; use corro_types::agent::{Agent, KnownDbVersion, SplitPool}; use corro_types::broadcast::{ChangeV1, Changeset}; use corro_types::change::row_to_change; use corro_types::config::{GossipConfig, TlsClientConfig}; use corro_types::sync::{SyncMessage, SyncMessageEncodeError, SyncMessageV1, SyncStateV1}; -use futures::{SinkExt, StreamExt, TryFutureExt}; +use futures::{Sink, SinkExt, Stream, StreamExt, TryFutureExt}; use metrics::counter; use quinn::{RecvStream, SendStream}; use rusqlite::params; @@ -336,6 +337,7 @@ async fn process_range( }; for (versions, known_version) in overlapping { + // optimization, cleared versions can't be revived... sending a single batch! if let KnownDbVersion::Cleared = &known_version { sender .send(SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 { @@ -347,16 +349,19 @@ async fn process_range( } for version in versions { - process_version( - pool, - actor_id, - is_local, - version, - &known_version, - vec![], - &sender, - ) - .await?; + let bw = booked.write().await; + if let Some(known_version) = bw.get(&version) { + process_version( + pool, + actor_id, + is_local, + version, + known_version, + vec![], + &sender, + ) + .await?; + } } } @@ -389,7 +394,14 @@ async fn process_version( let start_seq = range_needed.start(); let end_seq = range_needed.end(); - let mut prepped = conn.prepare_cached(r#"SELECT "table", pk, cid, val, col_version, db_version, seq, COALESCE(site_id, crsql_site_id()), cl FROM crsql_changes WHERE site_id IS ? AND db_version = ? AND seq >= ? AND seq <= ? ORDER BY seq ASC"#)?; + let mut prepped = conn.prepare_cached(r#" + SELECT "table", pk, cid, val, col_version, db_version, seq, COALESCE(site_id, crsql_site_id()), cl + FROM crsql_changes + WHERE site_id IS ? + AND db_version = ? + AND seq >= ? AND seq <= ? + ORDER BY seq ASC + "#)?; let site_id: Option<[u8; 16]> = (!is_local) .then_some(actor_id) .map(|actor_id| actor_id.to_bytes()); @@ -429,72 +441,76 @@ async fn process_version( } // TODO: find a way to make this safe... // FIXME: there's a race here between getting this cached "known db version" and processing+clearing the rows in buffered changes - // KnownDbVersion::Partial { seqs, last_seq, ts } => { - - // debug!("seqs needed: {seqs_needed:?}"); - // debug!("seqs we got: {seqs:?}"); - // if seqs_needed.is_empty() { - // seqs_needed = vec![(0..=*last_seq)]; - // } - - // for range_needed in seqs_needed { - // for range in seqs.overlapping(&range_needed) { - // let start_seq = cmp::max(range.start(), range_needed.start()); - // debug!("start seq: {start_seq}"); - // let end_seq = cmp::min(range.end(), range_needed.end()); - // debug!("end seq: {end_seq}"); - - // let mut prepped = conn.prepare_cached( - // r#" - // SELECT "table", pk, cid, val, col_version, db_version, seq, site_id, cl - // FROM __corro_buffered_changes - // WHERE site_id = ? - // AND version = ? - // AND seq >= ? AND seq <= ?"#, - // )?; - - // let site_id: [u8; 16] = actor_id.to_bytes(); - - // let rows = prepped.query_map( - // params![site_id, version, start_seq, end_seq], - // row_to_change, - // )?; - - // let mut chunked = ChunkedChanges::new( - // rows, - // *start_seq, - // *end_seq, - // MAX_CHANGES_PER_MESSAGE, - // ); - // while let Some(changes_seqs) = chunked.next() { - // match changes_seqs { - // Ok((changes, seqs)) => { - // if let Err(_e) = sender.blocking_send(SyncMessage::V1( - // SyncMessageV1::Changeset(ChangeV1 { - // actor_id, - // changeset: Changeset::Full { - // version, - // changes, - // seqs, - // last_seq: *last_seq, - // ts: *ts, - // }, - // }), - // )) { - // eyre::bail!("sync message sender channel is closed"); - // } - // } - // Err(e) => { - // error!("could not process buffered crsql change (version: {version}) for broadcast: {e}"); - // break; - // } - // } - // } - // } - // } - // } - _ => { - // warn!("not supposed to happen"); + KnownDbVersion::Partial { seqs, last_seq, ts } => { + debug!("seqs needed: {seqs_needed:?}"); + debug!("seqs we got: {seqs:?}"); + if seqs_needed.is_empty() { + seqs_needed = vec![(0..=*last_seq)]; + } + + for range_needed in seqs_needed { + for range in seqs.overlapping(&range_needed) { + let start_seq = cmp::max(range.start(), range_needed.start()); + debug!("start seq: {start_seq}"); + let end_seq = cmp::min(range.end(), range_needed.end()); + debug!("end seq: {end_seq}"); + + let mut prepped = conn.prepare_cached( + r#" + SELECT "table", pk, cid, val, col_version, db_version, seq, site_id, cl + FROM __corro_buffered_changes + WHERE site_id = ? + AND version = ? + AND seq >= ? AND seq <= ?"#, + )?; + + let site_id: [u8; 16] = actor_id.to_bytes(); + + let rows = prepped.query_map( + params![site_id, version, start_seq, end_seq], + row_to_change, + )?; + + let mut chunked = ChunkedChanges::new( + rows, + *start_seq, + *end_seq, + MAX_CHANGES_PER_MESSAGE, + ); + while let Some(changes_seqs) = chunked.next() { + match changes_seqs { + Ok((changes, seqs)) => { + if let Err(_e) = sender.blocking_send(SyncMessage::V1( + SyncMessageV1::Changeset(ChangeV1 { + actor_id, + changeset: Changeset::Full { + version, + changes, + seqs, + last_seq: *last_seq, + ts: *ts, + }, + }), + )) { + eyre::bail!("sync message sender channel is closed"); + } + } + Err(e) => { + error!("could not process buffered crsql change (version: {version}) for broadcast: {e}"); + break; + } + } + } + } + } + } + KnownDbVersion::Cleared => { + sender.blocking_send(SyncMessage::V1(SyncMessageV1::Changeset(ChangeV1 { + actor_id, + changeset: Changeset::Empty { + versions: version..=version, + }, + })))?; } } Ok(()) @@ -541,14 +557,15 @@ async fn process_sync( // 2. process partial needs if let Some(partially_needed) = sync_state.partial_need.get(&actor_id) { for (version, seqs_needed) in partially_needed.iter() { - let known = { booked.read().await.get(version).cloned() }; + let bw = booked.write().await; + let known = bw.get(version); if let Some(known) = known { process_version( &pool, actor_id, is_local, *version, - &known, + known, seqs_needed.clone(), &sender, ) @@ -584,6 +601,38 @@ async fn process_sync( Ok(()) } +async fn write_sync_msg + Unpin>( + buf: &mut BytesMut, + msg: SyncMessage, + write: &mut W, +) -> Result<(), SyncSendError> { + msg.write_to_stream(buf.writer()) + .map_err(SyncMessageEncodeError::from)?; + + let buf_len = buf.len(); + write.send(buf.split().freeze()).await?; + + counter!("corro.sync.chunk.sent.bytes", buf_len as u64); + + Ok(()) +} + +pub async fn read_sync_msg> + Unpin>( + read: &mut R, +) -> Result, SyncRecvError> { + match tokio::time::timeout(Duration::from_secs(5), read.next()).await { + Ok(Some(buf_res)) => match buf_res { + Ok(mut buf) => match SyncMessage::from_buf(&mut buf) { + Ok(msg) => Ok(Some(msg)), + Err(e) => Err(SyncRecvError::from(e)), + }, + Err(e) => Err(SyncRecvError::from(e)), + }, + Ok(None) => Ok(None), + Err(_e) => Err(SyncRecvError::TimedOut), + } +} + pub async fn bidirectional_sync( agent: &Agent, our_sync_state: SyncStateV1, @@ -596,28 +645,66 @@ pub async fn bidirectional_sync( let mut read = FramedRead::new(read, LengthDelimitedCodec::new()); let mut write = FramedWrite::new(write, LengthDelimitedCodec::new()); - tx.send(SyncMessage::V1(SyncMessageV1::State(our_sync_state))) - .await - .map_err(|_| SyncSendError::ChannelClosed)?; + let mut send_buf = BytesMut::new(); + + write_sync_msg( + &mut send_buf, + SyncMessage::V1(SyncMessageV1::State(our_sync_state)), + &mut write, + ) + .await?; + + let their_sync_state = match their_sync_state { + Some(state) => state, + None => match read_sync_msg(&mut read).await? { + Some(SyncMessage::V1(SyncMessageV1::State(state))) => state, + Some(_) => return Err(SyncRecvError::ExpectedSyncState.into()), + None => return Err(SyncRecvError::UnexpectedEndOfStream.into()), + }, + }; + + let their_actor_id = their_sync_state.actor_id; + + write_sync_msg( + &mut send_buf, + SyncMessage::V1(SyncMessageV1::Clock(agent.clock().new_timestamp().into())), + &mut write, + ) + .await?; + + match read_sync_msg(&mut read).await? { + Some(SyncMessage::V1(SyncMessageV1::Clock(ts))) => { + if let Err(e) = agent + .clock() + .update_with_timestamp(&uhlc::Timestamp::new(ts.to_ntp64(), their_actor_id.into())) + { + warn!("could not update clock from actor {their_actor_id}: {e}"); + } + } + Some(_) => return Err(SyncRecvError::ExpectedClockMessage.into()), + None => return Err(SyncRecvError::UnexpectedEndOfStream.into()), + } + + tokio::spawn( + process_sync( + agent.actor_id(), + agent.pool().clone(), + agent.bookie().clone(), + their_sync_state, + tx, + ) + .inspect_err(|e| error!("could not process sync request: {e}")), + ); let (_sent_count, recv_count) = tokio::try_join!( async move { let mut count = 0; - let mut buf = BytesMut::new(); while let Some(msg) = rx.recv().await { - msg.write_to_stream((&mut buf).writer()) - .map_err(SyncMessageEncodeError::from) - .map_err(SyncSendError::from)?; - - let buf_len = buf.len(); - write - .send(buf.split().freeze()) - .await - .map_err(SyncSendError::from)?; - - count += 1; + if let SyncMessage::V1(SyncMessageV1::Changeset(change)) = &msg { + count += change.len(); + } - counter!("corro.sync.chunk.sent.bytes", buf_len as u64); + write_sync_msg(&mut send_buf, msg, &mut write).await?; } let mut send = write.into_inner(); @@ -627,56 +714,23 @@ pub async fn bidirectional_sync( debug!(actor_id = %agent.actor_id(), "done writing sync messages (count: {count})"); + counter!("corro.sync.changes.sent", count as u64, "actor_id" => their_actor_id.to_string()); + Ok::<_, SyncError>(count) }, async move { - let their_sync_state = match their_sync_state { - Some(state) => state, - None => { - if let Some(buf_res) = read.next().await { - let mut buf = buf_res.map_err(SyncRecvError::from)?; - let msg = SyncMessage::from_buf(&mut buf).map_err(SyncRecvError::from)?; - if let SyncMessage::V1(SyncMessageV1::State(state)) = msg { - state - } else { - return Err(SyncRecvError::UnexpectedSyncMessage.into()); - } - } else { - return Err(SyncRecvError::UnexpectedEndOfStream.into()); - } - } - }; - - let their_actor_id = their_sync_state.actor_id; - - tx.send(SyncMessage::V1(SyncMessageV1::Clock( - agent.clock().new_timestamp().into(), - ))) - .await - .map_err(|_| SyncSendError::ChannelClosed)?; - - tokio::spawn( - process_sync( - agent.actor_id(), - agent.pool().clone(), - agent.bookie().clone(), - their_sync_state, - tx, - ) - .inspect_err(|e| error!("could not process sync request: {e}")), - ); - let mut count = 0; - while let Ok(Some(buf_res)) = - tokio::time::timeout(Duration::from_secs(5), read.next()).await - { - let mut buf = buf_res.map_err(SyncRecvError::from)?; - - counter!("corro.sync.chunk.recv.bytes", buf.len() as u64); - - match SyncMessage::from_buf(&mut buf) { - Ok(msg) => { + loop { + match read_sync_msg(&mut read).await { + Ok(None) => { + break; + } + Err(e) => { + error!("sync recv error: {e}"); + break; + } + Ok(Some(msg)) => { let len = match msg { SyncMessage::V1(SyncMessageV1::Changeset(change)) => { let len = change.len(); @@ -689,24 +743,20 @@ pub async fn bidirectional_sync( warn!("received sync state message more than once, ignoring"); continue; } - SyncMessage::V1(SyncMessageV1::Clock(ts)) => { - if let Err(e) = agent.clock().update_with_timestamp( - &uhlc::Timestamp::new(ts.to_ntp64(), their_actor_id.into()), - ) { - warn!( - "could not update clock from actor {their_actor_id}: {e}" - ); - } + SyncMessage::V1(SyncMessageV1::Clock(_)) => { + warn!("received sync clock message more than once, ignoring"); continue; } }; count += len; } - Err(e) => return Err(SyncRecvError::from(e).into()), } } + debug!(actor_id = %agent.actor_id(), "done reading sync messages"); + counter!("corro.sync.changes.recv", count as u64, "actor_id" => their_actor_id.to_string()); + Ok(count) } )?; diff --git a/crates/corro-agent/src/api/pubsub.rs b/crates/corro-agent/src/api/pubsub.rs index a684b951..3ce1b8d2 100644 --- a/crates/corro-agent/src/api/pubsub.rs +++ b/crates/corro-agent/src/api/pubsub.rs @@ -216,9 +216,10 @@ async fn process_sub_channel( .await; return; } + // NOTE: I think that's infaillible... writer - .write(b"\n") + .write_all(b"\n") .expect("could not write new line to BytesMut Writer"); // accumulate up to ~64KB @@ -643,24 +644,23 @@ mod tests { return None; } loop { - loop { - match self.codec.decode(&mut self.buf) { - Ok(Some(line)) => match serde_json::from_str(&line) { - Ok(res) => return Some(Ok(res)), - Err(e) => { - self.done = true; - return Some(Err(e.into())); - } - }, - Ok(None) => { - break; - } + match self.codec.decode(&mut self.buf) { + Ok(Some(line)) => match serde_json::from_str(&line) { + Ok(res) => return Some(Ok(res)), Err(e) => { self.done = true; return Some(Err(e.into())); } + }, + Ok(None) => { + // fall through + } + Err(e) => { + self.done = true; + return Some(Err(e.into())); } } + let bytes_res = self.body.data().await; match bytes_res { Some(Ok(b)) => { diff --git a/crates/corro-api-types/Cargo.toml b/crates/corro-api-types/Cargo.toml index 03ca4d43..bd1690bf 100644 --- a/crates/corro-api-types/Cargo.toml +++ b/crates/corro-api-types/Cargo.toml @@ -15,5 +15,6 @@ rusqlite = { workspace = true } serde = { workspace = true } smallvec = { workspace = true } speedy = { workspace = true } +strum = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } \ No newline at end of file diff --git a/crates/corro-api-types/src/lib.rs b/crates/corro-api-types/src/lib.rs index 2e39b657..aa006d80 100644 --- a/crates/corro-api-types/src/lib.rs +++ b/crates/corro-api-types/src/lib.rs @@ -1,6 +1,8 @@ use std::{ collections::HashMap, fmt::{self, Write}, + hash::Hash, + ops::Deref, }; use compact_str::{CompactString, ToCompactString}; @@ -69,9 +71,9 @@ pub enum RqliteResult { #[derive(Debug, Default, Clone, Serialize, Deserialize, Readable, Writable, PartialEq)] pub struct Change { - pub table: String, + pub table: TableName, pub pk: Vec, - pub cid: String, + pub cid: ColumnName, pub val: SqliteValue, pub col_version: i64, pub db_version: i64, @@ -141,7 +143,7 @@ impl<'a> SqliteValueRef<'a> { match self { SqliteValueRef::Null => SqliteValue::Null, SqliteValueRef::Integer(v) => SqliteValue::Integer(*v), - SqliteValueRef::Real(v) => SqliteValue::Real(*v), + SqliteValueRef::Real(v) => SqliteValue::Real(Real(*v)), SqliteValueRef::Text(v) => SqliteValue::Text((*v).to_compact_string()), SqliteValueRef::Blob(v) => SqliteValue::Blob(v.to_smallvec()), } @@ -197,17 +199,49 @@ impl FromSql for ColumnType { } } -#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq, Hash)] #[serde(untagged)] pub enum SqliteValue { #[default] Null, Integer(i64), - Real(f64), + Real(Real), Text(CompactString), Blob(SmallVec<[u8; 512]>), } +#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, PartialEq)] +#[serde(transparent)] +pub struct Real(pub f64); + +impl Deref for Real { + type Target = f64; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Hash for Real { + fn hash(&self, state: &mut H) { + integer_decode(self.0).hash(state) + } +} + +fn integer_decode(val: f64) -> (u64, i16, i8) { + let bits: u64 = unsafe { std::mem::transmute(val) }; + let sign: i8 = if bits >> 63 == 0 { 1 } else { -1 }; + let mut exponent: i16 = ((bits >> 52) & 0x7ff) as i16; + let mantissa = if exponent == 0 { + (bits & 0xfffffffffffff) << 1 + } else { + (bits & 0xfffffffffffff) | 0x10000000000000 + }; + + exponent -= 1023 + 52; + (mantissa, exponent, sign) +} + impl SqliteValue { pub fn column_type(&self) -> ColumnType { match self { @@ -263,7 +297,7 @@ impl SqliteValue { match self { SqliteValue::Null => SqliteValueRef::Null, SqliteValue::Integer(i) => SqliteValueRef::Integer(*i), - SqliteValue::Real(f) => SqliteValueRef::Real(*f), + SqliteValue::Real(r) => SqliteValueRef::Real(r.0), SqliteValue::Text(s) => SqliteValueRef::Text(s.as_str()), SqliteValue::Blob(v) => SqliteValueRef::Blob(v.as_slice()), } @@ -305,7 +339,7 @@ impl FromSql for SqliteValue { Ok(match value { ValueRef::Null => SqliteValue::Null, ValueRef::Integer(i) => SqliteValue::Integer(i), - ValueRef::Real(f) => SqliteValue::Real(f), + ValueRef::Real(f) => SqliteValue::Real(Real(f)), ValueRef::Text(t) => SqliteValue::Text( std::str::from_utf8(t.into()) .map_err(|e| FromSqlError::Other(Box::new(e)))? @@ -321,7 +355,7 @@ impl ToSql for SqliteValue { Ok(match self { SqliteValue::Null => ToSqlOutput::Owned(Value::Null), SqliteValue::Integer(i) => ToSqlOutput::Owned(Value::Integer(*i)), - SqliteValue::Real(f) => ToSqlOutput::Owned(Value::Real(*f)), + SqliteValue::Real(f) => ToSqlOutput::Owned(Value::Real(f.0)), SqliteValue::Text(t) => ToSqlOutput::Borrowed(ValueRef::Text(t.as_bytes())), SqliteValue::Blob(b) => ToSqlOutput::Borrowed(ValueRef::Blob(b.as_slice())), }) @@ -353,7 +387,7 @@ where Ok(match u8::read_from(reader)? { 0 => SqliteValue::Null, 1 => SqliteValue::Integer(i64::read_from(reader)?), - 2 => SqliteValue::Real(f64::read_from(reader)?), + 2 => SqliteValue::Real(Real(f64::read_from(reader)?)), 3 => { let len = reader.read_u32()? as usize; @@ -417,3 +451,103 @@ where }) } } + +#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[serde(transparent)] +pub struct TableName(pub CompactString); + +#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[serde(transparent)] +pub struct ColumnName(pub CompactString); + +impl Deref for TableName { + type Target = CompactString; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Writable for TableName +where + C: Context, +{ + #[inline] + fn write_to>(&self, writer: &mut T) -> Result<(), ::Error> { + self.0.as_str().write_to(writer) + } + + #[inline] + fn bytes_needed(&self) -> Result::Error> { + Writable::::bytes_needed(self.0.as_str()) + } +} + +impl<'a, C> Readable<'a, C> for TableName +where + C: Context, +{ + #[inline] + fn read_from>(reader: &mut R) -> Result::Error> { + let s: &'a str = Readable::<'a, C>::read_from(reader)?; + Ok(Self(CompactString::new(s))) + } +} + +impl FromSql for TableName { + fn column_result(value: ValueRef<'_>) -> rusqlite::types::FromSqlResult { + Ok(Self(CompactString::new(value.as_str()?))) + } +} + +impl ToSql for TableName { + fn to_sql(&self) -> rusqlite::Result> { + self.0.as_str().to_sql() + } +} + +impl Deref for ColumnName { + type Target = CompactString; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Writable for ColumnName +where + C: Context, +{ + #[inline] + fn write_to>(&self, writer: &mut T) -> Result<(), ::Error> { + self.0.as_str().write_to(writer) + } + + #[inline] + fn bytes_needed(&self) -> Result::Error> { + Writable::::bytes_needed(self.0.as_str()) + } +} + +impl<'a, C> Readable<'a, C> for ColumnName +where + C: Context, +{ + #[inline] + fn read_from>(reader: &mut R) -> Result::Error> { + let s: &'a str = Readable::<'a, C>::read_from(reader)?; + Ok(Self(CompactString::new(s))) + } +} + +impl FromSql for ColumnName { + fn column_result(value: ValueRef<'_>) -> rusqlite::types::FromSqlResult { + Ok(Self(CompactString::new(value.as_str()?))) + } +} + +impl ToSql for ColumnName { + fn to_sql(&self) -> rusqlite::Result> { + self.0.as_str().to_sql() + } +} diff --git a/crates/corro-tpl/src/lib.rs b/crates/corro-tpl/src/lib.rs index 9d2255f3..306e8c71 100644 --- a/crates/corro-tpl/src/lib.rs +++ b/crates/corro-tpl/src/lib.rs @@ -498,10 +498,10 @@ impl Engine { v.as_int() .map_err(|_e| Box::new(EvalAltResult::from("could not cast to i64")))?, ), - "f64" => SqliteValue::Real( + "f64" => SqliteValue::Real(corro_types::api::Real( v.as_float() .map_err(|_e| Box::new(EvalAltResult::from("could not cast to f64")))?, - ), + )), "bool" => { if v.as_bool() .map_err(|_e| Box::new(EvalAltResult::from("could not cast to bool")))? diff --git a/crates/corro-types/crsqlite-darwin-aarch64.dylib b/crates/corro-types/crsqlite-darwin-aarch64.dylib index 93f1eb93..53f483bb 100644 Binary files a/crates/corro-types/crsqlite-darwin-aarch64.dylib and b/crates/corro-types/crsqlite-darwin-aarch64.dylib differ diff --git a/crates/corro-types/crsqlite-darwin-x86_64.dylib b/crates/corro-types/crsqlite-darwin-x86_64.dylib index e89e2455..48fb4843 100644 Binary files a/crates/corro-types/crsqlite-darwin-x86_64.dylib and b/crates/corro-types/crsqlite-darwin-x86_64.dylib differ diff --git a/crates/corro-types/crsqlite-linux-aarch64.so b/crates/corro-types/crsqlite-linux-aarch64.so index 7fd5b7e3..30bfdf79 100644 Binary files a/crates/corro-types/crsqlite-linux-aarch64.so and b/crates/corro-types/crsqlite-linux-aarch64.so differ diff --git a/crates/corro-types/crsqlite-linux-x86_64.so b/crates/corro-types/crsqlite-linux-x86_64.so index 035947f1..8935360e 100644 Binary files a/crates/corro-types/crsqlite-linux-x86_64.so and b/crates/corro-types/crsqlite-linux-x86_64.so differ diff --git a/crates/corro-types/src/api.rs b/crates/corro-types/src/api.rs index e6d1cbe6..75b613b7 100644 --- a/crates/corro-types/src/api.rs +++ b/crates/corro-types/src/api.rs @@ -1,5 +1,5 @@ use compact_str::CompactString; -use corro_api_types::SqliteValue; +pub use corro_api_types::{Real, SqliteValue}; pub use corro_api_types::{QueryEvent, RqliteResponse, RqliteResult, Statement}; diff --git a/crates/corro-types/src/pubsub.rs b/crates/corro-types/src/pubsub.rs index 007a9b2f..216937db 100644 --- a/crates/corro-types/src/pubsub.rs +++ b/crates/corro-types/src/pubsub.rs @@ -679,7 +679,7 @@ impl Matcher { let delete_prepped = tx.prepare_cached(&sql)?; - for (change_type, mut prepped) in [ + for (mut change_type, mut prepped) in [ (None, insert_prepped), (Some(ChangeType::Delete), delete_prepped), ] { @@ -690,7 +690,7 @@ impl Matcher { while let Ok(Some(row)) = rows.next() { let rowid: i64 = row.get(0)?; - let change_type = change_type.clone().take().unwrap_or_else(|| { + let change_type = change_type.take().unwrap_or({ if rowid > last_rowid { ChangeType::Insert } else { diff --git a/crates/corro-types/src/schema.rs b/crates/corro-types/src/schema.rs index ff9c17f2..1a43e562 100644 --- a/crates/corro-types/src/schema.rs +++ b/crates/corro-types/src/schema.rs @@ -164,12 +164,12 @@ pub fn make_schema_inner( schema: &NormalizedSchema, new_schema: &NormalizedSchema, ) -> Result<(), SchemaError> { - // iterate over dropped tables - for name in schema + if let Some(name) = schema .tables .keys() .collect::>() .difference(&new_schema.tables.keys().collect::>()) + .next() { // TODO: add options and check flag return Err(SchemaError::DropTableWithoutDestructiveFlag( @@ -254,8 +254,7 @@ pub fn make_schema_inner( debug!("dropped cols: {dropped_cols:?}"); - for col_name in dropped_cols { - // TODO: add options and check flag + if let Some(col_name) = dropped_cols.into_iter().next() { return Err(SchemaError::DropColumnWithoutDestructiveFlag( name.clone(), col_name.clone(), diff --git a/crates/corro-types/src/sync.rs b/crates/corro-types/src/sync.rs index 5a0954ad..10926fc9 100644 --- a/crates/corro-types/src/sync.rs +++ b/crates/corro-types/src/sync.rs @@ -37,11 +37,18 @@ impl SyncStateV1 { .values() .flat_map(|v| v.iter().map(|range| (range.end() - range.start()) + 1)) .sum::() - + self - .partial_need - .values() - .map(|partials| partials.len() as i64) - .sum::() + + ( + self.partial_need + .values() + .flat_map(|partials| { + partials.values().flat_map(|ranges| { + ranges.iter().map(|range| (range.end() - range.start()) + 1) + }) + }) + .sum::() + / 50 + // this is how many chunks we're looking at, kind of random... + ) } pub fn need_len_for_actor(&self, actor_id: &ActorId) -> i64 { diff --git a/crates/corrosion/Cargo.toml b/crates/corrosion/Cargo.toml index 5172ac75..335b034a 100644 --- a/crates/corrosion/Cargo.toml +++ b/crates/corrosion/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +build-info = { workspace = true } bytes = { workspace = true } camino = { workspace = true } clap = { workspace = true } @@ -44,5 +45,8 @@ tracing-subscriber = { workspace = true } tripwire = { path = "../tripwire" } uuid = { workspace = true } +[build-dependencies] +build-info-build = { workspace = true } + [dev-dependencies] corro-tests = { path = "../corro-tests" } diff --git a/crates/corrosion/build.rs b/crates/corrosion/build.rs new file mode 100644 index 00000000..d36778f6 --- /dev/null +++ b/crates/corrosion/build.rs @@ -0,0 +1,3 @@ +fn main() { + build_info_build::build_script(); +} diff --git a/crates/corrosion/src/command/agent.rs b/crates/corrosion/src/command/agent.rs index e8ba2e2a..b79bf83b 100644 --- a/crates/corrosion/src/command/agent.rs +++ b/crates/corrosion/src/command/agent.rs @@ -3,6 +3,7 @@ use std::net::SocketAddr; use camino::Utf8PathBuf; use corro_admin::AdminConfig; use corro_types::config::{Config, TelemetryConfig}; +use metrics::gauge; use metrics_exporter_prometheus::PrometheusBuilder; use spawn::wait_for_all_pending_handles; use tracing::{error, info}; @@ -14,6 +15,8 @@ pub async fn run(config: Config, config_path: &Utf8PathBuf) -> eyre::Result<()> if let Some(TelemetryConfig::Prometheus { bind_addr }) = config.telemetry { setup_prometheus(bind_addr).expect("could not setup prometheus"); + let info = crate::version(); + gauge!("corro.build.info", 1.0, "version" => info.crate_info.version.to_string(), "ts" => info.timestamp.to_string(), "rustc_version" => info.compiler.version.to_string()); } let (tripwire, tripwire_worker) = tripwire::Tripwire::new_signals(); diff --git a/crates/corrosion/src/main.rs b/crates/corrosion/src/main.rs index 7863a2ef..4c1efd00 100644 --- a/crates/corrosion/src/main.rs +++ b/crates/corrosion/src/main.rs @@ -12,6 +12,7 @@ use command::{ tls::{generate_ca, generate_client_cert, generate_server_cert}, tpl::TemplateFlags, }; +use corro_api_types::SqliteValue; use corro_client::CorrosionApiClient; use corro_types::{ api::{QueryEvent, RqliteResult, Statement}, @@ -35,6 +36,8 @@ pub const VERSION: &str = env!("CARGO_PKG_VERSION"); pub const CONFIG: OnceCell = OnceCell::new(); pub const API_CLIENT: OnceCell = OnceCell::new(); +build_info::build_info!(pub fn version); + #[global_allocator] static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; @@ -160,11 +163,21 @@ async fn process_cli(cli: Cli) -> eyre::Result<()> { query, columns: show_columns, timer, + param, } => { - let mut body = cli - .api_client()? - .query(&Statement::Simple(query.clone())) - .await?; + let stmt = if param.is_empty() { + Statement::Simple(query.clone()) + } else { + Statement::WithParams( + query.clone(), + param + .into_iter() + .map(|p| SqliteValue::Text(p.into())) + .collect(), + ) + }; + + let mut body = cli.api_client()?.query(&stmt).await?; let mut lines = LinesCodec::new(); @@ -365,6 +378,9 @@ enum Command { columns: bool, #[arg(long, default_value = "false")] timer: bool, + + #[arg(long)] + param: Vec, }, /// Execute a SQL statement that mutates the state of Corrosion diff --git a/doc/SUMMARY.md b/doc/SUMMARY.md index 010bfadd..20b735ca 100644 --- a/doc/SUMMARY.md +++ b/doc/SUMMARY.md @@ -5,12 +5,15 @@ # User guide - [Authorization]() - [Backup / Restore]() +- [Consistency]() - [CRDTs](crdts.md) - [Encryption]() - [Gossip]() - [Schema](schema.md) - [Subscriptions]() - [Synchronization]() +- [Telemetry](telemetry/README.md) + - [Prometheus](telemetry/prometheus.md) - [Templates]() # Reference diff --git a/doc/telemetry/README.md b/doc/telemetry/README.md new file mode 100644 index 00000000..cf53d631 --- /dev/null +++ b/doc/telemetry/README.md @@ -0,0 +1 @@ +# Telemetry \ No newline at end of file diff --git a/doc/telemetry/prometheus.md b/doc/telemetry/prometheus.md new file mode 100644 index 00000000..60830553 --- /dev/null +++ b/doc/telemetry/prometheus.md @@ -0,0 +1,43 @@ +# Prometheus metrics + +## TYPE corro_broadcast_buffer_capacity gauge +## TYPE corro_broadcast_pending_count gauge +## TYPE corro_broadcast_recv_count counter +## TYPE corro_broadcast_serialization_buffer_capacity gauge +## TYPE corro_build_info gauge +## TYPE corro_changes_committed counter +## TYPE corro_db_buffered_changes_rows_total gauge +## TYPE corro_db_table_checksum gauge +## TYPE corro_db_table_rows_total gauge +## TYPE corro_db_wal_truncate_seconds histogram +## TYPE corro_gossip_broadcast_channel_capacity gauge +## TYPE corro_gossip_cluster_size gauge +## TYPE corro_gossip_config_max_transmissions gauge +## TYPE corro_gossip_config_num_indirect_probes gauge +## TYPE corro_gossip_member_added counter +## TYPE corro_gossip_member_removed counter +## TYPE corro_gossip_members gauge +## TYPE corro_gossip_updates_backlog gauge +## TYPE corro_peer_connection_accept_total counter +## TYPE corro_peer_datagram_bytes_recv_total counter +## TYPE corro_peer_datagram_bytes_sent_total counter +## TYPE corro_peer_datagram_recv_total counter +## TYPE corro_peer_datagram_sent_total counter +## TYPE corro_peer_stream_accept_total counter +## TYPE corro_peer_stream_bytes_recv_total counter +## TYPE corro_peer_stream_bytes_sent_total counter +## TYPE corro_peer_streams_accept_total counter +## TYPE corro_sqlite_pool_execution_seconds histogram +## TYPE corro_sqlite_pool_queue_seconds histogram +## TYPE corro_sqlite_pool_read_connections gauge +## TYPE corro_sqlite_pool_read_connections_idle gauge +## TYPE corro_sqlite_pool_write_connections gauge +## TYPE corro_sqlite_pool_write_connections_idle gauge +## TYPE corro_sync_attempts_count counter +## TYPE corro_sync_changes_recv counter +## TYPE corro_sync_changes_sent counter +## TYPE corro_sync_chunk_sent_bytes counter +## TYPE corro_sync_client_head gauge +## TYPE corro_sync_client_member counter +## TYPE corro_sync_client_needed gauge +## TYPE corro_sync_client_request_operations_need_count histogram \ No newline at end of file diff --git a/integration-tests/tests/cli_test.rs b/integration-tests/tests/cli_test.rs index a5e024fe..ab234689 100644 --- a/integration-tests/tests/cli_test.rs +++ b/integration-tests/tests/cli_test.rs @@ -9,7 +9,6 @@ static CORROSION_BIN: Lazy = Lazy::new(|| { escargot::CargoBuild::new() .bin("corrosion") .manifest_path("../Cargo.toml") - .target_dir("../target/escargot") .run() .unwrap() }); @@ -39,7 +38,9 @@ async fn test_query() { .arg("--api-addr") .arg(api_addr.to_string()) .arg("query") - .arg("SELECT hex(site_id) FROM crsql_site_id WHERE ordinal = 0") + .arg("--param") + .arg("0") + .arg("SELECT hex(site_id) FROM crsql_site_id WHERE ordinal = ?") .assert(); assert.success().stdout(format!("{expected}\n"));