Skip to content

Move device changes off the main process #18581

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 30 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3c8f07d
Remove the `device_id_exists_cache` from the device store.
sandhose Jun 16, 2025
3a25d31
Move the store/delete/update device methods to the DeviceWorkerStore
sandhose Jun 16, 2025
0392492
Type the store explicitly on the DeviceHandler
sandhose Jun 16, 2025
b853ed1
Config
sandhose Jun 18, 2025
4e867d6
Simplify the creation of MultiWriterStreamToken
sandhose Jun 18, 2025
5b02bd4
Make the device list token a MultiWriterStreamToken
sandhose Jun 20, 2025
d89459f
Move all the device storage methods to workers
sandhose Jun 20, 2025
869b376
Add a replication endpoint for DeviceHandler.notify_device_update
sandhose Jun 20, 2025
3931565
Move _check_device_name_length to a separate function
sandhose Jun 20, 2025
c98cc53
Move DeviceHandler.check_device_registered to work on any worker
sandhose Jun 20, 2025
9cbb3a2
Move delete_devices to work on any worker
sandhose Jun 20, 2025
e32644b
Move update_device and upsert_device to be available on all workers
sandhose Jun 20, 2025
ce21996
Move dehydrated devices operations on workers
sandhose Jun 20, 2025
a6b5b4e
Make notify_user_signature_update avail in workers through replication
sandhose Jun 20, 2025
3c49179
TEMP: newsfile
sandhose Jun 23, 2025
c4eaff4
Instanciate the device handler on device list writers
sandhose Jun 23, 2025
e9ff0e1
Route device list updates EDUs to the right writers
sandhose Jun 23, 2025
340d06b
Register replication endpoints on device writers
sandhose Jun 23, 2025
6246b85
Remove the dependency on DeviceHandler in most places
sandhose Jun 23, 2025
1242a6d
Setup device_lists workers in complement
sandhose Jun 23, 2025
8b31903
Move all the E2E keys store methods to the worker store
sandhose Jun 24, 2025
0861867
Handle signing key updates EDUs on device lists writers
sandhose Jun 24, 2025
d4af1c2
Poke the fed sender if it's on the same instance as device list writers
sandhose Jun 24, 2025
2ac8e0e
Allow more than one device list worker
sandhose Jun 24, 2025
4459831
Consolidate multi user device resyncs through the DeviceListUpdater
sandhose Jun 25, 2025
7f2919e
Only handle signing key updates on a single device_lists writer
sandhose Jun 25, 2025
a2c0315
Run two device_lists writers in complement
sandhose Jun 25, 2025
4cf7dd3
Run _handle_new_device_update_async on the first device list writer only
sandhose Jun 25, 2025
71c5566
Ensure a few operations are only running on the first device list writer
sandhose Jun 25, 2025
9392adc
Run the 'delete_stale_devices' background task on the background worker
sandhose Jun 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18581.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Temporary for CI to pass.
1 change: 1 addition & 0 deletions docker/complement/conf/start_for_complement.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then
client_reader, \
appservice, \
pusher, \
device_lists:2, \
stream_writers=account_data+presence+receipts+to_device+typing"

fi
Expand Down
34 changes: 16 additions & 18 deletions docker/configure_workers_and_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,15 @@
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"device_lists": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": [
"^/_matrix/client/(api/v1|r0|v3|unstable)/keys/signatures/upload$"
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"typing": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
Expand Down Expand Up @@ -412,16 +421,17 @@ def add_worker_roles_to_shared_config(
# streams
instance_map = shared_config.setdefault("instance_map", {})

# This is a list of the stream_writers that there can be only one of. Events can be
# sharded, and therefore doesn't belong here.
singular_stream_writers = [
# This is a list of the stream_writers.
stream_writers = {
"account_data",
"events",
"device_lists",
"presence",
"receipts",
"to_device",
"typing",
"push_rules",
]
}

# Worker-type specific sharding config. Now a single worker can fulfill multiple
# roles, check each.
Expand All @@ -434,25 +444,13 @@ def add_worker_roles_to_shared_config(
if "event_persister" in worker_types_set:
# Event persisters write to the events stream, so we need to update
# the list of event stream writers
shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
worker_name
)
worker_types_set.add("events")

# Map of stream writer instance names to host/ports combos
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
instance_map[worker_name] = {
"path": f"/run/worker.{worker_port}",
}
else:
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
# Update the list of stream writers. It's convenient that the name of the worker
# type is the same as the stream to write. Iterate over the whole list in case there
# is more than one.
for worker in worker_types_set:
if worker in singular_stream_writers:
if worker in stream_writers:
shared_config.setdefault("stream_writers", {}).setdefault(
worker, []
).append(worker_name)
Expand Down
2 changes: 2 additions & 0 deletions docs/usage/configuration/config_documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -4291,6 +4291,8 @@ This setting has the following sub-options:

* `push_rules` (string): Name of a worker assigned to the `push_rules` stream.

* `device_lists` (string): Name of a worker assigned to the `device_lists` stream.

Example configuration:
```yaml
stream_writers:
Expand Down
3 changes: 3 additions & 0 deletions schema/synapse-config.schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5323,6 +5323,9 @@ properties:
push_rules:
type: string
description: Name of a worker assigned to the `push_rules` stream.
device_lists:
type: string
description: Name of a worker assigned to the `device_lists` stream.
default: {}
examples:
- events: worker1
Expand Down
14 changes: 12 additions & 2 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,13 @@ class WriterLocations:
can only be a single instance.
account_data: The instances that write to the account data streams. Currently
can only be a single instance.
receipts: The instances that write to the receipts stream. Currently
can only be a single instance.
receipts: The instances that write to the receipts stream.
presence: The instances that write to the presence stream. Currently
can only be a single instance.
push_rules: The instances that write to the push stream. Currently
can only be a single instance.
device_lists: The instances that write to the device list stream. Currently
can only be a single instance.
"""

events: List[str] = attr.ib(
Expand Down Expand Up @@ -194,6 +195,10 @@ class WriterLocations:
default=["master"],
converter=_instance_to_list_converter,
)
device_lists: List[str] = attr.ib(
default=["master"],
converter=_instance_to_list_converter,
)


@attr.s(auto_attribs=True)
Expand Down Expand Up @@ -415,6 +420,11 @@ def read_config(
"Must only specify one instance to handle `push` messages."
)

if len(self.writers.device_lists) == 0:
raise ConfigError(
"Must specify at least one instance to handle `device_lists` messages."
)

self.events_shard_config = RoutableShardedWorkerHandlingConfig(
self.writers.events
)
Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,8 @@ async def _get_device_list_summary(

# Fetch the users who have modified their device list since then.
users_with_changed_device_lists = await self.store.get_all_devices_changed(
from_key, to_key=new_key
MultiWriterStreamToken(stream=from_key),
to_key=MultiWriterStreamToken(stream=new_key),
)

# Filter out any users the application service is not interested in
Expand Down
5 changes: 0 additions & 5 deletions synapse/handlers/deactivate_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

from synapse.api.constants import Membership
from synapse.api.errors import SynapseError
from synapse.handlers.device import DeviceHandler
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import Codes, Requester, UserID, create_requester

Expand Down Expand Up @@ -84,10 +83,6 @@ async def deactivate_account(
Returns:
True if identity server supports removing threepids, otherwise False.
"""

# This can only be called on the main process.
assert isinstance(self._device_handler, DeviceHandler)

# Check if this user can be deactivated
if not await self._third_party_rules.check_can_deactivate_user(
user_id, by_admin
Expand Down
Loading
Loading