From b0591ed60aac36fbbbe242aeb9163455a2a41ea6 Mon Sep 17 00:00:00 2001 From: Lee Danilek Date: Sat, 4 May 2024 10:05:41 -0700 Subject: [PATCH] text search walk stable index name (#25376) like index range queries, text search queries should walk a consistent tablet. otherwise there might be weird issues where it's walking an index and the tablet changes out from under it, possibly changing the indexed fields. GitOrigin-RevId: d576844a9a5b29a5a736057a19a819fe27a01f54 --- crates/application/src/tests/cron_jobs.rs | 2 +- crates/common/src/query.rs | 9 +- crates/common/src/types/index.rs | 10 +- crates/database/src/bootstrap_model/index.rs | 8 +- crates/database/src/query/filter.rs | 5 + crates/database/src/query/index_range.rs | 5 + crates/database/src/query/limit.rs | 5 + crates/database/src/query/mod.rs | 103 ++++++++++++------ crates/database/src/query/search_query.rs | 27 ++++- crates/database/src/tests/mod.rs | 2 +- crates/database/src/transaction.rs | 14 +-- .../src/environment/udf/async_syscall.rs | 10 +- crates/isolate/src/tests/user_error.rs | 5 +- crates/model/src/session_requests/mod.rs | 5 +- 14 files changed, 143 insertions(+), 67 deletions(-) diff --git a/crates/application/src/tests/cron_jobs.rs b/crates/application/src/tests/cron_jobs.rs index 6b03827e..5202e754 100644 --- a/crates/application/src/tests/cron_jobs.rs +++ b/crates/application/src/tests/cron_jobs.rs @@ -194,7 +194,7 @@ async fn test_cron_jobs_helper(rt: TestRuntime, backend_state: BackendState) -> let mut table_model = TableModel::new(&mut tx); assert!(table_model.table_is_empty(&OBJECTS_TABLE).await?); let mut logs_query = cron_log_query(&mut tx)?; - logs_query.expect_none(&mut tx).await?; + assert!(logs_query.next(&mut tx, Some(1)).await?.is_none()); // Resuming the backend should make the jobs execute. let mut model = BackendStateModel::new(&mut tx); diff --git a/crates/common/src/query.rs b/crates/common/src/query.rs index 4ac951ab..027fd5ea 100644 --- a/crates/common/src/query.rs +++ b/crates/common/src/query.rs @@ -33,7 +33,6 @@ use value::{ ConvexObject, ConvexValue, TableId, - TableIdAndTableNumber, }; use crate::{ @@ -52,6 +51,7 @@ use crate::{ IndexName, MaybeValue, TableName, + TabletIndexName, }, value::{ sha256::Sha256 as CommonSha256, @@ -581,12 +581,9 @@ pub struct Search { } impl Search { - pub fn to_internal( - self, - f: &impl Fn(TableName) -> anyhow::Result, - ) -> anyhow::Result { + pub fn to_internal(self, tablet_index_name: TabletIndexName) -> anyhow::Result { Ok(InternalSearch { - index_name: self.index_name.to_resolved(f)?.into(), + index_name: tablet_index_name, table_name: self.table, filters: self .filters diff --git a/crates/common/src/types/index.rs b/crates/common/src/types/index.rs index 79aff154..622ff254 100644 --- a/crates/common/src/types/index.rs +++ b/crates/common/src/types/index.rs @@ -115,7 +115,7 @@ pub type TabletIndexName = GenericIndexName; /// Like TabletIndexName in that it refers to a stable underlying index, /// but it works for virtual tables too. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Eq, PartialEq)] pub enum StableIndexName { Physical(TabletIndexName), Virtual(IndexName, TabletIndexName), @@ -123,6 +123,14 @@ pub enum StableIndexName { } impl StableIndexName { + pub fn tablet_index_name(&self) -> Option<&TabletIndexName> { + match self { + StableIndexName::Physical(tablet_index_name) => Some(tablet_index_name), + StableIndexName::Virtual(_, tablet_index_name) => Some(tablet_index_name), + StableIndexName::Missing => None, + } + } + pub fn virtual_table_number_map( &self, table_mapping: &TableMapping, diff --git a/crates/database/src/bootstrap_model/index.rs b/crates/database/src/bootstrap_model/index.rs index d29df5ab..756cdd15 100644 --- a/crates/database/src/bootstrap_model/index.rs +++ b/crates/database/src/bootstrap_model/index.rs @@ -487,11 +487,9 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> { stable_index_name: &StableIndexName, printable_index_name: &IndexName, ) -> anyhow::Result { - let resolved_index_name = match stable_index_name { - StableIndexName::Physical(index_name) => index_name, - StableIndexName::Virtual(_, index_name) => index_name, - StableIndexName::Missing => anyhow::bail!(index_not_found_error(printable_index_name)), - }; + let resolved_index_name = stable_index_name + .tablet_index_name() + .with_context(|| index_not_found_error(printable_index_name))?; let metadata = self.require_enabled_index_metadata(printable_index_name, resolved_index_name)?; match metadata.config.clone() { diff --git a/crates/database/src/query/filter.rs b/crates/database/src/query/filter.rs index c00a5905..40a97c27 100644 --- a/crates/database/src/query/filter.rs +++ b/crates/database/src/query/filter.rs @@ -5,6 +5,7 @@ use common::{ Expression, }, runtime::Runtime, + types::TabletIndexName, }; use super::{ @@ -71,4 +72,8 @@ impl QueryStream for Filter { fn feed(&mut self, index_range_response: IndexRangeResponse) -> anyhow::Result<()> { self.inner.feed(index_range_response) } + + fn tablet_index_name(&self) -> Option<&TabletIndexName> { + self.inner.tablet_index_name() + } } diff --git a/crates/database/src/query/index_range.rs b/crates/database/src/query/index_range.rs index 9d504579..63da7021 100644 --- a/crates/database/src/query/index_range.rs +++ b/crates/database/src/query/index_range.rs @@ -20,6 +20,7 @@ use common::{ types::{ IndexName, StableIndexName, + TabletIndexName, WriteTimestamp, }, version::Version, @@ -263,6 +264,10 @@ impl QueryStream for IndexRange { fn feed(&mut self, index_range_response: IndexRangeResponse) -> anyhow::Result<()> { self.process_fetch(index_range_response.page, index_range_response.cursor) } + + fn tablet_index_name(&self) -> Option<&TabletIndexName> { + self.stable_index_name.tablet_index_name() + } } impl Drop for IndexRange { diff --git a/crates/database/src/query/limit.rs b/crates/database/src/query/limit.rs index cc022014..449c971a 100644 --- a/crates/database/src/query/limit.rs +++ b/crates/database/src/query/limit.rs @@ -4,6 +4,7 @@ use async_trait::async_trait; use common::{ query::CursorPosition, runtime::Runtime, + types::TabletIndexName, }; use super::{ @@ -69,4 +70,8 @@ impl QueryStream for Limit { fn feed(&mut self, index_range_response: IndexRangeResponse) -> anyhow::Result<()> { self.inner.feed(index_range_response) } + + fn tablet_index_name(&self) -> Option<&TabletIndexName> { + self.inner.tablet_index_name() + } } diff --git a/crates/database/src/query/mod.rs b/crates/database/src/query/mod.rs index 4f442ca6..80d67420 100644 --- a/crates/database/src/query/mod.rs +++ b/crates/database/src/query/mod.rs @@ -10,7 +10,11 @@ use common::{ database_index::IndexedFields, INDEX_TABLE, }, - document::GenericDocument, + document::{ + DeveloperDocument, + GenericDocument, + ResolvedDocument, + }, index::IndexKeyBytes, interval::Interval, query::{ @@ -24,6 +28,7 @@ use common::{ runtime::Runtime, types::{ IndexName, + TabletIndexName, WriteTimestamp, }, version::Version, @@ -110,6 +115,10 @@ trait QueryStream: Send { prefetch_hint: Option, ) -> anyhow::Result>; fn feed(&mut self, index_range_response: IndexRangeResponse) -> anyhow::Result<()>; + + /// All queries walk an index of some kind, as long as the table exists. + /// This is that index name, tied to a tablet. + fn tablet_index_name(&self) -> Option<&TabletIndexName>; } pub struct IndexRangeResponse { @@ -391,9 +400,12 @@ impl CompiledQuery { version, )) }, - QuerySource::Search(search) => { - QueryNode::Search(SearchQuery::new(search, cursor_interval, version)) - }, + QuerySource::Search(search) => QueryNode::Search(SearchQuery::new( + stable_index_name, + search, + cursor_interval, + version, + )), }; for operator in query.operators { let next_node = match operator { @@ -445,11 +457,21 @@ impl CompiledQuery { } } + pub fn fingerprint(&self) -> &QueryFingerprint { + &self.query_fingerprint + } + + pub fn is_approaching_data_limit(&self) -> bool { + self.root.is_approaching_data_limit() + } +} + +impl DeveloperQuery { pub async fn next( &mut self, tx: &mut Transaction, prefetch_hint: Option, - ) -> anyhow::Result>> { + ) -> anyhow::Result> { match self.next_with_ts(tx, prefetch_hint).await? { None => Ok(None), Some((document, _)) => Ok(Some(document)), @@ -461,32 +483,56 @@ impl CompiledQuery { &mut self, tx: &mut Transaction, prefetch_hint: Option, - ) -> anyhow::Result, WriteTimestamp)>> { + ) -> anyhow::Result> { query_batch_next(btreemap! {0 => (self, prefetch_hint)}, tx) .await .remove(&0) .context("batch_key missing")? } +} - pub async fn expect_one( +impl ResolvedQuery { + pub async fn next( &mut self, tx: &mut Transaction, - ) -> anyhow::Result> { - let v = self - .next(tx, Some(2)) - .await? - .ok_or_else(|| anyhow::anyhow!("Expected one value for query, received zero"))?; + prefetch_hint: Option, + ) -> anyhow::Result> { + match self.next_with_ts(tx, prefetch_hint).await? { + None => Ok(None), + Some((document, _)) => Ok(Some(document)), + } + } - if self.next(tx, Some(1)).await?.is_some() { - anyhow::bail!("Received more than one value for query"); + #[convex_macro::instrument_future] + pub async fn next_with_ts( + &mut self, + tx: &mut Transaction, + prefetch_hint: Option, + ) -> anyhow::Result> { + let tablet_id = self + .root + .tablet_index_name() + .map(|index_name| *index_name.table()); + let result = query_batch_next(btreemap! {0 => (self, prefetch_hint)}, tx) + .await + .remove(&0) + .context("batch_key missing")??; + if let Some((document, _)) = &result { + // TODO(lee) inject tablet id here which will allow the rest of the query + // pipeline to use DeveloperDocuments only. To ensure this will be + // correct, we do an assertion temporarily. + anyhow::ensure!( + document.table().table_id + == tablet_id.context("document must come from some tablet")? + ); } - Ok(v) + Ok(result) } pub async fn expect_at_most_one( &mut self, tx: &mut Transaction, - ) -> anyhow::Result>> { + ) -> anyhow::Result> { let v = match self.next(tx, Some(2)).await? { Some(v) => v, None => return Ok(None), @@ -496,22 +542,6 @@ impl CompiledQuery { } Ok(Some(v)) } - - pub async fn expect_none(&mut self, tx: &mut Transaction) -> anyhow::Result<()> { - anyhow::ensure!( - self.next(tx, Some(1)).await?.is_none(), - "Expected no value for this query, but received one." - ); - Ok(()) - } - - pub fn fingerprint(&self) -> &QueryFingerprint { - &self.query_fingerprint - } - - pub fn is_approaching_data_limit(&self) -> bool { - self.root.is_approaching_data_limit() - } } pub async fn query_batch_next( @@ -623,6 +653,15 @@ impl QueryStream for QueryNode { QueryNode::Limit(r) => r.feed(index_range_response), } } + + fn tablet_index_name(&self) -> Option<&TabletIndexName> { + match self { + QueryNode::IndexRange(r) => r.tablet_index_name(), + QueryNode::Search(r) => r.tablet_index_name(), + QueryNode::Filter(r) => r.tablet_index_name(), + QueryNode::Limit(r) => r.tablet_index_name(), + } + } } /// Return a system limit for reading too many documents in a query diff --git a/crates/database/src/query/search_query.rs b/crates/database/src/query/search_query.rs index ccef078c..d92ce49c 100644 --- a/crates/database/src/query/search_query.rs +++ b/crates/database/src/query/search_query.rs @@ -9,7 +9,11 @@ use common::{ SearchVersion, }, runtime::Runtime, - types::WriteTimestamp, + types::{ + StableIndexName, + TabletIndexName, + WriteTimestamp, + }, version::{ Version, MIN_NPM_VERSION_FOR_FUZZY_SEARCH, @@ -39,6 +43,11 @@ use crate::{ /// A `QueryStream` that begins by querying a search index. pub struct SearchQuery { + // The tablet index being searched. + // Table names in `query` are just for error messages and usage, and may + // get out of sync with this. + stable_index_name: StableIndexName, + query: Search, // Results are generated on the first call to SearchQuery::next. results: Option>, @@ -50,8 +59,14 @@ pub struct SearchQuery { } impl SearchQuery { - pub fn new(query: Search, cursor_interval: CursorInterval, version: Option) -> Self { + pub fn new( + stable_index_name: StableIndexName, + query: Search, + cursor_interval: CursorInterval, + version: Option, + ) -> Self { Self { + stable_index_name, query, results: None, cursor_interval, @@ -71,7 +86,9 @@ impl SearchQuery { tx: &mut Transaction, ) -> anyhow::Result> { let search_version = self.get_cli_gated_search_version(); - let revisions = tx.search(&self.query, search_version).await?; + let revisions = tx + .search(&self.stable_index_name, &self.query, search_version) + .await?; let revisions_in_range = revisions .into_iter() .filter(|(_, index_key)| self.cursor_interval.contains(index_key)) @@ -145,6 +162,10 @@ impl QueryStream for SearchQuery { fn feed(&mut self, _index_range_response: IndexRangeResponse) -> anyhow::Result<()> { anyhow::bail!("cannot feed an index range response into a search query"); } + + fn tablet_index_name(&self) -> Option<&TabletIndexName> { + self.stable_index_name.tablet_index_name() + } } #[derive(Clone)] diff --git a/crates/database/src/tests/mod.rs b/crates/database/src/tests/mod.rs index 80961eb3..33fc8b71 100644 --- a/crates/database/src/tests/mod.rs +++ b/crates/database/src/tests/mod.rs @@ -1833,7 +1833,7 @@ async fn test_retries(rt: TestRuntime) -> anyhow::Result<()> { let mut tx = db.begin_system().await?; let query = Query::full_table_scan("table".parse()?, Order::Asc); let mut compiled_query = CompiledResolvedQuery::new(&mut tx, query)?; - compiled_query.expect_none(&mut tx).await?; + assert!(compiled_query.next(&mut tx, None).await?.is_none()); Ok(()) } diff --git a/crates/database/src/transaction.rs b/crates/database/src/transaction.rs index 83378294..ea7dd467 100644 --- a/crates/database/src/transaction.rs +++ b/crates/database/src/transaction.rs @@ -905,20 +905,16 @@ impl Transaction { pub async fn search( &mut self, + stable_index_name: &StableIndexName, search: &Search, version: SearchVersion, ) -> anyhow::Result> { - // If the table doesn't exist, short circuit to avoid erroring in the - // table_mapping. Also take a dependency on the table not existing. - if !TableModel::new(self).table_exists(search.index_name.table()) { + let Some(tablet_index_name) = stable_index_name.tablet_index_name() else { return Ok(vec![]); - } - let search = search - .clone() - .to_internal(&self.table_mapping().name_to_id_user_input())?; - let index_name = search.index_name.clone(); + }; + let search = search.clone().to_internal(tablet_index_name.clone())?; self.index - .search(&mut self.reads, &search, index_name, version) + .search(&mut self.reads, &search, tablet_index_name.clone(), version) .await } diff --git a/crates/isolate/src/environment/udf/async_syscall.rs b/crates/isolate/src/environment/udf/async_syscall.rs index bf0e86da..c3f8f619 100644 --- a/crates/isolate/src/environment/udf/async_syscall.rs +++ b/crates/isolate/src/environment/udf/async_syscall.rs @@ -7,7 +7,7 @@ use std::{ use anyhow::Context; use common::{ - document::GenericDocument, + document::DeveloperDocument, execution_context::ExecutionContext, knobs::MAX_SYSCALL_BATCH_SIZE, query::{ @@ -28,8 +28,6 @@ use common::{ use database::{ query::{ query_batch_next, - CompiledQuery, - QueryType, TableFilter, }, soft_data_limit, @@ -957,11 +955,11 @@ struct QueryPageMetadata { } impl> DatabaseSyscallsShared { - async fn read_page_from_query( - mut query: CompiledQuery, + async fn read_page_from_query( + mut query: DeveloperQuery, tx: &mut Transaction, page_size: usize, - ) -> anyhow::Result<(Vec>, QueryPageMetadata)> { + ) -> anyhow::Result<(Vec, QueryPageMetadata)> { let end_cursor = query.end_cursor(); let has_end_cursor = end_cursor.is_some(); let mut page = Vec::with_capacity(page_size); diff --git a/crates/isolate/src/tests/user_error.rs b/crates/isolate/src/tests/user_error.rs index a9cee642..071e6256 100644 --- a/crates/isolate/src/tests/user_error.rs +++ b/crates/isolate/src/tests/user_error.rs @@ -231,8 +231,9 @@ async fn test_private_system_table(rt: TestRuntime) -> anyhow::Result<()> { &mut tx, Query::full_table_scan(BACKEND_STATE_TABLE.clone(), Order::Asc), )? - .expect_one(&mut tx) - .await?; + .expect_at_most_one(&mut tx) + .await? + .expect("backend state should exist"); // But developer UDFs can't query it because it's a private system table. must_let!(let ConvexValue::Array(results) = t.query( diff --git a/crates/model/src/session_requests/mod.rs b/crates/model/src/session_requests/mod.rs index 2155a009..0df57a37 100644 --- a/crates/model/src/session_requests/mod.rs +++ b/crates/model/src/session_requests/mod.rs @@ -131,7 +131,10 @@ impl<'a, RT: Runtime> SessionRequestModel<'a, RT> { let Some((doc, ts)) = query_stream.next_with_ts(self.tx, None).await? else { return Ok(None); }; - query_stream.expect_none(self.tx).await?; + anyhow::ensure!( + query_stream.next(self.tx, Some(1)).await?.is_none(), + "Expected at most one session request record." + ); let WriteTimestamp::Committed(ts) = ts else { anyhow::bail!( "Wrote a session request record in the same transaction as the get? Not \