Skip to content

Commit

Permalink
[parallel queries] refactor IndexRange next (#23755)
Browse files Browse the repository at this point in the history
small refactor to split out the post-processing for index range fetches.

GitOrigin-RevId: e2dd984161e8a51fb306db4463b2b71b7bef9fc9
  • Loading branch information
ldanilek authored and Convex, Inc. committed Mar 21, 2024
1 parent 0580822 commit e9a9daf
Showing 1 changed file with 15 additions and 7 deletions.
22 changes: 15 additions & 7 deletions crates/database/src/query/index_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,20 @@ impl<T: QueryType> IndexRange<T> {
}))
}

fn process_fetch(
&mut self,
page: Vec<(IndexKeyBytes, GenericDocument<T::T>, WriteTimestamp)>,
fetch_cursor: CursorPosition,
) -> anyhow::Result<()> {
let (_, new_unfetched_interval) = self.unfetched_interval.split(fetch_cursor, self.order);
anyhow::ensure!(self.unfetched_interval != new_unfetched_interval);
self.unfetched_interval = new_unfetched_interval;
self.page_count += 1;
self.rows_read += page.len();
self.page.extend(page);
Ok(())
}

#[convex_macro::instrument_future]
async fn _next<RT: Runtime>(
&mut self,
Expand All @@ -229,13 +243,7 @@ impl<T: QueryType> IndexRange<T> {
.await
.remove(&0)
.context("batch_key missing")??;
let (_, new_unfetched_interval) =
self.unfetched_interval.split(fetch_cursor, self.order);
anyhow::ensure!(self.unfetched_interval != new_unfetched_interval);
self.unfetched_interval = new_unfetched_interval;
self.page_count += 1;
self.rows_read += page.len();
self.page.extend(page);
self.process_fetch(page, fetch_cursor)?;
}
}
}
Expand Down

0 comments on commit e9a9daf

Please sign in to comment.