Skip to content

Commit

Permalink
don't hash tables with > 500K rows, until primary key lookaside is do…
Browse files Browse the repository at this point in the history
…ne... (#65)
  • Loading branch information
jeromegn authored Sep 22, 2023
1 parent c98f7bd commit 329ff59
Showing 1 changed file with 35 additions and 27 deletions.
62 changes: 35 additions & 27 deletions crates/corro-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,8 @@ async fn metrics_loop(agent: Agent) {
}
}

const MAX_COUNT_TO_HASH: i64 = 500_000;

fn collect_metrics(agent: &Agent) {
agent.pool().emit_metrics();

Expand All @@ -1028,13 +1030,18 @@ fn collect_metrics(agent: &Agent) {
}
};

let mut low_count_tables = vec![];

for table in schema.tables.keys() {
match conn
.prepare_cached(&format!("SELECT count(*) FROM {table}"))
.and_then(|mut prepped| prepped.query_row([], |row| row.get::<_, i64>(0)))
{
Ok(count) => {
gauge!("corro.db.table.rows.total", count as f64, "table" => table.clone());
if count <= MAX_COUNT_TO_HASH {
low_count_tables.push(table);
}
}
Err(e) => {
error!("could not query count for table {table}: {e}");
Expand Down Expand Up @@ -1062,34 +1069,35 @@ fn collect_metrics(agent: &Agent) {
}
}

for (name, table) in schema.tables.iter() {
let pks = table.pk.iter().cloned().collect::<Vec<String>>().join(",");

match conn
.prepare_cached(&format!("SELECT * FROM {name} ORDER BY {pks}"))
.and_then(|mut prepped| {
let col_count = prepped.column_count();
prepped.query(()).and_then(|mut rows| {
let mut hasher = seahash::SeaHasher::with_seeds(
CHECKSUM_SEEDS[0],
CHECKSUM_SEEDS[1],
CHECKSUM_SEEDS[2],
CHECKSUM_SEEDS[3],
);
while let Ok(Some(row)) = rows.next() {
for idx in 0..col_count {
let v: SqliteValue = row.get(idx)?;
v.hash(&mut hasher);
for name in low_count_tables {
if let Some(table) = schema.tables.get(name) {
let pks = table.pk.iter().cloned().collect::<Vec<String>>().join(",");
match conn
.prepare_cached(&format!("SELECT * FROM {name} ORDER BY {pks}"))
.and_then(|mut prepped| {
let col_count = prepped.column_count();
prepped.query(()).and_then(|mut rows| {
let mut hasher = seahash::SeaHasher::with_seeds(
CHECKSUM_SEEDS[0],
CHECKSUM_SEEDS[1],
CHECKSUM_SEEDS[2],
CHECKSUM_SEEDS[3],
);
while let Ok(Some(row)) = rows.next() {
for idx in 0..col_count {
let v: SqliteValue = row.get(idx)?;
v.hash(&mut hasher);
}
}
}
Ok(hasher.finish())
})
}) {
Ok(hash) => {
gauge!("corro.db.table.checksum", hash as f64, "table" => name.clone());
}
Err(e) => {
error!("could not query clock table values for hashing {table}: {e}");
Ok(hasher.finish())
})
}) {
Ok(hash) => {
gauge!("corro.db.table.checksum", hash as f64, "table" => name.clone());
}
Err(e) => {
error!("could not query clock table values for hashing {table}: {e}");
}
}
}
}
Expand Down

0 comments on commit 329ff59

Please sign in to comment.