diff --git a/Cargo.lock b/Cargo.lock index 983dbaee..3870f554 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8614,7 +8614,6 @@ dependencies = [ "criterion", "derive_more", "errors", - "futures", "hex", "humansize", "imbl", @@ -8626,6 +8625,7 @@ dependencies = [ "serde_json", "sha2", "thiserror", + "tokio", "uuid", ] diff --git a/Cargo.toml b/Cargo.toml index e05d8daf..58315d3e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -193,3 +193,11 @@ codegen-units = 16 [profile.dev.package.num-bigint-dig] opt-level = 3 codegen-units = 16 + +[profile.dev.package.tokio] +opt-level = 3 +codegen-units = 16 + +[profile.dev.package.tokio-stream] +opt-level = 3 +codegen-units = 16 diff --git a/crates/application/src/function_log.rs b/crates/application/src/function_log.rs index 83c7011f..5e2ed358 100644 --- a/crates/application/src/function_log.rs +++ b/crates/application/src/function_log.rs @@ -52,7 +52,6 @@ use common::{ }, }; use float_next_after::NextAfter; -use futures::channel::oneshot; use http::{ Method, StatusCode, @@ -74,6 +73,7 @@ use serde_json::{ json, Value as JsonValue, }; +use tokio::sync::oneshot; use url::Url; use usage_tracking::{ AggregatedFunctionUsageStats, diff --git a/crates/application/src/lib.rs b/crates/application/src/lib.rs index 1b7c5880..de01d2a7 100644 --- a/crates/application/src/lib.rs +++ b/crates/application/src/lib.rs @@ -135,10 +135,7 @@ use function_log::{ FunctionExecutionPart, }; use function_runner::FunctionRunner; -use futures::{ - channel::oneshot, - stream::BoxStream, -}; +use futures::stream::BoxStream; use headers::{ ContentLength, ContentType, @@ -283,6 +280,7 @@ use table_summary_worker::{ TableSummaryClient, TableSummaryWorker, }; +use tokio::sync::oneshot; use usage_tracking::{ FunctionUsageStats, FunctionUsageTracker, diff --git a/crates/application/src/table_summary_worker.rs b/crates/application/src/table_summary_worker.rs index cfb28e15..2e81758a 100644 --- a/crates/application/src/table_summary_worker.rs +++ b/crates/application/src/table_summary_worker.rs @@ -22,12 +22,12 @@ use database::{ TableSummaryWriter, }; use futures::{ - channel::oneshot, pin_mut, select_biased, FutureExt, }; use parking_lot::Mutex; +use tokio::sync::oneshot; use crate::metrics::log_worker_starting; diff --git a/crates/common/src/bounded_thread_pool.rs b/crates/common/src/bounded_thread_pool.rs index cd02271b..f119e23e 100644 --- a/crates/common/src/bounded_thread_pool.rs +++ b/crates/common/src/bounded_thread_pool.rs @@ -1,10 +1,7 @@ use std::sync::Arc; use futures::{ - channel::{ - mpsc, - oneshot, - }, + channel::mpsc, future::{ self, BoxFuture, @@ -17,6 +14,7 @@ use futures::{ StreamExt, }; use parking_lot::Mutex; +use tokio::sync::oneshot; use crate::{ codel_queue::{ diff --git a/crates/common/src/runtime/mod.rs b/crates/common/src/runtime/mod.rs index 54ab2952..dcd9fcad 100644 --- a/crates/common/src/runtime/mod.rs +++ b/crates/common/src/runtime/mod.rs @@ -20,7 +20,6 @@ use std::{ use anyhow::Context; use async_trait::async_trait; use futures::{ - channel::oneshot, future::{ BoxFuture, FusedFuture, @@ -52,6 +51,7 @@ use proptest::prelude::*; use rand::RngCore; use serde::Serialize; use thiserror::Error; +use tokio::sync::oneshot; use uuid::Uuid; use value::heap_size::HeapSize; @@ -203,6 +203,7 @@ pub trait Runtime: Clone + Sync + Send + 'static { /// Spawn a future on a reserved OS thread. This is only really necessary /// for libraries like `V8` that care about being called from a /// particular thread. + #[must_use = "Threads are canceled when their `SpawnHandle` is dropped."] fn spawn_thread, F: FnOnce() -> Fut + Send + 'static>( &self, f: F, diff --git a/crates/common/src/runtime/testing/mod.rs b/crates/common/src/runtime/testing/mod.rs index 25fcc0e4..38d84ec1 100644 --- a/crates/common/src/runtime/testing/mod.rs +++ b/crates/common/src/runtime/testing/mod.rs @@ -18,7 +18,6 @@ use std::{ }; use futures::{ - channel::oneshot, future::{ self, BoxFuture, @@ -34,10 +33,13 @@ use rand::{ SeedableRng, }; use rand_chacha::ChaCha12Rng; -use tokio::runtime::{ - Builder, - RngSeed, - UnhandledPanic, +use tokio::{ + runtime::{ + Builder, + RngSeed, + UnhandledPanic, + }, + sync::oneshot, }; use super::{ diff --git a/crates/common/src/sync/mod.rs b/crates/common/src/sync/mod.rs index 0b3d5bbb..f3728518 100644 --- a/crates/common/src/sync/mod.rs +++ b/crates/common/src/sync/mod.rs @@ -1,18 +1,18 @@ pub mod split_rw_lock; pub mod state_channel; -// It's safe to use these `tokio` sync primitives in our runtime-generic code -// since they don't internally depend on the `tokio` runtime. Feel free to add -// more if you need them, but generally prefer using `futures`-based primitives -// if sufficient. +use futures::future; +use tokio::sync::oneshot; pub use tokio::sync::{ broadcast, - // This channel is useful over `futures::channel::mpsc` since it doesn't require `&mut self` on - // `try_send`. The `futures` implementation conforms to their `Sink` trait which unnecessarily - // requires mutability. mpsc, watch, Mutex, MutexGuard, Notify, }; + +/// Wait until a sender's corresponding receiver has been closed. +pub async fn oneshot_receiver_closed(sender: &mut oneshot::Sender) { + future::poll_fn(|cx| sender.poll_closed(cx)).await +} diff --git a/crates/convex/examples/convex_chat_client.rs b/crates/convex/examples/convex_chat_client.rs index 99b8bf43..d1922ef3 100644 --- a/crates/convex/examples/convex_chat_client.rs +++ b/crates/convex/examples/convex_chat_client.rs @@ -21,13 +21,13 @@ use convex::{ Value, }; use futures::{ - channel::oneshot, pin_mut, select_biased, FutureExt, StreamExt, }; use maplit::btreemap; +use tokio::sync::oneshot; const SETUP_MSG: &str = r" Please run this Convex Chat client from an initialized Convex project. diff --git a/crates/convex/src/client/mod.rs b/crates/convex/src/client/mod.rs index e497437e..d345ea95 100644 --- a/crates/convex/src/client/mod.rs +++ b/crates/convex/src/client/mod.rs @@ -12,15 +12,15 @@ use convex_sync_types::{ #[cfg(doc)] use futures::Stream; use futures::{ - channel::{ - mpsc, - oneshot, - }, + channel::mpsc, SinkExt, StreamExt, }; use tokio::{ - sync::broadcast, + sync::{ + broadcast, + oneshot, + }, task::JoinHandle, }; use tokio_stream::wrappers::BroadcastStream; diff --git a/crates/convex/src/client/worker.rs b/crates/convex/src/client/worker.rs index efa1689d..f3d6bd4b 100644 --- a/crates/convex/src/client/worker.rs +++ b/crates/convex/src/client/worker.rs @@ -10,15 +10,15 @@ use convex_sync_types::{ UdfPath, }; use futures::{ - channel::{ - mpsc, - oneshot, - }, + channel::mpsc, select_biased, FutureExt, StreamExt, }; -use tokio::sync::broadcast; +use tokio::sync::{ + broadcast, + oneshot, +}; use tokio_stream::wrappers::BroadcastStream; use crate::{ diff --git a/crates/convex/src/sync/web_socket_manager.rs b/crates/convex/src/sync/web_socket_manager.rs index bae28096..9c8a38b4 100644 --- a/crates/convex/src/sync/web_socket_manager.rs +++ b/crates/convex/src/sync/web_socket_manager.rs @@ -16,10 +16,7 @@ use convex_sync_types::{ Timestamp, }; use futures::{ - channel::{ - mpsc, - oneshot, - }, + channel::mpsc, select_biased, FutureExt, SinkExt, @@ -27,6 +24,7 @@ use futures::{ }; use tokio::{ net::TcpStream, + sync::oneshot, task::JoinHandle, time::{ Instant, diff --git a/crates/database/src/committer.rs b/crates/database/src/committer.rs index 51497f54..b7beee37 100644 --- a/crates/database/src/committer.rs +++ b/crates/database/src/committer.rs @@ -71,7 +71,6 @@ use errors::{ ErrorMetadataAnyhowExt, }; use futures::{ - channel::oneshot, future::{ BoxFuture, Either, @@ -86,6 +85,7 @@ use indexing::index_registry::IndexRegistry; use minitrace::prelude::*; use parking_lot::Mutex; use prometheus::VMHistogram; +use tokio::sync::oneshot; use usage_tracking::FunctionUsageTracker; use value::{ heap_size::WithHeapSize, diff --git a/crates/database/src/index_workers/search_flusher.rs b/crates/database/src/index_workers/search_flusher.rs index 118c65a3..82486a0e 100644 --- a/crates/database/src/index_workers/search_flusher.rs +++ b/crates/database/src/index_workers/search_flusher.rs @@ -455,7 +455,7 @@ impl SearchFlusher { let rate_limit_pages_per_second = job.build_reason.read_max_pages_per_second(); let developer_config = job.index_config.developer_config.clone(); let params = self.params.clone(); - self.runtime.spawn_thread(move || async move { + let mut handle = self.runtime.spawn_thread(move || async move { let result = Self::build_multipart_segment_on_thread( params, rate_limit_pages_per_second, @@ -469,8 +469,9 @@ impl SearchFlusher { build_index_args, ) .await; - _ = tx.send(result); + let _ = tx.send(result); }); + handle.join().await?; rx.await? } @@ -612,10 +613,11 @@ impl SearchFlusher { let (tx, rx) = oneshot::channel(); let rt = self.runtime.clone(); let storage = self.storage.clone(); - self.runtime.spawn_thread(move || async move { + let mut handle = self.runtime.spawn_thread(move || async move { let result = T::upload_new_segment(&rt, storage, new_segment).await; let _ = tx.send(result); }); + handle.join().await?; rx.await? } } diff --git a/crates/database/src/subscription.rs b/crates/database/src/subscription.rs index 316a0b52..ceb73c24 100644 --- a/crates/database/src/subscription.rs +++ b/crates/database/src/subscription.rs @@ -33,10 +33,7 @@ use common::{ }, }; use futures::{ - channel::{ - mpsc, - oneshot, - }, + channel::mpsc, select_biased, FutureExt, StreamExt, @@ -47,6 +44,7 @@ use parking_lot::Mutex; use prometheus::VMHistogram; use search::query::TextSearchSubscriptions; use slab::Slab; +use tokio::sync::oneshot; use crate::{ metrics, diff --git a/crates/database/src/write_log.rs b/crates/database/src/write_log.rs index 7994f35a..54eb29b1 100644 --- a/crates/database/src/write_log.rs +++ b/crates/database/src/write_log.rs @@ -28,11 +28,9 @@ use errors::{ ErrorMetadata, ErrorMetadataAnyhowExt, }; -use futures::{ - channel::oneshot, - Future, -}; +use futures::Future; use parking_lot::RwLock; +use tokio::sync::oneshot; use value::heap_size::{ HeapSize, WithHeapSize, @@ -156,7 +154,7 @@ impl WriteLog { // Notify waiters let mut i = 0; while i < self.waiters.len() { - if ts > self.waiters[i].0 || self.waiters[i].1.is_canceled() { + if ts > self.waiters[i].0 || self.waiters[i].1.is_closed() { // Remove from the waiters. let w = self.waiters.swap_remove_back(i).expect("checked above"); // Notify. Ignore if receiver has dropped. diff --git a/crates/function_runner/src/isolate_worker.rs b/crates/function_runner/src/isolate_worker.rs index 4d72d38c..ecc40621 100644 --- a/crates/function_runner/src/isolate_worker.rs +++ b/crates/function_runner/src/isolate_worker.rs @@ -7,6 +7,7 @@ use common::pause::PauseClient; use common::{ errors::report_error, runtime::Runtime, + sync::oneshot_receiver_closed, types::UdfType, }; use futures::FutureExt; @@ -108,7 +109,7 @@ impl IsolateWorker for FunctionRunnerIsolateWorker { client_id, isolate, isolate_clean, - response.cancellation().boxed(), + oneshot_receiver_closed(&mut response).boxed(), ) .await; let status = match &r { @@ -158,7 +159,7 @@ impl IsolateWorker for FunctionRunnerIsolateWorker { isolate, isolate_clean, request.params.clone(), - response.cancellation().boxed(), + oneshot_receiver_closed(&mut response).boxed(), ) .await; @@ -211,7 +212,7 @@ impl IsolateWorker for FunctionRunnerIsolateWorker { request.http_module_path, request.routed_path, request.http_request, - response.cancellation().boxed(), + oneshot_receiver_closed(&mut response).boxed(), ) .await; let status = match &r { diff --git a/crates/function_runner/src/server.rs b/crates/function_runner/src/server.rs index f397b1d5..4fd20a82 100644 --- a/crates/function_runner/src/server.rs +++ b/crates/function_runner/src/server.rs @@ -55,10 +55,7 @@ use database::{ TransactionTextSnapshot, }; use file_storage::TransactionalFileStorage; -use futures::channel::{ - mpsc, - oneshot, -}; +use futures::channel::mpsc; use isolate::{ client::{ initialize_v8, @@ -104,6 +101,7 @@ use storage::{ StorageUseCase, }; use sync_types::Timestamp; +use tokio::sync::oneshot; use usage_tracking::{ FunctionUsageStats, FunctionUsageTracker, @@ -754,7 +752,6 @@ mod tests { use common::pause::PauseController; use database::test_helpers::DbFixtures; use errors::ErrorMetadataAnyhowExt; - use futures::channel::oneshot; use isolate::{ client::{ initialize_v8, @@ -766,6 +763,7 @@ mod tests { use model::test_helpers::DbFixturesWithModel; use runtime::testing::TestRuntime; use storage::LocalDirStorage; + use tokio::sync::oneshot; use crate::server::{ FunctionRunnerCore, diff --git a/crates/isolate/src/client.rs b/crates/isolate/src/client.rs index 22fb39c7..9f72de1e 100644 --- a/crates/isolate/src/client.rs +++ b/crates/isolate/src/client.rs @@ -65,6 +65,7 @@ use common::{ }, schemas::DatabaseSchema, static_span, + sync::oneshot_receiver_closed, types::{ ModuleEnvironment, UdfType, @@ -85,10 +86,7 @@ use errors::{ }; use file_storage::TransactionalFileStorage; use futures::{ - channel::{ - mpsc, - oneshot, - }, + channel::mpsc, future, pin_mut, select, @@ -139,6 +137,7 @@ use sync_types::{ CanonicalizedModulePath, CanonicalizedUdfPath, }; +use tokio::sync::oneshot; use usage_tracking::FunctionUsageStats; use value::{ id_v6::DeveloperDocumentId, @@ -1747,7 +1746,7 @@ impl IsolateWorker for BackendIsolateWorker { client_id, isolate, isolate_clean, - response.cancellation().boxed(), + oneshot_receiver_closed(&mut response).boxed(), ) .await; let status = match &r { @@ -1799,7 +1798,7 @@ impl IsolateWorker for BackendIsolateWorker { request.http_module_path, request.routed_path, request.http_request, - response.cancellation().boxed(), + oneshot_receiver_closed(&mut response).boxed(), ) .await; let status = match &r { @@ -1845,7 +1844,7 @@ impl IsolateWorker for BackendIsolateWorker { isolate, isolate_clean, request.params.clone(), - response.cancellation().boxed(), + oneshot_receiver_closed(&mut response).boxed(), ) .await; let status = match &r { diff --git a/crates/isolate/src/isolate2/client.rs b/crates/isolate/src/isolate2/client.rs index b6556749..13d7e954 100644 --- a/crates/isolate/src/isolate2/client.rs +++ b/crates/isolate/src/isolate2/client.rs @@ -11,13 +11,13 @@ use common::{ types::UdfType, }; use deno_core::ModuleSpecifier; -use futures::channel::{ - mpsc, - oneshot, -}; +use futures::channel::mpsc; use serde_json::Value as JsonValue; use sync_types::CanonicalizedUdfPath; -use tokio::sync::Semaphore; +use tokio::sync::{ + oneshot, + Semaphore, +}; use value::{ ConvexArray, ConvexValue, @@ -169,7 +169,7 @@ impl IsolateThreadClient { pub async fn send( &mut self, request: IsolateThreadRequest, - mut rx: oneshot::Receiver>, + rx: oneshot::Receiver>, ) -> anyhow::Result { if self.user_time_remaining.is_zero() { anyhow::bail!("User time exhausted"); @@ -181,17 +181,17 @@ impl IsolateThreadClient { // Start the user timer after we acquire the permit. let user_start = Instant::now(); - let mut user_timeout = self.rt.wait(self.user_time_remaining); + let user_timeout = self.rt.wait(self.user_time_remaining); self.sender .try_send(request) .map_err(|e| e.into_send_error())?; - let result = futures::select_biased! { + let result = tokio::select! { + result = rx => result, _ = user_timeout => { // XXX: We need to terminate the isolate handle here in // case user code is in an infinite loop. anyhow::bail!("User time exhausted"); }, - result = rx => result, }; // Deduct the time spent in the isolate thread from our remaining user time. diff --git a/crates/isolate/src/isolate2/runner.rs b/crates/isolate/src/isolate2/runner.rs index ef0cf653..3c936a07 100644 --- a/crates/isolate/src/isolate2/runner.rs +++ b/crates/isolate/src/isolate2/runner.rs @@ -29,6 +29,7 @@ use common::{ Runtime, UnixTimestamp, }, + sync::oneshot_receiver_closed, types::{ PersistenceVersion, UdfType, @@ -42,11 +43,7 @@ use database::{ }; use errors::ErrorMetadata; use futures::{ - channel::{ - mpsc, - oneshot, - }, - FutureExt, + channel::mpsc, StreamExt, }; use keybroker::KeyBroker; @@ -69,7 +66,10 @@ use parking_lot::Mutex; use rand::SeedableRng; use rand_chacha::ChaCha12Rng; use serde_json::Value as JsonValue; -use tokio::sync::Semaphore; +use tokio::sync::{ + oneshot, + Semaphore, +}; use value::{ ConvexArray, ConvexObject, @@ -1009,12 +1009,12 @@ async fn tokio_thread( query_journal, ); - let r = futures::select_biased! { - r = request.fuse() => r, + let r = tokio::select! { + r = request => r, // Eventually we'll attempt to cleanup the isolate thread in these conditions. _ = rt.wait(total_timeout) => Err(anyhow::anyhow!("Total timeout exceeded")), - _ = sender.cancellation().fuse() => Err(anyhow::anyhow!("Cancelled")), + _ = oneshot_receiver_closed(&mut sender) => Err(anyhow::anyhow!("Cancelled")), }; let _ = sender.send(r.map(|r| (tx, r))); drop(client); diff --git a/crates/isolate/src/test_helpers.rs b/crates/isolate/src/test_helpers.rs index f04a93f0..9afa539e 100644 --- a/crates/isolate/src/test_helpers.rs +++ b/crates/isolate/src/test_helpers.rs @@ -72,10 +72,7 @@ use database::{ }; use file_storage::TransactionalFileStorage; use futures::{ - channel::{ - mpsc, - oneshot, - }, + channel::mpsc, select, Future, FutureExt, @@ -127,6 +124,7 @@ use storage::{ Storage, }; use sync_types::UdfPath; +use tokio::sync::oneshot; use usage_tracking::FunctionUsageStats; use value::{ id_v6::DeveloperDocumentId, diff --git a/crates/local_backend/src/subs/mod.rs b/crates/local_backend/src/subs/mod.rs index 92cc4693..b931057b 100644 --- a/crates/local_backend/src/subs/mod.rs +++ b/crates/local_backend/src/subs/mod.rs @@ -405,6 +405,7 @@ mod tests { SinkExt, StreamExt, }; + use tokio::sync::oneshot; use tokio_tungstenite::connect_async; use tungstenite::error::Error as TungsteniteError; @@ -448,7 +449,7 @@ mod tests { ); let port = portpicker::pick_unused_port().expect("No ports free"); let addr = format!("127.0.0.1:{port}").parse()?; - let (shutdown_tx, shutdown_rx) = futures::channel::oneshot::channel(); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); let proxy_server = tokio::spawn(app.serve(addr, async move { shutdown_rx.await.unwrap(); })); diff --git a/crates/runtime/src/prod.rs b/crates/runtime/src/prod.rs index 1df71eb5..c510ad0e 100644 --- a/crates/runtime/src/prod.rs +++ b/crates/runtime/src/prod.rs @@ -27,7 +27,6 @@ use common::{ }, }; use futures::{ - channel::oneshot, future::{ BoxFuture, FusedFuture, @@ -42,6 +41,7 @@ use tokio::{ Handle as TokioRuntimeHandle, Runtime as TokioRuntime, }, + sync::oneshot, time::{ sleep, Duration, @@ -116,7 +116,7 @@ impl ThreadHandle { Fut: Future, F: FnOnce() -> Fut + Send + 'static, { - let (cancel_tx, mut cancel_rx) = oneshot::channel(); + let (cancel_tx, cancel_rx) = oneshot::channel(); let (done_tx, done_rx) = oneshot::channel(); let thread_handle = thread::Builder::new() .stack_size(*RUNTIME_STACK_SIZE) @@ -124,9 +124,17 @@ impl ThreadHandle { let _guard = tokio_handle.enter(); let thread_body = async move { let future = f(); - let was_canceled = futures::select! { - _ = cancel_rx => true, - _ = future.fuse() => false, + tokio::pin!(future); + let was_canceled = tokio::select! { + r = cancel_rx => { + if r.is_ok() { + true + } else { + future.await; + false + } + }, + _ = &mut future => false, }; let _ = done_tx.send(was_canceled); }; diff --git a/crates/value/Cargo.toml b/crates/value/Cargo.toml index 26b02ea5..2f43a2a4 100644 --- a/crates/value/Cargo.toml +++ b/crates/value/Cargo.toml @@ -17,7 +17,6 @@ byteorder = { workspace = true } bytes = { workspace = true } derive_more = { workspace = true } errors = { path = "../errors" } -futures = { workspace = true } hex = { workspace = true } humansize = { workspace = true } imbl = { workspace = true } @@ -30,6 +29,7 @@ serde_json = { workspace = true } sha2 = { workspace = true } sync_types = { package = "convex_sync_types", path = "../convex/sync_types" } thiserror = { workspace = true } +tokio = { workspace = true } uuid = { workspace = true } [target.'cfg(not(target_os="windows"))'.dependencies] @@ -46,7 +46,9 @@ errors = { path = "../errors", features = ["testing"] } metrics = { path = "../metrics", features = ["testing"] } proptest = { workspace = true } proptest-derive = { workspace = true } -sync_types = { package = "convex_sync_types", path = "../convex/sync_types", features = ["testing"] } +sync_types = { package = "convex_sync_types", path = "../convex/sync_types", features = [ + "testing", +] } [features] testing = [ diff --git a/crates/value/src/heap_size.rs b/crates/value/src/heap_size.rs index 4f4d3caa..64691cfb 100644 --- a/crates/value/src/heap_size.rs +++ b/crates/value/src/heap_size.rs @@ -24,7 +24,6 @@ use std::{ ptr::NonNull, }; -use futures::channel::oneshot; #[cfg(any(test, feature = "testing"))] use proptest::{ prelude::Arbitrary, @@ -46,6 +45,7 @@ use sync_types::{ Timestamp, UserIdentityAttributes, }; +use tokio::sync::oneshot; pub trait HeapSize { fn heap_size(&self) -> usize;