Skip to content

Commit

Permalink
Use tokio's oneshot everywhere (#30516)
Browse files Browse the repository at this point in the history
Switch our oneshots from `futures` to `tokio` so they're coop budget aware -- if a Tokio task has exhausted its coop budget, `oneshot::Receivers` will preempt the task if awaited. This does require that our futures are all executed within the Tokio runtime, however.

GitOrigin-RevId: da8f8017fd0f07f8e50332682c43ce514ba8c3d4
  • Loading branch information
sujayakar authored and Convex, Inc. committed Oct 9, 2024
1 parent 2839c16 commit 78129b8
Show file tree
Hide file tree
Showing 27 changed files with 108 additions and 98 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,11 @@ codegen-units = 16
[profile.dev.package.num-bigint-dig]
opt-level = 3
codegen-units = 16

[profile.dev.package.tokio]
opt-level = 3
codegen-units = 16

[profile.dev.package.tokio-stream]
opt-level = 3
codegen-units = 16
2 changes: 1 addition & 1 deletion crates/application/src/function_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ use common::{
},
};
use float_next_after::NextAfter;
use futures::channel::oneshot;
use http::{
Method,
StatusCode,
Expand All @@ -74,6 +73,7 @@ use serde_json::{
json,
Value as JsonValue,
};
use tokio::sync::oneshot;
use url::Url;
use usage_tracking::{
AggregatedFunctionUsageStats,
Expand Down
6 changes: 2 additions & 4 deletions crates/application/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,7 @@ use function_log::{
FunctionExecutionPart,
};
use function_runner::FunctionRunner;
use futures::{
channel::oneshot,
stream::BoxStream,
};
use futures::stream::BoxStream;
use headers::{
ContentLength,
ContentType,
Expand Down Expand Up @@ -283,6 +280,7 @@ use table_summary_worker::{
TableSummaryClient,
TableSummaryWorker,
};
use tokio::sync::oneshot;
use usage_tracking::{
FunctionUsageStats,
FunctionUsageTracker,
Expand Down
2 changes: 1 addition & 1 deletion crates/application/src/table_summary_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ use database::{
TableSummaryWriter,
};
use futures::{
channel::oneshot,
pin_mut,
select_biased,
FutureExt,
};
use parking_lot::Mutex;
use tokio::sync::oneshot;

use crate::metrics::log_worker_starting;

Expand Down
6 changes: 2 additions & 4 deletions crates/common/src/bounded_thread_pool.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use std::sync::Arc;

use futures::{
channel::{
mpsc,
oneshot,
},
channel::mpsc,
future::{
self,
BoxFuture,
Expand All @@ -17,6 +14,7 @@ use futures::{
StreamExt,
};
use parking_lot::Mutex;
use tokio::sync::oneshot;

use crate::{
codel_queue::{
Expand Down
3 changes: 2 additions & 1 deletion crates/common/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::{
use anyhow::Context;
use async_trait::async_trait;
use futures::{
channel::oneshot,
future::{
BoxFuture,
FusedFuture,
Expand Down Expand Up @@ -52,6 +51,7 @@ use proptest::prelude::*;
use rand::RngCore;
use serde::Serialize;
use thiserror::Error;
use tokio::sync::oneshot;
use uuid::Uuid;
use value::heap_size::HeapSize;

Expand Down Expand Up @@ -203,6 +203,7 @@ pub trait Runtime: Clone + Sync + Send + 'static {
/// Spawn a future on a reserved OS thread. This is only really necessary
/// for libraries like `V8` that care about being called from a
/// particular thread.
#[must_use = "Threads are canceled when their `SpawnHandle` is dropped."]
fn spawn_thread<Fut: Future<Output = ()>, F: FnOnce() -> Fut + Send + 'static>(
&self,
f: F,
Expand Down
12 changes: 7 additions & 5 deletions crates/common/src/runtime/testing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::{
};

use futures::{
channel::oneshot,
future::{
self,
BoxFuture,
Expand All @@ -34,10 +33,13 @@ use rand::{
SeedableRng,
};
use rand_chacha::ChaCha12Rng;
use tokio::runtime::{
Builder,
RngSeed,
UnhandledPanic,
use tokio::{
runtime::{
Builder,
RngSeed,
UnhandledPanic,
},
sync::oneshot,
};

use super::{
Expand Down
14 changes: 7 additions & 7 deletions crates/common/src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
pub mod split_rw_lock;
pub mod state_channel;

// It's safe to use these `tokio` sync primitives in our runtime-generic code
// since they don't internally depend on the `tokio` runtime. Feel free to add
// more if you need them, but generally prefer using `futures`-based primitives
// if sufficient.
use futures::future;
use tokio::sync::oneshot;
pub use tokio::sync::{
broadcast,
// This channel is useful over `futures::channel::mpsc` since it doesn't require `&mut self` on
// `try_send`. The `futures` implementation conforms to their `Sink` trait which unnecessarily
// requires mutability.
mpsc,
watch,
Mutex,
MutexGuard,
Notify,
};

/// Wait until a sender's corresponding receiver has been closed.
pub async fn oneshot_receiver_closed<T>(sender: &mut oneshot::Sender<T>) {
future::poll_fn(|cx| sender.poll_closed(cx)).await
}
2 changes: 1 addition & 1 deletion crates/convex/examples/convex_chat_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ use convex::{
Value,
};
use futures::{
channel::oneshot,
pin_mut,
select_biased,
FutureExt,
StreamExt,
};
use maplit::btreemap;
use tokio::sync::oneshot;

const SETUP_MSG: &str = r"
Please run this Convex Chat client from an initialized Convex project.
Expand Down
10 changes: 5 additions & 5 deletions crates/convex/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ use convex_sync_types::{
#[cfg(doc)]
use futures::Stream;
use futures::{
channel::{
mpsc,
oneshot,
},
channel::mpsc,
SinkExt,
StreamExt,
};
use tokio::{
sync::broadcast,
sync::{
broadcast,
oneshot,
},
task::JoinHandle,
};
use tokio_stream::wrappers::BroadcastStream;
Expand Down
10 changes: 5 additions & 5 deletions crates/convex/src/client/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ use convex_sync_types::{
UdfPath,
};
use futures::{
channel::{
mpsc,
oneshot,
},
channel::mpsc,
select_biased,
FutureExt,
StreamExt,
};
use tokio::sync::broadcast;
use tokio::sync::{
broadcast,
oneshot,
};
use tokio_stream::wrappers::BroadcastStream;

use crate::{
Expand Down
6 changes: 2 additions & 4 deletions crates/convex/src/sync/web_socket_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,15 @@ use convex_sync_types::{
Timestamp,
};
use futures::{
channel::{
mpsc,
oneshot,
},
channel::mpsc,
select_biased,
FutureExt,
SinkExt,
StreamExt,
};
use tokio::{
net::TcpStream,
sync::oneshot,
task::JoinHandle,
time::{
Instant,
Expand Down
2 changes: 1 addition & 1 deletion crates/database/src/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ use errors::{
ErrorMetadataAnyhowExt,
};
use futures::{
channel::oneshot,
future::{
BoxFuture,
Either,
Expand All @@ -86,6 +85,7 @@ use indexing::index_registry::IndexRegistry;
use minitrace::prelude::*;
use parking_lot::Mutex;
use prometheus::VMHistogram;
use tokio::sync::oneshot;
use usage_tracking::FunctionUsageTracker;
use value::{
heap_size::WithHeapSize,
Expand Down
8 changes: 5 additions & 3 deletions crates/database/src/index_workers/search_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ impl<RT: Runtime, T: SearchIndex + 'static> SearchFlusher<RT, T> {
let rate_limit_pages_per_second = job.build_reason.read_max_pages_per_second();
let developer_config = job.index_config.developer_config.clone();
let params = self.params.clone();
self.runtime.spawn_thread(move || async move {
let mut handle = self.runtime.spawn_thread(move || async move {
let result = Self::build_multipart_segment_on_thread(
params,
rate_limit_pages_per_second,
Expand All @@ -469,8 +469,9 @@ impl<RT: Runtime, T: SearchIndex + 'static> SearchFlusher<RT, T> {
build_index_args,
)
.await;
_ = tx.send(result);
let _ = tx.send(result);
});
handle.join().await?;
rx.await?
}

Expand Down Expand Up @@ -612,10 +613,11 @@ impl<RT: Runtime, T: SearchIndex + 'static> SearchFlusher<RT, T> {
let (tx, rx) = oneshot::channel();
let rt = self.runtime.clone();
let storage = self.storage.clone();
self.runtime.spawn_thread(move || async move {
let mut handle = self.runtime.spawn_thread(move || async move {
let result = T::upload_new_segment(&rt, storage, new_segment).await;
let _ = tx.send(result);
});
handle.join().await?;
rx.await?
}
}
Expand Down
6 changes: 2 additions & 4 deletions crates/database/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ use common::{
},
};
use futures::{
channel::{
mpsc,
oneshot,
},
channel::mpsc,
select_biased,
FutureExt,
StreamExt,
Expand All @@ -47,6 +44,7 @@ use parking_lot::Mutex;
use prometheus::VMHistogram;
use search::query::TextSearchSubscriptions;
use slab::Slab;
use tokio::sync::oneshot;

use crate::{
metrics,
Expand Down
8 changes: 3 additions & 5 deletions crates/database/src/write_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@ use errors::{
ErrorMetadata,
ErrorMetadataAnyhowExt,
};
use futures::{
channel::oneshot,
Future,
};
use futures::Future;
use parking_lot::RwLock;
use tokio::sync::oneshot;
use value::heap_size::{
HeapSize,
WithHeapSize,
Expand Down Expand Up @@ -156,7 +154,7 @@ impl WriteLog {
// Notify waiters
let mut i = 0;
while i < self.waiters.len() {
if ts > self.waiters[i].0 || self.waiters[i].1.is_canceled() {
if ts > self.waiters[i].0 || self.waiters[i].1.is_closed() {
// Remove from the waiters.
let w = self.waiters.swap_remove_back(i).expect("checked above");
// Notify. Ignore if receiver has dropped.
Expand Down
7 changes: 4 additions & 3 deletions crates/function_runner/src/isolate_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use common::pause::PauseClient;
use common::{
errors::report_error,
runtime::Runtime,
sync::oneshot_receiver_closed,
types::UdfType,
};
use futures::FutureExt;
Expand Down Expand Up @@ -108,7 +109,7 @@ impl<RT: Runtime> IsolateWorker<RT> for FunctionRunnerIsolateWorker<RT> {
client_id,
isolate,
isolate_clean,
response.cancellation().boxed(),
oneshot_receiver_closed(&mut response).boxed(),
)
.await;
let status = match &r {
Expand Down Expand Up @@ -158,7 +159,7 @@ impl<RT: Runtime> IsolateWorker<RT> for FunctionRunnerIsolateWorker<RT> {
isolate,
isolate_clean,
request.params.clone(),
response.cancellation().boxed(),
oneshot_receiver_closed(&mut response).boxed(),
)
.await;

Expand Down Expand Up @@ -211,7 +212,7 @@ impl<RT: Runtime> IsolateWorker<RT> for FunctionRunnerIsolateWorker<RT> {
request.http_module_path,
request.routed_path,
request.http_request,
response.cancellation().boxed(),
oneshot_receiver_closed(&mut response).boxed(),
)
.await;
let status = match &r {
Expand Down
Loading

0 comments on commit 78129b8

Please sign in to comment.