From 329ff594ab1befe2d67c764e5e2b1155307b31c5 Mon Sep 17 00:00:00 2001 From: Jerome Gravel-Niquet Date: Fri, 22 Sep 2023 13:46:23 -0400 Subject: [PATCH] don't hash tables with > 500K rows, until primary key lookaside is done... (#65) --- crates/corro-agent/src/agent.rs | 62 +++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/crates/corro-agent/src/agent.rs b/crates/corro-agent/src/agent.rs index 76b3a880..7163f5d2 100644 --- a/crates/corro-agent/src/agent.rs +++ b/crates/corro-agent/src/agent.rs @@ -1015,6 +1015,8 @@ async fn metrics_loop(agent: Agent) { } } +const MAX_COUNT_TO_HASH: i64 = 500_000; + fn collect_metrics(agent: &Agent) { agent.pool().emit_metrics(); @@ -1028,6 +1030,8 @@ fn collect_metrics(agent: &Agent) { } }; + let mut low_count_tables = vec![]; + for table in schema.tables.keys() { match conn .prepare_cached(&format!("SELECT count(*) FROM {table}")) @@ -1035,6 +1039,9 @@ fn collect_metrics(agent: &Agent) { { Ok(count) => { gauge!("corro.db.table.rows.total", count as f64, "table" => table.clone()); + if count <= MAX_COUNT_TO_HASH { + low_count_tables.push(table); + } } Err(e) => { error!("could not query count for table {table}: {e}"); @@ -1062,34 +1069,35 @@ fn collect_metrics(agent: &Agent) { } } - for (name, table) in schema.tables.iter() { - let pks = table.pk.iter().cloned().collect::>().join(","); - - match conn - .prepare_cached(&format!("SELECT * FROM {name} ORDER BY {pks}")) - .and_then(|mut prepped| { - let col_count = prepped.column_count(); - prepped.query(()).and_then(|mut rows| { - let mut hasher = seahash::SeaHasher::with_seeds( - CHECKSUM_SEEDS[0], - CHECKSUM_SEEDS[1], - CHECKSUM_SEEDS[2], - CHECKSUM_SEEDS[3], - ); - while let Ok(Some(row)) = rows.next() { - for idx in 0..col_count { - let v: SqliteValue = row.get(idx)?; - v.hash(&mut hasher); + for name in low_count_tables { + if let Some(table) = schema.tables.get(name) { + let pks = table.pk.iter().cloned().collect::>().join(","); + match conn + .prepare_cached(&format!("SELECT * FROM {name} ORDER BY {pks}")) + .and_then(|mut prepped| { + let col_count = prepped.column_count(); + prepped.query(()).and_then(|mut rows| { + let mut hasher = seahash::SeaHasher::with_seeds( + CHECKSUM_SEEDS[0], + CHECKSUM_SEEDS[1], + CHECKSUM_SEEDS[2], + CHECKSUM_SEEDS[3], + ); + while let Ok(Some(row)) = rows.next() { + for idx in 0..col_count { + let v: SqliteValue = row.get(idx)?; + v.hash(&mut hasher); + } } - } - Ok(hasher.finish()) - }) - }) { - Ok(hash) => { - gauge!("corro.db.table.checksum", hash as f64, "table" => name.clone()); - } - Err(e) => { - error!("could not query clock table values for hashing {table}: {e}"); + Ok(hasher.finish()) + }) + }) { + Ok(hash) => { + gauge!("corro.db.table.checksum", hash as f64, "table" => name.clone()); + } + Err(e) => { + error!("could not query clock table values for hashing {table}: {e}"); + } } } }