Skip to content

Commit

Permalink
feat(base): Make ObservableMap::stream works on `wasm32-unknown-unk…
Browse files Browse the repository at this point in the history
…nown`.

This patch updates `eyeball-im` and `eyeball-im-util` to integrate
jplatte/eyeball#63. With this new feature, we
can have a single implementation of `ObservableMap` (instead of 2: one
for all targets, one for `wasm32-u-u`). It makes it possible to get
`Client::rooms_stream` available on all targets now.
  • Loading branch information
Hywan committed Nov 13, 2024
1 parent a920c3f commit af84c79
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 191 deletions.
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions crates/matrix-sdk-base/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ use std::{
};

use eyeball::{SharedObservable, Subscriber};
#[cfg(not(target_arch = "wasm32"))]
use eyeball_im::{Vector, VectorDiff};
#[cfg(not(target_arch = "wasm32"))]
use futures_util::Stream;
#[cfg(feature = "e2e-encryption")]
use matrix_sdk_crypto::{
Expand Down Expand Up @@ -236,7 +234,6 @@ impl BaseClient {

/// Get a stream of all the rooms changes, in addition to the existing
/// rooms.
#[cfg(not(target_arch = "wasm32"))]
pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>>) {
self.store.rooms_stream()
}
Expand Down
3 changes: 0 additions & 3 deletions crates/matrix-sdk-base/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ use std::{
sync::{Arc, RwLock as StdRwLock},
};

#[cfg(not(target_arch = "wasm32"))]
use eyeball_im::{Vector, VectorDiff};
#[cfg(not(target_arch = "wasm32"))]
use futures_util::Stream;
use once_cell::sync::OnceCell;

Expand Down Expand Up @@ -267,7 +265,6 @@ impl Store {

/// Get a stream of all the rooms changes, in addition to the existing
/// rooms.
#[cfg(not(target_arch = "wasm32"))]
pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>>) {
self.rooms.read().unwrap().stream()
}
Expand Down
288 changes: 106 additions & 182 deletions crates/matrix-sdk-base/src/store/observable_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,212 +14,137 @@

//! An [`ObservableMap`] implementation.
#[cfg(not(target_arch = "wasm32"))]
mod impl_non_wasm32 {
use std::{borrow::Borrow, collections::HashMap, hash::Hash};

use eyeball_im::{ObservableVector, Vector, VectorDiff};
use futures_util::Stream;

/// An observable map.
///
/// This is an “observable map” naive implementation. Just like regular
/// hashmap, we have a redirection from a key to a position, and from a
/// position to a value. The (key, position) tuples are stored in an
/// [`HashMap`]. The (position, value) tuples are stored in an
/// [`ObservableVector`]. The (key, position) tuple is only provided for
/// fast _reading_ implementations, like `Self::get` and
/// `Self::get_or_create`. The (position, value) tuples are observable,
/// this is what interests us the most here.
///
/// Why not implementing a new `ObservableMap` type in `eyeball-im` instead
/// of this custom implementation? Because we want to continue providing
/// `VectorDiff` when observing the changes, so that the rest of the API in
/// the Matrix Rust SDK aren't broken. Indeed, an `ObservableMap` must
/// produce `MapDiff`, which would be quite different.
/// Plus, we would like to re-use all our existing code, test, stream
/// adapters and so on.
///
/// This is a trade-off. This implementation is simple enough for the
/// moment, and basically does the job.
#[derive(Debug)]
pub(crate) struct ObservableMap<K, V>
where
V: Clone + Send + Sync + 'static,
{
/// The (key, position) tuples.
mapping: HashMap<K, usize>,
use std::{borrow::Borrow, collections::HashMap, hash::Hash};

use eyeball_im::{ObservableVector, Vector, VectorDiff};
use futures_util::Stream;

/// An observable map.
///
/// This is an “observable map” naive implementation. Just like regular
/// hashmap, we have a redirection from a key to a position, and from a
/// position to a value. The (key, position) tuples are stored in an
/// [`HashMap`]. The (position, value) tuples are stored in an
/// [`ObservableVector`]. The (key, position) tuple is only provided for
/// fast _reading_ implementations, like `Self::get` and
/// `Self::get_or_create`. The (position, value) tuples are observable,
/// this is what interests us the most here.
///
/// Why not implementing a new `ObservableMap` type in `eyeball-im` instead
/// of this custom implementation? Because we want to continue providing
/// `VectorDiff` when observing the changes, so that the rest of the API in
/// the Matrix Rust SDK aren't broken. Indeed, an `ObservableMap` must
/// produce `MapDiff`, which would be quite different.
/// Plus, we would like to re-use all our existing code, test, stream
/// adapters and so on.
///
/// This is a trade-off. This implementation is simple enough for the
/// moment, and basically does the job.
#[derive(Debug)]
pub(crate) struct ObservableMap<K, V>
where
V: Clone + 'static,
{
/// The (key, position) tuples.
mapping: HashMap<K, usize>,

/// The values where the indices are the `position` part of
/// `Self::mapping`.
values: ObservableVector<V>,
}

/// The values where the indices are the `position` part of
/// `Self::mapping`.
values: ObservableVector<V>,
impl<K, V> ObservableMap<K, V>
where
K: Hash + Eq,
V: Clone + 'static,
{
/// Create a new `Self`.
pub(crate) fn new() -> Self {
Self { mapping: HashMap::new(), values: ObservableVector::new() }
}

impl<K, V> ObservableMap<K, V>
where
K: Hash + Eq,
V: Clone + Send + Sync + 'static,
{
/// Create a new `Self`.
pub(crate) fn new() -> Self {
Self { mapping: HashMap::new(), values: ObservableVector::new() }
}

/// Insert a new `V` in the collection.
///
/// If the `V` value already exists, it will be updated to the new one.
pub(crate) fn insert(&mut self, key: K, value: V) -> usize {
match self.mapping.get(&key) {
Some(position) => {
self.values.set(*position, value);

*position
}
None => {
let position = self.values.len();

self.values.push_back(value);
self.mapping.insert(key, position);
/// Insert a new `V` in the collection.
///
/// If the `V` value already exists, it will be updated to the new one.
pub(crate) fn insert(&mut self, key: K, value: V) -> usize {
match self.mapping.get(&key) {
Some(position) => {
self.values.set(*position, value);

position
}
*position
}
}
None => {
let position = self.values.len();

/// Reading one `V` value based on their ID, if it exists.
pub(crate) fn get<L>(&self, key: &L) -> Option<&V>
where
K: Borrow<L>,
L: Hash + Eq + ?Sized,
{
self.mapping.get(key).and_then(|position| self.values.get(*position))
}
self.values.push_back(value);
self.mapping.insert(key, position);

/// Reading one `V` value based on their ID, or create a new one (by
/// using `default`).
pub(crate) fn get_or_create<L, F>(&mut self, key: &L, default: F) -> &V
where
K: Borrow<L>,
L: Hash + Eq + ?Sized + ToOwned<Owned = K>,
F: FnOnce() -> V,
{
let position = match self.mapping.get(key) {
Some(position) => *position,
None => {
let value = default();
let position = self.values.len();

self.values.push_back(value);
self.mapping.insert(key.to_owned(), position);

position
}
};

self.values
.get(position)
.expect("Value should be present or has just been inserted, but it's missing")
}

/// Return an iterator over the existing values.
pub(crate) fn iter(&self) -> impl Iterator<Item = &V> {
self.values.iter()
}

/// Get a [`Stream`] of the values.
pub(crate) fn stream(&self) -> (Vector<V>, impl Stream<Item = Vec<VectorDiff<V>>>) {
self.values.subscribe().into_values_and_batched_stream()
}

/// Remove a `V` value based on their ID, if it exists.
///
/// Returns the removed value.
pub(crate) fn remove<L>(&mut self, key: &L) -> Option<V>
where
K: Borrow<L>,
L: Hash + Eq + ?Sized,
{
let position = self.mapping.remove(key)?;
Some(self.values.remove(position))
position
}
}
}
}

#[cfg(target_arch = "wasm32")]
mod impl_wasm32 {
use std::{borrow::Borrow, collections::BTreeMap, hash::Hash};

/// An observable map for Wasm. It's a simple wrapper around `BTreeMap`.
#[derive(Debug)]
pub(crate) struct ObservableMap<K, V>(BTreeMap<K, V>)
/// Reading one `V` value based on their ID, if it exists.
pub(crate) fn get<L>(&self, key: &L) -> Option<&V>
where
V: Clone + 'static;
K: Borrow<L>,
L: Hash + Eq + ?Sized,
{
self.mapping.get(key).and_then(|position| self.values.get(*position))
}

impl<K, V> ObservableMap<K, V>
/// Reading one `V` value based on their ID, or create a new one (by
/// using `default`).
pub(crate) fn get_or_create<L, F>(&mut self, key: &L, default: F) -> &V
where
K: Hash + Eq + Ord,
V: Clone + 'static,
K: Borrow<L>,
L: Hash + Eq + ?Sized + ToOwned<Owned = K>,
F: FnOnce() -> V,
{
/// Create a new `Self`.
pub(crate) fn new() -> Self {
Self(BTreeMap::new())
}
let position = match self.mapping.get(key) {
Some(position) => *position,
None => {
let value = default();
let position = self.values.len();

/// Insert a new `V` in the collection.
///
/// If the `V` value already exists, it will be updated to the new one.
pub(crate) fn insert(&mut self, key: K, value: V) {
self.0.insert(key, value);
}
self.values.push_back(value);
self.mapping.insert(key.to_owned(), position);

/// Reading one `V` value based on their ID, if it exists.
pub(crate) fn get<L>(&self, key: &L) -> Option<&V>
where
K: Borrow<L>,
L: Hash + Eq + Ord + ?Sized,
{
self.0.get(key)
}
position
}
};

/// Reading one `V` value based on their ID, or create a new one (by
/// using `default`).
pub(crate) fn get_or_create<L, F>(&mut self, key: &L, default: F) -> &V
where
K: Borrow<L>,
L: Hash + Eq + ?Sized + ToOwned<Owned = K>,
F: FnOnce() -> V,
{
self.0.entry(key.to_owned()).or_insert_with(default)
}
self.values
.get(position)
.expect("Value should be present or has just been inserted, but it's missing")
}

/// Return an iterator over the existing values.
pub(crate) fn iter(&self) -> impl Iterator<Item = &V> {
self.0.values()
}
/// Return an iterator over the existing values.
pub(crate) fn iter(&self) -> impl Iterator<Item = &V> {
self.values.iter()
}

/// Remove a `V` value based on their ID, if it exists.
///
/// Returns the removed value.
pub(crate) fn remove<L>(&mut self, key: &L) -> Option<V>
where
K: Borrow<L>,
L: Hash + Eq + Ord + ?Sized,
{
self.0.remove(key)
}
/// Get a [`Stream`] of the values.
pub(crate) fn stream(&self) -> (Vector<V>, impl Stream<Item = Vec<VectorDiff<V>>>) {
self.values.subscribe().into_values_and_batched_stream()
}
}

#[cfg(not(target_arch = "wasm32"))]
pub(crate) use impl_non_wasm32::ObservableMap;
#[cfg(target_arch = "wasm32")]
pub(crate) use impl_wasm32::ObservableMap;
/// Remove a `V` value based on their ID, if it exists.
///
/// Returns the removed value.
pub(crate) fn remove<L>(&mut self, key: &L) -> Option<V>
where
K: Borrow<L>,
L: Hash + Eq + ?Sized,
{
let position = self.mapping.remove(key)?;
Some(self.values.remove(position))
}
}

#[cfg(test)]
mod tests {
#[cfg(not(target_arch = "wasm32"))]
use eyeball_im::VectorDiff;
#[cfg(not(target_arch = "wasm32"))]
use stream_assert::{assert_closed, assert_next_eq, assert_pending};

use super::ObservableMap;
Expand Down Expand Up @@ -314,7 +239,6 @@ mod tests {
);
}

#[cfg(not(target_arch = "wasm32"))]
#[test]
fn test_stream() {
let mut map = ObservableMap::<char, char>::new();
Expand Down

0 comments on commit af84c79

Please sign in to comment.