Skip to content

Commit

Permalink
Wire up queries to isolate2 (#24784)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 10d1b67cdd8ff9ba38822d8563ae17beb0af1382
  • Loading branch information
sujayakar authored and Convex, Inc. committed Apr 17, 2024
1 parent bc76986 commit 4c52b74
Show file tree
Hide file tree
Showing 10 changed files with 776 additions and 541 deletions.
59 changes: 45 additions & 14 deletions crates/isolate/src/environment/udf/async_syscall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use common::{
},
types::PersistenceVersion,
value::ConvexValue,
version::Version,
};
use database::{
query::{
Expand Down Expand Up @@ -76,6 +77,7 @@ use crate::{
ArgName,
},
helpers::UdfArgsJson,
isolate2::client::QueryId,
metrics::async_syscall_timer,
};

Expand Down Expand Up @@ -214,6 +216,14 @@ impl<RT: Runtime> QueryManager<RT> {
}
}

pub enum ManagedQuery<RT: Runtime> {
Pending {
query: Query,
version: Option<Version>,
},
Active(DeveloperQuery<RT>),
}

// Trait for allowing code reuse between `DatabaseUdfEnvironment` and isolate2.
#[allow(async_fn_in_trait)]
pub trait AsyncSyscallProvider<RT: Runtime> {
Expand All @@ -229,7 +239,9 @@ pub trait AsyncSyscallProvider<RT: Runtime> {

fn log_async_syscall(&mut self, name: String, duration: Duration, is_success: bool);

fn query_manager(&mut self) -> &mut QueryManager<RT>;
fn take_query(&mut self, query_id: QueryId) -> Option<ManagedQuery<RT>>;
fn insert_query(&mut self, query_id: QueryId, query: DeveloperQuery<RT>);
fn cleanup_query(&mut self, query_id: QueryId) -> bool;

fn prev_journal(&mut self) -> &mut QueryJournal;
fn next_journal(&mut self) -> &mut QueryJournal;
Expand Down Expand Up @@ -291,8 +303,18 @@ impl<RT: Runtime> AsyncSyscallProvider<RT> for DatabaseUdfEnvironment<RT> {
.log_async_syscall(name, duration, is_success);
}

fn query_manager(&mut self) -> &mut QueryManager<RT> {
&mut self.query_manager
fn take_query(&mut self, query_id: QueryId) -> Option<ManagedQuery<RT>> {
self.query_manager
.take_developer(query_id)
.map(ManagedQuery::Active)
}

fn insert_query(&mut self, query_id: QueryId, query: DeveloperQuery<RT>) {
self.query_manager.insert_developer(query_id, query);
}

fn cleanup_query(&mut self, query_id: QueryId) -> bool {
self.query_manager.cleanup_developer(query_id)
}

fn prev_journal(&mut self) -> &mut QueryJournal {
Expand Down Expand Up @@ -707,13 +729,24 @@ impl<RT: Runtime, P: AsyncSyscallProvider<RT>> DatabaseSyscallsV1<RT, P> {
let args: QueryStreamNextArgs = serde_json::from_value(args)?;
Ok(args.query_id)
})?;
let local_query = provider
.query_manager()
.take_developer(query_id)
.context(ErrorMetadata::not_found(
"QueryNotFound",
"in-progress query not found",
))?;
let managed_query =
provider
.take_query(query_id)
.context(ErrorMetadata::not_found(
"QueryNotFound",
"in-progress query not found",
))?;
let local_query = match managed_query {
ManagedQuery::Pending { query, version } => {
DeveloperQuery::new_with_version(
provider.tx()?,
query,
version,
table_filter,
)?
},
ManagedQuery::Active(local_query) => local_query,
};
Some((Some(query_id), local_query))
},
AsyncRead::Get(args) => {
Expand Down Expand Up @@ -790,9 +823,7 @@ impl<RT: Runtime, P: AsyncSyscallProvider<RT>> DatabaseSyscallsV1<RT, P> {
for (batch_key, (query_id, local_query)) in queries_to_fetch {
let result: anyhow::Result<_> = try {
if let Some(query_id) = query_id {
provider
.query_manager()
.insert_developer(query_id, local_query);
provider.insert_query(query_id, local_query);
}
let maybe_next = fetch_results
.remove(&batch_key)
Expand All @@ -806,7 +837,7 @@ impl<RT: Runtime, P: AsyncSyscallProvider<RT>> DatabaseSyscallsV1<RT, P> {

if let Some(query_id) = query_id {
if done {
provider.query_manager().cleanup_developer(query_id);
provider.cleanup_query(query_id);
}
serde_json::to_value(QueryStreamNextResult {
value: value.into(),
Expand Down
65 changes: 36 additions & 29 deletions crates/isolate/src/environment/udf/syscall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use common::{
query::Query,
runtime::Runtime,
static_span,
version::Version,
};
use database::{
query::TableFilter,
DeveloperQuery,
Transaction,
};
use errors::ErrorMetadata;
use serde::{
Expand All @@ -23,13 +23,12 @@ use serde_json::{
use value::{
id_v6::DocumentIdV6,
InternalId,
TableIdAndTableNumber,
TableName,
TableNumber,
};

use super::{
async_syscall::QueryManager,
DatabaseUdfEnvironment,
};
use super::DatabaseUdfEnvironment;
use crate::environment::helpers::{
parse_version,
with_argument_error,
Expand All @@ -38,8 +37,12 @@ use crate::environment::helpers::{

pub trait SyscallProvider<RT: Runtime> {
fn table_filter(&self) -> TableFilter;
fn query_manager(&mut self) -> &mut QueryManager<RT>;
fn tx(&mut self) -> Result<&mut Transaction<RT>, ErrorMetadata>;

fn lookup_table(&mut self, name: &TableName) -> anyhow::Result<Option<TableIdAndTableNumber>>;
fn lookup_virtual_table(&mut self, name: &TableName) -> anyhow::Result<Option<TableNumber>>;

fn start_query(&mut self, query: Query, version: Option<Version>) -> anyhow::Result<u32>;
fn cleanup_query(&mut self, query_id: u32) -> bool;
}

impl<RT: Runtime> SyscallProvider<RT> for DatabaseUdfEnvironment<RT> {
Expand All @@ -51,12 +54,29 @@ impl<RT: Runtime> SyscallProvider<RT> for DatabaseUdfEnvironment<RT> {
}
}

fn query_manager(&mut self) -> &mut QueryManager<RT> {
&mut self.query_manager
fn lookup_table(&mut self, name: &TableName) -> anyhow::Result<Option<TableIdAndTableNumber>> {
let table_mapping = self.phase.tx()?.table_mapping();
Ok(table_mapping.id_and_number_if_exists(name))
}

fn lookup_virtual_table(&mut self, name: &TableName) -> anyhow::Result<Option<TableNumber>> {
let virtual_table_mapping = self.phase.tx()?.virtual_table_mapping();
Ok(virtual_table_mapping.number_if_exists(name))
}

fn start_query(&mut self, query: Query, version: Option<Version>) -> anyhow::Result<u32> {
let table_filter = self.table_filter();
let tx = self.phase.tx()?;
// TODO: Are all invalid query pipelines developer errors? These could be bugs
// in convex/server.
let compiled_query =
{ DeveloperQuery::new_with_version(tx, query, version, table_filter)? };
let query_id = self.query_manager.put_developer(compiled_query);
Ok(query_id)
}

fn tx(&mut self) -> Result<&mut Transaction<RT>, ErrorMetadata> {
self.phase.tx()
fn cleanup_query(&mut self, query_id: u32) -> bool {
self.query_manager.cleanup_developer(query_id)
}
}

Expand Down Expand Up @@ -111,18 +131,11 @@ fn syscall_normalize_id<RT: Runtime, P: SyscallProvider<RT>>(
let table_name: TableName = args.table.parse().context(ArgName("table"))?;
Ok((table_name, args.id_string))
})?;
let virtual_table_number = provider
.tx()?
.virtual_table_mapping()
.number_if_exists(&table_name);
let virtual_table_number = provider.lookup_virtual_table(&table_name)?;
let table_number = match virtual_table_number {
Some(table_number) => Some(table_number),
None => {
let physical_table_number = provider
.tx()?
.table_mapping()
.id_and_number_if_exists(&table_name)
.map(|t| t.table_number);
let physical_table_number = provider.lookup_table(&table_name)?.map(|t| t.table_number);
match provider.table_filter() {
TableFilter::IncludePrivateSystemTables => physical_table_number,
TableFilter::ExcludePrivateSystemTables if table_name.is_system() => None,
Expand Down Expand Up @@ -155,9 +168,7 @@ fn syscall_query_stream<RT: Runtime, P: SyscallProvider<RT>>(
provider: &mut P,
args: JsonValue,
) -> anyhow::Result<JsonValue> {
let _s: common::tracing::NoopSpan = static_span!();
let table_filter = provider.table_filter();
let tx = provider.tx()?;
let _s = static_span!();

#[derive(Deserialize)]
struct QueryStreamArgs {
Expand All @@ -170,11 +181,7 @@ fn syscall_query_stream<RT: Runtime, P: SyscallProvider<RT>>(
let version = parse_version(args.version)?;
Ok((parsed_query, version))
})?;
// TODO: Are all invalid query pipelines developer errors? These could be bugs
// in convex/server.
let compiled_query =
{ DeveloperQuery::new_with_version(tx, parsed_query, version, table_filter)? };
let query_id = provider.query_manager().put_developer(compiled_query);
let query_id = provider.start_query(parsed_query, version)?;

#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
Expand All @@ -197,6 +204,6 @@ fn syscall_query_cleanup<RT: Runtime, P: SyscallProvider<RT>>(
}
let args: QueryCleanupArgs =
with_argument_error("queryCleanup", || Ok(serde_json::from_value(args)?))?;
let cleaned_up = provider.query_manager().cleanup_developer(args.query_id);
let cleaned_up = provider.cleanup_query(args.query_id);
Ok(serde_json::to_value(cleaned_up)?)
}
Loading

0 comments on commit 4c52b74

Please sign in to comment.