Skip to content

Commit

Permalink
WIP, hack, blood and tears
Browse files Browse the repository at this point in the history
  • Loading branch information
bnjbvr committed Nov 21, 2024
1 parent 2344f00 commit 4fc22de
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl EventCacheStore for MemoryStore {
async fn reload_linked_chunk(
&self,
_room_id: &RoomId,
) -> Result<LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>, Self::Error> {
) -> Result<Option<LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>>, Self::Error> {
// TODO(hywan)
Ok(Default::default())
}
Expand Down
4 changes: 2 additions & 2 deletions crates/matrix-sdk-base/src/event_cache/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub trait EventCacheStore: AsyncTraitDeps {
async fn reload_linked_chunk(
&self,
room_id: &RoomId,
) -> Result<LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>, Self::Error>;
) -> Result<Option<LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>>, Self::Error>;

/// Add a media file's content in the media store.
///
Expand Down Expand Up @@ -174,7 +174,7 @@ impl<T: EventCacheStore> EventCacheStore for EraseEventCacheStoreError<T> {
async fn reload_linked_chunk(
&self,
room_id: &RoomId,
) -> Result<LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>, Self::Error> {
) -> Result<Option<LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>>, Self::Error> {
self.0.reload_linked_chunk(room_id).await.map_err(Into::into)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ CREATE TABLE "linked_chunks" (
-- Identifier of the chunk, unique per room.
"id" INTEGER,
-- Which room does this chunk belong to? (hashed key shared with the two other tables)
"room_id" BLOB NO NULL,
"room_id" BLOB NOT NULL,

-- Previous chunk in the linked list.
"previous" INTEGER,
Expand All @@ -21,10 +21,10 @@ CREATE TABLE "gaps" (
-- Which chunk does this gap refer to?
"chunk_id" INTEGER NOT NULL,
-- Which room does this event belong to? (hashed key shared with linked_chunks)
"room_id" BLOB NO NULL,
"room_id" BLOB NOT NULL,

-- The previous batch token of a gap (encrypted value).
"prev_token" TEXT NOT NULL,
"prev_token" BLOB NOT NULL,

-- If the owning chunk gets deleted, delete the entry too.
FOREIGN KEY(chunk_id, room_id) REFERENCES linked_chunks(id, room_id) ON DELETE CASCADE
Expand All @@ -35,14 +35,14 @@ CREATE TABLE "events" (
-- Which chunk does this event refer to?
"chunk_id" INTEGER NOT NULL,
-- Which room does this event belong to? (hashed key shared with linked_chunks)
"room_id" BLOB NO NULL,
"room_id" BLOB NOT NULL,

-- `OwnedEventId` for events, can be null if malformed.
"event_id" TEXT,
-- JSON serialized `Raw<AnySyncTimelineEvent>` (encrypted value).
"raw" BLOB NOT NULL,
-- Index (position) in the chunk.
"index" INTEGER NOT NULL,
-- Position (index) in the chunk.
"position" INTEGER NOT NULL,

-- If the owning chunk gets deleted, delete the entry too.
FOREIGN KEY(chunk_id, room_id) REFERENCES linked_chunks(id, room_id) ON DELETE CASCADE
Expand Down
108 changes: 74 additions & 34 deletions crates/matrix-sdk-sqlite/src/event_cache_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use matrix_sdk_store_encryption::StoreCipher;
use ruma::{MilliSecondsSinceUnixEpoch, RoomId};
use rusqlite::{OptionalExtension, Transaction};
use tokio::fs;
use tracing::debug;
use tracing::{debug, trace};

use crate::{
error::{Error, Result},
Expand Down Expand Up @@ -201,20 +201,35 @@ impl EventCacheStore for SqliteEventCacheStore {
room_id: &RoomId,
updates: &[Update<Event, Gap>],
) -> Result<(), Self::Error> {
let room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);

for up in updates {
match up {
Update::NewItemsChunk { previous, new, next } => {
let new = new.clone();
let previous = previous.clone();
let next = next.clone();
let room_id = room_id.clone();
let hashed_room_id = hashed_room_id.clone();

trace!(
%room_id,
"new events chunk (prev={:?}, i={}, next={:?})",
previous.as_ref().map(ChunkIdentifier::index),
new.index(),
next.as_ref().map(ChunkIdentifier::index)
);

self.acquire()
.await?
.with_transaction(move |txn| {
insert_chunk(txn, &room_id, previous.as_ref(), new, next.as_ref(), "E")
insert_chunk(
txn,
&hashed_room_id,
previous.as_ref(),
new,
next.as_ref(),
"E",
)
})
.await?;
}
Expand All @@ -223,17 +238,26 @@ impl EventCacheStore for SqliteEventCacheStore {
let new = new.clone();
let previous = previous.clone();
let next = next.clone();
let room_id = room_id.clone();
let hashed_room_id = hashed_room_id.clone();

let serialized = serde_json::to_vec(&gap.prev_token)?;
let prev_token = self.encode_value(serialized)?;

let prev_token = self.encode_value(gap.prev_token.clone().into_bytes())?;
trace!(
%room_id,
"new gap chunk (prev={:?}, i={}, next={:?})",
previous.as_ref().map(ChunkIdentifier::index),
new.index(),
next.as_ref().map(ChunkIdentifier::index)
);

self.acquire()
.await?
.with_transaction(move |txn| -> rusqlite::Result<()> {
// Insert the chunk as a gap.
insert_chunk(
txn,
&room_id,
&hashed_room_id,
previous.as_ref(),
new,
next.as_ref(),
Expand All @@ -248,7 +272,7 @@ impl EventCacheStore for SqliteEventCacheStore {
INSERT INTO gaps(chunk_id, room_id, prev_token)
VALUES (?, ?, ?)
"#,
(new.index(), room_id, prev_token),
(new.index(), hashed_room_id, prev_token),
)?;

Ok(())
Expand All @@ -257,32 +281,34 @@ impl EventCacheStore for SqliteEventCacheStore {
}

Update::RemoveChunk(chunk_identifier) => {
let room_id = room_id.clone();
let hashed_room_id = hashed_room_id.clone();
let chunk_id = chunk_identifier.index();

trace!(%room_id, "removing chunk @ {chunk_id}");

self.acquire()
.await?
.with_transaction(move |txn| -> rusqlite::Result<()> {
// Find chunk to delete.
let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
"SELECT previous, next FROM linked_chunks WHERE id = ? AND room_id = ?",
(chunk_id, &room_id),
(chunk_id, &hashed_room_id),
|row| Ok((row.get(0)?, row.get(1)?))
)?;

// Replace its previous' next to its own next.
if let Some(previous) = previous {
txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND room_id = ?", (next, previous, &room_id))?;
txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND room_id = ?", (next, previous, &hashed_room_id))?;
}

// Replace its next' previous to its own previous.
if let Some(next) = next {
txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND room_id = ?", (previous, next, &room_id))?;
txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND room_id = ?", (previous, next, &hashed_room_id))?;
}

// Now delete it, and let cascading delete corresponding entries in the
// other data tables.
txn.execute("DELETE FROM linked_chunks WHERE id = ? AND room_id = ?", (chunk_id, room_id))?;
txn.execute("DELETE FROM linked_chunks WHERE id = ? AND room_id = ?", (chunk_id, hashed_room_id))?;

Ok(())
})
Expand All @@ -291,7 +317,9 @@ impl EventCacheStore for SqliteEventCacheStore {

Update::PushItems { at, items } => {
let chunk_id = at.chunk_identifier().index();
let room_id = room_id.clone();
let hashed_room_id = hashed_room_id.clone();

trace!(%room_id, "pushing items @ {chunk_id}");

let entries = items
.into_iter()
Expand All @@ -301,7 +329,7 @@ impl EventCacheStore for SqliteEventCacheStore {
let raw = self.encode_value(serialized)?;
let event_id = event.event_id().map(|event_id| event_id.to_string());
let index = at.index() + i;
Ok((chunk_id, event_id, raw, index))
Ok((event_id, raw, index))
})
.collect::<Result<Vec<_>, _>>()?;

Expand All @@ -311,10 +339,10 @@ impl EventCacheStore for SqliteEventCacheStore {
for entry in entries {
txn.execute(
r#"
INSERT INTO events(chunk_id, room_id, event_id, raw, index)
VALUES (?, ?, ?, ?)
INSERT INTO events(chunk_id, room_id, event_id, raw, position)
VALUES (?, ?, ?, ?, ?)
"#,
(entry.0, &room_id, entry.1, entry.2, entry.3),
(chunk_id, &hashed_room_id, entry.0, entry.1, entry.2),
)?;
}

Expand All @@ -324,25 +352,27 @@ impl EventCacheStore for SqliteEventCacheStore {
}

Update::RemoveItem { at } => {
let room_id = room_id.clone();
let hashed_room_id = hashed_room_id.clone();
let chunk_id = at.chunk_identifier().index();
let index = at.index();

trace!(%room_id, "removing item @ {chunk_id}:{index}");

self.acquire()
.await?
.with_transaction(move |txn| -> rusqlite::Result<()> {
// Remove the entry.
txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND index = ?", (&room_id, chunk_id, index))?;
txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position = ?", (&hashed_room_id, chunk_id, index))?;

// Decrement the index of each item after the one we're going to
// remove.
txn.execute(
r#"
UPDATE events
SET index = index - 1
WHERE room_id = ? AND chunk_id = ? AND index > ?
SET position = position - 1
WHERE room_id = ? AND chunk_id = ? AND position > ?
"#,
(&room_id, chunk_id, index)
(&hashed_room_id, chunk_id, index)
)?;

Ok(())
Expand All @@ -351,15 +381,17 @@ impl EventCacheStore for SqliteEventCacheStore {
}

Update::DetachLastItems { at } => {
let room_id = room_id.clone();
let hashed_room_id = hashed_room_id.clone();
let chunk_id = at.chunk_identifier().index();
let index = at.index();

trace!(%room_id, "truncating items >= {chunk_id}:{index}");

self.acquire()
.await?
.with_transaction(move |txn| -> rusqlite::Result<()> {
// Remove these entries.
txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND index >= ?", (&room_id, chunk_id, index))?;
txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position >= ?", (&hashed_room_id, chunk_id, index))?;
Ok(())
})
.await?;
Expand Down Expand Up @@ -391,8 +423,11 @@ impl EventCacheStore for SqliteEventCacheStore {
async fn reload_linked_chunk(
&self,
room_id: &RoomId,
) -> Result<LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>, Self::Error> {
let room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
) -> Result<Option<LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>>, Self::Error> {
let room_id = room_id.to_owned();
let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);

let this = self.clone();

let result = self
.acquire()
Expand All @@ -404,7 +439,7 @@ impl EventCacheStore for SqliteEventCacheStore {
.prepare(
"SELECT id, previous, next, type FROM linked_chunks WHERE room_id = ?",
)?
.query_map((&room_id,), |row| {
.query_map((&hashed_room_id,), |row| {
Ok((
row.get::<_, u64>(0)?,
row.get::<_, Option<u64>>(1)?,
Expand All @@ -415,15 +450,19 @@ impl EventCacheStore for SqliteEventCacheStore {
{
let (id, previous, next, chunk_type) = data?;

trace!(%room_id, "reloaded chunk {id} of type {chunk_type}");

match chunk_type.as_str() {
"G" => {
// It's a gap! There's at most one row for it in the database, so a
// call to `query_row` is sufficient.
let prev_token: String = txn.query_row(
let encoded_prev_token: Vec<u8> = txn.query_row(
"SELECT prev_token FROM gaps WHERE chunk_id = ? AND room_id = ?",
(id, &room_id),
(id, &hashed_room_id),
|row| row.get(0),
)?;
let prev_token_bytes = this.decode_value(&encoded_prev_token)?;
let prev_token = serde_json::from_slice(&prev_token_bytes)?;

let previous = previous.map(ChunkIdentifier::from_raw);
let next = next.map(ChunkIdentifier::from_raw);
Expand All @@ -444,12 +483,13 @@ impl EventCacheStore for SqliteEventCacheStore {
r#"
SELECT raw FROM events
WHERE chunk_id = ? AND room_id = ?
ORDER BY index ASC
ORDER BY position ASC
"#,
)?
.query_map((id, &room_id), |row| row.get::<_, Vec<u8>>(0))?
.query_map((id, &hashed_room_id), |row| row.get::<_, Vec<u8>>(0))?
{
let raw = event_data?;
let encoded_raw = event_data?;
let raw = this.decode_value(&encoded_raw)?;
let raw_event = serde_json::from_slice(&raw)?;

// TODO: keep encryption information around!
Expand All @@ -473,7 +513,7 @@ impl EventCacheStore for SqliteEventCacheStore {

builder.set_observable();

Ok(builder.build().unwrap_or_default())
Ok(builder.build())
})
.await?;

Expand Down
19 changes: 18 additions & 1 deletion crates/matrix-sdk/src/event_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use tokio::sync::{
broadcast::{error::RecvError, Receiver},
Mutex, RwLock,
};
use tracing::{error, info_span, instrument, trace, warn, Instrument as _, Span};
use tracing::{debug, error, info_span, instrument, trace, warn, Instrument as _, Span};

use self::paginator::PaginatorError;
use crate::{client::WeakClient, Client};
Expand Down Expand Up @@ -173,6 +173,22 @@ impl EventCache {
pub fn subscribe(&self) -> Result<()> {
let client = self.inner.client()?;

// TODO shady as F, don't do that for realz
let c = client.clone();
let this = self.clone();
std::thread::spawn(move || {
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(async {
for room in c.rooms() {
if let Err(err) = this.for_room(room.room_id()).await {
warn!("couldn't reload room cache for room {}: {err}", room.room_id());
}
}
});
})
.join()
.unwrap();

let _ = self.inner.drop_handles.get_or_init(|| {
// Spawn the task that will listen to all the room updates at once.
let listen_updates_task = spawn(Self::listen_task(
Expand Down Expand Up @@ -228,6 +244,7 @@ impl EventCache {

async move {
while ignore_user_list_stream.next().await.is_some() {
debug!("received a ignore user list change");
if let Err(err) = inner.clear_all_rooms().await {
warn!("error when clearing all room after an ignore list update: {err}");
}
Expand Down
Loading

0 comments on commit 4fc22de

Please sign in to comment.