Skip to content

Commit

Permalink
text search walk stable index name (#25376)
Browse files Browse the repository at this point in the history
like index range queries, text search queries should walk a consistent tablet. otherwise there might be weird issues where it's walking an index and the tablet changes out from under it, possibly changing the indexed fields.

GitOrigin-RevId: d576844a9a5b29a5a736057a19a819fe27a01f54
  • Loading branch information
ldanilek authored and Convex, Inc. committed May 4, 2024
1 parent 2693de9 commit b0591ed
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 67 deletions.
2 changes: 1 addition & 1 deletion crates/application/src/tests/cron_jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ async fn test_cron_jobs_helper(rt: TestRuntime, backend_state: BackendState) ->
let mut table_model = TableModel::new(&mut tx);
assert!(table_model.table_is_empty(&OBJECTS_TABLE).await?);
let mut logs_query = cron_log_query(&mut tx)?;
logs_query.expect_none(&mut tx).await?;
assert!(logs_query.next(&mut tx, Some(1)).await?.is_none());

// Resuming the backend should make the jobs execute.
let mut model = BackendStateModel::new(&mut tx);
Expand Down
9 changes: 3 additions & 6 deletions crates/common/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use value::{
ConvexObject,
ConvexValue,
TableId,
TableIdAndTableNumber,
};

use crate::{
Expand All @@ -52,6 +51,7 @@ use crate::{
IndexName,
MaybeValue,
TableName,
TabletIndexName,
},
value::{
sha256::Sha256 as CommonSha256,
Expand Down Expand Up @@ -581,12 +581,9 @@ pub struct Search {
}

impl Search {
pub fn to_internal(
self,
f: &impl Fn(TableName) -> anyhow::Result<TableIdAndTableNumber>,
) -> anyhow::Result<InternalSearch> {
pub fn to_internal(self, tablet_index_name: TabletIndexName) -> anyhow::Result<InternalSearch> {
Ok(InternalSearch {
index_name: self.index_name.to_resolved(f)?.into(),
index_name: tablet_index_name,
table_name: self.table,
filters: self
.filters
Expand Down
10 changes: 9 additions & 1 deletion crates/common/src/types/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,22 @@ pub type TabletIndexName = GenericIndexName<TableId>;

/// Like TabletIndexName in that it refers to a stable underlying index,
/// but it works for virtual tables too.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum StableIndexName {
Physical(TabletIndexName),
Virtual(IndexName, TabletIndexName),
Missing,
}

impl StableIndexName {
pub fn tablet_index_name(&self) -> Option<&TabletIndexName> {
match self {
StableIndexName::Physical(tablet_index_name) => Some(tablet_index_name),
StableIndexName::Virtual(_, tablet_index_name) => Some(tablet_index_name),
StableIndexName::Missing => None,
}
}

pub fn virtual_table_number_map(
&self,
table_mapping: &TableMapping,
Expand Down
8 changes: 3 additions & 5 deletions crates/database/src/bootstrap_model/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,11 +487,9 @@ impl<'a, RT: Runtime> IndexModel<'a, RT> {
stable_index_name: &StableIndexName,
printable_index_name: &IndexName,
) -> anyhow::Result<IndexedFields> {
let resolved_index_name = match stable_index_name {
StableIndexName::Physical(index_name) => index_name,
StableIndexName::Virtual(_, index_name) => index_name,
StableIndexName::Missing => anyhow::bail!(index_not_found_error(printable_index_name)),
};
let resolved_index_name = stable_index_name
.tablet_index_name()
.with_context(|| index_not_found_error(printable_index_name))?;
let metadata =
self.require_enabled_index_metadata(printable_index_name, resolved_index_name)?;
match metadata.config.clone() {
Expand Down
5 changes: 5 additions & 0 deletions crates/database/src/query/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use common::{
Expression,
},
runtime::Runtime,
types::TabletIndexName,
};

use super::{
Expand Down Expand Up @@ -71,4 +72,8 @@ impl<T: QueryType> QueryStream<T> for Filter<T> {
fn feed(&mut self, index_range_response: IndexRangeResponse<T::T>) -> anyhow::Result<()> {
self.inner.feed(index_range_response)
}

fn tablet_index_name(&self) -> Option<&TabletIndexName> {
self.inner.tablet_index_name()
}
}
5 changes: 5 additions & 0 deletions crates/database/src/query/index_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use common::{
types::{
IndexName,
StableIndexName,
TabletIndexName,
WriteTimestamp,
},
version::Version,
Expand Down Expand Up @@ -263,6 +264,10 @@ impl<T: QueryType> QueryStream<T> for IndexRange<T> {
fn feed(&mut self, index_range_response: IndexRangeResponse<T::T>) -> anyhow::Result<()> {
self.process_fetch(index_range_response.page, index_range_response.cursor)
}

fn tablet_index_name(&self) -> Option<&TabletIndexName> {
self.stable_index_name.tablet_index_name()
}
}

impl<T: QueryType> Drop for IndexRange<T> {
Expand Down
5 changes: 5 additions & 0 deletions crates/database/src/query/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use async_trait::async_trait;
use common::{
query::CursorPosition,
runtime::Runtime,
types::TabletIndexName,
};

use super::{
Expand Down Expand Up @@ -69,4 +70,8 @@ impl<T: QueryType> QueryStream<T> for Limit<T> {
fn feed(&mut self, index_range_response: IndexRangeResponse<T::T>) -> anyhow::Result<()> {
self.inner.feed(index_range_response)
}

fn tablet_index_name(&self) -> Option<&TabletIndexName> {
self.inner.tablet_index_name()
}
}
103 changes: 71 additions & 32 deletions crates/database/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ use common::{
database_index::IndexedFields,
INDEX_TABLE,
},
document::GenericDocument,
document::{
DeveloperDocument,
GenericDocument,
ResolvedDocument,
},
index::IndexKeyBytes,
interval::Interval,
query::{
Expand All @@ -24,6 +28,7 @@ use common::{
runtime::Runtime,
types::{
IndexName,
TabletIndexName,
WriteTimestamp,
},
version::Version,
Expand Down Expand Up @@ -110,6 +115,10 @@ trait QueryStream<T: QueryType>: Send {
prefetch_hint: Option<usize>,
) -> anyhow::Result<QueryStreamNext<T>>;
fn feed(&mut self, index_range_response: IndexRangeResponse<T::T>) -> anyhow::Result<()>;

/// All queries walk an index of some kind, as long as the table exists.
/// This is that index name, tied to a tablet.
fn tablet_index_name(&self) -> Option<&TabletIndexName>;
}

pub struct IndexRangeResponse<T: TableIdentifier> {
Expand Down Expand Up @@ -391,9 +400,12 @@ impl<RT: Runtime, T: QueryType> CompiledQuery<RT, T> {
version,
))
},
QuerySource::Search(search) => {
QueryNode::Search(SearchQuery::new(search, cursor_interval, version))
},
QuerySource::Search(search) => QueryNode::Search(SearchQuery::new(
stable_index_name,
search,
cursor_interval,
version,
)),
};
for operator in query.operators {
let next_node = match operator {
Expand Down Expand Up @@ -445,11 +457,21 @@ impl<RT: Runtime, T: QueryType> CompiledQuery<RT, T> {
}
}

pub fn fingerprint(&self) -> &QueryFingerprint {
&self.query_fingerprint
}

pub fn is_approaching_data_limit(&self) -> bool {
self.root.is_approaching_data_limit()
}
}

impl<RT: Runtime> DeveloperQuery<RT> {
pub async fn next(
&mut self,
tx: &mut Transaction<RT>,
prefetch_hint: Option<usize>,
) -> anyhow::Result<Option<GenericDocument<T::T>>> {
) -> anyhow::Result<Option<DeveloperDocument>> {
match self.next_with_ts(tx, prefetch_hint).await? {
None => Ok(None),
Some((document, _)) => Ok(Some(document)),
Expand All @@ -461,32 +483,56 @@ impl<RT: Runtime, T: QueryType> CompiledQuery<RT, T> {
&mut self,
tx: &mut Transaction<RT>,
prefetch_hint: Option<usize>,
) -> anyhow::Result<Option<(GenericDocument<T::T>, WriteTimestamp)>> {
) -> anyhow::Result<Option<(DeveloperDocument, WriteTimestamp)>> {
query_batch_next(btreemap! {0 => (self, prefetch_hint)}, tx)
.await
.remove(&0)
.context("batch_key missing")?
}
}

pub async fn expect_one(
impl<RT: Runtime> ResolvedQuery<RT> {
pub async fn next(
&mut self,
tx: &mut Transaction<RT>,
) -> anyhow::Result<GenericDocument<T::T>> {
let v = self
.next(tx, Some(2))
.await?
.ok_or_else(|| anyhow::anyhow!("Expected one value for query, received zero"))?;
prefetch_hint: Option<usize>,
) -> anyhow::Result<Option<ResolvedDocument>> {
match self.next_with_ts(tx, prefetch_hint).await? {
None => Ok(None),
Some((document, _)) => Ok(Some(document)),
}
}

if self.next(tx, Some(1)).await?.is_some() {
anyhow::bail!("Received more than one value for query");
#[convex_macro::instrument_future]
pub async fn next_with_ts(
&mut self,
tx: &mut Transaction<RT>,
prefetch_hint: Option<usize>,
) -> anyhow::Result<Option<(ResolvedDocument, WriteTimestamp)>> {
let tablet_id = self
.root
.tablet_index_name()
.map(|index_name| *index_name.table());
let result = query_batch_next(btreemap! {0 => (self, prefetch_hint)}, tx)
.await
.remove(&0)
.context("batch_key missing")??;
if let Some((document, _)) = &result {
// TODO(lee) inject tablet id here which will allow the rest of the query
// pipeline to use DeveloperDocuments only. To ensure this will be
// correct, we do an assertion temporarily.
anyhow::ensure!(
document.table().table_id
== tablet_id.context("document must come from some tablet")?
);
}
Ok(v)
Ok(result)
}

pub async fn expect_at_most_one(
&mut self,
tx: &mut Transaction<RT>,
) -> anyhow::Result<Option<GenericDocument<T::T>>> {
) -> anyhow::Result<Option<ResolvedDocument>> {
let v = match self.next(tx, Some(2)).await? {
Some(v) => v,
None => return Ok(None),
Expand All @@ -496,22 +542,6 @@ impl<RT: Runtime, T: QueryType> CompiledQuery<RT, T> {
}
Ok(Some(v))
}

pub async fn expect_none(&mut self, tx: &mut Transaction<RT>) -> anyhow::Result<()> {
anyhow::ensure!(
self.next(tx, Some(1)).await?.is_none(),
"Expected no value for this query, but received one."
);
Ok(())
}

pub fn fingerprint(&self) -> &QueryFingerprint {
&self.query_fingerprint
}

pub fn is_approaching_data_limit(&self) -> bool {
self.root.is_approaching_data_limit()
}
}

pub async fn query_batch_next<RT: Runtime, T: QueryType>(
Expand Down Expand Up @@ -623,6 +653,15 @@ impl<T: QueryType> QueryStream<T> for QueryNode<T> {
QueryNode::Limit(r) => r.feed(index_range_response),
}
}

fn tablet_index_name(&self) -> Option<&TabletIndexName> {
match self {
QueryNode::IndexRange(r) => r.tablet_index_name(),
QueryNode::Search(r) => r.tablet_index_name(),
QueryNode::Filter(r) => r.tablet_index_name(),
QueryNode::Limit(r) => r.tablet_index_name(),
}
}
}

/// Return a system limit for reading too many documents in a query
Expand Down
27 changes: 24 additions & 3 deletions crates/database/src/query/search_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ use common::{
SearchVersion,
},
runtime::Runtime,
types::WriteTimestamp,
types::{
StableIndexName,
TabletIndexName,
WriteTimestamp,
},
version::{
Version,
MIN_NPM_VERSION_FOR_FUZZY_SEARCH,
Expand Down Expand Up @@ -39,6 +43,11 @@ use crate::{

/// A `QueryStream` that begins by querying a search index.
pub struct SearchQuery<T: QueryType> {
// The tablet index being searched.
// Table names in `query` are just for error messages and usage, and may
// get out of sync with this.
stable_index_name: StableIndexName,

query: Search,
// Results are generated on the first call to SearchQuery::next.
results: Option<SearchResultIterator<T>>,
Expand All @@ -50,8 +59,14 @@ pub struct SearchQuery<T: QueryType> {
}

impl<T: QueryType> SearchQuery<T> {
pub fn new(query: Search, cursor_interval: CursorInterval, version: Option<Version>) -> Self {
pub fn new(
stable_index_name: StableIndexName,
query: Search,
cursor_interval: CursorInterval,
version: Option<Version>,
) -> Self {
Self {
stable_index_name,
query,
results: None,
cursor_interval,
Expand All @@ -71,7 +86,9 @@ impl<T: QueryType> SearchQuery<T> {
tx: &mut Transaction<RT>,
) -> anyhow::Result<SearchResultIterator<T>> {
let search_version = self.get_cli_gated_search_version();
let revisions = tx.search(&self.query, search_version).await?;
let revisions = tx
.search(&self.stable_index_name, &self.query, search_version)
.await?;
let revisions_in_range = revisions
.into_iter()
.filter(|(_, index_key)| self.cursor_interval.contains(index_key))
Expand Down Expand Up @@ -145,6 +162,10 @@ impl<T: QueryType> QueryStream<T> for SearchQuery<T> {
fn feed(&mut self, _index_range_response: IndexRangeResponse<T::T>) -> anyhow::Result<()> {
anyhow::bail!("cannot feed an index range response into a search query");
}

fn tablet_index_name(&self) -> Option<&TabletIndexName> {
self.stable_index_name.tablet_index_name()
}
}

#[derive(Clone)]
Expand Down
2 changes: 1 addition & 1 deletion crates/database/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1833,7 +1833,7 @@ async fn test_retries(rt: TestRuntime) -> anyhow::Result<()> {
let mut tx = db.begin_system().await?;
let query = Query::full_table_scan("table".parse()?, Order::Asc);
let mut compiled_query = CompiledResolvedQuery::new(&mut tx, query)?;
compiled_query.expect_none(&mut tx).await?;
assert!(compiled_query.next(&mut tx, None).await?.is_none());
Ok(())
}

Expand Down
Loading

0 comments on commit b0591ed

Please sign in to comment.