Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/cachekit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ workers = ["dep:worker", "dep:js-sys", "dep:getrandom"]
encryption = ["cachekit-core/encryption"]
l1 = ["dep:moka"]
macros = ["dep:cachekit-macros"]
# Opt into ?Send / Rc on native targets (for tokio::task::LocalSet, single-threaded
# runtimes, etc.). Same code paths that wasm32 already uses.
unsync = []

[dependencies]
cachekit-core = { version = "0.2", features = ["messagepack"] }
Expand Down
3 changes: 2 additions & 1 deletion crates/cachekit/src/backend/cachekitio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ pub(crate) fn from_http_status_sanitized(status: u16, body: &[u8], api_key: &str
// ── Backend impl ──────────────────────────────────────────────────────────────

#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
#[cfg_attr(not(feature = "unsync"), async_trait)]
#[cfg_attr(feature = "unsync", async_trait(?Send))]
impl Backend for CachekitIO {
async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, BackendError> {
let req = self.with_standard_headers(
Expand Down
3 changes: 2 additions & 1 deletion crates/cachekit/src/backend/cachekitio_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ struct LockAcquireResponse {
}

#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
#[cfg_attr(not(feature = "unsync"), async_trait)]
#[cfg_attr(feature = "unsync", async_trait(?Send))]
impl LockableBackend for CachekitIO {
async fn acquire_lock(
&self,
Expand Down
3 changes: 2 additions & 1 deletion crates/cachekit/src/backend/cachekitio_ttl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ struct RefreshTtlRequest {
}

#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
#[cfg_attr(not(feature = "unsync"), async_trait)]
#[cfg_attr(feature = "unsync", async_trait(?Send))]
impl TtlInspectable for CachekitIO {
async fn ttl(&self, key: &str) -> Result<Option<Duration>, BackendError> {
let url = format!(
Expand Down
38 changes: 29 additions & 9 deletions crates/cachekit/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ pub struct HealthStatus {

/// Async cache backend abstraction.
///
/// Implementors must be `Send + Sync` on native targets.
/// On `wasm32` targets `Send` is relaxed (`?Send`) because the Workers runtime
/// is single-threaded and `reqwest`/`worker::Fetch` futures are `!Send`.
#[cfg(not(target_arch = "wasm32"))]
/// Implementors must be `Send + Sync` on native targets (unless the `unsync`
/// feature is enabled). On `wasm32` targets or with `unsync`, `Send` is relaxed
/// (`?Send`) because the runtime is single-threaded.
#[cfg(not(any(target_arch = "wasm32", feature = "unsync")))]
#[async_trait]
pub trait Backend: Send + Sync {
/// Retrieve the raw bytes stored under `key`, or `None` if absent.
Expand All @@ -51,25 +51,38 @@ pub trait Backend: Send + Sync {
async fn health(&self) -> Result<HealthStatus, BackendError>;
}

#[cfg(target_arch = "wasm32")]
/// Async cache backend abstraction (`?Send` variant).
///
/// Active when compiling for `wasm32` or with the `unsync` feature.
/// Identical API to the `Send + Sync` variant but without thread-safety bounds.
#[cfg(any(target_arch = "wasm32", feature = "unsync"))]
#[async_trait(?Send)]
pub trait Backend {
/// Retrieve the raw bytes stored under `key`, or `None` if absent.
async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, BackendError>;

/// Store `value` under `key`, optionally expiring after `ttl`.
async fn set(
&self,
key: &str,
value: Vec<u8>,
ttl: Option<Duration>,
) -> Result<(), BackendError>;

/// Remove `key` and return `true` if it existed.
async fn delete(&self, key: &str) -> Result<bool, BackendError>;

/// Return `true` if `key` exists without fetching the value.
async fn exists(&self, key: &str) -> Result<bool, BackendError>;

/// Return health/status information for this backend.
async fn health(&self) -> Result<HealthStatus, BackendError>;
}

// ── TtlInspectable ───────────────────────────────────────────────────────────

/// Optional extension for backends that can report the remaining TTL of a key.
#[cfg(not(target_arch = "wasm32"))]
#[cfg(not(any(target_arch = "wasm32", feature = "unsync")))]
#[async_trait]
pub trait TtlInspectable: Backend {
/// Return the remaining TTL for `key`, or `None` if the key does not exist
Expand All @@ -84,11 +97,15 @@ pub trait TtlInspectable: Backend {
}
}

#[cfg(target_arch = "wasm32")]
/// Optional extension for backends that can report the remaining TTL of a key (`?Send` variant).
#[cfg(any(target_arch = "wasm32", feature = "unsync"))]
#[async_trait(?Send)]
pub trait TtlInspectable: Backend {
/// Return the remaining TTL for `key`, or `None` if the key does not exist
/// or has no expiry.
async fn ttl(&self, key: &str) -> Result<Option<Duration>, BackendError>;

/// Refresh the TTL on an existing key. Default: not supported.
async fn refresh_ttl(&self, _key: &str, _ttl: Duration) -> Result<bool, BackendError> {
Err(BackendError::permanent(
"refresh_ttl not supported by this backend",
Expand All @@ -99,7 +116,7 @@ pub trait TtlInspectable: Backend {
// ── LockableBackend ─────────────────────────────────────────────────────────

/// Optional extension for backends that support distributed locking.
#[cfg(not(target_arch = "wasm32"))]
#[cfg(not(any(target_arch = "wasm32", feature = "unsync")))]
#[async_trait]
pub trait LockableBackend: Backend {
/// Acquire a distributed lock. Returns lock_id if acquired, None if contested.
Expand All @@ -112,14 +129,17 @@ pub trait LockableBackend: Backend {
async fn release_lock(&self, key: &str, lock_id: &str) -> Result<bool, BackendError>;
}

#[cfg(target_arch = "wasm32")]
/// Optional extension for backends that support distributed locking (`?Send` variant).
#[cfg(any(target_arch = "wasm32", feature = "unsync"))]
#[async_trait(?Send)]
pub trait LockableBackend: Backend {
/// Acquire a distributed lock. Returns lock_id if acquired, None if contested.
async fn acquire_lock(
&self,
key: &str,
timeout_ms: u64,
) -> Result<Option<String>, BackendError>;
/// Release a distributed lock. Returns true if released.
async fn release_lock(&self, key: &str, lock_id: &str) -> Result<bool, BackendError>;
}

Expand Down
6 changes: 4 additions & 2 deletions crates/cachekit/src/backend/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ impl RedisBackend {
// ── Backend impl ──────────────────────────────────────────────────────────────

#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
#[cfg_attr(not(feature = "unsync"), async_trait)]
#[cfg_attr(feature = "unsync", async_trait(?Send))]
impl Backend for RedisBackend {
async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, BackendError> {
let result: Option<bytes::Bytes> = self.client.get(key).await.map_err(redis_err)?;
Expand Down Expand Up @@ -127,7 +128,8 @@ impl Backend for RedisBackend {
// ── TtlInspectable impl ───────────────────────────────────────────────────────

#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
#[cfg_attr(not(feature = "unsync"), async_trait)]
#[cfg_attr(feature = "unsync", async_trait(?Send))]
impl TtlInspectable for RedisBackend {
async fn ttl(&self, key: &str) -> Result<Option<Duration>, BackendError> {
// Redis TTL return values:
Expand Down
38 changes: 25 additions & 13 deletions crates/cachekit/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,35 @@ use crate::serializer;

// ── SharedBackend type alias ──────────────────────────────────────────────────

/// Thread-safe reference to a heap-allocated backend.
/// Reference-counted pointer to a heap-allocated backend.
///
/// On native targets we require `Send + Sync` for use across threads.
/// On `wasm32` the Workers runtime is single-threaded so `Rc` is sufficient.
#[cfg(not(target_arch = "wasm32"))]
/// On native targets (without `unsync`) we require `Send + Sync` via `Arc`.
/// On `wasm32` or with the `unsync` feature, `Rc` is used instead — the runtime
/// is single-threaded so `Send` bounds are unnecessary.
#[cfg(not(any(target_arch = "wasm32", feature = "unsync")))]
pub type SharedBackend = std::sync::Arc<dyn Backend>;

#[cfg(target_arch = "wasm32")]
/// Reference-counted pointer to a heap-allocated backend (`?Send` variant).
#[cfg(any(target_arch = "wasm32", feature = "unsync"))]
pub type SharedBackend = std::rc::Rc<dyn Backend>;

// ── SharedEncryption type alias ──────────────────────────────────────────────

/// Thread-safe reference to the encryption layer.
/// Reference-counted pointer to the encryption layer.
///
/// On native targets `Arc` is used (requires `Sync`).
/// On `wasm32` the Workers runtime is single-threaded so `Rc` is sufficient
/// and avoids the `!Sync` problem caused by `Cell<u64>` inside cachekit-core's
/// nonce counter.
#[cfg(all(feature = "encryption", not(target_arch = "wasm32")))]
/// On native targets (without `unsync`) `Arc` is used (requires `Sync`).
/// On `wasm32` or with `unsync`, `Rc` is used — avoids the `!Sync` problem
/// caused by `Cell<u64>` inside cachekit-core's nonce counter.
#[cfg(all(
feature = "encryption",
not(any(target_arch = "wasm32", feature = "unsync"))
))]
type SharedEncryption = std::sync::Arc<crate::encryption::EncryptionLayer>;

#[cfg(all(feature = "encryption", target_arch = "wasm32"))]
#[cfg(all(
feature = "encryption",
any(target_arch = "wasm32", feature = "unsync")
))]
type SharedEncryption = std::rc::Rc<crate::encryption::EncryptionLayer>;

// ── Key validation ────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -105,8 +112,13 @@ impl CacheKit {
.build()
.map_err(|e| CachekitError::Config(e.to_string()))?;

#[cfg(not(feature = "unsync"))]
let shared: SharedBackend = std::sync::Arc::new(backend);
#[cfg(feature = "unsync")]
let shared: SharedBackend = std::rc::Rc::new(backend);

let mut builder = CacheKitBuilder::default()
.backend(std::sync::Arc::new(backend))
.backend(shared)
.default_ttl(config.default_ttl)
.max_payload_bytes(config.max_payload_bytes)
.l1_capacity(config.l1_capacity);
Expand Down
4 changes: 2 additions & 2 deletions crates/cachekit/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ pub struct BackendError {
/// Human-readable description.
pub message: String,
/// The underlying error that caused this backend error, if any.
#[cfg(not(target_arch = "wasm32"))]
#[cfg(not(any(target_arch = "wasm32", feature = "unsync")))]
#[source]
pub source: Option<Box<dyn std::error::Error + Send + Sync>>,
/// The underlying error that caused this backend error, if any.
#[cfg(target_arch = "wasm32")]
#[cfg(any(target_arch = "wasm32", feature = "unsync"))]
#[source]
pub source: Option<Box<dyn std::error::Error>>,
}
Expand Down
6 changes: 3 additions & 3 deletions crates/cachekit/tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ struct User {

fn mock_client() -> CacheKit {
CacheKit::builder()
.backend(MockBackend::new())
.backend(MockBackend::shared())
.default_ttl(Duration::from_secs(60))
.no_l1()
.build()
Expand Down Expand Up @@ -94,7 +94,7 @@ async fn client_delete() {
#[tokio::test]
async fn client_payload_too_large() {
let client = CacheKit::builder()
.backend(MockBackend::new())
.backend(MockBackend::shared())
.max_payload_bytes(10)
.no_l1()
.build()
Expand Down Expand Up @@ -161,7 +161,7 @@ async fn client_key_validation() {

// Boundary case: exactly 1024 bytes should be accepted
let client2 = CacheKit::builder()
.backend(MockBackend::new())
.backend(MockBackend::shared())
.no_l1()
.build()
.expect("client builds");
Expand Down
38 changes: 33 additions & 5 deletions crates/cachekit/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,56 @@
#![allow(dead_code)]

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use tokio::sync::Mutex;

use cachekit::backend::{Backend, HealthStatus};
use cachekit::client::SharedBackend;
use cachekit::error::BackendError;

/// In-memory mock backend backed by a `Mutex<HashMap>` for use in tests.
///
/// The internal store uses `Arc<Mutex>` so cloning the backend shares state,
/// regardless of whether the outer wrapper is `Arc` or `Rc`.
#[derive(Debug, Default, Clone)]
pub struct MockBackend {
pub store: Arc<Mutex<HashMap<String, Vec<u8>>>>,
pub store: std::sync::Arc<Mutex<HashMap<String, Vec<u8>>>>,
}

impl MockBackend {
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
/// Create a new mock and immediately wrap it as a [`SharedBackend`].
///
/// Use [`MockBackend::new_with_handle`] when tests need to inspect the
/// store directly (e.g., to verify ciphertext).
pub fn shared() -> SharedBackend {
let mock = Self::default();
Self::into_shared(mock)
}

/// Create a new mock, returning both a [`SharedBackend`] and a clone
/// for direct store inspection.
pub fn new_with_handle() -> (SharedBackend, Self) {
let mock = Self::default();
let handle = mock.clone();
(Self::into_shared(mock), handle)
}

fn into_shared(mock: Self) -> SharedBackend {
#[cfg(not(any(target_arch = "wasm32", feature = "unsync")))]
{
std::sync::Arc::new(mock)
}
#[cfg(any(target_arch = "wasm32", feature = "unsync"))]
{
std::rc::Rc::new(mock)
}
}
}

#[async_trait]
#[cfg_attr(not(any(target_arch = "wasm32", feature = "unsync")), async_trait)]
#[cfg_attr(any(target_arch = "wasm32", feature = "unsync"), async_trait(?Send))]
impl Backend for MockBackend {
async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, BackendError> {
Ok(self.store.lock().await.get(key).cloned())
Expand Down
Loading