From e9a9daffae0857b6ee4be2c389a0746c91bdbfc0 Mon Sep 17 00:00:00 2001 From: Lee Danilek Date: Wed, 20 Mar 2024 23:30:28 -0400 Subject: [PATCH] [parallel queries] refactor IndexRange next (#23755) small refactor to split out the post-processing for index range fetches. GitOrigin-RevId: e2dd984161e8a51fb306db4463b2b71b7bef9fc9 --- crates/database/src/query/index_range.rs | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/crates/database/src/query/index_range.rs b/crates/database/src/query/index_range.rs index 79f9919d..b35b4212 100644 --- a/crates/database/src/query/index_range.rs +++ b/crates/database/src/query/index_range.rs @@ -214,6 +214,20 @@ impl IndexRange { })) } + fn process_fetch( + &mut self, + page: Vec<(IndexKeyBytes, GenericDocument, WriteTimestamp)>, + fetch_cursor: CursorPosition, + ) -> anyhow::Result<()> { + let (_, new_unfetched_interval) = self.unfetched_interval.split(fetch_cursor, self.order); + anyhow::ensure!(self.unfetched_interval != new_unfetched_interval); + self.unfetched_interval = new_unfetched_interval; + self.page_count += 1; + self.rows_read += page.len(); + self.page.extend(page); + Ok(()) + } + #[convex_macro::instrument_future] async fn _next( &mut self, @@ -229,13 +243,7 @@ impl IndexRange { .await .remove(&0) .context("batch_key missing")??; - let (_, new_unfetched_interval) = - self.unfetched_interval.split(fetch_cursor, self.order); - anyhow::ensure!(self.unfetched_interval != new_unfetched_interval); - self.unfetched_interval = new_unfetched_interval; - self.page_count += 1; - self.rows_read += page.len(); - self.page.extend(page); + self.process_fetch(page, fetch_cursor)?; } } }