diff --git a/changelog.d/18873.feature b/changelog.d/18873.feature new file mode 100644 index 00000000000..bab4a0a075b --- /dev/null +++ b/changelog.d/18873.feature @@ -0,0 +1 @@ +Implement experimental [MSC3871](https://github.com/matrix-org/matrix-spec-proposals/pull/3871) to indicate `gaps` in the `/messages` timeline. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 34aae7ef3ce..3998186eff4 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -259,7 +259,9 @@ async def _maybe_backfill_inner( _BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY) for event_id, depth in await self.store.get_backfill_points_in_room( room_id=room_id, - current_depth=current_depth, + # Per the docstring, it's best to pad the `current_depth` by the + # number of messages you plan to backfill from these points. + current_depth=current_depth + limit, # We only need to end up with 5 extremities combined with the # insertion event extremities to make the `/backfill` request # but fetch an order of magnitude more to make sure there is diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index df1a7e714ce..6957ce74b65 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -414,12 +414,14 @@ async def purge_room( @trace async def get_messages( self, + *, requester: Requester, room_id: str, pagin_config: PaginationConfig, as_client_event: bool = True, event_filter: Optional[Filter] = None, use_admin_priviledge: bool = False, + backfill: bool = True, ) -> JsonDict: """Get messages in a room. @@ -432,6 +434,8 @@ async def get_messages( use_admin_priviledge: if `True`, return all events, regardless of whether `user` has access to them. To be used **ONLY** from the admin API. + backfill: If false, we skip backfill altogether. When true, we backfill as a + best effort. Returns: Pagination API results @@ -522,7 +526,7 @@ async def get_messages( event_filter=event_filter, ) - if pagin_config.direction == Direction.BACKWARDS: + if backfill and pagin_config.direction == Direction.BACKWARDS: # We use a `Set` because there can be multiple events at a given depth # and we only care about looking at the unique continum of depths to # find gaps. @@ -622,6 +626,7 @@ async def get_messages( if not events: return { "chunk": [], + "gaps": [], "start": await from_token.to_string(self.store), } @@ -641,6 +646,7 @@ async def get_messages( if not events: return { "chunk": [], + "gaps": [], "start": await from_token.to_string(self.store), "end": await next_token.to_string(self.store), } @@ -666,6 +672,10 @@ async def get_messages( events, user_id ) + gaps = await self.store.get_events_next_to_gaps( + events=events, direction=pagin_config.direction + ) + time_now = self.clock.time_msec() serialize_options = SerializeEventConfig( @@ -681,6 +691,18 @@ async def get_messages( bundle_aggregations=aggregations, ) ), + "gaps": [ + { + "prev_pagination_token": await from_token.copy_and_replace( + StreamKeyType.ROOM, gap.prev_token + ).to_string(self.store), + "event_id": gap.event_id, + "next_pagination_token": await from_token.copy_and_replace( + StreamKeyType.ROOM, gap.next_token + ).to_string(self.store), + } + for gap in gaps + ], "start": await from_token.to_string(self.store), "end": await next_token.to_string(self.store), } diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 64deae76507..6a25b392513 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -811,6 +811,17 @@ def __init__(self, hs: "HomeServer"): async def on_GET( self, request: SynapseRequest, room_id: str ) -> Tuple[int, JsonDict]: + """ + Query paremeters: + dir + from + to + limit + filter + backfill: If false, we skip backfill altogether. When true, we backfill as a + best effort. + """ + processing_start_time = self.clock.time_msec() # Fire off and hope that we get a result by the end. # @@ -840,12 +851,15 @@ async def on_GET( ): as_client_event = False + backfill = parse_boolean(request, "backfill", default=True) + msgs = await self.pagination_handler.get_messages( room_id=room_id, requester=requester, pagin_config=pagination_config, as_client_event=as_client_event, event_filter=event_filter, + backfill=backfill, ) processing_end_time = self.clock.time_msec() diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 26a91109dfe..4bef5fa5d21 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1212,6 +1212,30 @@ async def get_backfill_points_in_room( equal to the `current_depth`. Sorted by depth, highest to lowest (descending) so the closest events to the `current_depth` are first in the list. + Note: We can only do approximate depth comparisons. Backwards extremeties are + the oldest events we know of in the room but we only know of them because some + other event referenced them by prev_event and aren't persisted in our database + yet (meaning we don't know their depth specifically). So we need to look for the + approximate depth from the events connected to the current backwards + extremeties. + + It's best to pad the `current_depth` by the number of messages you plan to + backfill from these points. + + Example: + + - Your pagination token represents a scroll position at `depth` of `100`. + - We have a backfill point at an approximate depth of `125` + - You plan to backfill `50` events from that backfill point. + + When we pad our `current_depth`, `100` + `50` = `150`, we pick up the backfill + point at `125` (because <= `150`, our `current_depth`), backfill `50` events to + a depth of `75` in the timeline (exposing new events that we can return `100` -> + `75`). + + When we don't pad our `current_depth`, `100` is lower than any of the backfill + points so we don't pick any and miss out on backfilling any events. + We ignore extremities that are newer than the user's current scroll position (ie, those with depth greater than `current_depth`) as: 1. we don't really care about getting events that have happened @@ -1223,7 +1247,7 @@ async def get_backfill_points_in_room( Args: room_id: Room where we want to find the oldest events - current_depth: The depth at the user's current scrollback position + current_depth: The depth at the user's current scrollback position (see notes above). limit: The max number of backfill points to return Returns: diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index cc031d8996c..285dc15fcc4 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -34,6 +34,7 @@ Mapping, MutableMapping, Optional, + Sequence, Set, Tuple, cast, @@ -42,6 +43,7 @@ import attr from prometheus_client import Gauge +from typing_extensions import assert_never from twisted.internet import defer @@ -83,13 +85,17 @@ LoggingTransaction, make_tuple_in_list_sql_clause, ) + +# from synapse.storage.databases.main.stream import ( +# generate_next_token, +# ) from synapse.storage.types import Cursor from synapse.storage.util.id_generators import ( AbstractStreamIdGenerator, MultiWriterIdGenerator, ) from synapse.storage.util.sequence import build_sequence_generator -from synapse.types import JsonDict, get_domain_from_id +from synapse.types import JsonDict, RoomStreamToken, get_domain_from_id from synapse.types.state import StateFilter from synapse.types.storage import _BackgroundUpdates from synapse.util import unwrapFirstError @@ -100,6 +106,7 @@ from synapse.util.cancellation import cancellable from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure +from synapse.util.tokens import generate_next_token if TYPE_CHECKING: from synapse.server import HomeServer @@ -214,6 +221,34 @@ class EventRedactBehaviour(Enum): block = auto() +@attr.s(slots=True, frozen=True, auto_attribs=True) +class EventGapEntry: + """ + Represents a gap in the timeline. + + From MSC3871: Gappy timeline + """ + + event_id: str + """ + The target event ID which we see a gap before or after. + """ + + prev_token: RoomStreamToken + """ + The token position before the target `event_id` + + Remember: tokens are positions between events + """ + + next_token: RoomStreamToken + """ + The token position after the target `event_id` + + Remember: tokens are positions between events + """ + + class EventsWorkerStore(SQLBaseStore): # Whether to use dedicated DB threads for event fetching. This is only used # if there are multiple DB threads available. When used will lock the DB @@ -2315,15 +2350,24 @@ def is_event_next_to_backward_gap_txn(txn: LoggingTransaction) -> bool: is_event_next_to_backward_gap_txn, ) - async def is_event_next_to_forward_gap(self, event: EventBase) -> bool: - """Check if the given event is next to a forward gap of missing events. - The gap in front of the latest events is not considered a gap. + async def is_event_next_to_forward_gap( + self, event: EventBase, *, ignore_gap_after_latest: bool = True + ) -> bool: + """ + Check if the given event is next to a forward gap of missing events. + + By default when `ignore_gap_after_latest = True`, the gap in front of the + latest events is not considered a gap. + A(False)--->B(False)--->C(False)---> A(False)--->B(False)---> --->D(True)--->E(False) + When `ignore_gap_after_latest = False`, `A` would be considered next to a gap. + Args: - room_id: room where the event lives event: event to check (can't be an `outlier`) + ignore_gap_after_latest: Whether the gap after the latest events (forward + extremeties) in the room should be considered as an actual gap. Returns: Boolean indicating whether it's an extremity @@ -2335,38 +2379,39 @@ async def is_event_next_to_forward_gap(self, event: EventBase) -> bool: ) def is_event_next_to_gap_txn(txn: LoggingTransaction) -> bool: - # If the event in question is a forward extremity, we will just - # consider any potential forward gap as not a gap since it's one of - # the latest events in the room. - # - # `event_forward_extremities` does not include backfilled or outlier - # events so we can't rely on it to find forward gaps. We can only - # use it to determine whether a message is the latest in the room. - # - # We can't combine this query with the `forward_edge_query` below - # because if the event in question has no forward edges (isn't - # referenced by any other event's prev_events) but is in - # `event_forward_extremities`, we don't want to return 0 rows and - # say it's next to a gap. - forward_extremity_query = """ - SELECT 1 FROM event_forward_extremities - WHERE - room_id = ? - AND event_id = ? - LIMIT 1 - """ + if ignore_gap_after_latest: + # If the event in question is a forward extremity, we will just + # consider any potential forward gap as not a gap since it's one of + # the latest events in the room. + # + # `event_forward_extremities` does not include backfilled or outlier + # events so we can't rely on it to find forward gaps. We can only + # use it to determine whether a message is the latest in the room. + # + # We can't combine this query with the `forward_edge_query` below + # because if the event in question has no forward edges (isn't + # referenced by any other event's prev_events) but is in + # `event_forward_extremities`, we don't want to return 0 rows and + # say it's next to a gap. + forward_extremity_query = """ + SELECT 1 FROM event_forward_extremities + WHERE + room_id = ? + AND event_id = ? + LIMIT 1 + """ - # We consider any forward extremity as the latest in the room and - # not a forward gap. - # - # To expand, even though there is technically a gap at the front of - # the room where the forward extremities are, we consider those the - # latest messages in the room so asking other homeservers for more - # is useless. The new latest messages will just be federated as - # usual. - txn.execute(forward_extremity_query, (event.room_id, event.event_id)) - if txn.fetchone(): - return False + # We consider any forward extremity as the latest in the room and + # not a forward gap. + # + # To expand, even though there is technically a gap at the front of + # the room where the forward extremities are, we consider those the + # latest messages in the room so asking other homeservers for more + # is useless. The new latest messages will just be federated as + # usual. + txn.execute(forward_extremity_query, (event.room_id, event.event_id)) + if txn.fetchone(): + return False # Check to see whether the event in question is already referenced # by another event. If we don't see any edges, we're next to a @@ -2398,6 +2443,61 @@ def is_event_next_to_gap_txn(txn: LoggingTransaction) -> bool: is_event_next_to_gap_txn, ) + async def get_events_next_to_gaps( + self, events: Sequence[EventBase], direction: Direction + ) -> Sequence[EventGapEntry]: + """ + Find all of the events that have gaps next to them. + + When going backwards, we look for backward gaps (i.e. missing prev_events). + + When going forwards, we look for forward gaps (i.e. events that aren't + referenced by any other events). + + Args: + events: topological ordered list of events + direction: which side of the events to check for gaps. This should match the + direction we're paginating in. + """ + + gaps = [] + for event in events: + # FIXME: We should use a bulk look-up instead of N+1 queries. + if direction == Direction.BACKWARDS: + is_next_to_gap = await self.is_event_next_to_backward_gap(event) + elif direction == Direction.FORWARDS: + is_next_to_gap = await self.is_event_next_to_forward_gap( + event, ignore_gap_after_latest=False + ) + else: + assert_never(direction) + + if not is_next_to_gap: + continue + + stream_ordering = event.internal_metadata.stream_ordering + assert stream_ordering is not None, ( + "persisted events should have stream_ordering" + ) + + gaps.append( + EventGapEntry( + prev_token=generate_next_token( + direction=Direction.BACKWARDS, + last_topo_ordering=event.depth, + last_stream_ordering=stream_ordering, + ), + event_id=event.event_id, + next_token=generate_next_token( + direction=Direction.FORWARDS, + last_topo_ordering=event.depth, + last_stream_ordering=stream_ordering, + ), + ) + ) + + return gaps + async def get_event_id_for_timestamp( self, room_id: str, timestamp: int, direction: Direction ) -> Optional[str]: diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 5edac56ec3c..5f43a45b6e3 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -48,13 +48,13 @@ make_in_list_sql_clause, ) from synapse.storage.databases.main.stream import ( - generate_next_token, generate_pagination_bounds, generate_pagination_where_clause, ) from synapse.storage.engines import PostgresEngine from synapse.types import JsonDict, MultiWriterStreamToken, StreamKeyType, StreamToken from synapse.util.caches.descriptors import cached, cachedList +from synapse.util.tokens import generate_next_token if TYPE_CHECKING: from synapse.server import HomeServer diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 66280f2f9af..4650d14e324 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -86,6 +86,7 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.cancellation import cancellable from synapse.util.iterutils import batch_iter +from synapse.util.tokens import generate_next_token if TYPE_CHECKING: from synapse.server import HomeServer @@ -294,30 +295,6 @@ def generate_pagination_bounds( return order, from_bound, to_bound -def generate_next_token( - direction: Direction, last_topo_ordering: Optional[int], last_stream_ordering: int -) -> RoomStreamToken: - """ - Generate the next room stream token based on the currently returned data. - - Args: - direction: Whether pagination is going forwards or backwards. - last_topo_ordering: The last topological ordering being returned. - last_stream_ordering: The last stream ordering being returned. - - Returns: - A new RoomStreamToken to return to the client. - """ - if direction == Direction.BACKWARDS: - # Tokens are positions between events. - # This token points *after* the last event in the chunk. - # We need it to point to the event before it in the chunk - # when we are going backwards so we subtract one from the - # stream part. - last_stream_ordering -= 1 - return RoomStreamToken(topological=last_topo_ordering, stream=last_stream_ordering) - - def _make_generic_sql_bound( bound: str, column_names: Tuple[str, str], diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 943f211b118..e38c1b961fe 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -604,7 +604,7 @@ def get_stream_pos_for_instance(self, instance_name: str) -> int: return self.instance_map.get(instance_name, self.stream) def is_before_or_eq(self, other_token: Self) -> bool: - """Wether this token is before the other token, i.e. every constituent + """Whether this token is before the other token, i.e. every constituent part is before the other. Essentially it is `self <= other`. @@ -694,7 +694,7 @@ class RoomStreamToken(AbstractMultiWriterStreamToken): --- - Historic tokens start with a "t" followed by the `depth` + Historical tokens start with a "t" followed by the `depth` (`topological_ordering` in the event graph) of the event that comes before the position of the token, followed by "-", followed by the `stream_ordering` of the event that comes before the position of the token. @@ -827,17 +827,15 @@ def as_historical_tuple(self) -> Tuple[int, int]: return self.topological, self.stream - def get_stream_pos_for_instance(self, instance_name: str) -> int: - """Get the stream position that the given writer was at at this token. + def is_before_or_eq(self, other_token: Self) -> bool: + is_before_or_eq_stream_ordering = super().is_before_or_eq(other_token) + if not is_before_or_eq_stream_ordering: + return False - This only makes sense for "live" tokens that may have a vector clock - component, and so asserts that this is a "live" token. - """ - assert self.topological is None + if self.topological is not None and other_token.topological is not None: + return self.topological <= other_token.topological - # If we don't have an entry for the instance we can assume that it was - # at `self.stream`. - return self.instance_map.get(instance_name, self.stream) + return True async def to_string(self, store: "DataStore") -> str: """See class level docstring for information about the format.""" diff --git a/synapse/util/tokens.py b/synapse/util/tokens.py new file mode 100644 index 00000000000..0142ea9a430 --- /dev/null +++ b/synapse/util/tokens.py @@ -0,0 +1,47 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2025 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +# + +from typing import Optional + +from synapse.api.constants import Direction +from synapse.types import RoomStreamToken + + +def generate_next_token( + direction: Direction, last_topo_ordering: Optional[int], last_stream_ordering: int +) -> RoomStreamToken: + """ + Generate the next room stream token based on the currently returned data. + + Args: + direction: Whether pagination is going forwards or backwards. + last_topo_ordering: The last topological ordering being returned. + last_stream_ordering: The last stream ordering being returned. + + Returns: + A new RoomStreamToken to return to the client. + """ + if direction == Direction.BACKWARDS: + # Tokens are positions between events. + # This token points *after* the last event in the chunk. + # We need it to point to the event before it in the chunk + # when we are going backwards so we subtract one from the + # stream part. + last_stream_ordering -= 1 + + # TODO: Is this okay to do? Kinda seems more correct + if last_topo_ordering is not None: + last_topo_ordering -= 1 + + return RoomStreamToken(topological=last_topo_ordering, stream=last_stream_ordering) diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index 24a28fbdd28..6c75253029a 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -24,6 +24,7 @@ """Tests REST events for /rooms paths.""" import json +import logging from http import HTTPStatus from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple, Union from unittest.mock import AsyncMock, Mock, call, patch @@ -59,7 +60,14 @@ sync, ) from synapse.server import HomeServer -from synapse.types import JsonDict, RoomAlias, UserID, create_requester +from synapse.types import ( + JsonDict, + RoomAlias, + StreamKeyType, + StreamToken, + UserID, + create_requester, +) from synapse.util import Clock from synapse.util.stringutils import random_string @@ -70,6 +78,8 @@ from tests.unittest import override_config from tests.utils import default_config +logger = logging.getLogger(__name__) + PATH_PREFIX = b"/_matrix/client/api/v1" @@ -1739,8 +1749,8 @@ def test_autojoin_rooms(self) -> None: self.assertEqual(len(rooms), 4) -class RoomMessagesTestCase(RoomBase): - """Tests /rooms/$room_id/messages/$user_id/$msg_id REST events.""" +class RoomSendMessagesTestCase(RoomBase): + """Tests /rooms/{roomId}/send/{eventType}/{txnId} REST events.""" user_id = "@sid1:red" @@ -2242,6 +2252,11 @@ class RoomMessageListTestCase(RoomBase): user_id = "@sid1:red" def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = self.hs.get_datastores().main + persistence = self.hs.get_storage_controllers().persistence + assert persistence is not None + self.persistence = persistence + self.room_id = self.helper.create_room_as(self.user_id) def test_topo_token_is_accepted(self) -> None: @@ -2371,6 +2386,195 @@ def test_room_message_filter_query_validation(self) -> None: channel.json_body["errcode"], Codes.NOT_JSON, channel.json_body ) + def _setup_gappy_timeline(self) -> Tuple[Dict[str, str], Dict[str, str]]: + """ + Set up a gappy timeline for testing. + + We create a chain of events but only persist every other event so we have gaps + everywhere. + + (`p` means the event was persisted and known to this local server) + ``` + p p p p p + old history <- foo -> bar <- baz -> qux <- corge <- grault <- garply <- waldo <- fred + ``` + + We also have some that are persisted at + the beginning of the room but that's just a quirk of how we set this test + fixture up. The "old history" is supposed to represent the point that we've + actually back-paginated so far from our server. + + Returns: + Tuple of: + 1. Mapping from message to event IDs. + 2. Mapping from event IDs to messages. + """ + + message_list = [ + "old history", + "foo", + "bar", + "baz", + "qux", + "corge", + "grault", + "garply", + "waldo", + "fred", + ] + message_to_event_id_map = {} + event_id_to_message_map = {} + + # Make a straight line of events where only every other is persisted + forward_extremity_event_ids = list( + self.get_success( + self.hs.get_datastores().main.get_latest_event_ids_in_room(self.room_id) + ) + ) + previous_depth = 0 + for message_index, message_text in enumerate(message_list): + event, event_context = self.get_success( + create_event( + self.hs, + prev_event_ids=forward_extremity_event_ids, + type=EventTypes.Message, + content={"body": message_text, "msgtype": "m.text"}, + sender=self.user_id, + room_id=self.room_id, + room_version=self.get_success( + self.store.get_room_version_id(self.room_id) + ), + ) + ) + message_to_event_id_map[message_text] = event.event_id + event_id_to_message_map[event.event_id] = message_text + # Update the forward extremity to the new event + forward_extremity_event_ids = [ + event.event_id, + # Because we only persist every other event, if we just give Synapse a + # unknown event ID as a `prev_event_id`, it wont' be able to calculate + # `depth` in the DAG and will just default it to a `depth` of 1. + # + # Let's just connect it to one of the previous-previous events so that + # Synapse has some known `prev_event_id` to calculate the `depth` from. + forward_extremity_event_ids[0], + ] + + # Persist every other event (do the odds, so we start with *not* persisting + # the event representing the "old history") + if message_index % 2 == 1: + event, _, _ = self.get_success( + self.persistence.persist_event(event, event_context) + ) + # For sanity sake because `/messages` uses topological ordering, let's + # assert that the `depth` is increasing. + self.assertGreater( + event.depth, + previous_depth, + "Expected event depth to increase as we persist events", + ) + previous_depth = event.depth + + return message_to_event_id_map, event_id_to_message_map + + def test_gaps_going_backwards(self) -> None: + message_to_event_id_map, event_id_to_message_map = self._setup_gappy_timeline() + + # Craft a token the represents the position just after the "corge" event. + # When looking backwards, we should see the "corge" event. + corge_room_stream_token = self.get_success( + self.store.get_topological_token_for_event(message_to_event_id_map["corge"]) + ) + current_token = self.hs.get_event_sources().get_current_token() + corge_token = self.get_success( + current_token.copy_and_replace( + StreamKeyType.ROOM, + corge_room_stream_token, + ).to_string(self.store) + ) + + messages_type_filter = '{"types": ["m.room.message"]}' + channel = self.make_request( + "GET", + "/rooms/%s/messages?dir=b&from=%s&filter=%s" + % (self.room_id, corge_token, messages_type_filter), + ) + self.assertEqual(HTTPStatus.OK, channel.code) + logger.info("asdf %s", channel.json_body) + + # Make sure the timeline includes everything from "corge" backwards (inclusive) + # + actual_messages = [ + event_id_to_message_map.get(event["event_id"], event["event_id"]) + for event in channel.json_body["chunk"] + ] + expected_messages = [ + "corge", + # "qux", + "baz", + # "bar", + "foo", + # "old history", + ] + # Because the `assertEquals` assertion to assert exact order gives horrible diff + # output when it fails, let's use `assertIncludes` as a first step to sanity + # check everything is there before we assert the exact order. + self.assertIncludes( + set(actual_messages), + set(expected_messages), + exact=True, + ) + # Asser the actual order + self.assertEqual(actual_messages, expected_messages) + + # Make sure the gaps are correct + actual_gaps = [ + event_id_to_message_map.get(gap["event_id"], gap["event_id"]) + for gap in channel.json_body["gaps"] + ] + expected_gaps = expected_messages + # We only need to assert gaps are in the list (the order doesn't matter) + self.assertIncludes( + set(actual_gaps), + set(expected_gaps), + exact=True, + ) + # Ensure that the tokens point to the correct positions + for gap in channel.json_body["gaps"]: + event_room_stream_token = self.get_success( + self.store.get_topological_token_for_event(gap["event_id"]) + ) + + # Make sure that the `prev_pagination_token` points to the position before + # the event + prev_pagination_token = self.get_success( + StreamToken.from_string(self.store, gap["prev_pagination_token"]) + ) + assert prev_pagination_token.room_key.topological is not None, ( + "expected `gap.prev_pagination_token` to be a topological token since it was returned from `/messages`" + ) + assert prev_pagination_token.room_key.is_before_or_eq( + event_room_stream_token + ), ( + "expected the `gap.prev_pagination_token` to point to the position before the event" + ) + + # Make sure that the `next_pagination_token` points to the position after + # the event + next_pagination_token = self.get_success( + StreamToken.from_string(self.store, gap["next_pagination_token"]) + ) + assert next_pagination_token.room_key.topological is not None, ( + "expected `gap.next_pagination_token` to be a topological token since it was returned from `/messages`" + ) + assert not event_room_stream_token.is_before_or_eq( + prev_pagination_token.room_key + ), ( + "expected the `gap.next_pagination_token` to point to the position after the event" + ) + + # TODO: `test_gaps_going_forwards` + class RoomMessageFilterTestCase(RoomBase): """Tests /rooms/$room_id/messages REST events."""