From 12d9609dd662e95036bc57871fc35bb5176ef4e4 Mon Sep 17 00:00:00 2001 From: Daniel Salinas Date: Thu, 12 Dec 2024 11:18:52 -0500 Subject: [PATCH] Add created_at time persisted send_queue system --- bindings/matrix-sdk-ffi/src/timeline/mod.rs | 2 + .../src/store/integration_tests.rs | 107 ++++++++++++++++-- .../matrix-sdk-base/src/store/memory_store.rs | 23 ++-- .../matrix-sdk-base/src/store/send_queue.rs | 10 +- crates/matrix-sdk-base/src/store/traits.rs | 12 +- .../src/state_store/mod.rs | 21 +++- .../010_send_queue_enqueue_time.sql | 6 + crates/matrix-sdk-sqlite/src/state_store.rs | 92 +++++++++++---- .../src/timeline/event_handler.rs | 1 + .../src/timeline/event_item/local.rs | 4 +- .../src/timeline/event_item/mod.rs | 5 + .../matrix-sdk-ui/src/timeline/tests/echo.rs | 2 +- .../tests/integration/timeline/echo.rs | 4 +- .../tests/integration/timeline/queue.rs | 4 +- crates/matrix-sdk/src/send_queue.rs | 49 +++++++- crates/matrix-sdk/src/send_queue/upload.rs | 15 ++- 16 files changed, 292 insertions(+), 65 deletions(-) create mode 100644 crates/matrix-sdk-sqlite/migrations/state_store/010_send_queue_enqueue_time.sql diff --git a/bindings/matrix-sdk-ffi/src/timeline/mod.rs b/bindings/matrix-sdk-ffi/src/timeline/mod.rs index 89f226dc6a0..1d466040b68 100644 --- a/bindings/matrix-sdk-ffi/src/timeline/mod.rs +++ b/bindings/matrix-sdk-ffi/src/timeline/mod.rs @@ -1084,6 +1084,7 @@ pub struct EventTimelineItem { timestamp: u64, reactions: Vec, local_send_state: Option, + local_created_at: Option, read_receipts: HashMap, origin: Option, can_be_replied_to: bool, @@ -1121,6 +1122,7 @@ impl From for EventTimelineItem { timestamp: item.timestamp().0.into(), reactions, local_send_state: item.send_state().map(|s| s.into()), + local_created_at: item.local_created_at().map(|t| t.0.into()), read_receipts, origin: item.origin(), can_be_replied_to: item.can_be_replied_to(), diff --git a/crates/matrix-sdk-base/src/store/integration_tests.rs b/crates/matrix-sdk-base/src/store/integration_tests.rs index c6b3b5a7c3c..94eb18221ca 100644 --- a/crates/matrix-sdk-base/src/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/store/integration_tests.rs @@ -29,7 +29,8 @@ use ruma::{ }, owned_event_id, owned_mxc_uri, room_id, serde::Raw, - uint, user_id, EventId, OwnedEventId, OwnedUserId, RoomId, TransactionId, UserId, + uint, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, RoomId, + TransactionId, UserId, }; use serde_json::{json, value::Value as JsonValue}; @@ -980,13 +981,21 @@ impl StateStoreIntegrationTests for DynStateStore { let ev = SerializableEventContent::new(&RoomMessageEventContent::text_plain("sup").into()) .unwrap(); - self.save_send_queue_request(room_id, txn.clone(), ev.into(), 0).await?; + self.save_send_queue_request( + room_id, + txn.clone(), + MilliSecondsSinceUnixEpoch::now(), + ev.into(), + 0, + ) + .await?; // Add a single dependent queue request. self.save_dependent_queued_request( room_id, &txn, ChildTransactionId::new(), + MilliSecondsSinceUnixEpoch::now(), DependentQueuedRequestKind::RedactEvent, ) .await?; @@ -1242,7 +1251,15 @@ impl StateStoreIntegrationTests for DynStateStore { let event0 = SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into()) .unwrap(); - self.save_send_queue_request(room_id, txn0.clone(), event0.into(), 0).await.unwrap(); + self.save_send_queue_request( + room_id, + txn0.clone(), + MilliSecondsSinceUnixEpoch::now(), + event0.into(), + 0, + ) + .await + .unwrap(); // Reading it will work. let pending = self.load_send_queue_requests(room_id).await.unwrap(); @@ -1266,7 +1283,15 @@ impl StateStoreIntegrationTests for DynStateStore { ) .unwrap(); - self.save_send_queue_request(room_id, txn, event.into(), 0).await.unwrap(); + self.save_send_queue_request( + room_id, + txn, + MilliSecondsSinceUnixEpoch::now(), + event.into(), + 0, + ) + .await + .unwrap(); } // Reading all the events should work. @@ -1364,7 +1389,15 @@ impl StateStoreIntegrationTests for DynStateStore { let event = SerializableEventContent::new(&RoomMessageEventContent::text_plain("room2").into()) .unwrap(); - self.save_send_queue_request(room_id2, txn.clone(), event.into(), 0).await.unwrap(); + self.save_send_queue_request( + room_id2, + txn.clone(), + MilliSecondsSinceUnixEpoch::now(), + event.into(), + 0, + ) + .await + .unwrap(); } // Add and remove one event for room3. @@ -1374,7 +1407,15 @@ impl StateStoreIntegrationTests for DynStateStore { let event = SerializableEventContent::new(&RoomMessageEventContent::text_plain("room3").into()) .unwrap(); - self.save_send_queue_request(room_id3, txn.clone(), event.into(), 0).await.unwrap(); + self.save_send_queue_request( + room_id3, + txn.clone(), + MilliSecondsSinceUnixEpoch::now(), + event.into(), + 0, + ) + .await + .unwrap(); self.remove_send_queue_request(room_id3, &txn).await.unwrap(); } @@ -1399,21 +1440,45 @@ impl StateStoreIntegrationTests for DynStateStore { let ev0 = SerializableEventContent::new(&RoomMessageEventContent::text_plain("low0").into()) .unwrap(); - self.save_send_queue_request(room_id, low0_txn.clone(), ev0.into(), 2).await.unwrap(); + self.save_send_queue_request( + room_id, + low0_txn.clone(), + MilliSecondsSinceUnixEpoch::now(), + ev0.into(), + 2, + ) + .await + .unwrap(); // Saving one request with higher priority should work. let high_txn = TransactionId::new(); let ev1 = SerializableEventContent::new(&RoomMessageEventContent::text_plain("high").into()) .unwrap(); - self.save_send_queue_request(room_id, high_txn.clone(), ev1.into(), 10).await.unwrap(); + self.save_send_queue_request( + room_id, + high_txn.clone(), + MilliSecondsSinceUnixEpoch::now(), + ev1.into(), + 10, + ) + .await + .unwrap(); // Saving another request with the low priority should work. let low1_txn = TransactionId::new(); let ev2 = SerializableEventContent::new(&RoomMessageEventContent::text_plain("low1").into()) .unwrap(); - self.save_send_queue_request(room_id, low1_txn.clone(), ev2.into(), 2).await.unwrap(); + self.save_send_queue_request( + room_id, + low1_txn.clone(), + MilliSecondsSinceUnixEpoch::now(), + ev2.into(), + 2, + ) + .await + .unwrap(); // The requests should be ordered from higher priority to lower, and when equal, // should use the insertion order instead. @@ -1453,7 +1518,15 @@ impl StateStoreIntegrationTests for DynStateStore { let event0 = SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into()) .unwrap(); - self.save_send_queue_request(room_id, txn0.clone(), event0.into(), 0).await.unwrap(); + self.save_send_queue_request( + room_id, + txn0.clone(), + MilliSecondsSinceUnixEpoch::now(), + event0.into(), + 0, + ) + .await + .unwrap(); // No dependents, to start with. assert!(self.load_dependent_queued_requests(room_id).await.unwrap().is_empty()); @@ -1464,6 +1537,7 @@ impl StateStoreIntegrationTests for DynStateStore { room_id, &txn0, child_txn.clone(), + MilliSecondsSinceUnixEpoch::now(), DependentQueuedRequestKind::RedactEvent, ) .await @@ -1515,12 +1589,21 @@ impl StateStoreIntegrationTests for DynStateStore { let event1 = SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into()) .unwrap(); - self.save_send_queue_request(room_id, txn1.clone(), event1.into(), 0).await.unwrap(); + self.save_send_queue_request( + room_id, + txn1.clone(), + MilliSecondsSinceUnixEpoch::now(), + event1.into(), + 0, + ) + .await + .unwrap(); self.save_dependent_queued_request( room_id, &txn0, ChildTransactionId::new(), + MilliSecondsSinceUnixEpoch::now(), DependentQueuedRequestKind::RedactEvent, ) .await @@ -1531,6 +1614,7 @@ impl StateStoreIntegrationTests for DynStateStore { room_id, &txn1, ChildTransactionId::new(), + MilliSecondsSinceUnixEpoch::now(), DependentQueuedRequestKind::EditEvent { new_content: SerializableEventContent::new( &RoomMessageEventContent::text_plain("edit").into(), @@ -1563,6 +1647,7 @@ impl StateStoreIntegrationTests for DynStateStore { room_id, &txn, child_txn.clone(), + MilliSecondsSinceUnixEpoch::now(), DependentQueuedRequestKind::RedactEvent, ) .await diff --git a/crates/matrix-sdk-base/src/store/memory_store.rs b/crates/matrix-sdk-base/src/store/memory_store.rs index abc5fca09b4..fa6f37e39c1 100644 --- a/crates/matrix-sdk-base/src/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/store/memory_store.rs @@ -30,8 +30,8 @@ use ruma::{ }, serde::Raw, time::Instant, - CanonicalJsonObject, EventId, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedTransactionId, - OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId, + CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, + OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId, }; use tracing::{debug, instrument, warn}; @@ -733,16 +733,19 @@ impl StateStore for MemoryStore { &self, room_id: &RoomId, transaction_id: OwnedTransactionId, + created_at: MilliSecondsSinceUnixEpoch, kind: QueuedRequestKind, priority: usize, ) -> Result<(), Self::Error> { - self.inner - .write() - .unwrap() - .send_queue_events - .entry(room_id.to_owned()) - .or_default() - .push(QueuedRequest { kind, transaction_id, error: None, priority }); + self.inner.write().unwrap().send_queue_events.entry(room_id.to_owned()).or_default().push( + QueuedRequest { + kind, + transaction_id, + error: None, + priority, + created_at: Some(created_at), + }, + ); Ok(()) } @@ -841,6 +844,7 @@ impl StateStore for MemoryStore { room: &RoomId, parent_transaction_id: &TransactionId, own_transaction_id: ChildTransactionId, + created_at: MilliSecondsSinceUnixEpoch, content: DependentQueuedRequestKind, ) -> Result<(), Self::Error> { self.inner @@ -854,6 +858,7 @@ impl StateStore for MemoryStore { parent_transaction_id: parent_transaction_id.to_owned(), own_transaction_id, parent_key: None, + created_at: Some(created_at), }); Ok(()) } diff --git a/crates/matrix-sdk-base/src/store/send_queue.rs b/crates/matrix-sdk-base/src/store/send_queue.rs index ece50344e9f..289def806e6 100644 --- a/crates/matrix-sdk-base/src/store/send_queue.rs +++ b/crates/matrix-sdk-base/src/store/send_queue.rs @@ -23,7 +23,8 @@ use ruma::{ AnyMessageLikeEventContent, EventContent as _, RawExt as _, }, serde::Raw, - OwnedDeviceId, OwnedEventId, OwnedTransactionId, OwnedUserId, TransactionId, UInt, + MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedEventId, OwnedTransactionId, OwnedUserId, + TransactionId, UInt, }; use serde::{Deserialize, Serialize}; @@ -131,6 +132,9 @@ pub struct QueuedRequest { /// The bigger the value, the higher the priority at which this request /// should be handled. pub priority: usize, + + /// The time that the request was original attempted. + pub created_at: Option, } impl QueuedRequest { @@ -371,6 +375,10 @@ pub struct DependentQueuedRequest { /// If the parent request has been sent, the parent's request identifier /// returned by the server once the local echo has been sent out. pub parent_key: Option, + + /// The time that the request was original attempted. + #[serde(skip_serializing_if = "Option::is_none")] + pub created_at: Option, } impl DependentQueuedRequest { diff --git a/crates/matrix-sdk-base/src/store/traits.rs b/crates/matrix-sdk-base/src/store/traits.rs index 6e34f4fe263..51b321bc8a2 100644 --- a/crates/matrix-sdk-base/src/store/traits.rs +++ b/crates/matrix-sdk-base/src/store/traits.rs @@ -35,8 +35,8 @@ use ruma::{ }, serde::Raw, time::SystemTime, - EventId, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, - TransactionId, UserId, + EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, OwnedRoomId, + OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId, }; use serde::{Deserialize, Serialize}; @@ -359,6 +359,7 @@ pub trait StateStore: AsyncTraitDeps { &self, room_id: &RoomId, transaction_id: OwnedTransactionId, + created_at: MilliSecondsSinceUnixEpoch, request: QueuedRequestKind, priority: usize, ) -> Result<(), Self::Error>; @@ -421,6 +422,7 @@ pub trait StateStore: AsyncTraitDeps { room_id: &RoomId, parent_txn_id: &TransactionId, own_txn_id: ChildTransactionId, + created_at: MilliSecondsSinceUnixEpoch, content: DependentQueuedRequestKind, ) -> Result<(), Self::Error>; @@ -657,11 +659,12 @@ impl StateStore for EraseStateStoreError { &self, room_id: &RoomId, transaction_id: OwnedTransactionId, + created_at: MilliSecondsSinceUnixEpoch, content: QueuedRequestKind, priority: usize, ) -> Result<(), Self::Error> { self.0 - .save_send_queue_request(room_id, transaction_id, content, priority) + .save_send_queue_request(room_id, transaction_id, created_at, content, priority) .await .map_err(Into::into) } @@ -711,10 +714,11 @@ impl StateStore for EraseStateStoreError { room_id: &RoomId, parent_txn_id: &TransactionId, own_txn_id: ChildTransactionId, + created_at: MilliSecondsSinceUnixEpoch, content: DependentQueuedRequestKind, ) -> Result<(), Self::Error> { self.0 - .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, content) + .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content) .await .map_err(Into::into) } diff --git a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs index b8ca7442b27..1714ea252eb 100644 --- a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs +++ b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs @@ -44,8 +44,8 @@ use ruma::{ GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, SyncStateEvent, }, serde::Raw, - CanonicalJsonObject, EventId, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedTransactionId, - OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId, + CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, + OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId, }; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tracing::{debug, warn}; @@ -439,6 +439,10 @@ struct PersistedQueuedRequest { priority: Option, + /// The time the original message was first attempted to be sent at. + #[serde(skip_serializing_if = "Option::is_none")] + created_at: Option, + // Migrated fields: keep these private, they're not used anymore elsewhere in the code base. /// Deprecated (from old format), now replaced with error field. is_wedged: Option, @@ -464,7 +468,13 @@ impl PersistedQueuedRequest { // By default, events without a priority have a priority of 0. let priority = self.priority.unwrap_or(0); - Some(QueuedRequest { kind, transaction_id: self.transaction_id, error, priority }) + Some(QueuedRequest { + kind, + transaction_id: self.transaction_id, + error, + priority, + created_at: self.created_at, + }) } } @@ -1358,6 +1368,7 @@ impl_state_store!({ &self, room_id: &RoomId, transaction_id: OwnedTransactionId, + created_at: MilliSecondsSinceUnixEpoch, kind: QueuedRequestKind, priority: usize, ) -> Result<()> { @@ -1379,7 +1390,6 @@ impl_state_store!({ || Ok(Vec::new()), |val| self.deserialize_value::>(&val), )?; - // Push the new request. prev.push(PersistedQueuedRequest { room_id: room_id.to_owned(), @@ -1389,6 +1399,7 @@ impl_state_store!({ is_wedged: None, event: None, priority: Some(priority), + created_at: Some(created_at), }); // Save the new vector into db. @@ -1558,6 +1569,7 @@ impl_state_store!({ room_id: &RoomId, parent_txn_id: &TransactionId, own_txn_id: ChildTransactionId, + created_at: MilliSecondsSinceUnixEpoch, content: DependentQueuedRequestKind, ) -> Result<()> { let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id); @@ -1584,6 +1596,7 @@ impl_state_store!({ parent_transaction_id: parent_txn_id.to_owned(), own_transaction_id: own_txn_id, parent_key: None, + created_at: Some(created_at), }); // Save the new vector into db. diff --git a/crates/matrix-sdk-sqlite/migrations/state_store/010_send_queue_enqueue_time.sql b/crates/matrix-sdk-sqlite/migrations/state_store/010_send_queue_enqueue_time.sql new file mode 100644 index 00000000000..7dcf3bf6d04 --- /dev/null +++ b/crates/matrix-sdk-sqlite/migrations/state_store/010_send_queue_enqueue_time.sql @@ -0,0 +1,6 @@ +-- Migration script to add the created_at column to the send_queue_events table +ALTER TABLE "send_queue_events" +ADD COLUMN "created_at" INTEGER DEFAULT NULL; + +ALTER TABLE "dependent_send_queue_events" +ADD COLUMN "created_at" INTEGER DEFAULT NULL; diff --git a/crates/matrix-sdk-sqlite/src/state_store.rs b/crates/matrix-sdk-sqlite/src/state_store.rs index 36ff843cc71..ba9f2b90999 100644 --- a/crates/matrix-sdk-sqlite/src/state_store.rs +++ b/crates/matrix-sdk-sqlite/src/state_store.rs @@ -32,8 +32,8 @@ use ruma::{ GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, }, serde::Raw, - CanonicalJsonObject, EventId, OwnedEventId, OwnedRoomId, OwnedTransactionId, OwnedUserId, - RoomId, RoomVersionId, TransactionId, UserId, + CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, + OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UInt, UserId, }; use rusqlite::{OptionalExtension, Transaction}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; @@ -69,7 +69,7 @@ mod keys { /// This is used to figure whether the sqlite database requires a migration. /// Every new SQL migration should imply a bump of this number, and changes in /// the [`SqliteStateStore::run_migrations`] function.. -const DATABASE_VERSION: u8 = 10; +const DATABASE_VERSION: u8 = 11; /// A sqlite based cryptostore. #[derive(Clone)] @@ -318,6 +318,17 @@ impl SqliteStateStore { .await?; } + if from < 11 && to >= 11 { + conn.with_transaction(move |txn| { + // Run the migration. + txn.execute_batch(include_str!( + "../migrations/state_store/010_send_queue_enqueue_time.sql" + ))?; + txn.set_db_version(11) + }) + .await?; + } + Ok(()) } @@ -1746,6 +1757,7 @@ impl StateStore for SqliteStateStore { &self, room_id: &RoomId, transaction_id: OwnedTransactionId, + created_at: MilliSecondsSinceUnixEpoch, content: QueuedRequestKind, priority: usize, ) -> Result<(), Self::Error> { @@ -1753,16 +1765,16 @@ impl StateStore for SqliteStateStore { let room_id_value = self.serialize_value(&room_id.to_owned())?; let content = self.serialize_json(&content)?; - // The transaction id is used both as a key (in remove/update) and a value (as // it's useful for the callers), so we keep it as is, and neither hash // it (with encode_key) or encrypt it (through serialize_value). After // all, it carries no personal information, so this is considered fine. + let created_at_ts: u64 = created_at.0.into(); self.acquire() .await? .with_transaction(move |txn| { - txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority) VALUES (?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority))?; + txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?; Ok(()) }) .await @@ -1824,14 +1836,14 @@ impl StateStore for SqliteStateStore { // Note: ROWID is always present and is an auto-incremented integer counter. We // want to maintain the insertion order, so we can sort using it. // Note 2: transaction_id is not encoded, see why in `save_send_queue_event`. - let res: Vec<(String, Vec, Option>, usize)> = self + let res: Vec<(String, Vec, Option>, usize, u64)> = self .acquire() .await? .prepare( - "SELECT transaction_id, content, wedge_reason, priority FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID", + "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID", |mut stmt| { stmt.query((room_id,))? - .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))) + .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?))) .collect() }, ) @@ -1839,11 +1851,13 @@ impl StateStore for SqliteStateStore { let mut requests = Vec::with_capacity(res.len()); for entry in res { + let created_at = UInt::new(entry.4).map(MilliSecondsSinceUnixEpoch); requests.push(QueuedRequest { transaction_id: entry.0.into(), kind: self.deserialize_json(&entry.1)?, error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?, priority: entry.3, + created_at, }); } @@ -1901,6 +1915,7 @@ impl StateStore for SqliteStateStore { room_id: &RoomId, parent_txn_id: &TransactionId, own_txn_id: ChildTransactionId, + created_at: MilliSecondsSinceUnixEpoch, content: DependentQueuedRequestKind, ) -> Result<()> { let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id); @@ -1910,15 +1925,22 @@ impl StateStore for SqliteStateStore { let parent_txn_id = parent_txn_id.to_string(); let own_txn_id = own_txn_id.to_string(); + let created_at_ts: u64 = created_at.0.into(); self.acquire() .await? .with_transaction(move |txn| { txn.prepare_cached( r#"INSERT INTO dependent_send_queue_events - (room_id, parent_transaction_id, own_transaction_id, content) - VALUES (?, ?, ?, ?)"#, + (room_id, parent_transaction_id, own_transaction_id, content, created_at) + VALUES (?, ?, ?, ?, ?)"#, )? - .execute((room_id, parent_txn_id, own_txn_id, content))?; + .execute(( + room_id, + parent_txn_id, + own_txn_id, + content, + created_at_ts, + ))?; Ok(()) }) .await @@ -2011,14 +2033,14 @@ impl StateStore for SqliteStateStore { let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id); // Note: transaction_id is not encoded, see why in `save_send_queue_event`. - let res: Vec<(String, String, Option>, Vec)> = self + let res: Vec<(String, String, Option>, Vec, u64)> = self .acquire() .await? .prepare( - "SELECT own_transaction_id, parent_transaction_id, parent_key, content FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID", + "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID", |mut stmt| { stmt.query((room_id,))? - .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))) + .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?))) .collect() }, ) @@ -2026,11 +2048,13 @@ impl StateStore for SqliteStateStore { let mut dependent_events = Vec::with_capacity(res.len()); for entry in res { + let created_at = UInt::new(entry.4).map(MilliSecondsSinceUnixEpoch); dependent_events.push(DependentQueuedRequest { own_transaction_id: entry.0.into(), parent_transaction_id: entry.1.into(), parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?, kind: self.deserialize_json(&entry.3)?, + created_at, }); } @@ -2384,16 +2408,15 @@ mod migration_tests { let wedge_tx = wedged_event_transaction_id.clone(); let local_tx = local_event_transaction_id.clone(); - db.save_dependent_queued_request( - room_id, - &local_tx, - ChildTransactionId::new(), - DependentQueuedRequestKind::RedactEvent, - ) - .await - .unwrap(); - conn.with_transaction(move |txn| { + add_dependent_send_queue_event_v7( + &db, + txn, + room_id, + &local_tx, + ChildTransactionId::new(), + DependentQueuedRequestKind::RedactEvent, + )?; add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?; add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?; Result::<_, Error>::Ok(()) @@ -2431,6 +2454,29 @@ mod migration_tests { txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")? .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?; + Ok(()) + } + fn add_dependent_send_queue_event_v7( + this: &SqliteStateStore, + txn: &Transaction<'_>, + room_id: &RoomId, + parent_txn_id: &TransactionId, + own_txn_id: ChildTransactionId, + content: DependentQueuedRequestKind, + ) -> Result<(), Error> { + let room_id_value = this.serialize_value(&room_id.to_owned())?; + + let parent_txn_id = parent_txn_id.to_string(); + let own_txn_id = own_txn_id.to_string(); + let content = this.serialize_json(&content)?; + + txn.prepare_cached( + "INSERT INTO dependent_send_queue_events + (room_id, parent_transaction_id, own_transaction_id, content) + VALUES (?, ?, ?, ?)", + )? + .execute((room_id_value, parent_txn_id, own_txn_id, content))?; + Ok(()) } } diff --git a/crates/matrix-sdk-ui/src/timeline/event_handler.rs b/crates/matrix-sdk-ui/src/timeline/event_handler.rs index 475c1f30a14..9b0ea87d65b 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_handler.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_handler.rs @@ -1033,6 +1033,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { send_state: EventSendState::NotSentYet, transaction_id: txn_id.to_owned(), send_handle: send_handle.clone(), + created_at: send_handle.clone().and_then(|h| h.created_at), } .into(), diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/local.rs b/crates/matrix-sdk-ui/src/timeline/event_item/local.rs index 2890b6eab8e..3f5cd48c953 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/local.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/local.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use as_variant::as_variant; use matrix_sdk::{send_queue::SendHandle, Error}; -use ruma::{EventId, OwnedEventId, OwnedTransactionId}; +use ruma::{EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId}; use super::TimelineEventItemId; @@ -30,6 +30,8 @@ pub(in crate::timeline) struct LocalEventTimelineItem { pub transaction_id: OwnedTransactionId, /// A handle to manipulate this event before it is sent, if possible. pub send_handle: Option, + /// The time that the event was created locally + pub created_at: Option, } impl LocalEventTimelineItem { diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs index a10f9fc95a6..f632e9371a7 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs @@ -268,6 +268,11 @@ impl EventTimelineItem { as_variant!(&self.kind, EventTimelineItemKind::Local(local) => &local.send_state) } + /// Get the local time that the event was enqueued at. + pub fn local_created_at(&self) -> Option { + as_variant!(&self.kind, EventTimelineItemKind::Local(local) => local.created_at).flatten() + } + /// Get the unique identifier of this item. /// /// Returns the transaction ID for a local echo item that has not been sent diff --git a/crates/matrix-sdk-ui/src/timeline/tests/echo.rs b/crates/matrix-sdk-ui/src/timeline/tests/echo.rs index 533c1c2c56d..5f611adcd71 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/echo.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/echo.rs @@ -334,7 +334,7 @@ async fn test_no_reuse_of_counters() { let local_id = assert_next_matches_with_timeout!(stream, VectorDiff::PushBack { value: item } => { let event_item = item.as_event().unwrap(); assert!(event_item.is_local_echo()); - assert_matches!(event_item.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(event_item.send_state(), Some(EventSendState::NotSentYet{ .. })); assert!(!event_item.can_be_replied_to()); item.unique_id().to_owned() }); diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs b/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs index f00ad135d05..202f294d88f 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs @@ -172,7 +172,7 @@ async fn test_retry_failed() { // First, local echo is added. assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { - assert_matches!(value.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(value.send_state(), Some(EventSendState::NotSentYet{ ..})); }); // Sending fails, because the error is a transient one that's recoverable, @@ -318,7 +318,7 @@ async fn test_cancel_failed() { // Local echo is added (immediately) assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { - assert_matches!(value.send_state(), Some(EventSendState::NotSentYet)); + assert_matches!(value.send_state(), Some(EventSendState::NotSentYet{ ..})); }); // Sending fails, the mock server has no matching route diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs index 32a11923dfc..1ab3357418e 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs @@ -273,7 +273,7 @@ async fn test_reloaded_failed_local_echoes_are_marked_as_failed() { // Local echoes are updated with the failed send state as soon as the error // response has been received. assert_let!(Some(VectorDiff::Set { index: 0, value: first }) = timeline_stream.next().await); - let (error, is_recoverable) = assert_matches!(first.send_state().unwrap(), EventSendState::SendingFailed { error, is_recoverable } => (error, is_recoverable)); + let (error, is_recoverable) = assert_matches!(first.send_state().unwrap(), EventSendState::SendingFailed { error, is_recoverable, .. } => (error, is_recoverable)); // The error is not recoverable. assert!(!is_recoverable); @@ -292,7 +292,7 @@ async fn test_reloaded_failed_local_echoes_are_marked_as_failed() { assert_eq!(initial.len(), 1); assert_eq!(initial[0].content().as_message().unwrap().body(), "wall of text"); assert_let!( - Some(EventSendState::SendingFailed { error, is_recoverable }) = initial[0].send_state() + Some(EventSendState::SendingFailed { error, is_recoverable, .. }) = initial[0].send_state() ); // Same recoverable status as above. diff --git a/crates/matrix-sdk/src/send_queue.rs b/crates/matrix-sdk/src/send_queue.rs index 1b9aec6d57c..d1fdb4e0711 100644 --- a/crates/matrix-sdk/src/send_queue.rs +++ b/crates/matrix-sdk/src/send_queue.rs @@ -162,7 +162,7 @@ use ruma::{ AnyMessageLikeEventContent, EventContent as _, }, serde::Raw, - OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId, + MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId, }; use tokio::sync::{broadcast, oneshot, Mutex, Notify, OwnedMutexGuard}; use tracing::{debug, error, info, instrument, trace, warn}; @@ -444,7 +444,8 @@ impl RoomSendQueue { let content = SerializableEventContent::from_raw(content, event_type); - let transaction_id = self.inner.queue.push(content.clone().into()).await?; + let created_at = MilliSecondsSinceUnixEpoch::now(); + let transaction_id = self.inner.queue.push(content.clone().into(), created_at).await?; trace!(%transaction_id, "manager sends a raw event to the background task"); self.inner.notifier.notify_one(); @@ -453,6 +454,7 @@ impl RoomSendQueue { room: self.clone(), transaction_id: transaction_id.clone(), media_handles: None, + created_at: Some(created_at), }; let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { @@ -950,6 +952,7 @@ impl QueueStorage { async fn push( &self, request: QueuedRequestKind, + created_at: MilliSecondsSinceUnixEpoch, ) -> Result { let transaction_id = TransactionId::new(); @@ -961,6 +964,7 @@ impl QueueStorage { .save_send_queue_request( &self.room_id, transaction_id.clone(), + created_at, request, Self::LOW_PRIORITY, ) @@ -1116,6 +1120,7 @@ impl QueueStorage { &self.room_id, transaction_id, ChildTransactionId::new(), + MilliSecondsSinceUnixEpoch::now(), DependentQueuedRequestKind::RedactEvent, ) .await?; @@ -1156,6 +1161,7 @@ impl QueueStorage { &self.room_id, transaction_id, ChildTransactionId::new(), + MilliSecondsSinceUnixEpoch::now(), DependentQueuedRequestKind::EditEvent { new_content: serializable }, ) .await?; @@ -1175,11 +1181,13 @@ impl QueueStorage { /// Push requests (and dependents) to upload a media. /// /// See the module-level description for details of the whole processus. + #[allow(clippy::too_many_arguments)] async fn push_media( &self, event: RoomMessageEventContent, content_type: Mime, send_event_txn: OwnedTransactionId, + created_at: MilliSecondsSinceUnixEpoch, upload_file_txn: OwnedTransactionId, file_media_request: MediaRequestParameters, thumbnail: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>, @@ -1187,7 +1195,6 @@ impl QueueStorage { let guard = self.store.lock().await; let client = guard.client()?; let store = client.store(); - let thumbnail_info = if let Some((thumbnail_info, thumbnail_media_request, thumbnail_content_type)) = thumbnail @@ -1199,6 +1206,7 @@ impl QueueStorage { .save_send_queue_request( &self.room_id, upload_thumbnail_txn.clone(), + created_at, QueuedRequestKind::MediaUpload { content_type: thumbnail_content_type.to_string(), cache_key: thumbnail_media_request, @@ -1215,6 +1223,7 @@ impl QueueStorage { &self.room_id, &upload_thumbnail_txn, upload_file_txn.clone().into(), + created_at, DependentQueuedRequestKind::UploadFileWithThumbnail { content_type: content_type.to_string(), cache_key: file_media_request, @@ -1230,6 +1239,7 @@ impl QueueStorage { .save_send_queue_request( &self.room_id, upload_file_txn.clone(), + created_at, QueuedRequestKind::MediaUpload { content_type: content_type.to_string(), cache_key: file_media_request, @@ -1249,6 +1259,7 @@ impl QueueStorage { &self.room_id, &upload_file_txn, send_event_txn.into(), + created_at, DependentQueuedRequestKind::FinishUpload { local_echo: event, file_upload: upload_file_txn.clone(), @@ -1266,6 +1277,7 @@ impl QueueStorage { &self, transaction_id: &TransactionId, key: String, + created_at: MilliSecondsSinceUnixEpoch, ) -> Result, RoomSendQueueStorageError> { let guard = self.store.lock().await; let client = guard.client()?; @@ -1295,6 +1307,7 @@ impl QueueStorage { &self.room_id, transaction_id, reaction_txn_id.clone(), + created_at, DependentQueuedRequestKind::ReactEvent { key }, ) .await?; @@ -1323,6 +1336,7 @@ impl QueueStorage { room: room.clone(), transaction_id: queued.transaction_id, media_handles: None, + created_at: queued.created_at, }, send_error: queued.error, }, @@ -1382,6 +1396,7 @@ impl QueueStorage { upload_thumbnail_txn: thumbnail_info.map(|info| info.txn), upload_file_txn: file_upload, }), + created_at: dep.created_at, }, send_error: None, }, @@ -1405,6 +1420,7 @@ impl QueueStorage { client: &Client, dependent_request: DependentQueuedRequest, new_updates: &mut Vec, + created_at: MilliSecondsSinceUnixEpoch, ) -> Result { let store = client.store(); @@ -1471,6 +1487,7 @@ impl QueueStorage { .save_send_queue_request( &self.room_id, dependent_request.own_transaction_id.into(), + created_at, serializable.into(), Self::HIGH_PRIORITY, ) @@ -1558,6 +1575,7 @@ impl QueueStorage { .save_send_queue_request( &self.room_id, dependent_request.own_transaction_id.into(), + created_at, serializable.into(), Self::HIGH_PRIORITY, ) @@ -1660,7 +1678,15 @@ impl QueueStorage { for dependent in canonicalized_dependent_requests { let dependent_id = dependent.own_transaction_id.clone(); - match self.try_apply_single_dependent_request(&client, dependent, new_updates).await { + match self + .try_apply_single_dependent_request( + &client, + dependent, + new_updates, + MilliSecondsSinceUnixEpoch::now(), + ) + .await + { Ok(should_remove) => { if should_remove { // The dependent request has been successfully applied, forget about it. @@ -1888,6 +1914,9 @@ pub struct SendHandle { /// Additional handles for a media upload. media_handles: Option, + + /// The time that this send handle was first created + pub created_at: Option, } impl SendHandle { @@ -2073,8 +2102,9 @@ impl SendHandle { ) -> Result, RoomSendQueueStorageError> { trace!("received an intent to react"); + let created_at = MilliSecondsSinceUnixEpoch::now(); if let Some(reaction_txn_id) = - self.room.inner.queue.react(&self.transaction_id, key.clone()).await? + self.room.inner.queue.react(&self.transaction_id, key.clone(), created_at).await? { trace!("successfully queued react"); @@ -2138,6 +2168,7 @@ impl SendReactionHandle { room: self.room.clone(), transaction_id: self.transaction_id.clone().into(), media_handles: None, + created_at: Some(MilliSecondsSinceUnixEpoch::now()), }; handle.abort().await @@ -2275,6 +2306,7 @@ mod tests { .unwrap(), }, parent_key: None, + created_at: None, }; let res = canonicalize_dependent_requests(&[edit]); @@ -2295,6 +2327,7 @@ mod tests { parent_transaction_id: txn.clone(), kind: DependentQueuedRequestKind::RedactEvent, parent_key: None, + created_at: None, }; let edit = DependentQueuedRequest { @@ -2307,6 +2340,7 @@ mod tests { .unwrap(), }, parent_key: None, + created_at: None, }; inputs.push({ @@ -2346,6 +2380,7 @@ mod tests { .unwrap(), }, parent_key: None, + created_at: None, }) .collect::>(); @@ -2377,6 +2412,7 @@ mod tests { kind: DependentQueuedRequestKind::RedactEvent, parent_transaction_id: txn1.clone(), parent_key: None, + created_at: None, }, // This one pertains to txn2. DependentQueuedRequest { @@ -2389,6 +2425,7 @@ mod tests { }, parent_transaction_id: txn2.clone(), parent_key: None, + created_at: None, }, ]; @@ -2419,6 +2456,7 @@ mod tests { kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() }, parent_transaction_id: txn.clone(), parent_key: None, + created_at: None, }; let edit_id = ChildTransactionId::new(); @@ -2432,6 +2470,7 @@ mod tests { }, parent_transaction_id: txn, parent_key: None, + created_at: None, }; let res = canonicalize_dependent_requests(&[react, edit]); diff --git a/crates/matrix-sdk/src/send_queue/upload.rs b/crates/matrix-sdk/src/send_queue/upload.rs index 7c32c8fa000..6513faca6da 100644 --- a/crates/matrix-sdk/src/send_queue/upload.rs +++ b/crates/matrix-sdk/src/send_queue/upload.rs @@ -28,7 +28,7 @@ use ruma::{ room::message::{FormattedBody, MessageType, RoomMessageEventContent}, AnyMessageLikeEventContent, }, - OwnedTransactionId, TransactionId, + MilliSecondsSinceUnixEpoch, OwnedTransactionId, TransactionId, }; use tracing::{debug, error, instrument, trace, warn, Span}; @@ -184,6 +184,7 @@ impl RoomSendQueue { config.mentions, ); + let created_at = MilliSecondsSinceUnixEpoch::now(); // Save requests in the queue storage. self.inner .queue @@ -191,6 +192,7 @@ impl RoomSendQueue { event_content.clone(), content_type, send_event_txn.clone().into(), + created_at, upload_file_txn.clone(), file_media_request, queue_thumbnail_info, @@ -205,6 +207,7 @@ impl RoomSendQueue { room: self.clone(), transaction_id: send_event_txn.clone().into(), media_handles: Some(MediaHandles { upload_thumbnail_txn, upload_file_txn }), + created_at: Some(created_at), }; let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { @@ -305,6 +308,7 @@ impl QueueStorage { .save_send_queue_request( &self.room_id, event_txn, + MilliSecondsSinceUnixEpoch::now(), new_content.into(), Self::HIGH_PRIORITY, ) @@ -349,7 +353,13 @@ impl QueueStorage { client .store() - .save_send_queue_request(&self.room_id, next_upload_txn, request, Self::HIGH_PRIORITY) + .save_send_queue_request( + &self.room_id, + next_upload_txn, + MilliSecondsSinceUnixEpoch::now(), + request, + Self::HIGH_PRIORITY, + ) .await .map_err(RoomSendQueueStorageError::StateStoreError)?; @@ -577,6 +587,7 @@ impl QueueStorage { &self.room_id, txn, ChildTransactionId::new(), + MilliSecondsSinceUnixEpoch::now(), DependentQueuedRequestKind::EditEvent { new_content: new_serialized }, ) .await?;