Skip to content

Commit

Permalink
[Transaction] batch index_range (#23706)
Browse files Browse the repository at this point in the history
refactor, not changing behavior yet.
we thread the batching up, from TransactionIndex::index_range up through UserFacingModel::index_range.

the Query layer is still unbatched, but this refactor unblocks that.

GitOrigin-RevId: 9704db01e647a72f519b28a6e92de2da7ed33523
  • Loading branch information
ldanilek authored and Convex, Inc. committed Mar 20, 2024
1 parent 0ee1d31 commit f6764b0
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 181 deletions.
2 changes: 1 addition & 1 deletion crates/common/src/types/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub type TabletIndexName = GenericIndexName<TableId>;

/// Like TabletIndexName in that it refers to a stable underlying index,
/// but it works for virtual tables too.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum StableIndexName {
Physical(TabletIndexName),
Virtual(IndexName, TabletIndexName),
Expand Down
131 changes: 82 additions & 49 deletions crates/database/src/bootstrap_model/user_facing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@ use common::{
ResolvedDocument,
},
index::IndexKeyBytes,
interval::Interval,
query::{
CursorPosition,
Order,
},
query::CursorPosition,
runtime::Runtime,
types::{
StableIndexName,
Expand All @@ -41,7 +37,10 @@ use crate::{
log_virtual_table_get,
log_virtual_table_query,
},
transaction::MAX_PAGE_SIZE,
transaction::{
IndexRangeRequest,
MAX_PAGE_SIZE,
},
unauthorized_error,
virtual_tables::VirtualTable,
PatchValue,
Expand Down Expand Up @@ -320,72 +319,106 @@ impl<'a, RT: Runtime> UserFacingModel<'a, RT> {
)
}

/// NOTE: returns a page of results. Callers must call record_read_document
/// for all documents returned from the index stream.
#[convex_macro::instrument_future]
pub async fn index_range(
async fn start_index_range(
&mut self,
stable_index_name: &StableIndexName,
interval: &Interval,
order: Order,
mut max_rows: usize,
version: Option<Version>,
) -> anyhow::Result<(
Vec<(IndexKeyBytes, DeveloperDocument, WriteTimestamp)>,
CursorPosition,
)> {
if interval.is_empty() {
return Ok((vec![], CursorPosition::End));
request: IndexRangeRequest,
) -> anyhow::Result<
Result<
(
Vec<(IndexKeyBytes, DeveloperDocument, WriteTimestamp)>,
CursorPosition,
),
RangeRequest,
>,
> {
if request.interval.is_empty() {
return Ok(Ok((vec![], CursorPosition::End)));
}

max_rows = cmp::min(max_rows, MAX_PAGE_SIZE);
let max_rows = cmp::min(request.max_rows, MAX_PAGE_SIZE);

let tablet_index_name = match stable_index_name {
let tablet_index_name = match request.stable_index_name {
StableIndexName::Physical(tablet_index_name) => tablet_index_name,
StableIndexName::Virtual(index_name, tablet_index_name) => {
log_virtual_table_query();
return VirtualTable::new(self.tx)
// TODO(lee) batch virtual table queryStreamNext
let virtual_result = VirtualTable::new(self.tx)
.index_range(
RangeRequest {
index_name: tablet_index_name.clone(),
printable_index_name: index_name.clone(),
interval: interval.clone(),
order,
interval: request.interval.clone(),
order: request.order,
max_size: max_rows,
},
version,
request.version,
)
.await;
.await?;
return Ok(Ok(virtual_result));
},
StableIndexName::Missing => {
return Ok((vec![], CursorPosition::End));
return Ok(Ok((vec![], CursorPosition::End)));
},
};
let index_name = tablet_index_name
.clone()
.map_table(&self.tx.table_mapping().tablet_to_name())?;
Ok(Err(RangeRequest {
index_name: tablet_index_name.clone(),
printable_index_name: index_name,
interval: request.interval.clone(),
order: request.order,
max_size: max_rows,
}))
}

/// NOTE: returns a page of results. Callers must call record_read_document
/// for all documents returned from the index stream.
#[convex_macro::instrument_future]
pub async fn index_range_batch(
&mut self,
requests: BTreeMap<BatchKey, IndexRangeRequest>,
) -> BTreeMap<
BatchKey,
anyhow::Result<(
Vec<(IndexKeyBytes, DeveloperDocument, WriteTimestamp)>,
CursorPosition,
)>,
> {
let batch_size = requests.len();
let mut results = BTreeMap::new();
let mut fetch_requests = BTreeMap::new();
for (batch_key, request) in requests {
match self.start_index_range(request).await {
Err(e) => {
results.insert(batch_key, Err(e));
},
Ok(Ok(result)) => {
results.insert(batch_key, Ok(result));
},
Ok(Err(request)) => {
fetch_requests.insert(batch_key, request);
},
}
}

let (results, cursor) = self
let fetch_results = self
.tx
.index
.range(
&mut self.tx.reads,
RangeRequest {
index_name: tablet_index_name.clone(),
printable_index_name: index_name,
interval: interval.clone(),
order,
max_size: max_rows,
},
)
.await?;
let developer_results = results
.into_iter()
.map(|(key, doc, ts)| {
let doc = doc.to_developer();
anyhow::Ok((key, doc, ts))
})
.try_collect()?;
Ok((developer_results, cursor))
.range_batch(&mut self.tx.reads, fetch_requests)
.await;

for (batch_key, fetch_result) in fetch_results {
let result = fetch_result.map(|(resolved_results, cursor)| {
let developer_results = resolved_results
.into_iter()
.map(|(key, doc, ts)| (key, doc.to_developer(), ts))
.collect();
(developer_results, cursor)
});
results.insert(batch_key, result);
}
assert_eq!(results.len(), batch_size);
results
}
}
147 changes: 82 additions & 65 deletions crates/database/src/query/index_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
collections::VecDeque,
};

use anyhow::Context;
use async_trait::async_trait;
use common::{
document::GenericDocument,
Expand All @@ -24,6 +25,7 @@ use common::{
},
version::Version,
};
use maplit::btreemap;

use super::{
query_scanned_too_many_documents_error,
Expand All @@ -35,6 +37,7 @@ use super::{
};
use crate::{
metrics,
transaction::IndexRangeRequest,
Transaction,
};

Expand Down Expand Up @@ -138,80 +141,94 @@ impl<T: QueryType> IndexRange<T> {
}
}

#[convex_macro::instrument_future]
async fn _next<RT: Runtime>(
fn start_next<RT: Runtime>(
&mut self,
tx: &mut Transaction<RT>,
prefetch_hint: Option<usize>,
) -> anyhow::Result<Option<(GenericDocument<T::T>, WriteTimestamp)>> {
loop {
// If we have an end cursor, for correctness we need to process
// the entire interval, so ignore `maximum_rows_read` and `maximum_bytes_read`.
let enforce_limits = self.cursor_interval.end_inclusive.is_none();
) -> anyhow::Result<Result<Option<(GenericDocument<T::T>, WriteTimestamp)>, IndexRangeRequest>>
{
// If we have an end cursor, for correctness we need to process
// the entire interval, so ignore `maximum_rows_read` and `maximum_bytes_read`.
let enforce_limits = self.cursor_interval.end_inclusive.is_none();

if enforce_limits
&& let Some(maximum_bytes_read) = self.maximum_bytes_read
&& self.returned_bytes >= maximum_bytes_read
{
// If we're over our data budget, throw an error.
// We do this after we've already exceeded the limit to ensure that
// paginated queries always scan at least one item so they can
// make progress.
return Err(query_scanned_too_much_data(self.returned_bytes).into());
}
if enforce_limits
&& let Some(maximum_bytes_read) = self.maximum_bytes_read
&& self.returned_bytes >= maximum_bytes_read
{
// If we're over our data budget, throw an error.
// We do this after we've already exceeded the limit to ensure that
// paginated queries always scan at least one item so they can
// make progress.
return Err(query_scanned_too_much_data(self.returned_bytes).into());
}

if let Some((index_position, v, timestamp)) = self.page.pop_front() {
let index_bytes = index_position.len();
if let Some(intermediate_cursors) = &mut self.intermediate_cursors {
intermediate_cursors.push(CursorPosition::After(index_position.clone()));
}
self.cursor_interval.curr_exclusive = Some(CursorPosition::After(index_position));
self.returned_results += 1;
T::record_read_document(tx, &v, self.printable_index_name.table())?;
// Database bandwidth for index reads
tx.usage_tracker.track_database_egress_size(
self.printable_index_name.table().to_string(),
index_bytes as u64,
self.printable_index_name.is_system_owned(),
);
self.returned_bytes += v.size();
return Ok(Some((v, timestamp)));
}
if let Some(CursorPosition::End) = self.cursor_interval.curr_exclusive {
return Ok(None);
}
if self.unfetched_interval.is_empty() {
// We're out of results. If we have an end cursor then we must
// have reached it. Otherwise we're at the end of the entire
// query.
self.cursor_interval.curr_exclusive = Some(
self.cursor_interval
.end_inclusive
.clone()
.unwrap_or(CursorPosition::End),
);
return Ok(None);
if let Some((index_position, v, timestamp)) = self.page.pop_front() {
let index_bytes = index_position.len();
if let Some(intermediate_cursors) = &mut self.intermediate_cursors {
intermediate_cursors.push(CursorPosition::After(index_position.clone()));
}
self.cursor_interval.curr_exclusive = Some(CursorPosition::After(index_position));
self.returned_results += 1;
T::record_read_document(tx, &v, self.printable_index_name.table())?;
// Database bandwidth for index reads
tx.usage_tracker.track_database_egress_size(
self.printable_index_name.table().to_string(),
index_bytes as u64,
self.printable_index_name.is_system_owned(),
);
self.returned_bytes += v.size();
return Ok(Ok(Some((v, timestamp))));
}
if let Some(CursorPosition::End) = self.cursor_interval.curr_exclusive {
return Ok(Ok(None));
}
if self.unfetched_interval.is_empty() {
// We're out of results. If we have an end cursor then we must
// have reached it. Otherwise we're at the end of the entire
// query.
self.cursor_interval.curr_exclusive = Some(
self.cursor_interval
.end_inclusive
.clone()
.unwrap_or(CursorPosition::End),
);
return Ok(Ok(None));
}

let mut max_rows = prefetch_hint
.unwrap_or(DEFAULT_QUERY_PREFETCH)
.clamp(1, MAX_QUERY_FETCH);
let mut max_rows = prefetch_hint
.unwrap_or(DEFAULT_QUERY_PREFETCH)
.clamp(1, MAX_QUERY_FETCH);

if enforce_limits && let Some(maximum_rows_read) = self.maximum_rows_read {
if self.rows_read >= maximum_rows_read {
return Err(query_scanned_too_many_documents_error(self.rows_read).into());
}
max_rows = cmp::min(max_rows, maximum_rows_read - self.rows_read);
if enforce_limits && let Some(maximum_rows_read) = self.maximum_rows_read {
if self.rows_read >= maximum_rows_read {
return Err(query_scanned_too_many_documents_error(self.rows_read).into());
}
let (page, fetch_cursor) = T::index_range(
tx,
&self.stable_index_name,
&self.unfetched_interval,
self.order,
max_rows,
self.version.clone(),
)
.await?;
max_rows = cmp::min(max_rows, maximum_rows_read - self.rows_read);
}
Ok(Err(IndexRangeRequest {
stable_index_name: self.stable_index_name.clone(),
interval: self.unfetched_interval.clone(),
order: self.order,
max_rows,
version: self.version.clone(),
}))
}

#[convex_macro::instrument_future]
async fn _next<RT: Runtime>(
&mut self,
tx: &mut Transaction<RT>,
prefetch_hint: Option<usize>,
) -> anyhow::Result<Option<(GenericDocument<T::T>, WriteTimestamp)>> {
loop {
let request = match self.start_next(tx, prefetch_hint)? {
Ok(result) => return Ok(result),
Err(request) => request,
};
let (page, fetch_cursor) = T::index_range_batch(tx, btreemap! {0 => request})
.await
.remove(&0)
.context("batch_key missing")??;
let (_, new_unfetched_interval) =
self.unfetched_interval.split(fetch_cursor, self.order);
anyhow::ensure!(self.unfetched_interval != new_unfetched_interval);
Expand Down
Loading

0 comments on commit f6764b0

Please sign in to comment.