Skip to content

Commit

Permalink
Wire the syscall providers into isolate2 (#24616)
Browse files Browse the repository at this point in the history
## Open Source

There's still a bunch of unimplemented functionality here, but it's enough to get a basic mutation working!

GitOrigin-RevId: 649d89d154eeeb7be2bd4affb7ebd409cfa5facd
  • Loading branch information
sujayakar authored and Convex, Inc. committed Apr 12, 2024
1 parent ef04801 commit 76f55b7
Show file tree
Hide file tree
Showing 6 changed files with 319 additions and 83 deletions.
19 changes: 12 additions & 7 deletions crates/isolate/src/environment/udf/async_syscall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl HeapSize for PendingSyscall {

// Checks if the underlying table and the request's expectation for the table
// line up.
fn system_table_guard(name: &TableName, expect_system_table: bool) -> anyhow::Result<()> {
pub fn system_table_guard(name: &TableName, expect_system_table: bool) -> anyhow::Result<()> {
if expect_system_table && !name.is_system() {
return Err(anyhow::anyhow!(ErrorMetadata::bad_request(
"SystemTableError",
Expand Down Expand Up @@ -185,6 +185,10 @@ impl AsyncSyscallBatch {
Self::Unbatched { .. } => 1,
}
}

pub fn is_empty(&self) -> bool {
self.len() == 0
}
}

pub struct QueryManager<RT: Runtime> {
Expand Down Expand Up @@ -221,7 +225,8 @@ impl<RT: Runtime> QueryManager<RT> {
}

// Trait for allowing code reuse between `DatabaseUdfEnvironment` and isolate2.
pub trait SyscallProvider<RT: Runtime> {
#[allow(async_fn_in_trait)]
pub trait AsyncSyscallProvider<RT: Runtime> {
fn rt(&self) -> &RT;
fn tx(&mut self) -> Result<&mut Transaction<RT>, ErrorMetadata>;
fn key_broker(&self) -> &KeyBroker;
Expand Down Expand Up @@ -258,7 +263,7 @@ pub trait SyscallProvider<RT: Runtime> {
) -> anyhow::Result<Option<FileStorageEntry>>;
}

impl<RT: Runtime> SyscallProvider<RT> for DatabaseUdfEnvironment<RT> {
impl<RT: Runtime> AsyncSyscallProvider<RT> for DatabaseUdfEnvironment<RT> {
fn rt(&self) -> &RT {
&self.phase.rt
}
Expand Down Expand Up @@ -361,11 +366,11 @@ impl<RT: Runtime> SyscallProvider<RT> for DatabaseUdfEnvironment<RT> {
///
/// Most of the common logic lives on `Transaction` or `DatabaseSyscallsShared`,
/// and this is mostly just taking care of the argument parsing.
pub struct DatabaseSyscallsV1<RT: Runtime, P: SyscallProvider<RT>> {
pub struct DatabaseSyscallsV1<RT: Runtime, P: AsyncSyscallProvider<RT>> {
_pd: PhantomData<(RT, P)>,
}

impl<RT: Runtime, P: SyscallProvider<RT>> DatabaseSyscallsV1<RT, P> {
impl<RT: Runtime, P: AsyncSyscallProvider<RT>> DatabaseSyscallsV1<RT, P> {
/// Runs a batch of syscalls, each of which can succeed or fail
/// independently. The returned vec is the same length as the batch.
#[minitrace::trace]
Expand Down Expand Up @@ -893,7 +898,7 @@ impl<RT: Runtime, P: SyscallProvider<RT>> DatabaseSyscallsV1<RT, P> {
}
}

struct DatabaseSyscallsShared<RT: Runtime, P: SyscallProvider<RT>> {
struct DatabaseSyscallsShared<RT: Runtime, P: AsyncSyscallProvider<RT>> {
_pd: PhantomData<(RT, P)>,
}

Expand Down Expand Up @@ -923,7 +928,7 @@ struct QueryPageMetadata {
page_status: Option<QueryPageStatus>,
}

impl<RT: Runtime, P: SyscallProvider<RT>> DatabaseSyscallsShared<RT, P> {
impl<RT: Runtime, P: AsyncSyscallProvider<RT>> DatabaseSyscallsShared<RT, P> {
async fn read_page_from_query<T: QueryType>(
mut query: CompiledQuery<RT, T>,
tx: &mut Transaction<RT>,
Expand Down
4 changes: 2 additions & 2 deletions crates/isolate/src/environment/udf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use model::environment_variables::types::{
EnvVarName,
EnvVarValue,
};
mod async_syscall;
pub mod async_syscall;

pub mod outcome;
mod phase;
mod syscall;
pub mod syscall;
use std::{
cmp::Ordering,
collections::VecDeque,
Expand Down
121 changes: 74 additions & 47 deletions crates/isolate/src/environment/udf/syscall.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#![allow(non_snake_case)]

use std::convert::TryFrom;

use anyhow::Context;
Expand All @@ -11,6 +9,7 @@ use common::{
use database::{
query::TableFilter,
DeveloperQuery,
Transaction,
};
use errors::ErrorMetadata;
use serde::{
Expand All @@ -27,25 +26,49 @@ use value::{
TableName,
};

use super::async_syscall::{
DatabaseSyscallsV1,
SyscallProvider,
use super::{
async_syscall::QueryManager,
DatabaseUdfEnvironment,
};
use crate::environment::helpers::{
parse_version,
with_argument_error,
ArgName,
};

pub trait SyscallProvider<RT: Runtime> {
fn table_filter(&self) -> TableFilter;
fn query_manager(&mut self) -> &mut QueryManager<RT>;
fn tx(&mut self) -> Result<&mut Transaction<RT>, ErrorMetadata>;
}

impl<RT: Runtime> SyscallProvider<RT> for DatabaseUdfEnvironment<RT> {
fn table_filter(&self) -> TableFilter {
if self.udf_path.is_system() {
TableFilter::IncludePrivateSystemTables
} else {
TableFilter::ExcludePrivateSystemTables
}
}

fn query_manager(&mut self) -> &mut QueryManager<RT> {
&mut self.query_manager
}

fn tx(&mut self) -> Result<&mut Transaction<RT>, ErrorMetadata> {
self.phase.tx()
}
}

pub fn syscall_impl<RT: Runtime, P: SyscallProvider<RT>>(
provider: &mut P,
name: &str,
args: JsonValue,
) -> anyhow::Result<JsonValue> {
match name {
"1.0/queryCleanup" => DatabaseSyscallsV1::syscall_queryCleanup(provider, args),
"1.0/queryStream" => DatabaseSyscallsV1::syscall_queryStream(provider, args),
"1.0/db/normalizeId" => syscall_normalizeId(provider, args),
"1.0/queryCleanup" => syscall_query_cleanup(provider, args),
"1.0/queryStream" => syscall_query_stream(provider, args),
"1.0/db/normalizeId" => syscall_normalize_id(provider, args),

#[cfg(test)]
"throwSystemError" => anyhow::bail!("I can't go for that."),
Expand Down Expand Up @@ -73,7 +96,7 @@ pub fn syscall_impl<RT: Runtime, P: SyscallProvider<RT>>(
}
}

fn syscall_normalizeId<RT: Runtime, P: SyscallProvider<RT>>(
fn syscall_normalize_id<RT: Runtime, P: SyscallProvider<RT>>(
provider: &mut P,
args: JsonValue,
) -> anyhow::Result<JsonValue> {
Expand Down Expand Up @@ -128,48 +151,52 @@ fn syscall_normalizeId<RT: Runtime, P: SyscallProvider<RT>>(
}
}

impl<RT: Runtime, P: SyscallProvider<RT>> DatabaseSyscallsV1<RT, P> {
fn syscall_queryStream(provider: &mut P, args: JsonValue) -> anyhow::Result<JsonValue> {
let _s: common::tracing::NoopSpan = static_span!();
let table_filter = provider.table_filter();
let tx = provider.tx()?;
fn syscall_query_stream<RT: Runtime, P: SyscallProvider<RT>>(
provider: &mut P,
args: JsonValue,
) -> anyhow::Result<JsonValue> {
let _s: common::tracing::NoopSpan = static_span!();
let table_filter = provider.table_filter();
let tx = provider.tx()?;

#[derive(Deserialize)]
struct QueryStreamArgs {
query: JsonValue,
version: Option<String>,
}
let (parsed_query, version) = with_argument_error("queryStream", || {
let args: QueryStreamArgs = serde_json::from_value(args)?;
let parsed_query = Query::try_from(args.query).context(ArgName("query"))?;
let version = parse_version(args.version)?;
Ok((parsed_query, version))
})?;
// TODO: Are all invalid query pipelines developer errors? These could be bugs
// in convex/server.
let compiled_query =
{ DeveloperQuery::new_with_version(tx, parsed_query, version, table_filter)? };
let query_id = provider.query_manager().put_developer(compiled_query);
#[derive(Deserialize)]
struct QueryStreamArgs {
query: JsonValue,
version: Option<String>,
}
let (parsed_query, version) = with_argument_error("queryStream", || {
let args: QueryStreamArgs = serde_json::from_value(args)?;
let parsed_query = Query::try_from(args.query).context(ArgName("query"))?;
let version = parse_version(args.version)?;
Ok((parsed_query, version))
})?;
// TODO: Are all invalid query pipelines developer errors? These could be bugs
// in convex/server.
let compiled_query =
{ DeveloperQuery::new_with_version(tx, parsed_query, version, table_filter)? };
let query_id = provider.query_manager().put_developer(compiled_query);

#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct QueryStreamResult {
query_id: u32,
}
Ok(serde_json::to_value(QueryStreamResult { query_id })?)
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct QueryStreamResult {
query_id: u32,
}
Ok(serde_json::to_value(QueryStreamResult { query_id })?)
}

fn syscall_queryCleanup(provider: &mut P, args: JsonValue) -> anyhow::Result<JsonValue> {
let _s = static_span!();
fn syscall_query_cleanup<RT: Runtime, P: SyscallProvider<RT>>(
provider: &mut P,
args: JsonValue,
) -> anyhow::Result<JsonValue> {
let _s = static_span!();

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct QueryCleanupArgs {
query_id: u32,
}
let args: QueryCleanupArgs =
with_argument_error("queryCleanup", || Ok(serde_json::from_value(args)?))?;
let cleaned_up = provider.query_manager().cleanup_developer(args.query_id);
Ok(serde_json::to_value(cleaned_up)?)
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct QueryCleanupArgs {
query_id: u32,
}
let args: QueryCleanupArgs =
with_argument_error("queryCleanup", || Ok(serde_json::from_value(args)?))?;
let cleaned_up = provider.query_manager().cleanup_developer(args.query_id);
Ok(serde_json::to_value(cleaned_up)?)
}
Loading

0 comments on commit 76f55b7

Please sign in to comment.