Skip to content

Commit b1ef4ca

Browse files
Daniel SalinasDaniel Salinas
authored andcommitted
Add an enqueue time to the send queue system
1 parent bf6fa4c commit b1ef4ca

File tree

6 files changed

+71
-24
lines changed

6 files changed

+71
-24
lines changed

crates/matrix-sdk-base/src/store/memory_store.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ use ruma::{
3030
},
3131
serde::Raw,
3232
time::Instant,
33-
CanonicalJsonObject, EventId, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedTransactionId,
34-
OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId,
33+
CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri,
34+
OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId,
3535
};
3636
use tracing::{debug, instrument, trace, warn};
3737

@@ -809,12 +809,15 @@ impl StateStore for MemoryStore {
809809
kind: QueuedRequestKind,
810810
priority: usize,
811811
) -> Result<(), Self::Error> {
812-
self.send_queue_events
813-
.write()
814-
.unwrap()
815-
.entry(room_id.to_owned())
816-
.or_default()
817-
.push(QueuedRequest { kind, transaction_id, error: None, priority });
812+
self.send_queue_events.write().unwrap().entry(room_id.to_owned()).or_default().push(
813+
QueuedRequest {
814+
kind,
815+
transaction_id,
816+
error: None,
817+
priority,
818+
enqueue_time: Some(MilliSecondsSinceUnixEpoch::now()),
819+
},
820+
);
818821
Ok(())
819822
}
820823

@@ -912,6 +915,7 @@ impl StateStore for MemoryStore {
912915
parent_transaction_id: parent_transaction_id.to_owned(),
913916
own_transaction_id,
914917
parent_key: None,
918+
enqueue_time: None,
915919
},
916920
);
917921
Ok(())

crates/matrix-sdk-base/src/store/send_queue.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ use ruma::{
2323
AnyMessageLikeEventContent, EventContent as _, RawExt as _,
2424
},
2525
serde::Raw,
26-
OwnedDeviceId, OwnedEventId, OwnedTransactionId, OwnedUserId, TransactionId, UInt,
26+
MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedEventId, OwnedTransactionId, OwnedUserId,
27+
TransactionId, UInt,
2728
};
2829
use serde::{Deserialize, Serialize};
2930

@@ -131,6 +132,9 @@ pub struct QueuedRequest {
131132
/// The bigger the value, the higher the priority at which this request
132133
/// should be handled.
133134
pub priority: usize,
135+
136+
/// The time that the request was original attempted.
137+
pub enqueue_time: Option<MilliSecondsSinceUnixEpoch>,
134138
}
135139

136140
impl QueuedRequest {
@@ -365,6 +369,9 @@ pub struct DependentQueuedRequest {
365369
/// If the parent request has been sent, the parent's request identifier
366370
/// returned by the server once the local echo has been sent out.
367371
pub parent_key: Option<SentRequestKey>,
372+
373+
/// The time that the request was original attempted.
374+
pub enqueue_time: Option<MilliSecondsSinceUnixEpoch>,
368375
}
369376

370377
impl DependentQueuedRequest {

crates/matrix-sdk-indexeddb/src/state_store/mod.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ use ruma::{
4444
GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, SyncStateEvent,
4545
},
4646
serde::Raw,
47-
CanonicalJsonObject, EventId, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedTransactionId,
48-
OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId,
47+
CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri,
48+
OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId,
4949
};
5050
use serde::{de::DeserializeOwned, Deserialize, Serialize};
5151
use tracing::{debug, warn};
@@ -444,6 +444,8 @@ struct PersistedQueuedRequest {
444444
is_wedged: Option<bool>,
445445

446446
event: Option<SerializableEventContent>,
447+
448+
enqueue_time: Option<MilliSecondsSinceUnixEpoch>,
447449
}
448450

449451
impl PersistedQueuedRequest {
@@ -464,7 +466,13 @@ impl PersistedQueuedRequest {
464466
// By default, events without a priority have a priority of 0.
465467
let priority = self.priority.unwrap_or(0);
466468

467-
Some(QueuedRequest { kind, transaction_id: self.transaction_id, error, priority })
469+
Some(QueuedRequest {
470+
kind,
471+
transaction_id: self.transaction_id,
472+
error,
473+
priority,
474+
enqueue_time: self.enqueue_time,
475+
})
468476
}
469477
}
470478

@@ -1389,6 +1397,7 @@ impl_state_store!({
13891397
is_wedged: None,
13901398
event: None,
13911399
priority: Some(priority),
1400+
enqueue_time: Some(MilliSecondsSinceUnixEpoch::now()),
13921401
});
13931402

13941403
// Save the new vector into db.
@@ -1584,6 +1593,7 @@ impl_state_store!({
15841593
parent_transaction_id: parent_txn_id.to_owned(),
15851594
own_transaction_id: own_txn_id,
15861595
parent_key: None,
1596+
enqueue_time: None,
15871597
});
15881598

15891599
// Save the new vector into db.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
-- Migration script to add the enqueue_time column to the send_queue_events table
2+
ALTER TABLE "send_queue_events"
3+
ADD COLUMN "enqueue_time" INTEGER NOT NULL DEFAULT (strftime('%s', 'now'));
4+
5+
ALTER TABLE "dependent_send_queue_events"
6+
ADD COLUMN "enqueue_time" INTEGER NOT NULL DEFAULT (strftime('%s', 'now'));

crates/matrix-sdk-sqlite/src/state_store.rs

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ use ruma::{
3232
GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
3333
},
3434
serde::Raw,
35-
CanonicalJsonObject, EventId, OwnedEventId, OwnedRoomId, OwnedTransactionId, OwnedUserId,
36-
RoomId, RoomVersionId, TransactionId, UserId,
35+
CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
36+
OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UInt, UserId,
3737
};
3838
use rusqlite::{OptionalExtension, Transaction};
3939
use serde::{de::DeserializeOwned, Deserialize, Serialize};
@@ -69,7 +69,7 @@ mod keys {
6969
/// This is used to figure whether the sqlite database requires a migration.
7070
/// Every new SQL migration should imply a bump of this number, and changes in
7171
/// the [`SqliteStateStore::run_migrations`] function..
72-
const DATABASE_VERSION: u8 = 10;
72+
const DATABASE_VERSION: u8 = 11;
7373

7474
/// A sqlite based cryptostore.
7575
#[derive(Clone)]
@@ -318,6 +318,17 @@ impl SqliteStateStore {
318318
.await?;
319319
}
320320

321+
if from < 11 && to >= 11 {
322+
conn.with_transaction(move |txn| {
323+
// Run the migration.
324+
txn.execute_batch(include_str!(
325+
"../migrations/state_store/010_send_queue_enqueue_time.sql"
326+
))?;
327+
txn.set_db_version(11)
328+
})
329+
.await?;
330+
}
331+
321332
Ok(())
322333
}
323334

@@ -1753,7 +1764,6 @@ impl StateStore for SqliteStateStore {
17531764
let room_id_value = self.serialize_value(&room_id.to_owned())?;
17541765

17551766
let content = self.serialize_json(&content)?;
1756-
17571767
// The transaction id is used both as a key (in remove/update) and a value (as
17581768
// it's useful for the callers), so we keep it as is, and neither hash
17591769
// it (with encode_key) or encrypt it (through serialize_value). After
@@ -1824,26 +1834,28 @@ impl StateStore for SqliteStateStore {
18241834
// Note: ROWID is always present and is an auto-incremented integer counter. We
18251835
// want to maintain the insertion order, so we can sort using it.
18261836
// Note 2: transaction_id is not encoded, see why in `save_send_queue_event`.
1827-
let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize)> = self
1837+
let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, u64)> = self
18281838
.acquire()
18291839
.await?
18301840
.prepare(
1831-
"SELECT transaction_id, content, wedge_reason, priority FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1841+
"SELECT transaction_id, content, wedge_reason, priority, enqueue_time FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
18321842
|mut stmt| {
18331843
stmt.query((room_id,))?
1834-
.mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)))
1844+
.mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
18351845
.collect()
18361846
},
18371847
)
18381848
.await?;
18391849

18401850
let mut requests = Vec::with_capacity(res.len());
18411851
for entry in res {
1852+
let enqueue_time = MilliSecondsSinceUnixEpoch(UInt::new(entry.4).unwrap());
18421853
requests.push(QueuedRequest {
18431854
transaction_id: entry.0.into(),
18441855
kind: self.deserialize_json(&entry.1)?,
18451856
error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
18461857
priority: entry.3,
1858+
enqueue_time: Some(enqueue_time),
18471859
});
18481860
}
18491861

@@ -1909,7 +1921,6 @@ impl StateStore for SqliteStateStore {
19091921
// See comment in `save_send_queue_event`.
19101922
let parent_txn_id = parent_txn_id.to_string();
19111923
let own_txn_id = own_txn_id.to_string();
1912-
19131924
self.acquire()
19141925
.await?
19151926
.with_transaction(move |txn| {
@@ -2011,26 +2022,28 @@ impl StateStore for SqliteStateStore {
20112022
let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
20122023

20132024
// Note: transaction_id is not encoded, see why in `save_send_queue_event`.
2014-
let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>)> = self
2025+
let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, u64)> = self
20152026
.acquire()
20162027
.await?
20172028
.prepare(
2018-
"SELECT own_transaction_id, parent_transaction_id, parent_key, content FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2029+
"SELECT own_transaction_id, parent_transaction_id, parent_key, content, enqueue_time FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
20192030
|mut stmt| {
20202031
stmt.query((room_id,))?
2021-
.mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)))
2032+
.mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
20222033
.collect()
20232034
},
20242035
)
20252036
.await?;
20262037

20272038
let mut dependent_events = Vec::with_capacity(res.len());
20282039
for entry in res {
2040+
let enqueue_time = MilliSecondsSinceUnixEpoch(UInt::new(entry.4).unwrap());
20292041
dependent_events.push(DependentQueuedRequest {
20302042
own_transaction_id: entry.0.into(),
20312043
parent_transaction_id: entry.1.into(),
20322044
parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?,
20332045
kind: self.deserialize_json(&entry.3)?,
2046+
enqueue_time: Some(enqueue_time),
20342047
});
20352048
}
20362049

@@ -2427,7 +2440,6 @@ mod migration_tests {
24272440
let room_id_value = this.serialize_value(&room_id.to_owned())?;
24282441

24292442
let content = this.serialize_json(&content)?;
2430-
24312443
txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
24322444
.execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
24332445

crates/matrix-sdk/src/send_queue.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2275,6 +2275,7 @@ mod tests {
22752275
.unwrap(),
22762276
},
22772277
parent_key: None,
2278+
enqueue_time: None,
22782279
};
22792280
let res = canonicalize_dependent_requests(&[edit]);
22802281

@@ -2295,6 +2296,7 @@ mod tests {
22952296
parent_transaction_id: txn.clone(),
22962297
kind: DependentQueuedRequestKind::RedactEvent,
22972298
parent_key: None,
2299+
enqueue_time: None,
22982300
};
22992301

23002302
let edit = DependentQueuedRequest {
@@ -2307,6 +2309,7 @@ mod tests {
23072309
.unwrap(),
23082310
},
23092311
parent_key: None,
2312+
enqueue_time: None,
23102313
};
23112314

23122315
inputs.push({
@@ -2346,6 +2349,7 @@ mod tests {
23462349
.unwrap(),
23472350
},
23482351
parent_key: None,
2352+
enqueue_time: None,
23492353
})
23502354
.collect::<Vec<_>>();
23512355

@@ -2377,6 +2381,7 @@ mod tests {
23772381
kind: DependentQueuedRequestKind::RedactEvent,
23782382
parent_transaction_id: txn1.clone(),
23792383
parent_key: None,
2384+
enqueue_time: None,
23802385
},
23812386
// This one pertains to txn2.
23822387
DependentQueuedRequest {
@@ -2389,6 +2394,7 @@ mod tests {
23892394
},
23902395
parent_transaction_id: txn2.clone(),
23912396
parent_key: None,
2397+
enqueue_time: None,
23922398
},
23932399
];
23942400

@@ -2419,6 +2425,7 @@ mod tests {
24192425
kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() },
24202426
parent_transaction_id: txn.clone(),
24212427
parent_key: None,
2428+
enqueue_time: None,
24222429
};
24232430

24242431
let edit_id = ChildTransactionId::new();
@@ -2432,6 +2439,7 @@ mod tests {
24322439
},
24332440
parent_transaction_id: txn,
24342441
parent_key: None,
2442+
enqueue_time: None,
24352443
};
24362444

24372445
let res = canonicalize_dependent_requests(&[react, edit]);

0 commit comments

Comments
 (0)