Skip to content

Add support for MSC4293 - Redact on Kick/Ban #18540

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18540.feature
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this is the right architecture for dealing with such redactions. Pulling out all events sent by the user has a few potential issues: a) if there are a lot of events it might take a lot of time, and b) won't redact events that we haven't backfilled over federation yet. The MSC also seems to suggest that events should get unredacted if the ban is replaced with another state event without a redactions?

An alternate method may be to have a separate table that is room_redactions(room_id, sender), which we then check (as well as redactions table) when fetching the event? The new room_redactions would be updated when we add the ban/kick to the current_state_events table, and removed if it gets replaced?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MSC also seems to suggest that events should get unredacted if the ban is replaced with another state event without a redactions?

They shouldn't get unredacted, but the auto-redaction should cease for newly received events. I'll try to clarify this in the MSC itself.

An example series of events:

  1. Alice sends a message
  2. Bob bans Alice, with redact_events: true
  3. [Servers and clients redact Alice's message in step 1]
  4. Due to propagation delays, Alice's second message arrives after the ban. The server (and client, if it received it) redacts that message too.
  5. Later, Bob unbans Alice, setting redact_events: false
  6. Again due to propagation delays, or because Alice rejoined, Alice's third message arrives. It is not redacted.

The messages in steps 1 and 4 remain redacted even after step 5.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@erikjohnston Looking at it I think a room_redactions table might work (with an added end_ts column or something to indicate the end of a redaction application in the case that the ban is rescinded, etc), but would still need to pull all of the user's event ids (but not full events) out of the db because they are needed to invalidate the cache.

I don't have a sense of the time difference to pull event ids out of the database vs pulling the full events so it is unclear to me whether having to still pull the ids will negate the benefit of the new table - right now the PR does both so it is an obvious improvement but is it enough or does another approach need to be found?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with an added end_ts column or something to indicate the end of a redaction application in the case that the ban is rescinded, etc

Wouldn't removing the row from the table be enough to stop redacting new events?

I don't have a sense of the time difference to pull event ids out of the database vs pulling the full events so it is unclear to me whether having to still pull the ids will negate the benefit of the new table

Pulling out only event IDs will certainly be much less data than full events, which should contribute directly to time taken executing the query. By how much depends on how large a user's events typically are. But you're right in that we'll still need to do the hard work of querying for every event a user has sent in a room.


The overview of this PR is:

The redactions table is updated to change the unique constraint from event_id to (event_id, redacts). This is so one event (a kick/ban membership) can redact multiple other events.

When a new event comes in over federation, add redacted_because to its unsigned and add a redaction to the local DB, then invalidate the event cache for that event.

/ban and /kick are updated with a org.matrix.msc4293.redact_events JSON body parameter. If provided, that field is added to the content of the ban/kick membership event.

When any event is persisted (local or over federation), all event IDs that a user has sent in a room are pulled out and entries in redactions are created for them. Each event has redacted_because added to it. The get_event cache is invalidated for each of these events.


What's more concerning to me is that the query to lookup all events a user has sent in a room happens is blocking during processing of a membership event. Perhaps that should be moved to a background task?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but would still need to pull all of the user's event ids (but not full events) out of the db because they are needed to invalidate the cache.

I think you can invalidate those caches by room as well as by event ID?

Looking at it I think a room_redactions table might work (with an added end_ts column or something to indicate the end of a redaction application in the case that the ban is rescinded, etc)

We would probably want to do this by some sort of stream ordering or whatever. We might do this as a two step:

  1. On receipt of ban, add room ID / target to table and record the range of stream orderings that should be redacted.
  2. On receipt of new events (e.g. via backfill), check if they match the ban and if so record them as redacted.

or something

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for [MSC4293](https://github.com/matrix-org/matrix-spec-proposals/pull/4293) - Redact on Kick/Ban.
3 changes: 3 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,3 +569,6 @@ def read_config(

# MSC4155: Invite filtering
self.msc4155_enabled: bool = experimental.get("msc4155_enabled", False)

# MSC4293: Redact on Kick/Ban
self.msc4293_enabled: bool = experimental.get("msc4293_enabled", False)
60 changes: 60 additions & 0 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -1966,6 +1966,9 @@ async def _check_for_soft_fail(
Does nothing for events in rooms with partial state, since we may not have an
accurate membership event for the sender in the current state.

Also checks if event should be redacted due to a MSC4293 redaction flag in kick/ban
event for user

Args:
event
context: The `EventContext` which we are about to persist the event with.
Expand Down Expand Up @@ -2065,6 +2068,63 @@ async def _check_for_soft_fail(
soft_failed_event_counter.inc()
event.internal_metadata.soft_failed = True

if not self._config.experimental.msc4293_enabled:
return
# Use already calculated auth events to determine if the event should be redacted due to kick/ban
state_map = {}
for auth_event in current_auth_events:
state_map[(auth_event.type, auth_event.state_key)] = auth_event

for auth_event in current_auth_events:
if auth_event.get("state_key") is None:
continue
if (
auth_event.type != EventTypes.Member
or auth_event.state_key != event.sender
):
continue
if auth_event.membership not in [Membership.LEAVE, Membership.BAN]:
continue
# self-bans currently are not authorized so we don't check for that
# case
if (
auth_event.membership == Membership.LEAVE
and auth_event.sender == auth_event.state_key
):
continue
# we have a ban or kick for this sender,
# check for redaction flag and apply if found
autoredact = auth_event.content.get(
"org.matrix.msc4293.redact_events", False
)
if not autoredact or not isinstance(autoredact, bool):
continue

# check that the sender of the kick/ban can redact
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
if event_auth.check_redaction(room_version_obj, event, state_map):
# copy through the redacting event
event.unsigned["redacted_because"] = auth_event.get_dict()
await self._store.db_pool.simple_upsert(
table="redactions",
keyvalues={
"event_id": auth_event.event_id,
"redacts": event.event_id,
},
values={"received_ts": self._clock.time_msec()},
insertion_values={
"event_id": auth_event.event_id,
"redacts": event.event_id,
"received_ts": self._clock.time_msec(),
},
)
await self._store.db_pool.runInteraction(
"invalidate cache",
self._store.invalidate_get_event_cache_after_txn,
event.event_id,
)
Comment on lines +2121 to +2125
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we actually inserted the event into the cache at this point yet?

break

async def _load_or_fetch_auth_events_for_event(
self, destination: Optional[str], event: EventBase
) -> Collection[EventBase]:
Expand Down
32 changes: 20 additions & 12 deletions synapse/rest/client/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,7 @@ def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.room_member_handler = hs.get_room_member_handler()
self.auth = hs.get_auth()
self.config = hs.config

def register(self, http_server: HttpServer) -> None:
# /rooms/$roomid/[join|invite|leave|ban|unban|kick]
Expand All @@ -1111,12 +1112,12 @@ async def _do(
}:
raise AuthError(403, "Guest access not allowed")

content = parse_json_object_from_request(request, allow_empty_body=True)
request_body = parse_json_object_from_request(request, allow_empty_body=True)

if membership_action == "invite" and all(
key in content for key in ("medium", "address")
key in request_body for key in ("medium", "address")
):
if not all(key in content for key in ("id_server", "id_access_token")):
if not all(key in request_body for key in ("id_server", "id_access_token")):
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"`id_server` and `id_access_token` are required when doing 3pid invite",
Expand All @@ -1127,12 +1128,12 @@ async def _do(
await self.room_member_handler.do_3pid_invite(
room_id,
requester.user,
content["medium"],
content["address"],
content["id_server"],
request_body["medium"],
request_body["address"],
request_body["id_server"],
requester,
txn_id,
content["id_access_token"],
request_body["id_access_token"],
)
except ShadowBanError:
# Pretend the request succeeded.
Expand All @@ -1141,12 +1142,19 @@ async def _do(

target = requester.user
if membership_action in ["invite", "ban", "unban", "kick"]:
assert_params_in_dict(content, ["user_id"])
target = UserID.from_string(content["user_id"])
assert_params_in_dict(request_body, ["user_id"])
target = UserID.from_string(request_body["user_id"])

event_content = None
if "reason" in content:
event_content = {"reason": content["reason"]}
if "reason" in request_body:
event_content = {"reason": request_body["reason"]}
if self.config.experimental.msc4293_enabled:
if "org.matrix.msc4293.redact_events" in request_body:
if event_content is None:
event_content = {}
event_content["org.matrix.msc4293.redact_events"] = request_body[
"org.matrix.msc4293.redact_events"
]

try:
await self.room_member_handler.update_membership(
Expand All @@ -1155,7 +1163,7 @@ async def _do(
room_id=room_id,
action=membership_action,
txn_id=txn_id,
third_party_signed=content.get("third_party_signed", None),
third_party_signed=request_body.get("third_party_signed", None),
content=event_content,
)
except ShadowBanError:
Expand Down
85 changes: 83 additions & 2 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@
(EventTypes.Tombstone, ""),
)

# An arbitrarily large number
MAX_EVENTS = 1000000


@attr.s(slots=True, auto_attribs=True)
class DeltaState:
Expand Down Expand Up @@ -376,6 +379,85 @@ async def _persist_events_and_state_updates(

event_counter.labels(event.type, origin_type, origin_entity).inc()

# check for msc4293 redact_events flag and apply if found
if (
not self.hs.config.experimental.msc4293_enabled
or event.type != EventTypes.Member
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sanity check:

Suggested change
or event.type != EventTypes.Member
or event.type != EventTypes.Member
or event.state_key is None

):
continue
if event.membership not in [Membership.LEAVE, Membership.BAN]:
continue
redact = event.content.get("org.matrix.msc4293.redact_events", False)
if not redact or not isinstance(redact, bool):
continue
# self-bans currently are not authorized so we don't check for that
# case
if (
event.membership == Membership.LEAVE
and event.sender == event.state_key
):
continue
# check that sender can redact
state_filter = StateFilter.from_types([(EventTypes.PowerLevels, "")])
state = await self.store.get_partial_filtered_current_state_ids(
event.room_id, state_filter
)
pl_id = state[(EventTypes.PowerLevels, "")]
pl_event = await self.store.get_event(pl_id)
if pl_event:
sender_level = pl_event.content.get("users", {}).get(event.sender)
if sender_level is None:
sender_level = pl_event.content.get("users_default", 0)

redact_level = pl_event.content.get("redact")
if not redact_level:
redact_level = pl_event.content.get("events_default", 0)

room_redaction_level = pl_event.content.get("events", {}).get(
"m.room.redaction"
)
if room_redaction_level:
if sender_level < room_redaction_level:
continue

if sender_level > redact_level:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if sender_level > redact_level:
if sender_level >= redact_level:

ids_to_redact = (
await self.store.get_events_sent_by_user_in_room(
event.state_key, event.room_id, limit=MAX_EVENTS
)
)
if not ids_to_redact:
continue

key_values = [(event.event_id, x) for x in ids_to_redact]
value_values = [
(self._clock.time_msec(),) for x in ids_to_redact
]
Comment on lines +433 to +435
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
value_values = [
(self._clock.time_msec(),) for x in ids_to_redact
]
time_now_ms = self._clock.time_msec()
value_values = [
(time_now_ms,) for x in ids_to_redact
]

rather than calling self._clock.time_msec() repeatedly.

await self.db_pool.simple_upsert_many(
table="redactions",
key_names=["event_id", "redacts"],
key_values=key_values,
value_names=["received_ts"],
value_values=value_values,
desc="redact_on_ban_redaction_txn",
)

redacted_events = await self.store.get_events_as_list(
ids_to_redact
)
for redacted_event in redacted_events:
redacted_event.unsigned["redacted_because"] = event
Comment on lines +445 to +449
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We appear to pull out redacted_events here, but don't update the event in the DB again.

Can we run a query to update the events in the DB that doesn't require pulling the event contents to the application layer?


# normally the cache entry for a redacted event would be invalidated
# by an arriving redaction event, but since we are not creating redaction
# events we invalidate manually
for id in ids_to_redact:
await self.db_pool.runInteraction(
"invalidate cache",
self.store.invalidate_get_event_cache_after_txn,
id,
)

if new_forward_extremities:
self.store.get_latest_event_ids_in_room.prefill(
(room_id,), frozenset(new_forward_extremities)
Expand Down Expand Up @@ -2768,9 +2850,8 @@ def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None:
self.db_pool.simple_upsert_txn(
txn,
table="redactions",
keyvalues={"event_id": event.event_id},
keyvalues={"event_id": event.event_id, "redacts": event.redacts},
values={
"redacts": event.redacts,
"received_ts": self._clock.time_msec(),
},
)
Expand Down
22 changes: 22 additions & 0 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1962,6 +1962,28 @@ def __init__(
):
super().__init__(database, db_conn, hs)

self.db_pool.updates.register_background_index_update(
"redactions_add_event_id_idx",
"redactions_event_id",
"redactions",
["event_id"],
)

self.db_pool.updates.register_background_index_update(
"redactions_add_redacts_idx",
"redactions_redacts",
"redactions",
["redacts"],
)

self.db_pool.updates.register_background_index_update(
"redactions_add_have_censored_ts",
"redactions_have_censored_ts",
"redactions",
["received_ts"],
where_clause="NOT have_censored",
)

self.db_pool.updates.register_background_update_handler(
"insert_room_retention",
self._background_insert_retention,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine


def run_create(
cur: LoggingTransaction,
database_engine: BaseDatabaseEngine,
) -> None:
"""
Drop unique constraint event_id and add unique constraint (event_id, redacts)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why these changes are necessary? As I understand, it's so one event can redact multiple other events.

"""
if isinstance(database_engine, PostgresEngine):
add_constraint_sql = """
ALTER TABLE ONLY redactions ADD CONSTRAINT redactions_event_id_redacts_key UNIQUE (event_id, redacts);
"""
cur.execute(add_constraint_sql)

drop_constraint_sql = """
ALTER TABLE ONLY redactions DROP CONSTRAINT redactions_event_id_key;
"""
cur.execute(drop_constraint_sql)

else:
# in SQLite we need to rewrite the table to change the constraint.
# First drop any temporary table that might be here from a previous failed migration.
cur.execute("DROP TABLE IF EXISTS temp_redactions")

create_sql = """
CREATE TABLE temp_redactions (
event_id TEXT NOT NULL,
redacts TEXT NOT NULL,
have_censored BOOL NOT NULL DEFAULT false,
received_ts BIGINT,
UNIQUE(event_id, redacts)
);
"""
cur.execute(create_sql)

copy_sql = """
INSERT INTO temp_redactions (
event_id,
redacts,
have_censored,
received_ts
) SELECT r.event_id, r.redacts, r.have_censored, r.received_ts FROM redactions AS r;
"""
cur.execute(copy_sql)

drop_sql = """
DROP TABLE redactions
"""
cur.execute(drop_sql)

rename_sql = """
ALTER TABLE temp_redactions RENAME to redactions
"""
cur.execute(rename_sql)

sqlite3_idx_update_sql = """
INSERT INTO background_updates (ordering, update_name, progress_json) \
VALUES (?, ?, ?);
"""
cur.execute(sqlite3_idx_update_sql, (9206, "redactions_add_redacts_idx", "{}"))
cur.execute(
sqlite3_idx_update_sql, (9206, "redactions_add_have_censored_ts", "{}")
)

# in either case the event_id index needs to be re-created
idx_sql = """
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES (?, ?, ?);
"""
cur.execute(idx_sql, (9206, "redactions_add_event_id_idx", "{}"))
Loading
Loading