From 59e2ebe0f916a399bfc2d4646de422a52c2c3961 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Fri, 28 Nov 2025 10:49:10 +0100 Subject: [PATCH 1/3] chore: automatically map uid to client id --- collab/src/core/collab.rs | 31 +++++++- collab/src/core/user_data.rs | 139 ++++++++++++++++++++++++++++------- collab/src/entity/mod.rs | 2 +- 3 files changed, 141 insertions(+), 31 deletions(-) diff --git a/collab/src/core/collab.rs b/collab/src/core/collab.rs index f44e1989f..edba93d6d 100644 --- a/collab/src/core/collab.rs +++ b/collab/src/core/collab.rs @@ -29,7 +29,7 @@ use yrs::{ use crate::entity::{EncodedCollab, EncoderVersion}; use crate::error::CollabError; -use crate::preclude::JsonValue; +use crate::preclude::{JsonValue, PermanentUserData}; use uuid::Uuid; pub const DATA_SECTION: &str = "data"; @@ -87,17 +87,25 @@ pub struct CollabContext { /// The current transaction that is being executed. current_txn: Option>, version: Option, + /// Structure managing list of editors. + editors: Option, } unsafe impl Send for CollabContext {} unsafe impl Sync for CollabContext {} impl CollabContext { - fn new(origin: CollabOrigin, awareness: Awareness, version: Option) -> Self { + fn new( + origin: CollabOrigin, + awareness: Awareness, + version: Option, + user_data: Option, + ) -> Self { CollabContext { origin, awareness, version, + editors: user_data, undo_manager: None, current_txn: None, } @@ -111,6 +119,10 @@ impl CollabContext { &mut self.version } + pub fn user_data(&self) -> Option<&PermanentUserData> { + self.editors.as_ref() + } + pub fn with_txn(&mut self, f: F) -> Result where F: FnOnce(&mut TransactionMut) -> T, @@ -304,6 +316,7 @@ pub struct CollabOptions { pub data_source: Option, pub client_id: ClientID, pub skip_gc: bool, + pub remember_user: bool, } impl Display for CollabOptions { @@ -324,6 +337,7 @@ impl CollabOptions { data_source: None, client_id, skip_gc: false, + remember_user: false, } } @@ -332,6 +346,11 @@ impl CollabOptions { self } + pub fn with_remember_user(mut self, remember_user: bool) -> Self { + self.remember_user = remember_user; + self + } + pub fn with_gc(mut self, gc: bool) -> Self { self.skip_gc = !gc; self @@ -356,12 +375,18 @@ impl Collab { let plugins = Plugins::new(vec![]); let state = Arc::new(State::new(&object_id.to_string())); let awareness = Awareness::new(doc); + let user_data = if options.remember_user { + Some(PermanentUserData::new(awareness.doc(), origin.clone())) + } else { + None + }; let mut this = Self { object_id, context: CollabContext::new( origin, awareness, options.data_source.as_ref().and_then(DataSource::version), + user_data, ), state, data, @@ -431,7 +456,7 @@ impl Collab { object_id, // if not the fact that we need origin here, it would be // not necessary either - context: CollabContext::new(origin, awareness, None), + context: CollabContext::new(origin, awareness, None, None), state, data, meta, diff --git a/collab/src/core/user_data.rs b/collab/src/core/user_data.rs index c6ae8d8a4..9e0f87ffb 100644 --- a/collab/src/core/user_data.rs +++ b/collab/src/core/user_data.rs @@ -1,4 +1,5 @@ use crate::core::origin::CollabOrigin; +use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::ops::Range; use std::sync::Arc; @@ -16,7 +17,7 @@ pub type UserDescription = Arc; #[derive(Default)] struct State { - clients: HashMap, + clients: HashMap, dss: HashMap, action_queue: Vec, current_users: Vec, @@ -54,6 +55,13 @@ impl PermanentUserData { } }); + let client_id = doc.client_id(); + let uid: Option> = if let CollabOrigin::Client(c) = &local_origin { + Some(c.uid.to_string().into()) + } else { + None + }; + let local_origin: Origin = local_origin.into(); let s = state.clone(); let users_clone = users.clone(); @@ -72,18 +80,34 @@ impl PermanentUserData { } // if transaction was local add delete set to current user's ds array - let ds = txn.delete_set(); - if txn.origin() == Some(&local_origin) && !ds.is_empty() { - let encoded_ds = ds.encode_v1(); - let lock = s.read(); - for user_description in &lock.current_users { - let user: MapRef = users_clone - .get(txn, user_description) - .unwrap() - .cast() - .unwrap(); - let yds: ArrayRef = user.get(txn, "ds").unwrap().cast().unwrap(); - yds.push_back(txn, encoded_ds.clone()); + let has_deletes = !txn.delete_set().is_empty(); + let has_inserts = txn.after_state() != txn.before_state(); + if txn.origin() == Some(&local_origin) && has_deletes || has_inserts { + // the transaction originates locally and it made some writes + + if let Some(uid) = &uid { + // check if we already defined the current user + let mut lock = s.write(); + if let Entry::Vacant(e) = lock.clients.entry(client_id) { + e.insert(uid.clone()); + drop(lock); + Self::map_user_internal(users_clone.clone(), s.clone(), txn, client_id, uid.clone()); + } + } + + if has_deletes { + // store new deletes info in permanent user data part of the document + let encoded_ds = txn.delete_set().encode_v1(); + let lock = s.read(); + for user_description in &lock.current_users { + let user: MapRef = users_clone + .get(txn, user_description) + .unwrap() + .cast() + .unwrap(); + let yds: ArrayRef = user.get(txn, "ds").unwrap().cast().unwrap(); + yds.push_back(txn, encoded_ds.clone()); + } } } }) @@ -211,10 +235,25 @@ impl PermanentUserData { client_id: ClientID, description: S, ) { - let user_description = description.into(); - let user = match self.users.get(tx, &user_description) { + Self::map_user_internal( + self.users.clone(), + self.state.clone(), + tx, + client_id, + description.into(), + ); + } + + fn map_user_internal( + users: MapRef, + state: Arc>, + tx: &mut TransactionMut, + client_id: ClientID, + user_description: UserDescription, + ) { + let user = match users.get(tx, &user_description) { Some(Out::YMap(value)) => value, - _ => self.users.insert( + _ => users.insert( tx, user_description.clone(), MapPrelim::from([ @@ -227,22 +266,24 @@ impl PermanentUserData { ids.push_back(tx, Any::BigInt(client_id as i64)); // check if current user was overridden - let state = self.state.clone(); - let users = self.users.clone(); let description_clone = user_description.clone(); - self.users.observe_with("pud", move |txn, _| { - let old_user = users.get(txn, &description_clone); + let weak_state = Arc::downgrade(&state); + let users_clone = users.clone(); + users.observe_with("pud", move |txn, _| { + let old_user = users_clone.get(txn, &description_clone); if old_user != Some(Out::YMap(user.clone())) { // user was overridden - let mut lock = state.write(); - lock - .action_queue - .push(Action::UserOverridden(description_clone.clone())); + if let Some(state) = weak_state.upgrade() { + let mut lock = state.write(); + lock + .action_queue + .push(Action::UserOverridden(description_clone.clone())); + } } }); // keep track of current user - self.state.write().current_users.push(user_description); + state.write().current_users.push(user_description); } /// Get user description by client id. @@ -358,9 +399,12 @@ enum Action { #[cfg(test)] mod test { + use crate::core::collab::{CollabOptions, default_client_id}; use crate::core::origin::{CollabClient, CollabOrigin}; - use crate::preclude::{Doc, PermanentUserData}; - use std::collections::HashSet; + use crate::document::{BlockType, Document, DocumentData, DocumentMeta, generate_id}; + use crate::preclude::{Collab, Doc, PermanentUserData}; + use std::collections::{HashMap, HashSet}; + use uuid::Uuid; use yrs::updates::decoder::Decode; use yrs::{ReadTxn, Snapshot, StateVector, Text, Transact, Update}; @@ -465,4 +509,45 @@ mod test { } } } + + #[test] + fn collab_fills_user_data_automatically() { + let uid = 1; + let client_id = default_client_id(); + let oid = Uuid::new_v4(); + let origin = CollabOrigin::Client(CollabClient::new(uid, "device-1")); + let options = CollabOptions::new(oid, client_id).with_remember_user(true); + let collab = Collab::new_with_options(origin, options).unwrap(); + let page_id = generate_id(); + let mut document = Document::create_with_data( + collab, + DocumentData { + page_id: page_id.clone(), + blocks: HashMap::from([( + page_id.clone(), + crate::document::Block { + id: page_id.clone(), + ty: BlockType::Page.to_string(), + parent: "".to_string(), + children: "".to_string(), + external_id: None, + external_type: None, + data: Default::default(), + }, + )]), + meta: DocumentMeta { + children_map: Default::default(), + text_map: Some(HashMap::default()), + }, + }, + ) + .unwrap(); + document.initialize(); + + let users = document.user_data().unwrap(); + assert_eq!( + users.user_by_client_id(client_id).unwrap(), + uid.to_string().into() + ); + } } diff --git a/collab/src/entity/mod.rs b/collab/src/entity/mod.rs index 80277f1ac..b3c609444 100644 --- a/collab/src/entity/mod.rs +++ b/collab/src/entity/mod.rs @@ -20,7 +20,7 @@ pub struct EncodedCollab { pub doc_state: Bytes, #[serde(default)] pub version: EncoderVersion, - #[serde(default)] + #[serde(default, skip_serializing_if = "Option::is_none")] pub collab_version: Option, } From e7f90f40dbc5e9c7bd563062586d9116f7bbb8eb Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Fri, 28 Nov 2025 10:56:18 +0100 Subject: [PATCH 2/3] chore: add test to confirm that user data is not written automatically for readonly transactions --- collab/src/core/user_data.rs | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/collab/src/core/user_data.rs b/collab/src/core/user_data.rs index 9e0f87ffb..70782fad2 100644 --- a/collab/src/core/user_data.rs +++ b/collab/src/core/user_data.rs @@ -405,8 +405,9 @@ mod test { use crate::preclude::{Collab, Doc, PermanentUserData}; use std::collections::{HashMap, HashSet}; use uuid::Uuid; + use yrs::types::ToJson; use yrs::updates::decoder::Decode; - use yrs::{ReadTxn, Snapshot, StateVector, Text, Transact, Update}; + use yrs::{ReadTxn, Snapshot, StateVector, Text, Transact, Update, any}; #[test] fn add_or_remove_user_mappings() { @@ -550,4 +551,21 @@ mod test { uid.to_string().into() ); } + + #[test] + fn collab_doesnt_fill_user_data_automatically_if_no_data_was_written() { + let uid = 1; + let client_id = default_client_id(); + let oid = Uuid::new_v4(); + let origin = CollabOrigin::Client(CollabClient::new(uid, "device-1")); + let options = CollabOptions::new(oid, client_id).with_remember_user(true); + let mut collab = Collab::new_with_options(origin, options).unwrap(); + + // we use mutable transaction but we don't write anything + let json = collab.data.to_json(&collab.context.transact_mut()); + assert_eq!(json, any!({})); + + let pud = collab.user_data().unwrap(); + assert!(pud.user_by_client_id(client_id).is_none()); + } } From b625c26311e2886508c6693b61cea85b801944d4 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Mon, 1 Dec 2025 07:29:29 +0100 Subject: [PATCH 3/3] chore: improved editors between implementation --- Cargo.lock | 4 +- Cargo.toml | 2 +- collab/src/core/user_data.rs | 103 ++++++++++++++--------------------- 3 files changed, 45 insertions(+), 64 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bcc4f8038..9ea191e5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2525,9 +2525,9 @@ dependencies = [ [[package]] name = "yrs" -version = "0.24.0" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f904a99678a852d7cbc6958c94087f739c10cfb19642635951219c525a5fdb89" +checksum = "f6893d39bc55d014e4a1d0e71d06c0c41590d5cdeac35c126be44998bc320cff" dependencies = [ "arc-swap", "async-lock", diff --git a/Cargo.toml b/Cargo.toml index f2388f9f4..60797426a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["collab"] resolver = "2" [workspace.dependencies] -yrs = { version = "0.24", features = ["sync"] } +yrs = { version = "0.25", features = ["sync"] } anyhow = "1.0.94" thiserror = "1.0.39" serde = { version = "1.0.157", features = ["derive"] } diff --git a/collab/src/core/user_data.rs b/collab/src/core/user_data.rs index 70782fad2..e1250881f 100644 --- a/collab/src/core/user_data.rs +++ b/collab/src/core/user_data.rs @@ -1,7 +1,6 @@ use crate::core::origin::CollabOrigin; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; -use std::ops::Range; use std::sync::Arc; use yrs::block::ClientID; use yrs::types::Change; @@ -306,28 +305,29 @@ impl PermanentUserData { /// Return set of users that made edits between two snapshots. pub fn editors_between(&self, from: &Snapshot, to: &Snapshot) -> HashSet { let mut result = HashSet::new(); + let lock = self.state.read(); // get client ids that have changes between from and to snapshots for (client_id, &to_clock) in to.state_map.iter() { let from_clock = from.state_map.get(client_id); if to_clock > from_clock { - if let Some(user) = self.user_by_client_id(*client_id) { - result.insert(user); + if let Some(user) = lock.clients.get(client_id) { + result.insert(user.clone()); } } } // also check deleted ids - //TODO: this is not very efficient, consider optimizing if needed - let ds_diff = diff_delete_sets(&from.delete_set, &to.delete_set); - for (client_id, ranges) in ds_diff.iter() { - for range in ranges.iter() { - for clock in range.start..range.end { - let id = ID::new(*client_id, clock); - if let Some(user) = self.user_by_deleted_id(&id) { - result.insert(user); - } - } + for (user, ds) in lock.dss.iter() { + if result.contains(user) { + continue; // we already have that user + } + // pick the shared part between current user and `to` delete set + let intersect = ds.intersect(&to.delete_set); + if !intersect.is_empty() && !intersect.subset_of(&from.delete_set) { + // if the shared part doesn't fully belong to `from` delete set, it means that there were + // some deletes made by this user that fit into the window between from-to + result.insert(user.clone()); } } @@ -335,60 +335,41 @@ impl PermanentUserData { } } -fn diff_delete_sets(old_ds: &DeleteSet, new_ds: &DeleteSet) -> DeleteSet { - let mut diff_ds = DeleteSet::new(); - - for (client_id, new_range) in new_ds.iter() { - let old_range = old_ds - .range(client_id) - .unwrap_or(&Default::default()) - .clone(); - let mut old_iter = old_range.iter(); - - for new_range in new_range.iter() { - if let Some(old_range) = old_iter.next() { - if intersects(new_range, old_range) { - // overlapping ranges, need to check for new deletions - if new_range.start < old_range.start { - // new deletion before old range - diff_ds.insert( - ID::new(*client_id, new_range.start), - old_range.start - new_range.start, - ); - } - if new_range.end > old_range.end { - // new deletion after old range - diff_ds.insert( - ID::new(*client_id, old_range.end), - new_range.end - old_range.end, - ); +trait DeleteSetExt { + fn intersect(&self, other: &Self) -> Self; + fn subset_of(&self, other: &Self) -> bool; +} + +impl DeleteSetExt for DeleteSet { + fn intersect(&self, other: &Self) -> Self { + let mut result = DeleteSet::new(); + for (client, ranges) in self.iter() { + if let Some(other_ranges) = other.range(client) { + for a in ranges.iter() { + for b in other_ranges.iter() { + if a.start <= b.end && a.end >= b.start { + // there's an intersection + let start = a.start.max(b.start); + let len = a.end.min(b.end) - start; + result.insert(ID::new(*client, start), len); + } } - } else if new_range.end <= old_range.start { - // new deletion before old range - diff_ds.insert( - ID::new(*client_id, new_range.start), - new_range.end - new_range.start, - ); - } else if new_range.start >= old_range.end { - // new deletion after old range, continue to next old range - continue; } - } else { - // all remaining new_ranges are new deletions - diff_ds.insert( - ID::new(*client_id, new_range.start), - new_range.end - new_range.start, - ); } } + result } - diff_ds -} - -#[inline] -fn intersects(x: &Range, y: &Range) -> bool { - x.start < y.end && y.start < x.end + fn subset_of(&self, other: &Self) -> bool { + for (client_id, ranges) in self.iter() { + match other.range(client_id) { + None => return false, + Some(other_ranges) if !ranges.subset_of(other_ranges) => return false, + _ => { /* continue */ }, + } + } + true + } } #[derive(Debug)]