Skip to content

Commit

Permalink
[modules] populate cache with multiple values (#25152)
Browse files Browse the repository at this point in the history
when we start storing modules in S3, we will want to populate the entire module cache at once by reading the S3 zip file once. To do this, we should add a method to AsyncLru `get_and_prepopulate` that has the same functionality as `get` but the ValueGenerator can return values for additional keys, prepopulating them for future gets.

Started using this method in `ModuleCacheWorker` with no benefits over the existing API -- it's just exercising the codepath in preparation for the S3 use-case.
Also add a unit test.

GitOrigin-RevId: 1191f1558778384cee7690e3f4fb9fcc17ffb07a
  • Loading branch information
ldanilek authored and Convex, Inc. committed Apr 29, 2024
1 parent b6d0f46 commit 2e7f0eb
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 19 deletions.
40 changes: 33 additions & 7 deletions crates/application/src/module_cache/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::{
collections::BTreeSet,
collections::{
BTreeSet,
HashMap,
},
sync::Arc,
time::Duration,
};
Expand Down Expand Up @@ -108,13 +111,16 @@ impl<RT: Runtime> ModuleCacheWorker<RT> {
// if the number of modules is high and lots of UDFs are using old
// versions, but on average they should be populated and remain.
let num_loaded = referenced_versions.len();
for key in referenced_versions {
let fetcher = ModuleVersionFetcher {
database: self.database.clone(),
modules_storage: self.modules_storage.clone(),
};
let fetcher = ModuleVersionFetcher {
database: self.database.clone(),
modules_storage: self.modules_storage.clone(),
};
if let Some(first_key) = referenced_versions.first().cloned() {
self.cache
.get(key, fetcher.generate_value(key).boxed())
.get_and_prepopulate(
first_key,
fetcher.generate_values(referenced_versions).boxed(),
)
.await?;
}

Expand Down Expand Up @@ -152,6 +158,26 @@ impl<RT: Runtime> ModuleVersionFetcher<RT> {
.await?
.into_value())
}

async fn generate_values(
self,
keys: BTreeSet<(ResolvedDocumentId, ModuleVersion)>,
) -> HashMap<(ResolvedDocumentId, ModuleVersion), anyhow::Result<ModuleVersionMetadata>> {
let mut hashmap = HashMap::new();
for key in keys {
hashmap.insert(
key,
try {
let mut tx = self.database.begin(Identity::system()).await?;
ModuleModel::new(&mut tx)
.get_version(key.0, key.1)
.await?
.into_value()
},
);
}
hashmap
}
}

pub struct ModuleCache<RT: Runtime> {
Expand Down
89 changes: 77 additions & 12 deletions crates/async_lru/src/async_lru.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use core::hash::Hash;
use std::{
collections::HashMap,
fmt::Debug,
sync::Arc,
};
Expand Down Expand Up @@ -68,7 +69,8 @@ pub struct AsyncLru<RT: Runtime, Key, Value> {
pause_client: Option<Arc<tokio::sync::Mutex<PauseClient>>>,
}

pub type ValueGenerator<Value> = BoxFuture<'static, anyhow::Result<Value>>;
pub type SingleValueGenerator<Value> = BoxFuture<'static, anyhow::Result<Value>>;
pub type ValueGenerator<Key, Value> = BoxFuture<'static, HashMap<Key, anyhow::Result<Value>>>;

impl<RT: Runtime, Key, Value> Clone for AsyncLru<RT, Key, Value> {
fn clone(&self) -> Self {
Expand Down Expand Up @@ -142,7 +144,7 @@ type BuildValueResult<Value> = Result<Arc<Value>, Arc<anyhow::Error>>;

type BuildValueRequest<Key, Value> = (
Key,
ValueGenerator<Value>,
ValueGenerator<Key, Value>,
async_broadcast::Sender<BuildValueResult<Value>>,
);

Expand Down Expand Up @@ -247,8 +249,8 @@ impl<
inner.current_size += new_value.size();
// Ideally we'd not change the LRU order by putting here...
if let Some(old_value) = inner.cache.put(key, new_value) {
anyhow::ensure!(!matches!(old_value, CacheResult::Ready { .. }));
// Just in case we ever assign a size to our Waiting entries.
// Allow overwriting entries (Waiting or Ready) which may have been populated
// by racing requests with prefetches.
inner.current_size -= old_value.size();
}
Self::trim_to_size(&mut inner);
Expand Down Expand Up @@ -300,21 +302,45 @@ impl<
inner.current_size
}

pub async fn get(
pub async fn get_and_prepopulate(
&self,
key: Key,
value_generator: ValueGenerator<Value>,
value_generator: ValueGenerator<Key, Value>,
) -> anyhow::Result<Arc<Value>> {
let timer = async_lru_get_timer(self.label);
let result = self._get(&key, value_generator).await;
timer.finish(result.is_ok());
result
}

pub async fn get(
&self,
key: Key,
value_generator: SingleValueGenerator<Value>,
) -> anyhow::Result<Arc<Value>>
where
Key: Clone,
{
let timer = async_lru_get_timer(self.label);
let key_ = key.clone();
let result = self
._get(
&key_,
Box::pin(async move {
let mut hashmap = HashMap::new();
hashmap.insert(key, value_generator.await);
hashmap
}),
)
.await;
timer.finish(result.is_ok());
result
}

async fn _get(
&self,
key: &Key,
value_generator: ValueGenerator<Value>,
value_generator: ValueGenerator<Key, Value>,
) -> anyhow::Result<Arc<Value>> {
match self.get_sync(key, value_generator)? {
Status::Ready(value) => Ok(value),
Expand All @@ -336,7 +362,7 @@ impl<
fn get_sync(
&self,
key: &Key,
value_generator: ValueGenerator<Value>,
value_generator: ValueGenerator<Key, Value>,
) -> anyhow::Result<Status<Value>> {
let mut inner = self.inner.lock();
log_async_lru_size(inner.cache.len(), inner.current_size, self.label);
Expand Down Expand Up @@ -407,10 +433,16 @@ impl<
return;
}

let value = generator.await;
let values = generator.await;

let to_broadcast = Self::update_value(rt, inner, key, value).map_err(Arc::new);
let _ = tx.broadcast(to_broadcast).await;
for (k, value) in values {
let is_requested_key = k == key;
let to_broadcast =
Self::update_value(rt.clone(), inner.clone(), k, value).map_err(Arc::new);
if is_requested_key {
let _ = tx.broadcast(to_broadcast).await;
}
}
}
})
.await;
Expand All @@ -420,7 +452,10 @@ impl<

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::{
collections::HashMap,
sync::Arc,
};

use common::{
pause::PauseController,
Expand Down Expand Up @@ -536,6 +571,36 @@ mod tests {
Ok(())
}

#[convex_macro::test_runtime]
async fn test_get_and_prepopulate(rt: TestRuntime) -> anyhow::Result<()> {
let cache = AsyncLru::new(rt, 10, 1, "label");
let first = cache
.get_and_prepopulate(
"k1",
async move {
let mut hashmap = HashMap::new();
hashmap.insert("k1", Ok(1));
hashmap.insert("k2", Ok(2));
hashmap.insert("k3", Err(anyhow::anyhow!("k3 failed")));
hashmap
}
.boxed(),
)
.await?;
assert_eq!(*first, 1);
let k1_again = cache
.get("k1", GenerateRandomValue::generate_value("k1").boxed())
.await?;
assert_eq!(*k1_again, 1);
let k2_prepopulated = cache
.get("k2", GenerateRandomValue::generate_value("k2").boxed())
.await?;
assert_eq!(*k2_prepopulated, 2);
let k3_prepopulated = cache.get("k3", async move { Ok(3) }.boxed()).await?;
assert_eq!(*k3_prepopulated, 3);
Ok(())
}

#[convex_macro::test_runtime]
async fn get_generates_new_value_after_eviction(rt: TestRuntime) -> anyhow::Result<()> {
let cache = AsyncLru::new(rt, 1, 1, "label");
Expand Down

0 comments on commit 2e7f0eb

Please sign in to comment.