Skip to content

Commit

Permalink
gather processing time per table
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromegn committed Aug 29, 2024
1 parent cffbeeb commit 000a6da
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion crates/corro-types/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1209,10 +1209,13 @@ impl Matcher {
break;
}
let elapsed = start.elapsed();
debug!(sub_id = %self.id, "processed {buf_count} changes for subscription in {elapsed:?}");

histogram!("corro.subs.changes.processing.duration.seconds", "sql_hash" => self.hash.clone()).record(elapsed);

if elapsed >= PROCESSING_WARN_THRESHOLD {
warn!(sub_id = %self.id, "processed {buf_count} changes (very slowly) for subscription in {elapsed:?}");
} else {
debug!(sub_id = %self.id, "processed {buf_count} changes for subscription in {elapsed:?}");
}
buf_count = 0;

Expand Down Expand Up @@ -1572,6 +1575,7 @@ impl Matcher {
))?;

for table in tables.iter() {
let start = Instant::now();
let stmt = match self.cached_statements.get(table.as_str()) {
Some(stmt) => stmt,
None => {
Expand Down Expand Up @@ -1723,6 +1727,9 @@ impl Matcher {
}
// clean that up
tx.execute_batch("DELETE FROM state_results")?;

let elapsed = start.elapsed();
histogram!("corro.subs.changes.processing.table.duration.seconds", "sql_hash" => self.hash.clone(), "table" => table.0.to_string()).record(elapsed);
}

// clean up temporary tables immediately
Expand Down

0 comments on commit 000a6da

Please sign in to comment.