Skip to content

Commit

Permalink
corro-agent: add API endpoint to query row count for set of tables
Browse files Browse the repository at this point in the history
  • Loading branch information
spacekookie committed Jan 25, 2024
1 parent da29b07 commit 6bfa2d5
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 3 deletions.
16 changes: 15 additions & 1 deletion crates/corro-agent/src/agent/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
MAX_SYNC_BACKOFF, TO_CLEAR_COUNT,
},
api::public::{
api_v1_db_schema, api_v1_queries, api_v1_transactions,
api_v1_db_schema, api_v1_queries, api_v1_table_stats, api_v1_transactions,
pubsub::{api_v1_sub_by_id, api_v1_subs},
},
transport::Transport,
Expand Down Expand Up @@ -347,6 +347,20 @@ pub async fn setup_http_api_handler(
.layer(ConcurrencyLimitLayer::new(4)),
),
)
.route(
"/v1/table_stats",
post(api_v1_table_stats).route_layer(
tower::ServiceBuilder::new()
.layer(HandleErrorLayer::new(|_error: BoxError| async {
Ok::<_, Infallible>((
StatusCode::SERVICE_UNAVAILABLE,
"max concurrency limit reached".to_string(),
))
}))
.layer(LoadShedLayer::new())
.layer(ConcurrencyLimitLayer::new(4)),
),
)
.layer(axum::middleware::from_fn(require_authz))
.layer(
tower::ServiceBuilder::new()
Expand Down
72 changes: 70 additions & 2 deletions crates/corro-agent/src/api/public/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
use std::time::{Duration, Instant};
use std::{
collections::BTreeSet,
time::{Duration, Instant},
};

use axum::{response::IntoResponse, Extension};
use bytes::{BufMut, BytesMut};
use compact_str::ToCompactString;
use corro_types::{
agent::{Agent, ChangeError, CurrentVersion, KnownDbVersion},
api::{row_to_change, ColumnName, ExecResponse, ExecResult, QueryEvent, Statement},
api::{
row_to_change, ColumnName, ExecResponse, ExecResult, QueryEvent, Statement,
TableStatRequest, TableStatResponse,
},
base::{CrsqlDbVersion, CrsqlSeq},
broadcast::{ChangeV1, Changeset, Timestamp},
change::{ChunkedChanges, SqliteValue, MAX_CHANGES_BYTE_SIZE},
Expand Down Expand Up @@ -656,6 +662,68 @@ pub async fn api_v1_db_schema(
)
}

/// Query the table status of the current node
///
/// Currently this endpoint only supports querying the row count for a
/// selection of provided tables. Table names are checked for
/// existence before querying
pub async fn api_v1_table_stats(
Extension(agent): Extension<Agent>,
axum::extract::Json(ts_req): axum::extract::Json<TableStatRequest>,
) -> (StatusCode, axum::Json<TableStatResponse>) {
async fn count_table_lengths(
agent: &Agent,
ts_req: TableStatRequest,
) -> eyre::Result<(i64, Vec<String>)> {
debug!("Querying row count for {} tables", ts_req.tables.len());
let conn = agent.pool().read().await?;

block_in_place(move || -> eyre::Result<(i64, Vec<String>)> {
let valid_tables: BTreeSet<String> = conn
.prepare_cached("select name from sqlite_schema where type = 'table'")?
.query_map([], |row| row.get(0))?
.filter_map(|name| name.ok())
.collect();

let mut invalid_tables = vec![];
let mut total_count = 0;
for table in ts_req.tables.into_iter() {
if !valid_tables.contains(&table) {
error!("Table name {} doesn't exist!", &table);
invalid_tables.push(table);
continue;
}

let count: i64 = conn
.prepare_cached(&format!("SELECT COUNT(*) FROM {}", &table))?
.query_row((), |row| row.get(0))?;

total_count += count;
}
Ok((total_count, invalid_tables))
})
}

match count_table_lengths(&agent, ts_req).await {
Ok((count, invalid_tables)) => (
StatusCode::OK,
axum::Json(TableStatResponse {
total_row_count: count,
invalid_tables,
}),
),
Err(_) => (
StatusCode::INTERNAL_SERVER_ERROR,
axum::Json(TableStatResponse {
total_row_count: 0,
// Since we don't know what error occured or if any
// tables were valid, we just return an empty list
invalid_tables: vec![],
}),
),
}
}

#[cfg(test)]
mod tests {
use bytes::Bytes;
Expand Down
11 changes: 11 additions & 0 deletions crates/corro-api-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,17 @@ pub enum ExecResult {
Execute { rows_affected: usize, time: f64 },
Error { error: String },
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TableStatRequest {
pub tables: Vec<String>,
}

/// Contain node and sync status information
#[derive(Debug, Serialize, Deserialize)]
pub struct TableStatResponse {
pub total_row_count: i64,
pub invalid_tables: Vec<String>,
}

#[derive(Debug, Default, Clone, Serialize, Deserialize, Readable, Writable, PartialEq)]
pub struct Change {
Expand Down

0 comments on commit 6bfa2d5

Please sign in to comment.