Skip to content

Commit 2726c53

Browse files
make blockstore put a command action
this way it can be called from a component via event messages
1 parent e369606 commit 2726c53

File tree

14 files changed

+504
-141
lines changed

14 files changed

+504
-141
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/peerpiper-browser/src/bindgen.rs

Lines changed: 34 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -3,32 +3,20 @@
33
//! as Wurbo).
44
//! That wat the WIT interface can simply import the same API and use it to communicate with the
55
6-
pub mod blockstore_idb;
7-
8-
use futures::SinkExt;
96
use futures::{channel::mpsc, StreamExt};
107
use gloo_utils::format::JsValueSerdeExt;
11-
pub use peerpiper_core::events::PeerPiperCommand;
8+
pub use peerpiper_core::events::{PeerPiperCommand, SystemCommand};
9+
use peerpiper_core::Commander;
1210
use std::sync::Mutex;
1311
use std::sync::OnceLock;
1412
use wasm_bindgen::prelude::*;
1513
use wasm_bindgen_futures::spawn_local;
16-
use wnfs::common::CODEC_RAW;
17-
use wnfs::common::{BlockStore, Storable};
18-
use wnfs_unixfs_file::{
19-
builder::{Config, FileBuilder},
20-
chunker::{Chunker, ChunkerConfig},
21-
};
14+
15+
use crate::blockstore::BrowserBlockStore;
2216

2317
const MAX_CHANNELS: usize = 16;
2418

25-
/// This wraps command_sender in a Mutex so we can call it from multiple wasm_bindgen functions
26-
/// without worrying about thread safety.
27-
/// Make the OnceLock inner value mutable so we can call get_mut() on it.
28-
/// This makes our wasm API clean enough to interface directly to WIT world.
29-
static COMMAND_SENDER: OnceLock<Mutex<mpsc::Sender<PeerPiperCommand>>> = OnceLock::new();
30-
/// BrowserBlockStore
31-
static BSTORE: OnceLock<Mutex<blockstore_idb::BrowserBlockStore>> = OnceLock::new();
19+
static COMMANDER: OnceLock<Mutex<Commander<BrowserBlockStore>>> = OnceLock::new();
3220

3321
cfg_if::cfg_if! {
3422
if #[cfg(feature = "logging")] {
@@ -60,11 +48,13 @@ cfg_if::cfg_if! {
6048
pub async fn start() -> Result<(), JsValue> {
6149
init_log();
6250

63-
// Set up a blockstore in the browser
64-
let blockstore = blockstore_idb::BrowserBlockStore::new("peerpiper");
65-
blockstore.open().await?;
66-
67-
BSTORE.get_or_init(|| Mutex::new(blockstore));
51+
let blockstore = BrowserBlockStore::new("peerpiper");
52+
blockstore
53+
.open()
54+
.await
55+
.map_err(|err| JsValue::from_str(&format!("Error opening blockstore: {:?}", err)))?;
56+
let commander = Commander::new(blockstore);
57+
COMMANDER.get_or_init(|| Mutex::new(commander));
6858

6959
Ok(())
7060
}
@@ -76,8 +66,13 @@ pub async fn connect(libp2p_endpoint: &str, on_event: &js_sys::Function) -> Resu
7666
// command_sender will be used by other wasm_bindgen functions to send commands to the network
7767
// so we will need to wrap it in a Mutex or something to make it thread safe.
7868
let (command_sender, command_receiver) = mpsc::channel(8);
79-
// move command_sender into COMMAND_SENDER
80-
COMMAND_SENDER.get_or_init(|| Mutex::new(command_sender));
69+
// move command_sender into COMMANDER
70+
COMMANDER
71+
.get()
72+
.ok_or_else(|| JsError::new("Commander not initialized. Did `start()` complete?"))?
73+
.lock()
74+
.map_err(|err| JsError::new(&format!("Failed to lock commander: {}", err)))?
75+
.with_network(command_sender);
8176

8277
let endpoint = libp2p_endpoint.to_string().clone();
8378

@@ -99,49 +94,11 @@ pub async fn connect(libp2p_endpoint: &str, on_event: &js_sys::Function) -> Resu
9994
Ok(())
10095
}
10196

102-
/// Uses COMMAND_SENDER (if initialized) to send a command to the network.
103-
/// Else, returns an error.
104-
pub async fn send_command(command: PeerPiperCommand) -> Result<(), JsError> {
105-
tracing::trace!("Sending command");
106-
let command_sender = COMMAND_SENDER.get().ok_or_else(|| {
107-
JsError::new(
108-
"Command sender not initialized. Did you call `connect()` first to establish a connection?",
109-
)
110-
})?;
111-
112-
command_sender
113-
.lock()
114-
.map_err(|err| JsError::new(&format!("Failed to lock command sender: {}", err)))?
115-
.send(command)
116-
.await
117-
.map_err(|err| JsError::new(&format!("Failed to send command: {}", err)))?;
118-
Ok(())
119-
}
120-
121-
/// Publish to this topic String these bytes
122-
#[wasm_bindgen]
123-
pub async fn publish(topic: String, data: Vec<u8>) -> Result<(), JsError> {
124-
send_command(PeerPiperCommand::Publish { topic, data }).await
125-
}
126-
127-
/// Subscribe to this topic String
128-
#[wasm_bindgen]
129-
pub async fn subscribe(topic: String) -> Result<(), JsError> {
130-
send_command(PeerPiperCommand::Subscribe { topic }).await
131-
}
132-
133-
/// Unsubscribe from this topic String
134-
/// This will stop receiving messages from this topic.
135-
#[wasm_bindgen]
136-
pub async fn unsubscribe(topic: String) -> Result<(), JsError> {
137-
send_command(PeerPiperCommand::Unsubscribe { topic }).await
138-
}
139-
140-
/// Takes any json string and tries to deserialize it into a PeerPiperCommand,
141-
/// then sends it to the network.
97+
/// Takes any json string from a Guest Component, and tries to deserialize it into a PeerPiperCommand,
98+
/// then sends it to the COMMANDER who routes it to either the network or the system depending on the command.
14299
/// If it fails, returns an error.
143100
#[wasm_bindgen]
144-
pub async fn command(json: &str) -> Result<(), JsError> {
101+
pub async fn command(json: &str) -> Result<JsValue, JsError> {
145102
let example_publish = PeerPiperCommand::Publish {
146103
topic: "example".to_string(),
147104
data: vec![1, 2, 3],
@@ -153,36 +110,21 @@ pub async fn command(json: &str) -> Result<(), JsError> {
153110
serde_json::to_string(&example_publish).unwrap()
154111
))
155112
})?;
156-
send_command(command).await
157-
}
158-
159-
/// Allows the user to save a file to the system (IndexedDB. TODO: Memory too?)
160-
#[wasm_bindgen]
161-
pub async fn save(data: Vec<u8>) -> Result<(), JsError> {
162-
tracing::info!("Saving to blockstore bytes {:?}", data.len());
163113

164-
let blockstore = BSTORE
114+
let maybe_result = COMMANDER
165115
.get()
166-
.ok_or_else(|| {
167-
JsError::new(
168-
"Blockstore not initialized. Did you call `start()` first to establish a connection?",
169-
)
170-
})?
116+
.ok_or_else(|| JsError::new("Commander not initialized. Did `start()` complete?"))?
171117
.lock()
172-
.unwrap();
173-
174-
// The chunker needs to be here because it is specific to IndexedDB having a max size of 256 *
175-
// 1024 bytes. In another system (like a desktop disk) it could be chunked differently.
176-
let root_cid = FileBuilder::new()
177-
.content_bytes(data.clone())
178-
.fixed_chunker(256 * 1024)
179-
.build()
180-
.map_err(|err| JsError::new(&format!("Failed to build file: {}", err)))?
181-
.store(&blockstore.clone())
118+
.map_err(|err| JsError::new(&format!("Failed to lock commander: {}", err)))?
119+
.order(command)
182120
.await
183-
.map_err(|err| JsError::new(&format!("Failed to store file: {}", err)))?;
184-
185-
tracing::info!("Saved file to blockstore with CID: {:?}", root_cid);
121+
.map_err(|err| JsError::new(&format!("Failed to send command: {}", err)))?;
186122

187-
Ok(())
123+
// convert the ReturnValues enum to a JsValue (Cid as String, Vec<u8> as Uint8Array, or null)
124+
let js_val = match maybe_result {
125+
peerpiper_core::ReturnValues::Data(data) => JsValue::from_serde(&data)?,
126+
peerpiper_core::ReturnValues::ID(cid) => JsValue::from_str(&cid.to_string()),
127+
peerpiper_core::ReturnValues::None => JsValue::null(),
128+
};
129+
Ok(js_val)
188130
}
File renamed without changes.

crates/peerpiper-browser/src/bindgen/blockstore_idb.rs renamed to crates/peerpiper-browser/src/blockstore/blockstore_idb.rs

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -139,38 +139,3 @@ impl BrowserBlockStore {
139139
.ok_or(JsValue::from_str("Error converting to bool"))?)
140140
}
141141
}
142-
143-
impl WNFSBlockStore for BrowserBlockStore {
144-
async fn get_block(&self, cid: &Cid) -> Result<Bytes, BlockStoreError> {
145-
let key = CID::parse(&cid.to_string());
146-
let js_uint8array = self
147-
.get_idb(&key)
148-
.await
149-
.map_err(|_| BlockStoreError::CIDNotFound(*cid))?;
150-
let bytes: Bytes = js_uint8array.to_vec().into();
151-
Ok(bytes)
152-
}
153-
154-
async fn put_block_keyed(
155-
&self,
156-
cid: Cid,
157-
bytes: impl Into<Bytes> + CondSend,
158-
) -> Result<(), BlockStoreError> {
159-
let key = CID::parse(&cid.to_string());
160-
161-
let bytes: Bytes = bytes.into();
162-
163-
let val = js_sys::Uint8Array::from(bytes.as_ref());
164-
let _cid = self.put_idb(&key, val).await;
165-
166-
Ok(())
167-
}
168-
169-
async fn has_block(&self, cid: &Cid) -> Result<bool, BlockStoreError> {
170-
let key = CID::parse(&cid.to_string());
171-
Ok(self
172-
.has_in_idb(&key)
173-
.await
174-
.map_err(|_| BlockStoreError::CIDNotFound(*cid))?)
175-
}
176-
}
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/// WebNative File System Blockstore module
2+
mod wnfs_blockstore;
3+
4+
use crate::error;
5+
use js_sys::Uint8Array;
6+
use parking_lot::Mutex;
7+
use peerpiper_core::SystemCommandHandler;
8+
use send_wrapper::SendWrapper;
9+
use tokio::io::AsyncReadExt as _;
10+
use wasm_bindgen::prelude::*;
11+
use wnfs::common::utils::Arc;
12+
13+
use wnfs::common::libipld::Cid;
14+
use wnfs_unixfs_file::unixfs::UnixFsFile;
15+
16+
#[wasm_bindgen(module = "/src/blockstore/blockstore-idb.js")]
17+
extern "C" {
18+
19+
/// CID Class
20+
pub type CID;
21+
22+
/// CID Constructor
23+
#[wasm_bindgen(constructor)]
24+
pub fn new(version: u64, codec: u64, multihash: &Uint8Array, bytes: &Uint8Array) -> CID;
25+
26+
/// static parse(string)
27+
/// Parse a CID from a string
28+
#[wasm_bindgen(static_method_of = CID)]
29+
pub fn parse(string: &str) -> CID;
30+
31+
/// CID method toString()
32+
#[wasm_bindgen(method, js_name = toString)]
33+
pub fn to_string(this: &CID) -> String;
34+
35+
#[derive(Debug)]
36+
pub type IDBBlockstore;
37+
38+
#[wasm_bindgen(constructor)]
39+
pub fn new(location: &str) -> IDBBlockstore;
40+
41+
/// Open method, open(): Promise<void>
42+
#[wasm_bindgen(method)]
43+
pub fn open(this: &IDBBlockstore) -> js_sys::Promise;
44+
45+
/// close(): Promise<void>
46+
#[wasm_bindgen(method)]
47+
pub fn close(this: &IDBBlockstore) -> js_sys::Promise;
48+
49+
/// delete(key): Promise<void>
50+
#[wasm_bindgen(method)]
51+
pub fn delete(this: &IDBBlockstore, key: &str) -> js_sys::Promise;
52+
53+
/// deleteMany(source, options?): AwaitIterable<CID<unknown, number, number, Version>>
54+
#[wasm_bindgen(method)]
55+
pub fn delete_many(this: &IDBBlockstore, source: &JsValue) -> js_sys::Promise;
56+
57+
/// destroy(): Promise<void>
58+
#[wasm_bindgen(method)]
59+
pub fn destroy(this: &IDBBlockstore) -> js_sys::Promise;
60+
61+
/// get(key): Promise<Uint8Array>
62+
#[wasm_bindgen(method)]
63+
pub fn get(this: &IDBBlockstore, key: &CID) -> js_sys::Promise;
64+
65+
/// getAll(options?): AwaitIterable<Pair>
66+
#[wasm_bindgen(method)]
67+
pub fn get_all(this: &IDBBlockstore) -> js_sys::Promise;
68+
69+
/// getMany(source, options?): AwaitIterable<Pair>
70+
/// Retrieve values for the passed keys
71+
///
72+
/// Parameters
73+
/// source: AwaitIterable<CID<unknown, number, number, Version>>
74+
/// Optional options: AbortOptions
75+
/// Returns AwaitIterable<Pair>
76+
#[wasm_bindgen(method)]
77+
pub fn get_many(this: &IDBBlockstore, source: &JsValue) -> js_sys::Promise;
78+
79+
/// has(key): Promise<boolean>
80+
#[wasm_bindgen(method)]
81+
pub fn has(this: &IDBBlockstore, key: &CID) -> js_sys::Promise;
82+
83+
/// put(key, val: Uint8Array): Promise<CID<unknown, number, number, Version>>
84+
#[wasm_bindgen(method)]
85+
pub fn put(this: &IDBBlockstore, key: &CID, val: Uint8Array) -> js_sys::Promise;
86+
87+
/// putMany(source, options?): AwaitIterable<CID<unknown, number, number, Version>>
88+
#[wasm_bindgen(method)]
89+
pub fn put_many(this: &IDBBlockstore, source: &JsValue) -> js_sys::Promise;
90+
91+
}
92+
93+
impl SystemCommandHandler for BrowserBlockStore {
94+
type Error = crate::error::Error;
95+
async fn put(&self, data: Vec<u8>) -> Result<Cid, Self::Error> {
96+
let root_cid = wnfs_blockstore::put_chunks(self.clone(), data)
97+
.await
98+
.map_err(|err| error::Error::Anyhow(err))?;
99+
Ok(root_cid)
100+
}
101+
102+
async fn get(&self, cid: String) -> Result<Vec<u8>, Self::Error> {
103+
let cid = Cid::try_from(cid).map_err(|err| {
104+
error::Error::Anyhow(anyhow::Error::msg(format!("Failed to parse CID: {}", err)))
105+
})?;
106+
let file = UnixFsFile::load(&cid, &self).await.map_err(|err| {
107+
error::Error::Anyhow(anyhow::Error::msg(format!("Failed to load file: {}", err)))
108+
})?;
109+
110+
let mut buffer = Vec::new();
111+
let mut reader = file.into_content_reader(&self, None).map_err(|err| {
112+
error::Error::Anyhow(anyhow::Error::msg(format!(
113+
"Failed to create content reader: {}",
114+
err
115+
)))
116+
})?;
117+
reader.read_to_end(&mut buffer).await.map_err(|err| {
118+
error::Error::Anyhow(anyhow::Error::msg(format!("Failed to read file: {}", err)))
119+
})?;
120+
121+
Ok(buffer)
122+
}
123+
}
124+
125+
#[derive(Debug, Clone)]
126+
pub struct BrowserBlockStore {
127+
pub(crate) idb: SendWrapper<Arc<Mutex<IDBBlockstore>>>,
128+
}
129+
130+
impl BrowserBlockStore {
131+
/// Creates a new in-memory block store.
132+
pub fn new(namespace: &str) -> Self {
133+
let blockstore = IDBBlockstore::new(namespace);
134+
Self {
135+
idb: SendWrapper::new(Arc::new(Mutex::new(blockstore))),
136+
}
137+
}
138+
139+
/// Opens the blockstore.
140+
pub async fn open(&self) -> Result<(), JsValue> {
141+
let promise = self.idb.lock().open();
142+
let _ = wasm_bindgen_futures::JsFuture::from(promise)
143+
.await
144+
.map_err(|_| JsValue::from_str("Error opening blockstore"));
145+
Ok(())
146+
}
147+
148+
pub async fn get_idb(&self, cid: &CID) -> Result<Uint8Array, JsValue> {
149+
let promise = self.idb.lock().get(cid);
150+
let js_val = wasm_bindgen_futures::JsFuture::from(promise)
151+
.await
152+
.map_err(|_| JsValue::from_str("Error getting block"))?;
153+
Ok(js_val.into())
154+
}
155+
156+
/// Puts bytes into the blockstore.
157+
pub async fn put_idb(&self, cid: &CID, bytes: Uint8Array) -> Result<CID, JsValue> {
158+
let promise = self.idb.lock().put(cid, bytes);
159+
let js_val = wasm_bindgen_futures::JsFuture::from(promise)
160+
.await
161+
.map_err(|_| JsValue::from_str("Error putting block"))?;
162+
Ok(js_val.into())
163+
}
164+
165+
/// Checks if the blockstore has a block with the given CID.
166+
pub async fn has_in_idb(&self, cid: &CID) -> Result<bool, JsValue> {
167+
let promise = self.idb.lock().has(cid);
168+
let js_val = wasm_bindgen_futures::JsFuture::from(promise)
169+
.await
170+
.map_err(|_| JsValue::from_str("Error checking for block"))?;
171+
Ok(js_val
172+
.as_bool()
173+
.ok_or(JsValue::from_str("Error converting to bool"))?)
174+
}
175+
}

0 commit comments

Comments
 (0)