From 0a74b2201ba2cb5a991fdc7ab0e8ad2eb6f94ddd Mon Sep 17 00:00:00 2001 From: Joel Dice Date: Mon, 27 Jan 2025 10:28:03 -0700 Subject: [PATCH] remove `CachingStoreManager` from `factor-key-value` The semantic (non-)guarantees for wasi-keyvalue are still [under discussion](https://github.com/WebAssembly/wasi-keyvalue/pull/56), but meanwhile the behavior of Spin's write-behind cache has caused [some headaches](https://github.com/fermyon/spin/issues/2952), so I'm removing it until we have more clarity on what's allowed and what's disallowed by the proposed standard. The original motivation behind `CachingStoreManager` was to reflect the anticipated behavior of an eventually-consistent, low-latency, cloud-based distributed store and, per [Hyrum's Law](https://www.hyrumslaw.com/) help app developers avoid depending on the behavior of a local, centralized store which would not match that of a distributed store. However, the write-behind caching approach interacts poorly with the lazy connection establishment which some `StoreManager` implementations use, leading writes to apparently succeed even when the connection fails. Subsequent discussion regarding the above issue arrived at a consensus that we should not consider a write to have succeeded until and unless we've successfully connected to and received a write confirmation from at least one replica in a distributed system. I.e. rather than the replication factor (RF) = 0 we've been effectively providing up to this point, we should provide RF=1. The latter still provides low-latency performance when the nearest replica is reasonably close, but improves upon RF=0 in that it shifts responsibility for the write from Spin to the backing store prior to returning "success" to the application. Note that RF=1 (and indeed anything less than RF=ALL) cannot guarantee that the write will be seen immediately (or, in the extreme case of an unrecoverable failure, at all) by readers connected to other replicas. Applications requiring a stronger consistency model should use an ACID-style backing store rather than an eventually consistent one. Signed-off-by: Joel Dice --- Cargo.lock | 1 - crates/factor-key-value/Cargo.toml | 1 - crates/factor-key-value/src/lib.rs | 7 +- crates/factor-key-value/src/util.rs | 346 +--------------------------- 4 files changed, 5 insertions(+), 350 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 318106c7eb..81f17bd739 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7804,7 +7804,6 @@ name = "spin-factor-key-value" version = "3.2.0-pre0" dependencies = [ "anyhow", - "lru", "serde", "spin-core", "spin-factors", diff --git a/crates/factor-key-value/Cargo.toml b/crates/factor-key-value/Cargo.toml index af38d9f8e5..d414a68390 100644 --- a/crates/factor-key-value/Cargo.toml +++ b/crates/factor-key-value/Cargo.toml @@ -6,7 +6,6 @@ edition = { workspace = true } [dependencies] anyhow = { workspace = true } -lru = "0.12" serde = { workspace = true } spin-core = { path = "../core" } spin-factors = { path = "../factors" } diff --git a/crates/factor-key-value/src/lib.rs b/crates/factor-key-value/src/lib.rs index 2d9dbb2d9d..f893263af9 100644 --- a/crates/factor-key-value/src/lib.rs +++ b/crates/factor-key-value/src/lib.rs @@ -18,7 +18,7 @@ pub const KEY_VALUE_STORES_KEY: MetadataKey> = MetadataKey::new("key pub use host::{log_cas_error, log_error, Error, KeyValueDispatch, Store, StoreManager}; pub use runtime_config::RuntimeConfig; use spin_core::async_trait; -pub use util::{CachingStoreManager, DelegatingStoreManager}; +pub use util::DelegatingStoreManager; /// A factor that provides key-value storage. #[derive(Default)] @@ -54,8 +54,7 @@ impl Factor for KeyValueFactor { let store_managers = ctx.take_runtime_config().unwrap_or_default(); let delegating_manager = DelegatingStoreManager::new(store_managers); - let caching_manager = CachingStoreManager::new(delegating_manager); - let store_manager = Arc::new(caching_manager); + let store_manager = Arc::new(delegating_manager); // Build component -> allowed stores map let mut component_allowed_stores = HashMap::new(); @@ -100,7 +99,7 @@ impl Factor for KeyValueFactor { } } -type AppStoreManager = CachingStoreManager; +type AppStoreManager = DelegatingStoreManager; pub struct AppState { /// The store manager for the app. diff --git a/crates/factor-key-value/src/util.rs b/crates/factor-key-value/src/util.rs index 523a61f9e7..ea3fa5caa6 100644 --- a/crates/factor-key-value/src/util.rs +++ b/crates/factor-key-value/src/util.rs @@ -1,17 +1,6 @@ -use crate::{Cas, Error, Store, StoreManager, SwapError}; -use lru::LruCache; +use crate::{Error, Store, StoreManager}; use spin_core::async_trait; -use std::{ - collections::{HashMap, HashSet}, - future::Future, - num::NonZeroUsize, - sync::Arc, -}; -use tokio::{ - sync::Mutex as AsyncMutex, - task::{self, JoinHandle}, -}; -use tracing::Instrument; +use std::{collections::HashMap, sync::Arc}; /// A [`StoreManager`] which delegates to other `StoreManager`s based on the store label. pub struct DelegatingStoreManager { @@ -45,334 +34,3 @@ impl StoreManager for DelegatingStoreManager { None } } - -/// Wrap each `Store` produced by the inner `StoreManager` in an asynchronous, -/// write-behind cache. -/// -/// This serves two purposes: -/// -/// - Improve performance with slow and/or distant stores -/// -/// - Provide a relaxed consistency guarantee vs. what a fully synchronous store -/// provides -/// -/// The latter is intended to prevent guests from coming to rely on the -/// synchronous consistency model of an existing implementation which may later -/// be replaced with one providing a more relaxed, asynchronous (i.e. -/// "eventual") consistency model. See also and -/// . -/// -/// This implementation provides a "read-your-writes", asynchronous consistency -/// model such that values are immediately available for reading as soon as they -/// are written as long as the read(s) hit the same cache as the write(s). -/// Reads and writes through separate caches (e.g. separate guest instances or -/// separately-opened references to the same store within a single instance) are -/// _not_ guaranteed to be consistent; not only is cross-cache consistency -/// subject to scheduling and/or networking delays, a given tuple is never -/// refreshed from the backing store once added to a cache since this -/// implementation is intended for use only by short-lived guest instances. -/// -/// Note that, because writes are asynchronous and return immediately, -/// durability is _not_ guaranteed. I/O errors may occur asynchronously after -/// the write operation has returned control to the guest, which may result in -/// the write being lost without the guest knowing. In the future, a separate -/// `write-durable` function could be added to key-value.wit to provide either -/// synchronous or asynchronous feedback on durability for guests which need it. -pub struct CachingStoreManager { - capacity: NonZeroUsize, - inner: T, -} - -const DEFAULT_CACHE_SIZE: usize = 256; - -impl CachingStoreManager { - pub fn new(inner: T) -> Self { - Self::new_with_capacity(NonZeroUsize::new(DEFAULT_CACHE_SIZE).unwrap(), inner) - } - - pub fn new_with_capacity(capacity: NonZeroUsize, inner: T) -> Self { - Self { capacity, inner } - } -} - -#[async_trait] -impl StoreManager for CachingStoreManager { - async fn get(&self, name: &str) -> Result, Error> { - Ok(Arc::new(CachingStore { - inner: self.inner.get(name).await?, - state: Arc::new(AsyncMutex::new(CachingStoreState { - cache: LruCache::new(self.capacity), - previous_task: None, - })), - })) - } - - fn is_defined(&self, store_name: &str) -> bool { - self.inner.is_defined(store_name) - } - - fn summary(&self, store_name: &str) -> Option { - self.inner.summary(store_name) - } -} - -struct CachingStoreState { - cache: LruCache>>, - previous_task: Option>>, -} - -impl CachingStoreState { - /// Wrap the specified task in an outer task which waits for `self.previous_task` before proceeding, and spawn - /// the result. This ensures that write order is preserved. - fn spawn(&mut self, task: impl Future> + Send + 'static) { - let previous_task = self.previous_task.take(); - let task = async move { - if let Some(previous_task) = previous_task { - previous_task - .await - .map_err(|e| Error::Other(format!("{e:?}")))?? - } - - task.await - }; - self.previous_task = Some(task::spawn(task.in_current_span())) - } - - async fn flush(&mut self) -> Result<(), Error> { - if let Some(previous_task) = self.previous_task.take() { - previous_task - .await - .map_err(|e| Error::Other(format!("{e:?}")))?? - } - - Ok(()) - } -} - -struct CachingStore { - inner: Arc, - state: Arc>, -} - -#[async_trait] -impl Store for CachingStore { - async fn after_open(&self) -> Result<(), Error> { - self.inner.after_open().await - } - - async fn get(&self, key: &str) -> Result>, Error> { - // Retrieve the specified value from the cache, lazily populating the cache as necessary. - - let mut state = self.state.lock().await; - - if let Some(value) = state.cache.get(key).cloned() { - return Ok(value); - } - - // Flush any outstanding writes prior to reading from store. This is necessary because we need to - // guarantee the guest will read its own writes even if entries have been popped off the end of the LRU - // cache prior to their corresponding writes reaching the backing store. - state.flush().await?; - - let value = self.inner.get(key).await?; - - state.cache.put(key.to_owned(), value.clone()); - - Ok(value) - } - - async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error> { - // Update the cache and spawn a task to update the backing store asynchronously. - - let mut state = self.state.lock().await; - - state.cache.put(key.to_owned(), Some(value.to_owned())); - - let inner = self.inner.clone(); - let key = key.to_owned(); - let value = value.to_owned(); - state.spawn(async move { inner.set(&key, &value).await }); - - Ok(()) - } - - async fn delete(&self, key: &str) -> Result<(), Error> { - // Update the cache and spawn a task to update the backing store asynchronously. - - let mut state = self.state.lock().await; - - state.cache.put(key.to_owned(), None); - - let inner = self.inner.clone(); - let key = key.to_owned(); - state.spawn(async move { inner.delete(&key).await }); - - Ok(()) - } - - async fn exists(&self, key: &str) -> Result { - Ok(self.get(key).await?.is_some()) - } - - async fn get_keys(&self) -> Result, Error> { - // Get the keys from the backing store, remove any which are `None` in the cache, and add any which are - // `Some` in the cache, returning the result. - // - // Note that we don't bother caching the result, since we expect this function won't be called more than - // once for a given store in normal usage, and maintaining consistency would be complicated. - - let mut state = self.state.lock().await; - - // Flush any outstanding writes first in case entries have been popped off the end of the LRU cache prior - // to their corresponding writes reaching the backing store. - state.flush().await?; - - Ok(self - .inner - .get_keys() - .await? - .into_iter() - .filter(|k| { - state - .cache - .peek(k) - .map(|v| v.as_ref().is_some()) - .unwrap_or(true) - }) - .chain( - state - .cache - .iter() - .filter_map(|(k, v)| v.as_ref().map(|_| k.to_owned())), - ) - .collect::>() - .into_iter() - .collect()) - } - - async fn get_many( - &self, - keys: Vec, - ) -> anyhow::Result>)>, Error> { - let mut state = self.state.lock().await; - - // Flush any outstanding writes first in case entries have been popped off the end of the LRU cache prior - // to their corresponding writes reaching the backing store. - state.flush().await?; - - let mut found: Vec<(String, Option>)> = Vec::new(); - let mut not_found: Vec = Vec::new(); - for key in keys { - match state.cache.get(key.as_str()) { - Some(Some(value)) => found.push((key, Some(value.clone()))), - _ => not_found.push(key), - } - } - - if !not_found.is_empty() { - let keys_and_values = self.inner.get_many(not_found).await?; - for (key, value) in keys_and_values { - found.push((key.clone(), value.clone())); - state.cache.put(key, value); - } - } - - Ok(found) - } - - async fn set_many(&self, key_values: Vec<(String, Vec)>) -> anyhow::Result<(), Error> { - let mut state = self.state.lock().await; - - for (key, value) in key_values.clone() { - state.cache.put(key, Some(value)); - } - - self.inner.set_many(key_values).await - } - - async fn delete_many(&self, keys: Vec) -> anyhow::Result<(), Error> { - let mut state = self.state.lock().await; - - for key in keys.clone() { - state.cache.put(key, None); - } - - self.inner.delete_many(keys).await - } - - async fn increment(&self, key: String, delta: i64) -> anyhow::Result { - let mut state = self.state.lock().await; - let counter = self.inner.increment(key.clone(), delta).await?; - state - .cache - .put(key, Some(i64::to_le_bytes(counter).to_vec())); - Ok(counter) - } - - async fn new_compare_and_swap( - &self, - bucket_rep: u32, - key: &str, - ) -> anyhow::Result, Error> { - let inner = self.inner.new_compare_and_swap(bucket_rep, key).await?; - Ok(Arc::new(CompareAndSwap { - bucket_rep, - state: self.state.clone(), - key: key.to_string(), - inner_cas: inner, - })) - } -} - -struct CompareAndSwap { - bucket_rep: u32, - key: String, - state: Arc>, - inner_cas: Arc, -} - -#[async_trait] -impl Cas for CompareAndSwap { - async fn current(&self) -> anyhow::Result>, Error> { - let mut state = self.state.lock().await; - state.flush().await?; - let res = self.inner_cas.current().await; - match res.clone() { - Ok(value) => { - state.cache.put(self.key.clone(), value.clone()); - state.flush().await?; - Ok(value) - } - Err(err) => Err(err), - }?; - res - } - - async fn swap(&self, value: Vec) -> anyhow::Result<(), SwapError> { - let mut state = self.state.lock().await; - state - .flush() - .await - .map_err(|_e| SwapError::Other("failed flushing".to_string()))?; - let res = self.inner_cas.swap(value.clone()).await; - match res { - Ok(()) => { - state.cache.put(self.key.clone(), Some(value)); - state - .flush() - .await - .map_err(|_e| SwapError::Other("failed flushing".to_string()))?; - Ok(()) - } - Err(err) => Err(err), - } - } - - async fn bucket_rep(&self) -> u32 { - self.bucket_rep - } - - async fn key(&self) -> String { - self.key.clone() - } -}