From 25b7bbfc9982c1fbb7d05666829427e75153ac01 Mon Sep 17 00:00:00 2001 From: Jerome Gravel-Niquet Date: Sat, 6 Jul 2024 16:38:53 -0400 Subject: [PATCH] Add function to handle subscriptions when applying buffered changes (#214) * add function to handle subscriptions when applying buffered changes * add tests for receiving subscription updates from buffered changes * fix bad merge * manually implement hash for Timestamp so it behaves the same as PartialEq * fix various clippy warnings --------- Co-authored-by: Somtochi Onyekwere --- Cargo.lock | 274 +----------------- crates/corro-agent/src/agent/handlers.rs | 15 +- crates/corro-agent/src/agent/tests.rs | 4 +- crates/corro-agent/src/agent/util.rs | 299 +++++++++++--------- crates/corro-agent/src/api/peer.rs | 7 +- crates/corro-agent/src/api/public/pubsub.rs | 210 +++++++++++++- crates/corro-types/src/agent.rs | 6 +- crates/corro-types/src/broadcast.rs | 19 +- crates/corro-types/src/pubsub.rs | 84 ++++++ crates/corro-types/src/sync.rs | 2 +- crates/corrosion/Cargo.toml | 2 +- 11 files changed, 502 insertions(+), 420 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bb4b64d4..7126aa9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -161,7 +161,7 @@ dependencies = [ "proc-macro2", "quote", "syn 1.0.109", - "synstructure 0.12.6", + "synstructure", ] [[package]] @@ -1013,7 +1013,7 @@ dependencies = [ [[package]] name = "corrosion" -version = "0.1.0" +version = "0.2.0-beta.0" dependencies = [ "build-info", "build-info-build", @@ -1555,9 +1555,9 @@ dependencies = [ [[package]] name = "form_urlencoded" -version = "1.2.1" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8" dependencies = [ "percent-encoding", ] @@ -1963,124 +1963,6 @@ dependencies = [ "cxx-build", ] -[[package]] -name = "icu_collections" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db2fa452206ebee18c4b5c2274dbf1de17008e874b4dc4f0aea9d01ca79e4526" -dependencies = [ - "displaydoc", - "yoke", - "zerofrom", - "zerovec", -] - -[[package]] -name = "icu_locid" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13acbb8371917fc971be86fc8057c41a64b521c184808a698c02acc242dbf637" -dependencies = [ - "displaydoc", - "litemap", - "tinystr", - "writeable", - "zerovec", -] - -[[package]] -name = "icu_locid_transform" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01d11ac35de8e40fdeda00d9e1e9d92525f3f9d887cdd7aa81d727596788b54e" -dependencies = [ - "displaydoc", - "icu_locid", - "icu_locid_transform_data", - "icu_provider", - "tinystr", - "zerovec", -] - -[[package]] -name = "icu_locid_transform_data" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdc8ff3388f852bede6b579ad4e978ab004f139284d7b28715f773507b946f6e" - -[[package]] -name = "icu_normalizer" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19ce3e0da2ec68599d193c93d088142efd7f9c5d6fc9b803774855747dc6a84f" -dependencies = [ - "displaydoc", - "icu_collections", - "icu_normalizer_data", - "icu_properties", - "icu_provider", - "smallvec", - "utf16_iter", - "utf8_iter", - "write16", - "zerovec", -] - -[[package]] -name = "icu_normalizer_data" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8cafbf7aa791e9b22bec55a167906f9e1215fd475cd22adfcf660e03e989516" - -[[package]] -name = "icu_properties" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f8ac670d7422d7f76b32e17a5db556510825b29ec9154f235977c9caba61036" -dependencies = [ - "displaydoc", - "icu_collections", - "icu_locid_transform", - "icu_properties_data", - "icu_provider", - "tinystr", - "zerovec", -] - -[[package]] -name = "icu_properties_data" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67a8effbc3dd3e4ba1afa8ad918d5684b8868b3b26500753effea8d2eed19569" - -[[package]] -name = "icu_provider" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ed421c8a8ef78d3e2dbc98a973be2f3770cb42b606e3ab18d6237c4dfde68d9" -dependencies = [ - "displaydoc", - "icu_locid", - "icu_provider_macros", - "stable_deref_trait", - "tinystr", - "writeable", - "yoke", - "zerofrom", - "zerovec", -] - -[[package]] -name = "icu_provider_macros" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.32", -] - [[package]] name = "ident_case" version = "1.0.1" @@ -2100,14 +1982,12 @@ dependencies = [ [[package]] name = "idna" -version = "1.0.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4716a3a0933a1d01c2f72450e89596eb51dd34ef3c211ccd875acdf1f8fe47ed" +checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6" dependencies = [ - "icu_normalizer", - "icu_properties", - "smallvec", - "utf8_iter", + "unicode-bidi", + "unicode-normalization", ] [[package]] @@ -2349,12 +2229,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d59d8c75012853d2e872fb56bc8a2e53718e2cafe1a4c823143141c6d90c322f" -[[package]] -name = "litemap" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "643cb0b8d4fcc284004d5fd0d67ccf61dfffadb7f75e1e71bc420f4688a3a704" - [[package]] name = "lock_api" version = "0.4.11" @@ -2892,9 +2766,9 @@ dependencies = [ [[package]] name = "percent-encoding" -version = "2.3.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" [[package]] name = "pgwire" @@ -3794,9 +3668,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.13.2" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" dependencies = [ "serde", ] @@ -3937,12 +3811,6 @@ dependencies = [ "log", ] -[[package]] -name = "stable_deref_trait" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" - [[package]] name = "static_assertions" version = "1.1.0" @@ -4062,17 +3930,6 @@ dependencies = [ "unicode-xid", ] -[[package]] -name = "synstructure" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.32", -] - [[package]] name = "tempfile" version = "3.5.0" @@ -4208,16 +4065,6 @@ dependencies = [ "crunchy", ] -[[package]] -name = "tinystr" -version = "0.7.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9117f5d4db391c1cf6927e7bea3db74b9a1c1add8f7eda9ffd5364f40f57b82f" -dependencies = [ - "displaydoc", - "zerovec", -] - [[package]] name = "tinyvec" version = "1.6.0" @@ -4737,12 +4584,12 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" [[package]] name = "url" -version = "2.5.1" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7c25da092f0a868cdf09e8674cd3b7ef3a7d92a24253e663a2fb85e2496de56" +checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643" dependencies = [ "form_urlencoded", - "idna 1.0.0", + "idna 0.3.0", "percent-encoding", ] @@ -4758,18 +4605,6 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" -[[package]] -name = "utf16_iter" -version = "1.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8232dd3cdaed5356e0f716d285e4b40b932ac434100fe9b7e0e8e935b9e6246" - -[[package]] -name = "utf8_iter" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" - [[package]] name = "utf8parse" version = "0.2.1" @@ -5125,18 +4960,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "write16" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1890f4022759daae28ed4fe62859b1236caebfc61ede2f63ed4e695f3f6d936" - -[[package]] -name = "writeable" -version = "0.5.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" - [[package]] name = "x509-certificate" version = "0.21.0" @@ -5188,30 +5011,6 @@ dependencies = [ "time", ] -[[package]] -name = "yoke" -version = "0.7.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c5b1314b079b0930c31e3af543d8ee1757b1951ae1e1565ec704403a7240ca5" -dependencies = [ - "serde", - "stable_deref_trait", - "yoke-derive", - "zerofrom", -] - -[[package]] -name = "yoke-derive" -version = "0.7.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.32", - "synstructure 0.13.1", -] - [[package]] name = "zerocopy" version = "0.7.26" @@ -5232,55 +5031,12 @@ dependencies = [ "syn 2.0.32", ] -[[package]] -name = "zerofrom" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91ec111ce797d0e0784a1116d0ddcdbea84322cd79e5d5ad173daeba4f93ab55" -dependencies = [ - "zerofrom-derive", -] - -[[package]] -name = "zerofrom-derive" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.32", - "synstructure 0.13.1", -] - [[package]] name = "zeroize" version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9" -[[package]] -name = "zerovec" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb2cc8827d6c0994478a15c53f374f46fbd41bea663d809b14744bc42e6b109c" -dependencies = [ - "yoke", - "zerofrom", - "zerovec-derive", -] - -[[package]] -name = "zerovec-derive" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97cf56601ee5052b4417d90c8755c6683473c926039908196cf35d99f893ebe7" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.32", -] - [[package]] name = "zstd" version = "0.13.0" diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index 6f9ba695..862fb0a9 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -452,14 +452,22 @@ pub async fn handle_emptyset( } if process { - for (actor, changes) in &mut buf { while !changes.is_empty() { let change = changes.pop_front().unwrap(); match process_emptyset(agent.clone(), bookie.clone(), *actor, &change).await { Ok(()) => { // cost -= change.0.len(); - cost -= change.0.iter().map(|versions| cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20)).sum::(); + cost -= change + .0 + .iter() + .map(|versions| { + cmp::min( + (versions.end().0 - versions.start().0) as usize + 1, + 20, + ) + }) + .sum::(); } Err(e) => { warn!("encountered error when processing emptyset - {e}"); @@ -470,7 +478,6 @@ pub async fn handle_emptyset( } } } - } println!("shutting down handle empties loop"); @@ -849,7 +856,7 @@ pub async fn handle_sync( let mut last_cleared: HashMap> = HashMap::new(); for (actor_id, _) in chosen.clone() { - last_cleared.insert(actor_id, get_last_cleared_ts(&bookie, &actor_id).await); + last_cleared.insert(actor_id, get_last_cleared_ts(bookie, &actor_id).await); } let start = Instant::now(); diff --git a/crates/corro-agent/src/agent/tests.rs b/crates/corro-agent/src/agent/tests.rs index f3585e65..df6d7623 100644 --- a/crates/corro-agent/src/agent/tests.rs +++ b/crates/corro-agent/src/agent/tests.rs @@ -508,7 +508,7 @@ pub async fn configurable_stress_test( } if v.len() == agents.len() && v.iter() - .all(|(n, needed)| *n == changes_count as i64 && *needed == 0) + .all(|(n, needed)| *n == changes_count && *needed == 0) { break; } @@ -1164,7 +1164,7 @@ async fn get_rows( let changes: Vec; let seqs = if let Some(seq) = versions.1.clone() { let seq_query = " and seq >= ? and seq <= ?"; - query = query + seq_query; + query += seq_query; let mut prepped = conn.prepare(&query)?; changes = prepped .query_map((version, seq.start(), seq.end()), row_to_change)? diff --git a/crates/corro-agent/src/agent/util.rs b/crates/corro-agent/src/agent/util.rs index ca91c622..dbb48e6e 100644 --- a/crates/corro-agent/src/agent/util.rs +++ b/crates/corro-agent/src/agent/util.rs @@ -507,62 +507,63 @@ pub async fn process_fully_buffered_changes( actor_id: ActorId, version: Version, ) -> Result { - let mut conn = agent.pool().write_normal().await?; - debug!(%actor_id, %version, "acquired write (normal) connection to process fully buffered changes"); + let db_version = { + let mut conn = agent.pool().write_normal().await?; + debug!(%actor_id, %version, "acquired write (normal) connection to process fully buffered changes"); + + let booked = { + bookie + .write(format!( + "process_fully_buffered(ensure):{}", + actor_id.as_simple() + )) + .await + .ensure(actor_id) + }; - let booked = { - bookie + let mut bookedw = booked .write(format!( - "process_fully_buffered(ensure):{}", + "process_fully_buffered(booked writer):{}", actor_id.as_simple() )) - .await - .ensure(actor_id) - }; - - let mut bookedw = booked - .write(format!( - "process_fully_buffered(booked writer):{}", - actor_id.as_simple() - )) - .await; - debug!(%actor_id, %version, "acquired Booked write lock to process fully buffered changes"); - - let inserted = block_in_place(|| { - let (last_seq, ts) = { - match bookedw.partials.get(&version) { - Some(PartialVersion { seqs, last_seq, ts }) => { - if seqs.gaps(&(CrsqlSeq(0)..=*last_seq)).count() != 0 { - error!(%actor_id, %version, "found sequence gaps: {:?}, aborting!", seqs.gaps(&(CrsqlSeq(0)..=*last_seq)).collect::>()); - // TODO: return an error here - return Ok(false); + .await; + debug!(%actor_id, %version, "acquired Booked write lock to process fully buffered changes"); + + block_in_place(|| { + let (last_seq, ts) = { + match bookedw.partials.get(&version) { + Some(PartialVersion { seqs, last_seq, ts }) => { + if seqs.gaps(&(CrsqlSeq(0)..=*last_seq)).count() != 0 { + error!(%actor_id, %version, "found sequence gaps: {:?}, aborting!", seqs.gaps(&(CrsqlSeq(0)..=*last_seq)).collect::>()); + // TODO: return an error here + return Ok(None); + } + (*last_seq, *ts) + } + None => { + warn!(%actor_id, %version, "version not found in cache, returning"); + return Ok(None); } - (*last_seq, *ts) - } - None => { - warn!(%actor_id, %version, "version not found in cache, returning"); - return Ok(false); } - } - }; + }; - let tx = conn - .immediate_transaction() - .map_err(|source| ChangeError::Rusqlite { - source, - actor_id: Some(actor_id), - version: Some(version), - })?; + let tx = conn + .immediate_transaction() + .map_err(|source| ChangeError::Rusqlite { + source, + actor_id: Some(actor_id), + version: Some(version), + })?; - info!(%actor_id, %version, "Processing buffered changes to crsql_changes (actor: {actor_id}, version: {version}, last_seq: {last_seq})"); + info!(%actor_id, %version, "Processing buffered changes to crsql_changes (actor: {actor_id}, version: {version}, last_seq: {last_seq})"); - let max_db_version: Option> = tx.prepare_cached("SELECT MAX(db_version) FROM __corro_buffered_changes WHERE site_id = ? AND version = ?").map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?.query_row(params![actor_id.as_bytes(), version], |row| row.get(0)).optional().map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?; + let max_db_version: Option> = tx.prepare_cached("SELECT MAX(db_version) FROM __corro_buffered_changes WHERE site_id = ? AND version = ?").map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?.query_row(params![actor_id.as_bytes(), version], |row| row.get(0)).optional().map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?; - let start = Instant::now(); + let start = Instant::now(); - if let Some(max_db_version) = max_db_version.flatten() { - // insert all buffered changes into crsql_changes directly from the buffered changes table - let count = tx + if let Some(max_db_version) = max_db_version.flatten() { + // insert all buffered changes into crsql_changes directly from the buffered changes table + let count = tx .prepare_cached( r#" INSERT INTO crsql_changes ("table", pk, cid, val, col_version, db_version, site_id, cl, seq) @@ -574,42 +575,42 @@ pub async fn process_fully_buffered_changes( "#, ).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})? .execute(params![max_db_version, actor_id.as_bytes(), version]).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?; - info!(%actor_id, %version, "Inserted {count} rows from buffered into crsql_changes in {:?}", start.elapsed()); - } else { - info!(%actor_id, %version, "No buffered rows, skipped insertion into crsql_changes"); - } - - if let Err(e) = agent.tx_clear_buf().try_send((actor_id, version..=version)) { - error!("could not schedule buffered data clear: {e}"); - } - - let rows_impacted: i64 = tx - .prepare_cached("SELECT crsql_rows_impacted()") - .map_err(|source| ChangeError::Rusqlite { - source, - actor_id: Some(actor_id), - version: Some(version), - })? - .query_row((), |row| row.get(0)) - .map_err(|source| ChangeError::Rusqlite { - source, - actor_id: Some(actor_id), - version: Some(version), - })?; + info!(%actor_id, %version, "Inserted {count} rows from buffered into crsql_changes in {:?}", start.elapsed()); + } else { + info!(%actor_id, %version, "No buffered rows, skipped insertion into crsql_changes"); + } - debug!(%actor_id, %version, "rows impacted by buffered changes insertion: {rows_impacted}"); + if let Err(e) = agent.tx_clear_buf().try_send((actor_id, version..=version)) { + error!("could not schedule buffered data clear: {e}"); + } - if rows_impacted > 0 { - let db_version: CrsqlDbVersion = tx - .query_row("SELECT crsql_next_db_version()", [], |row| row.get(0)) + let rows_impacted: i64 = tx + .prepare_cached("SELECT crsql_rows_impacted()") + .map_err(|source| ChangeError::Rusqlite { + source, + actor_id: Some(actor_id), + version: Some(version), + })? + .query_row((), |row| row.get(0)) .map_err(|source| ChangeError::Rusqlite { source, actor_id: Some(actor_id), version: Some(version), })?; - debug!("db version: {db_version}"); - tx.prepare_cached( + debug!(%actor_id, %version, "rows impacted by buffered changes insertion: {rows_impacted}"); + + let db_version = if rows_impacted > 0 { + let db_version: CrsqlDbVersion = tx + .query_row("SELECT crsql_next_db_version()", [], |row| row.get(0)) + .map_err(|source| ChangeError::Rusqlite { + source, + actor_id: Some(actor_id), + version: Some(version), + })?; + debug!("db version: {db_version}"); + + tx.prepare_cached( " INSERT OR IGNORE INTO __corro_bookkeeping (actor_id, start_version, db_version, last_seq, ts) VALUES ( @@ -619,85 +620,101 @@ pub async fn process_fully_buffered_changes( :last_seq, :ts );", - ).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})? - .execute(named_params! { - ":actor_id": actor_id, - ":version": version, - ":db_version": db_version, - ":last_seq": last_seq, - ":ts": ts - }).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?; - - debug!(%actor_id, %version, "inserted bookkeeping row after buffered insert"); - } else { - store_empty_changeset( - &tx, - actor_id, - version..=version, - Timestamp::from(agent.clock().new_timestamp()), - )?; - debug!(%actor_id, %version, "inserted CLEARED bookkeeping row after buffered insert"); - }; + ).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})? + .execute(named_params! { + ":actor_id": actor_id, + ":version": version, + ":db_version": db_version, + ":last_seq": last_seq, + ":ts": ts + }).map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?; + + debug!(%actor_id, %version, "inserted bookkeeping row after buffered insert"); + + Some(db_version) + } else { + store_empty_changeset( + &tx, + actor_id, + version..=version, + Timestamp::from(agent.clock().new_timestamp()), + )?; + debug!(%actor_id, %version, "inserted CLEARED bookkeeping row after buffered insert"); + None + }; - let mut snap = bookedw.snapshot(); - snap.insert_db(&tx, [version..=version].into()) - .map_err(|source| ChangeError::Rusqlite { - source, - actor_id: Some(actor_id), - version: Some(version), - })?; + let mut snap = bookedw.snapshot(); + snap.insert_db(&tx, [version..=version].into()) + .map_err(|source| ChangeError::Rusqlite { + source, + actor_id: Some(actor_id), + version: Some(version), + })?; - let overwritten = - find_overwritten_versions(&tx).map_err(|source| ChangeError::Rusqlite { - source, - actor_id: Some(actor_id), - version: Some(version), - })?; + let overwritten = + find_overwritten_versions(&tx).map_err(|source| ChangeError::Rusqlite { + source, + actor_id: Some(actor_id), + version: Some(version), + })?; - let mut last_cleared: Option = None; - for (actor_id, versions_set) in overwritten { - if actor_id != agent.actor_id() { - warn!("clearing empties for another actor: {actor_id}") - } - let ts = Timestamp::from(agent.clock().new_timestamp()); - for versions in versions_set { - let inserted = store_empty_changeset(&tx, actor_id, versions, ts)?; - if inserted > 0 { - last_cleared = Some(ts); + let mut last_cleared: Option = None; + for (actor_id, versions_set) in overwritten { + if actor_id != agent.actor_id() { + warn!("clearing empties for another actor: {actor_id}") + } + let ts = Timestamp::from(agent.clock().new_timestamp()); + for versions in versions_set { + let inserted = store_empty_changeset(&tx, actor_id, versions, ts)?; + if inserted > 0 { + last_cleared = Some(ts); + } } } - } - let mut agent_booked = { - agent - .booked() - .blocking_write("process_fully_buffered_changes(get snapshot)") - }; + let mut agent_booked = { + agent + .booked() + .blocking_write("process_fully_buffered_changes(get snapshot)") + }; - let mut agent_snap = agent_booked.snapshot(); - if let Some(ts) = last_cleared { - agent_snap - .update_cleared_ts(&tx, ts) - .map_err(|source| ChangeError::Rusqlite { - source, - actor_id: Some(actor_id), - version: Some(version), - })?; - } + let mut agent_snap = agent_booked.snapshot(); + if let Some(ts) = last_cleared { + agent_snap + .update_cleared_ts(&tx, ts) + .map_err(|source| ChangeError::Rusqlite { + source, + actor_id: Some(actor_id), + version: Some(version), + })?; + } - tx.commit().map_err(|source| ChangeError::Rusqlite { - source, - actor_id: Some(actor_id), - version: Some(version), - })?; + tx.commit().map_err(|source| ChangeError::Rusqlite { + source, + actor_id: Some(actor_id), + version: Some(version), + })?; - bookedw.commit_snapshot(snap); - agent_booked.commit_snapshot(agent_snap); + bookedw.commit_snapshot(snap); + agent_booked.commit_snapshot(agent_snap); - Ok::<_, ChangeError>(true) - })?; + Ok::<_, ChangeError>(db_version) + }) + }?; + + if let Some(db_version) = db_version { + let conn = agent.pool().read().await?; + block_in_place(|| { + if let Err(e) = agent + .subs_manager() + .match_changes_from_db_version(&conn, db_version) + { + error!(%db_version, "could not match changes from db version: {e}"); + } + }); + } - Ok(inserted) + Ok(db_version.is_some()) } #[tracing::instrument(skip(agent, bookie, changes), err)] diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index 9c4983a4..256983bd 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -1448,11 +1448,14 @@ pub async fn parallel_sync( for res in counts.iter() { match res { Err(e) => error!("could not properly recv from peer: {e}"), - Ok((actor_id, _)) => members.update_sync_ts(&actor_id, ts), + Ok((actor_id, _)) => members.update_sync_ts(actor_id, ts), }; } - Ok(counts.into_iter().map(|res| res.map(|i| i.1)).flatten().sum::()) + Ok(counts + .into_iter() + .flat_map(|res| res.map(|i| i.1)) + .sum::()) } #[tracing::instrument(skip(agent, bookie, their_actor_id, read, write), fields(actor_id = %their_actor_id), err)] diff --git a/crates/corro-agent/src/api/public/pubsub.rs b/crates/corro-agent/src/api/public/pubsub.rs index 717c9021..73b7f6b2 100644 --- a/crates/corro-agent/src/api/public/pubsub.rs +++ b/crates/corro-agent/src/api/public/pubsub.rs @@ -868,21 +868,32 @@ async fn forward_bytes_to_body_sender( #[cfg(test)] mod tests { + use corro_types::actor::ActorId; + use corro_types::api::{Change, ColumnName, TableName}; + use corro_types::base::{CrsqlDbVersion, CrsqlSeq, Version}; + use corro_types::broadcast::{ChangeSource, ChangeV1, Changeset}; + use corro_types::pubsub::pack_columns; use corro_types::{ api::{ChangeId, RowId}, config::Config, pubsub::ChangeType, }; use http_body::Body; + use spawn::wait_for_all_pending_handles; + use std::ops::RangeInclusive; + use std::time::Instant; + use tokio::time::timeout; use tokio_util::codec::{Decoder, LinesCodec}; use tripwire::Tripwire; + use super::*; + use crate::agent::process_multiple_changes; use crate::{ agent::setup, api::public::{api_v1_db_schema, api_v1_transactions}, }; - - use super::*; + use corro_tests::launch_test_agent; + use corro_types::api::SqliteValue::Integer; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_api_v1_subs() -> eyre::Result<()> { @@ -1290,6 +1301,201 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn match_buffered_changes() -> eyre::Result<()> { + _ = tracing_subscriber::fmt::try_init(); + let (tripwire, tripwire_worker, tripwire_tx) = Tripwire::new_simple(); + + let ta1 = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?; + + let schema = "CREATE TABLE buftests ( + pk int NOT NULL PRIMARY KEY, + col1 text, + col2 text + );"; + + let (status_code, _body) = api_v1_db_schema( + Extension(ta1.agent.clone()), + axum::Json(vec![schema.into()]), + ) + .await; + assert_eq!(status_code, StatusCode::OK); + + let actor_id = ActorId(uuid::Uuid::new_v4()); + + let change1 = Change { + table: TableName("buftests".into()), + pk: pack_columns(&vec![1i64.into()])?, + cid: ColumnName("col1".into()), + val: "one".into(), + col_version: 1, + db_version: CrsqlDbVersion(1), + seq: CrsqlSeq(0), + site_id: actor_id.to_bytes(), + cl: 1, + }; + + let change2 = Change { + table: TableName("buftests".into()), + pk: pack_columns(&vec![1i64.into()])?, + cid: ColumnName("col2".into()), + val: "one line".into(), + col_version: 1, + db_version: CrsqlDbVersion(1), + seq: CrsqlSeq(1), + site_id: actor_id.to_bytes(), + cl: 1, + }; + + let changes = ChangeV1 { + actor_id, + changeset: Changeset::Full { + version: Version(1), + changes: vec![change1, change2], + seqs: RangeInclusive::new(CrsqlSeq(0), CrsqlSeq(1)), + last_seq: CrsqlSeq(1), + ts: Default::default(), + }, + }; + + process_multiple_changes( + ta1.agent.clone(), + ta1.bookie.clone(), + vec![(changes, ChangeSource::Sync, Instant::now())], + ) + .await?; + + let bcast_cache: SharedMatcherBroadcastCache = Default::default(); + let mut res = api_v1_subs( + Extension(ta1.agent.clone()), + Extension(bcast_cache.clone()), + Extension(tripwire.clone()), + axum::extract::Query(SubParams::default()), + axum::Json(Statement::Simple("select * from buftests".into())), + ) + .await + .into_response(); + + if !res.status().is_success() { + let b = res.body_mut().data().await.unwrap().unwrap(); + println!("body: {}", String::from_utf8_lossy(&b)); + } + + assert_eq!(res.status(), StatusCode::OK); + + let mut rows = RowsIter { + body: res.into_body(), + codec: LinesCodec::new(), + buf: BytesMut::new(), + done: false, + }; + + assert_eq!( + rows.recv().await.unwrap().unwrap(), + QueryEvent::Columns(vec!["pk".into(), "col1".into(), "col2".into()]) + ); + + assert_eq!( + rows.recv().await.unwrap().unwrap(), + QueryEvent::Row(RowId(1), vec![Integer(1), "one".into(), "one line".into()]) + ); + + assert!(matches!( + rows.recv().await.unwrap().unwrap(), + QueryEvent::EndOfQuery { .. } + )); + + // send partial change so it is buffered + let change3 = Change { + table: TableName("buftests".into()), + pk: pack_columns(&vec![2i64.into()])?, + cid: ColumnName("col1".into()), + val: "two".into(), + col_version: 1, + db_version: CrsqlDbVersion(1), + seq: CrsqlSeq(0), + site_id: actor_id.to_bytes(), + cl: 1, + }; + + let changes = ChangeV1 { + actor_id, + changeset: Changeset::Full { + version: Version(2), + changes: vec![change3], + seqs: RangeInclusive::new(CrsqlSeq(0), CrsqlSeq(0)), + last_seq: CrsqlSeq(1), + ts: Default::default(), + }, + }; + + process_multiple_changes( + ta1.agent.clone(), + ta1.bookie.clone(), + vec![(changes, ChangeSource::Sync, Instant::now())], + ) + .await?; + + // confirm that change is buffered in db + { + let conn = ta1.agent.pool().read().await?; + let end = conn.query_row( + "SELECT end_seq FROM __corro_seq_bookkeeping WHERE site_id = ? AND version = ?", + (actor_id, 2), + |row| row.get::<_, CrsqlSeq>(0), + )?; + assert_eq!(end, CrsqlSeq(0)); + } + + let change4 = Change { + table: TableName("buftests".into()), + pk: pack_columns(&vec![2i64.into()])?, + cid: ColumnName("col2".into()), + val: "two line".into(), + col_version: 1, + db_version: CrsqlDbVersion(1), + seq: CrsqlSeq(1), + site_id: actor_id.to_bytes(), + cl: 1, + }; + + let changes = ChangeV1 { + actor_id, + changeset: Changeset::Full { + version: Version(2), + changes: vec![change4], + seqs: RangeInclusive::new(CrsqlSeq(1), CrsqlSeq(1)), + last_seq: CrsqlSeq(1), + ts: Default::default(), + }, + }; + + process_multiple_changes( + ta1.agent.clone(), + ta1.bookie.clone(), + vec![(changes, ChangeSource::Sync, Instant::now())], + ) + .await?; + + let res = timeout(Duration::from_secs(5), rows.recv()).await?; + + assert_eq!( + res.unwrap().unwrap(), + QueryEvent::Change( + ChangeType::Insert, + RowId(2), + vec![Integer(2), "two".into(), "two line".into()], + ChangeId(1) + ) + ); + + tripwire_tx.send(()).await.ok(); + tripwire_worker.await; + wait_for_all_pending_handles().await; + + Ok(()) + } + struct RowsIter { body: axum::body::BoxBody, codec: LinesCodec, diff --git a/crates/corro-types/src/agent.rs b/crates/corro-types/src/agent.rs index d07ccfe7..018f249f 100644 --- a/crates/corro-types/src/agent.rs +++ b/crates/corro-types/src/agent.rs @@ -527,8 +527,10 @@ pub enum PoolError { #[derive(Debug, thiserror::Error)] pub enum ChangeError { - #[error("could not acquire pooled connection: {0}")] + #[error("could not acquire pooled write connection: {0}")] Pool(#[from] PoolError), + #[error("could not acquire pooled read connection: {0}")] + SqlitePool(#[from] SqlitePoolError), #[error("rusqlite: {source} (actor_id: {actor_id:?}, version: {version:?})")] Rusqlite { source: rusqlite::Error, @@ -1927,7 +1929,7 @@ mod tests { all_versions: &mut RangeInclusiveSet, versions: RangeInclusiveSet, ) -> rusqlite::Result<()> { - all_versions.extend(versions.clone().into_iter()); + all_versions.extend(versions.clone()); let mut snap = bv.snapshot(); snap.insert_db(conn, versions)?; bv.commit_snapshot(snap); diff --git a/crates/corro-types/src/broadcast.rs b/crates/corro-types/src/broadcast.rs index 382bfce5..655ff3fb 100644 --- a/crates/corro-types/src/broadcast.rs +++ b/crates/corro-types/src/broadcast.rs @@ -264,7 +264,7 @@ pub enum TimestampParseError { Parse(ParseNTP64Error), } -#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, Eq, PartialOrd, Ord, Hash)] +#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, Eq, PartialOrd, Ord)] #[serde(transparent)] pub struct Timestamp(pub NTP64); @@ -290,6 +290,13 @@ impl PartialEq for Timestamp { } } +impl std::hash::Hash for Timestamp { + fn hash(&self, state: &mut H) { + self.0.as_secs().hash(state); + self.0.subsec_nanos().hash(state); + } +} + impl Deref for Timestamp { type Target = NTP64; @@ -480,11 +487,11 @@ pub async fn broadcast_changes( // TODO: make this more generic so both sync and local changes can use it. let mut prepped = conn.prepare_cached( r#" - SELECT "table", pk, cid, val, col_version, db_version, seq, site_id, cl - FROM crsql_changes - WHERE db_version = ? - ORDER BY seq ASC - "#, + SELECT "table", pk, cid, val, col_version, db_version, seq, site_id, cl + FROM crsql_changes + WHERE db_version = ? + ORDER BY seq ASC + "#, )?; let rows = prepped.query_map([db_version], row_to_change)?; let chunked = ChunkedChanges::new(rows, CrsqlSeq(0), last_seq, MAX_CHANGES_BYTE_SIZE); diff --git a/crates/corro-types/src/pubsub.rs b/crates/corro-types/src/pubsub.rs index 99afaff2..0ff5c7d0 100644 --- a/crates/corro-types/src/pubsub.rs +++ b/crates/corro-types/src/pubsub.rs @@ -159,6 +159,90 @@ impl SubsManager { inner.remove(id) } + pub fn match_changes_from_db_version( + &self, + conn: &Connection, + db_version: CrsqlDbVersion, + ) -> rusqlite::Result<()> { + let handles = { + let inner = self.0.read(); + if inner.handles.is_empty() { + return Ok(()); + } + inner.handles.clone() + }; + + let mut candidates = handles + .iter() + .map(|(id, handle)| (id, (MatchCandidates::new(), handle))) + .collect::>(); + + { + let mut prepped = conn.prepare_cached( + r#" + SELECT "table", pk, cid + FROM crsql_changes + WHERE db_version = ? + ORDER BY seq ASC + "#, + )?; + + let rows = prepped.query_map([db_version], |row| { + Ok(( + row.get::<_, TableName>(0)?, + row.get::<_, Vec>(1)?, + row.get::<_, ColumnName>(2)?, + )) + })?; + + for change_res in rows { + let (table, pk, column) = change_res?; + + for (_id, (candidates, handle)) in candidates.iter_mut() { + let change = MatchableChange { + table: &table, + pk: &pk, + column: &column, + }; + handle.filter_matchable_change(candidates, change); + } + } + } + + // metrics... + for (id, (candidates, handle)) in candidates { + let mut match_count = 0; + + for (table, pks) in candidates.iter() { + let count = pks.len(); + match_count += count; + counter!("corro.subs.changes.matched.count", "sql_hash" => handle.inner.hash.clone(), "table" => table.to_string()).increment(count as u64); + } + + trace!(sub_id = %id, %db_version, "found {match_count} candidates"); + + if let Err(e) = handle.inner.changes_tx.try_send((candidates, db_version)) { + error!(sub_id = %id, "could not send change candidates to subscription handler: {e}"); + match e { + mpsc::error::TrySendError::Full(item) => { + warn!("channel is full, falling back to async send"); + let changes_tx = handle.inner.changes_tx.clone(); + tokio::spawn(async move { + _ = changes_tx.send(item).await; + }); + } + mpsc::error::TrySendError::Closed(_) => { + if let Some(handle) = self.remove(id) { + tokio::spawn(handle.cleanup()); + } + } + } + } + } + + Ok(()) + } + pub fn match_changes(&self, changes: &[Change], db_version: CrsqlDbVersion) { trace!( %db_version, diff --git a/crates/corro-types/src/sync.rs b/crates/corro-types/src/sync.rs index 43d21aa6..7c6a18fa 100644 --- a/crates/corro-types/src/sync.rs +++ b/crates/corro-types/src/sync.rs @@ -289,7 +289,7 @@ pub async fn get_last_cleared_ts(bookie: &Bookie, actor_id: &ActorId) -> Option< let booked_reader = booked.read("get_last_cleared_ts").await; return booked_reader.last_cleared_ts(); } - return None; + None } // generates a `SyncMessage` to tell another node what versions we're missing diff --git a/crates/corrosion/Cargo.toml b/crates/corrosion/Cargo.toml index 2c51f1ee..c8905d34 100644 --- a/crates/corrosion/Cargo.toml +++ b/crates/corrosion/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "corrosion" -version = "0.1.0" +version = "0.2.0-beta.0" edition = "2021" [dependencies]