Skip to content

Commit

Permalink
simplify ShortBoxFuture (#28983)
Browse files Browse the repository at this point in the history
i published ShortBoxFuture to crates since it seems a generally useful utility. and reduced the three lifetimes down to two.

GitOrigin-RevId: 6b007e40288a1ca28ecb19bec58c0f9d88cf56b5
  • Loading branch information
ldanilek authored and Convex, Inc. committed Aug 16, 2024
1 parent e19e4fa commit a5ee5d0
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 70 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ serde = { version = "1", features = [ "derive" ] }
serde_json = { version = "1", features = [ "float_roundtrip", "preserve_order" ] }
sha1 = { version = "0.10.5", features = [ "oid" ] }
sha2 = { version = "0.10.8" }
short_future = { version = "0.1.1" }
slab = "0.4.9"
sodiumoxide = "^0.2"
sourcemap = "8"
Expand Down
1 change: 1 addition & 0 deletions crates/application/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ semver = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
shape_inference = { path = "../shape_inference" }
short_future = { workspace = true }
slugify = "0.1.0"
sourcemap = { workspace = true }
storage = { path = "../storage" }
Expand Down
36 changes: 10 additions & 26 deletions crates/application/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ use database::{
IndexWorker,
OccRetryStats,
SearchIndexWorkers,
ShortBoxFuture,
Snapshot,
SnapshotPage,
StreamingExportTableFilter,
Expand Down Expand Up @@ -264,6 +263,7 @@ use search::{
};
use semver::Version;
use serde_json::Value as JsonValue;
use short_future::ShortBoxFuture;
use snapshot_import::{
clear_tables,
store_uploaded_import,
Expand Down Expand Up @@ -2814,12 +2814,8 @@ impl<RT: Runtime> Application<RT> {
F: Send + Sync,
F: for<'c> Fn(
&'c mut Transaction<RT>,
) -> ShortBoxFuture<
'_,
'b,
'c,
anyhow::Result<(T, Vec<DeploymentAuditLogEvent>)>,
>,
)
-> ShortBoxFuture<'c, 'b, anyhow::Result<(T, Vec<DeploymentAuditLogEvent>)>>,
{
let (t, events) = f(tx).0.await?;
DeploymentAuditLogModel::new(tx)
Expand All @@ -2839,12 +2835,8 @@ impl<RT: Runtime> Application<RT> {
T: Send + 'static,
F: for<'b> Fn(
&'b mut Transaction<RT>,
) -> ShortBoxFuture<
'_,
'a,
'b,
anyhow::Result<(T, Vec<DeploymentAuditLogEvent>)>,
>,
)
-> ShortBoxFuture<'b, 'a, anyhow::Result<(T, Vec<DeploymentAuditLogEvent>)>>,
{
self.execute_with_audit_log_events_and_occ_retries_with_pause_client(
identity,
Expand All @@ -2867,12 +2859,8 @@ impl<RT: Runtime> Application<RT> {
T: Send + 'static,
F: for<'b> Fn(
&'b mut Transaction<RT>,
) -> ShortBoxFuture<
'_,
'a,
'b,
anyhow::Result<(T, Vec<DeploymentAuditLogEvent>)>,
>,
)
-> ShortBoxFuture<'b, 'a, anyhow::Result<(T, Vec<DeploymentAuditLogEvent>)>>,
{
self.execute_with_audit_log_events_and_occ_retries_with_pause_client(
identity,
Expand All @@ -2895,12 +2883,8 @@ impl<RT: Runtime> Application<RT> {
T: Send + 'static,
F: for<'b> Fn(
&'b mut Transaction<RT>,
) -> ShortBoxFuture<
'_,
'a,
'b,
anyhow::Result<(T, Vec<DeploymentAuditLogEvent>)>,
>,
)
-> ShortBoxFuture<'b, 'a, anyhow::Result<(T, Vec<DeploymentAuditLogEvent>)>>,
{
let db = self.database.clone();
let (ts, (t, events), stats) = db
Expand Down Expand Up @@ -2936,7 +2920,7 @@ impl<RT: Runtime> Application<RT> {
where
F: Send + Sync,
T: Send + 'static,
F: for<'b> Fn(&'b mut Transaction<RT>) -> ShortBoxFuture<'_, 'a, 'b, anyhow::Result<T>>,
F: for<'b> Fn(&'b mut Transaction<RT>) -> ShortBoxFuture<'b, 'a, anyhow::Result<T>>,
{
self.database
.execute_with_occ_retries(identity, usage, pause_client, write_source, f)
Expand Down
1 change: 1 addition & 0 deletions crates/database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ search = { path = "../search" }
serde = { workspace = true }
serde_json = { workspace = true }
shape_inference = { path = "../shape_inference" }
short_future = { workspace = true }
slab = { workspace = true }
storage = { path = "../storage" }
sync_types = { package = "convex_sync_types", path = "../convex/sync_types" }
Expand Down
47 changes: 4 additions & 43 deletions crates/database/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::{
BTreeMap,
BTreeSet,
},
marker::PhantomData,
ops::Bound,
sync::{
atomic::{
Expand Down Expand Up @@ -103,10 +102,8 @@ use errors::{
};
use events::usage::UsageEventLogger;
use futures::{
future::BoxFuture,
pin_mut,
stream::BoxStream,
Future,
FutureExt,
StreamExt,
TryStreamExt,
Expand All @@ -128,6 +125,7 @@ use search::{
TextIndexManager,
TextIndexManagerState,
};
use short_future::ShortBoxFuture;
use storage::Storage;
use sync_types::backoff::Backoff;
use usage_tracking::{
Expand Down Expand Up @@ -693,43 +691,6 @@ impl ShutdownSignal {
}
}

/// ShortBoxFuture<'_, 'a, 'b, T> is a future with a shorter lifetime.
/// It is equivalent to BoxFuture<'a + 'b, T>, working
/// around limitations of HRTBs and explicit lifetime bounds.
/// This is useful when wrapping async closures, where the closure returns a
/// future that depends on both:
/// 1. references in the enclosing scope with lifetime 'a.
/// 2. references in the closure's arguments with lifetime 'b.
/// For example:
///
/// async fn with_retries<'a>(&'a self, f: F)
/// where F: for<'b> Fn(&'b Transaction) -> ShortBoxFuture<'_, 'a, 'b, ()>
/// {
/// let tx = self.begin();
/// for i in 0..2 {
/// f(&tx).await;
/// }
/// }
///
/// async fn go(&self) {
/// let document = ResolvedDocument::new();
/// with_retries(|tx| ShortBoxFuture::new(async {
/// tx.get(document.id()).await;
/// })).await
/// }
pub struct ShortBoxFuture<'c, 'a: 'c, 'b: 'c, T>(
pub BoxFuture<'c, T>,
PhantomData<&'a ()>,
PhantomData<&'b ()>,
);
impl<'c, 'a: 'c, 'b: 'c, T, F: Future<Output = T> + Send + 'c> From<F>
for ShortBoxFuture<'c, 'a, 'b, T>
{
fn from(f: F) -> Self {
Self(Box::pin(f), PhantomData, PhantomData)
}
}

#[derive(Clone)]
pub struct StreamingExportTableFilter {
pub table_name: Option<TableName>,
Expand Down Expand Up @@ -1275,7 +1236,7 @@ impl<RT: Runtime> Database<RT> {
where
T: Send,
R: Fn(&Error) -> bool,
F: for<'b> Fn(&'b mut Transaction<RT>) -> ShortBoxFuture<'_, 'a, 'b, anyhow::Result<T>>,
F: for<'b> Fn(&'b mut Transaction<RT>) -> ShortBoxFuture<'b, 'a, anyhow::Result<T>>,
{
let write_source = write_source.into();
let result = {
Expand Down Expand Up @@ -1341,7 +1302,7 @@ impl<RT: Runtime> Database<RT> {
) -> anyhow::Result<(Timestamp, T, OccRetryStats)>
where
T: Send,
F: for<'b> Fn(&'b mut Transaction<RT>) -> ShortBoxFuture<'_, 'a, 'b, anyhow::Result<T>>,
F: for<'b> Fn(&'b mut Transaction<RT>) -> ShortBoxFuture<'b, 'a, anyhow::Result<T>>,
{
let backoff = Backoff::new(INITIAL_OCC_BACKOFF, MAX_OCC_BACKOFF);
let is_retriable = |e: &Error| e.is_occ();
Expand Down Expand Up @@ -1372,7 +1333,7 @@ impl<RT: Runtime> Database<RT> {
) -> anyhow::Result<(Timestamp, T, OccRetryStats)>
where
T: Send,
F: for<'b> Fn(&'b mut Transaction<RT>) -> ShortBoxFuture<'_, 'a, 'b, anyhow::Result<T>>,
F: for<'b> Fn(&'b mut Transaction<RT>) -> ShortBoxFuture<'b, 'a, anyhow::Result<T>>,
{
let backoff = Backoff::new(INITIAL_OVERLOADED_BACKOFF, MAX_OVERLOADED_BACKOFF);
let is_retriable = |e: &Error| e.is_occ() || e.is_overloaded();
Expand Down
1 change: 0 additions & 1 deletion crates/database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ pub use self::{
DatabaseSnapshot,
DocumentDeltas,
OccRetryStats,
ShortBoxFuture,
ShutdownSignal,
SnapshotPage,
StreamingExportTableFilter,
Expand Down

0 comments on commit a5ee5d0

Please sign in to comment.