diff --git a/crates/common/src/types/index.rs b/crates/common/src/types/index.rs index 77515d47..fef9e91d 100644 --- a/crates/common/src/types/index.rs +++ b/crates/common/src/types/index.rs @@ -112,7 +112,7 @@ pub type TabletIndexName = GenericIndexName; /// Like TabletIndexName in that it refers to a stable underlying index, /// but it works for virtual tables too. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum StableIndexName { Physical(TabletIndexName), Virtual(IndexName, TabletIndexName), diff --git a/crates/database/src/bootstrap_model/user_facing.rs b/crates/database/src/bootstrap_model/user_facing.rs index d4efc987..df44b021 100644 --- a/crates/database/src/bootstrap_model/user_facing.rs +++ b/crates/database/src/bootstrap_model/user_facing.rs @@ -10,11 +10,7 @@ use common::{ ResolvedDocument, }, index::IndexKeyBytes, - interval::Interval, - query::{ - CursorPosition, - Order, - }, + query::CursorPosition, runtime::Runtime, types::{ StableIndexName, @@ -41,7 +37,10 @@ use crate::{ log_virtual_table_get, log_virtual_table_query, }, - transaction::MAX_PAGE_SIZE, + transaction::{ + IndexRangeRequest, + MAX_PAGE_SIZE, + }, unauthorized_error, virtual_tables::VirtualTable, PatchValue, @@ -320,72 +319,106 @@ impl<'a, RT: Runtime> UserFacingModel<'a, RT> { ) } - /// NOTE: returns a page of results. Callers must call record_read_document - /// for all documents returned from the index stream. - #[convex_macro::instrument_future] - pub async fn index_range( + async fn start_index_range( &mut self, - stable_index_name: &StableIndexName, - interval: &Interval, - order: Order, - mut max_rows: usize, - version: Option, - ) -> anyhow::Result<( - Vec<(IndexKeyBytes, DeveloperDocument, WriteTimestamp)>, - CursorPosition, - )> { - if interval.is_empty() { - return Ok((vec![], CursorPosition::End)); + request: IndexRangeRequest, + ) -> anyhow::Result< + Result< + ( + Vec<(IndexKeyBytes, DeveloperDocument, WriteTimestamp)>, + CursorPosition, + ), + RangeRequest, + >, + > { + if request.interval.is_empty() { + return Ok(Ok((vec![], CursorPosition::End))); } - max_rows = cmp::min(max_rows, MAX_PAGE_SIZE); + let max_rows = cmp::min(request.max_rows, MAX_PAGE_SIZE); - let tablet_index_name = match stable_index_name { + let tablet_index_name = match request.stable_index_name { StableIndexName::Physical(tablet_index_name) => tablet_index_name, StableIndexName::Virtual(index_name, tablet_index_name) => { log_virtual_table_query(); - return VirtualTable::new(self.tx) + // TODO(lee) batch virtual table queryStreamNext + let virtual_result = VirtualTable::new(self.tx) .index_range( RangeRequest { index_name: tablet_index_name.clone(), printable_index_name: index_name.clone(), - interval: interval.clone(), - order, + interval: request.interval.clone(), + order: request.order, max_size: max_rows, }, - version, + request.version, ) - .await; + .await?; + return Ok(Ok(virtual_result)); }, StableIndexName::Missing => { - return Ok((vec![], CursorPosition::End)); + return Ok(Ok((vec![], CursorPosition::End))); }, }; let index_name = tablet_index_name .clone() .map_table(&self.tx.table_mapping().tablet_to_name())?; + Ok(Err(RangeRequest { + index_name: tablet_index_name.clone(), + printable_index_name: index_name, + interval: request.interval.clone(), + order: request.order, + max_size: max_rows, + })) + } + + /// NOTE: returns a page of results. Callers must call record_read_document + /// for all documents returned from the index stream. + #[convex_macro::instrument_future] + pub async fn index_range_batch( + &mut self, + requests: BTreeMap, + ) -> BTreeMap< + BatchKey, + anyhow::Result<( + Vec<(IndexKeyBytes, DeveloperDocument, WriteTimestamp)>, + CursorPosition, + )>, + > { + let batch_size = requests.len(); + let mut results = BTreeMap::new(); + let mut fetch_requests = BTreeMap::new(); + for (batch_key, request) in requests { + match self.start_index_range(request).await { + Err(e) => { + results.insert(batch_key, Err(e)); + }, + Ok(Ok(result)) => { + results.insert(batch_key, Ok(result)); + }, + Ok(Err(request)) => { + fetch_requests.insert(batch_key, request); + }, + } + } - let (results, cursor) = self + let fetch_results = self .tx .index - .range( - &mut self.tx.reads, - RangeRequest { - index_name: tablet_index_name.clone(), - printable_index_name: index_name, - interval: interval.clone(), - order, - max_size: max_rows, - }, - ) - .await?; - let developer_results = results - .into_iter() - .map(|(key, doc, ts)| { - let doc = doc.to_developer(); - anyhow::Ok((key, doc, ts)) - }) - .try_collect()?; - Ok((developer_results, cursor)) + .range_batch(&mut self.tx.reads, fetch_requests) + .await; + + for (batch_key, fetch_result) in fetch_results { + let result = fetch_result.map(|(resolved_results, cursor)| { + let developer_results = resolved_results + .into_iter() + .map(|(key, doc, ts)| (key, doc.to_developer(), ts)) + .collect(); + (developer_results, cursor) + }); + results.insert(batch_key, result); + } + assert_eq!(results.len(), batch_size); + results } } diff --git a/crates/database/src/query/index_range.rs b/crates/database/src/query/index_range.rs index a7721906..79f9919d 100644 --- a/crates/database/src/query/index_range.rs +++ b/crates/database/src/query/index_range.rs @@ -3,6 +3,7 @@ use std::{ collections::VecDeque, }; +use anyhow::Context; use async_trait::async_trait; use common::{ document::GenericDocument, @@ -24,6 +25,7 @@ use common::{ }, version::Version, }; +use maplit::btreemap; use super::{ query_scanned_too_many_documents_error, @@ -35,6 +37,7 @@ use super::{ }; use crate::{ metrics, + transaction::IndexRangeRequest, Transaction, }; @@ -138,80 +141,94 @@ impl IndexRange { } } - #[convex_macro::instrument_future] - async fn _next( + fn start_next( &mut self, tx: &mut Transaction, prefetch_hint: Option, - ) -> anyhow::Result, WriteTimestamp)>> { - loop { - // If we have an end cursor, for correctness we need to process - // the entire interval, so ignore `maximum_rows_read` and `maximum_bytes_read`. - let enforce_limits = self.cursor_interval.end_inclusive.is_none(); + ) -> anyhow::Result, WriteTimestamp)>, IndexRangeRequest>> + { + // If we have an end cursor, for correctness we need to process + // the entire interval, so ignore `maximum_rows_read` and `maximum_bytes_read`. + let enforce_limits = self.cursor_interval.end_inclusive.is_none(); - if enforce_limits - && let Some(maximum_bytes_read) = self.maximum_bytes_read - && self.returned_bytes >= maximum_bytes_read - { - // If we're over our data budget, throw an error. - // We do this after we've already exceeded the limit to ensure that - // paginated queries always scan at least one item so they can - // make progress. - return Err(query_scanned_too_much_data(self.returned_bytes).into()); - } + if enforce_limits + && let Some(maximum_bytes_read) = self.maximum_bytes_read + && self.returned_bytes >= maximum_bytes_read + { + // If we're over our data budget, throw an error. + // We do this after we've already exceeded the limit to ensure that + // paginated queries always scan at least one item so they can + // make progress. + return Err(query_scanned_too_much_data(self.returned_bytes).into()); + } - if let Some((index_position, v, timestamp)) = self.page.pop_front() { - let index_bytes = index_position.len(); - if let Some(intermediate_cursors) = &mut self.intermediate_cursors { - intermediate_cursors.push(CursorPosition::After(index_position.clone())); - } - self.cursor_interval.curr_exclusive = Some(CursorPosition::After(index_position)); - self.returned_results += 1; - T::record_read_document(tx, &v, self.printable_index_name.table())?; - // Database bandwidth for index reads - tx.usage_tracker.track_database_egress_size( - self.printable_index_name.table().to_string(), - index_bytes as u64, - self.printable_index_name.is_system_owned(), - ); - self.returned_bytes += v.size(); - return Ok(Some((v, timestamp))); - } - if let Some(CursorPosition::End) = self.cursor_interval.curr_exclusive { - return Ok(None); - } - if self.unfetched_interval.is_empty() { - // We're out of results. If we have an end cursor then we must - // have reached it. Otherwise we're at the end of the entire - // query. - self.cursor_interval.curr_exclusive = Some( - self.cursor_interval - .end_inclusive - .clone() - .unwrap_or(CursorPosition::End), - ); - return Ok(None); + if let Some((index_position, v, timestamp)) = self.page.pop_front() { + let index_bytes = index_position.len(); + if let Some(intermediate_cursors) = &mut self.intermediate_cursors { + intermediate_cursors.push(CursorPosition::After(index_position.clone())); } + self.cursor_interval.curr_exclusive = Some(CursorPosition::After(index_position)); + self.returned_results += 1; + T::record_read_document(tx, &v, self.printable_index_name.table())?; + // Database bandwidth for index reads + tx.usage_tracker.track_database_egress_size( + self.printable_index_name.table().to_string(), + index_bytes as u64, + self.printable_index_name.is_system_owned(), + ); + self.returned_bytes += v.size(); + return Ok(Ok(Some((v, timestamp)))); + } + if let Some(CursorPosition::End) = self.cursor_interval.curr_exclusive { + return Ok(Ok(None)); + } + if self.unfetched_interval.is_empty() { + // We're out of results. If we have an end cursor then we must + // have reached it. Otherwise we're at the end of the entire + // query. + self.cursor_interval.curr_exclusive = Some( + self.cursor_interval + .end_inclusive + .clone() + .unwrap_or(CursorPosition::End), + ); + return Ok(Ok(None)); + } - let mut max_rows = prefetch_hint - .unwrap_or(DEFAULT_QUERY_PREFETCH) - .clamp(1, MAX_QUERY_FETCH); + let mut max_rows = prefetch_hint + .unwrap_or(DEFAULT_QUERY_PREFETCH) + .clamp(1, MAX_QUERY_FETCH); - if enforce_limits && let Some(maximum_rows_read) = self.maximum_rows_read { - if self.rows_read >= maximum_rows_read { - return Err(query_scanned_too_many_documents_error(self.rows_read).into()); - } - max_rows = cmp::min(max_rows, maximum_rows_read - self.rows_read); + if enforce_limits && let Some(maximum_rows_read) = self.maximum_rows_read { + if self.rows_read >= maximum_rows_read { + return Err(query_scanned_too_many_documents_error(self.rows_read).into()); } - let (page, fetch_cursor) = T::index_range( - tx, - &self.stable_index_name, - &self.unfetched_interval, - self.order, - max_rows, - self.version.clone(), - ) - .await?; + max_rows = cmp::min(max_rows, maximum_rows_read - self.rows_read); + } + Ok(Err(IndexRangeRequest { + stable_index_name: self.stable_index_name.clone(), + interval: self.unfetched_interval.clone(), + order: self.order, + max_rows, + version: self.version.clone(), + })) + } + + #[convex_macro::instrument_future] + async fn _next( + &mut self, + tx: &mut Transaction, + prefetch_hint: Option, + ) -> anyhow::Result, WriteTimestamp)>> { + loop { + let request = match self.start_next(tx, prefetch_hint)? { + Ok(result) => return Ok(result), + Err(request) => request, + }; + let (page, fetch_cursor) = T::index_range_batch(tx, btreemap! {0 => request}) + .await + .remove(&0) + .context("batch_key missing")??; let (_, new_unfetched_interval) = self.unfetched_interval.split(fetch_cursor, self.order); anyhow::ensure!(self.unfetched_interval != new_unfetched_interval); diff --git a/crates/database/src/query/mod.rs b/crates/database/src/query/mod.rs index 60aa1e26..10875268 100644 --- a/crates/database/src/query/mod.rs +++ b/crates/database/src/query/mod.rs @@ -1,4 +1,7 @@ -use std::marker::PhantomData; +use std::{ + collections::BTreeMap, + marker::PhantomData, +}; use async_trait::async_trait; use common::{ @@ -12,7 +15,6 @@ use common::{ query::{ Cursor, CursorPosition, - Order, Query, QueryFingerprint, QueryOperator, @@ -21,12 +23,12 @@ use common::{ runtime::Runtime, types::{ IndexName, - StableIndexName, WriteTimestamp, }, version::Version, }; use errors::ErrorMetadata; +use indexing::backend_in_memory_indexes::BatchKey; use value::{ GenericDocumentId, TableIdAndTableNumber, @@ -45,6 +47,7 @@ use self::{ search_query::SearchQuery, }; use crate::{ + transaction::IndexRangeRequest, IndexModel, Transaction, UserFacingModel, @@ -104,17 +107,16 @@ trait QueryStream: Send { pub trait QueryType { type T: TableIdentifier; - async fn index_range( + async fn index_range_batch( tx: &mut Transaction, - stable_index_name: &StableIndexName, - interval: &Interval, - order: Order, - max_rows: usize, - version: Option, - ) -> anyhow::Result<( - Vec<(IndexKeyBytes, GenericDocument, WriteTimestamp)>, - CursorPosition, - )>; + requests: BTreeMap, + ) -> BTreeMap< + BatchKey, + anyhow::Result<( + Vec<(IndexKeyBytes, GenericDocument, WriteTimestamp)>, + CursorPosition, + )>, + >; async fn get_with_ts( tx: &mut Transaction, @@ -141,19 +143,17 @@ pub enum Developer {} impl QueryType for Resolved { type T = TableIdAndTableNumber; - async fn index_range( + async fn index_range_batch( tx: &mut Transaction, - stable_index_name: &StableIndexName, - interval: &Interval, - order: Order, - max_rows: usize, - _version: Option, - ) -> anyhow::Result<( - Vec<(IndexKeyBytes, GenericDocument, WriteTimestamp)>, - CursorPosition, - )> { - tx.index_range(stable_index_name, interval, order, max_rows) - .await + requests: BTreeMap, + ) -> BTreeMap< + BatchKey, + anyhow::Result<( + Vec<(IndexKeyBytes, GenericDocument, WriteTimestamp)>, + CursorPosition, + )>, + > { + tx.index_range_batch(requests).await } async fn get_with_ts( @@ -184,20 +184,17 @@ impl QueryType for Resolved { impl QueryType for Developer { type T = TableNumber; - async fn index_range( + async fn index_range_batch( tx: &mut Transaction, - stable_index_name: &StableIndexName, - interval: &Interval, - order: Order, - max_rows: usize, - version: Option, - ) -> anyhow::Result<( - Vec<(IndexKeyBytes, GenericDocument, WriteTimestamp)>, - CursorPosition, - )> { - UserFacingModel::new(tx) - .index_range(stable_index_name, interval, order, max_rows, version) - .await + requests: BTreeMap, + ) -> BTreeMap< + BatchKey, + anyhow::Result<( + Vec<(IndexKeyBytes, GenericDocument, WriteTimestamp)>, + CursorPosition, + )>, + > { + UserFacingModel::new(tx).index_range_batch(requests).await } async fn get_with_ts( diff --git a/crates/database/src/transaction.rs b/crates/database/src/transaction.rs index d2dc4751..0d7b66a9 100644 --- a/crates/database/src/transaction.rs +++ b/crates/database/src/transaction.rs @@ -70,6 +70,7 @@ use common::{ TableMapping, VirtualTableMapping, }, + version::Version, }; use errors::ErrorMetadata; use indexing::backend_in_memory_indexes::{ @@ -961,23 +962,22 @@ impl Transaction { .await } - /// NOTE: returns a page of results. Callers must call record_read_document - /// for all documents returned from the index stream. - #[convex_macro::instrument_future] - pub async fn index_range( + fn start_index_range( &mut self, - stable_index_name: &StableIndexName, - interval: &Interval, - order: Order, - mut max_rows: usize, - ) -> anyhow::Result<( - Vec<(IndexKeyBytes, ResolvedDocument, WriteTimestamp)>, - CursorPosition, - )> { - if interval.is_empty() { - return Ok((vec![], CursorPosition::End)); + request: IndexRangeRequest, + ) -> anyhow::Result< + Result< + ( + Vec<(IndexKeyBytes, ResolvedDocument, WriteTimestamp)>, + CursorPosition, + ), + RangeRequest, + >, + > { + if request.interval.is_empty() { + return Ok(Ok((vec![], CursorPosition::End))); } - let tablet_index_name = match stable_index_name { + let tablet_index_name = match request.stable_index_name { StableIndexName::Physical(tablet_index_name) => tablet_index_name, StableIndexName::Virtual(..) => { anyhow::bail!( @@ -986,26 +986,60 @@ impl Transaction { ); }, StableIndexName::Missing => { - return Ok((vec![], CursorPosition::End)); + return Ok(Ok((vec![], CursorPosition::End))); }, }; let index_name = tablet_index_name .clone() .map_table(&self.table_mapping().tablet_to_name())?; - max_rows = cmp::min(max_rows, MAX_PAGE_SIZE); + let max_rows = cmp::min(request.max_rows, MAX_PAGE_SIZE); + Ok(Err(RangeRequest { + index_name: tablet_index_name, + printable_index_name: index_name, + interval: request.interval, + order: request.order, + max_size: max_rows, + })) + } - self.index - .range( - &mut self.reads, - RangeRequest { - index_name: tablet_index_name.clone(), - printable_index_name: index_name, - interval: interval.clone(), - order, - max_size: max_rows, + /// NOTE: returns a page of results. Callers must call record_read_document + /// for all documents returned from the index stream. + #[convex_macro::instrument_future] + pub async fn index_range_batch( + &mut self, + requests: BTreeMap, + ) -> BTreeMap< + BatchKey, + anyhow::Result<( + Vec<(IndexKeyBytes, ResolvedDocument, WriteTimestamp)>, + CursorPosition, + )>, + > { + let batch_size = requests.len(); + let mut results = BTreeMap::new(); + let mut fetch_requests = BTreeMap::new(); + for (batch_key, request) in requests { + match self.start_index_range(request) { + Err(e) => { + results.insert(batch_key, Err(e)); }, - ) - .await + Ok(Ok(result)) => { + results.insert(batch_key, Ok(result)); + }, + Ok(Err(fetch_request)) => { + fetch_requests.insert(batch_key, fetch_request); + }, + } + } + let fetch_results = self + .index + .range_batch(&mut self.reads, fetch_requests) + .await; + for (batch_key, fetch_result) in fetch_results { + results.insert(batch_key, fetch_result); + } + assert_eq!(results.len(), batch_size); + results } /// Used when a system table is served from cache - to manually add a read @@ -1033,6 +1067,14 @@ impl Transaction { } } +pub struct IndexRangeRequest { + pub stable_index_name: StableIndexName, + pub interval: Interval, + pub order: Order, + pub max_rows: usize, + pub version: Option, +} + /// FinalTransaction is a finalized Transaction. /// After all persistence reads have been performed and validated, and all /// writes have been staged, a FinalTransaction stores the transaction until it