Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump redis from 6-bullseye to 7-bullseye in /docker #1

Open
wants to merge 8 commits into
base: t2bot.io
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/Dockerfile-workers
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ FROM debian:bullseye-slim AS deps_base
# which makes it much easier to copy (but we need to make sure we use an image
# based on the same debian version as the synapse image, to make sure we get
# the expected version of libc.
FROM redis:6-bullseye AS redis_base
FROM redis:7-bullseye AS redis_base

# now build the final image, based on the the regular Synapse docker image
FROM matrixdotorg/synapse:$SYNAPSE_VERSION
Expand Down
19 changes: 18 additions & 1 deletion synapse/appservice/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
import urllib.parse
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Mapping, Optional, Tuple

Expand Down Expand Up @@ -324,6 +325,10 @@ async def push_bulk(
"left": list(device_list_summary.left),
}

if len(serialized_events) == 0 and len(ephemeral) == 0:
logger.info("Returning early on transaction: no events to send")
return True

try:
await self.put_json(
uri=uri,
Expand Down Expand Up @@ -365,7 +370,19 @@ async def push_bulk(
def _serialize(
self, service: "ApplicationService", events: Iterable[EventBase]
) -> List[JsonDict]:
new_events = []
time_now = self.clock.time_msec()

for event in events:
if int(round(time.time() * 1000)) - event.origin_server_ts > (15 * 60 * 1000):
logger.warning("Dropping event (due to age) %s" % event.event_id)
continue
if service.id != "github" and service.is_interested_in_user(event.sender) and event.sender.endswith(":t2bot.io"):
logger.warning("Dropping event (due to echo) %s" % event.event_id)
continue
logger.info("Allowing @ fallback: %s" % event.event_id)
new_events.append(event)

return [
serialize_event(
e,
Expand All @@ -384,5 +401,5 @@ def _serialize(
),
),
)
for e in events
for e in new_events
]
2 changes: 1 addition & 1 deletion synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ async def get_pdu(
destinations: Collection[str],
event_id: str,
room_version: RoomVersion,
timeout: Optional[int] = None,
timeout: Optional[int] = 15000,
) -> Optional[PulledPduInfo]:
"""Requests the PDU with given origin and ID from the remote home
servers.
Expand Down
2 changes: 1 addition & 1 deletion synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@

# when processing incoming transactions, we try to handle multiple rooms in
# parallel, up to this limit.
TRANSACTION_CONCURRENCY_LIMIT = 10
TRANSACTION_CONCURRENCY_LIMIT = 50 # T2B: Raise from 10

logger = logging.getLogger(__name__)

Expand Down
5 changes: 5 additions & 0 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,11 @@ def send_presence_to_destinations(
for destination in destinations:
if destination == self.server_name:
continue

# T2B: Skip sending presence to servers we know don't support it
if destination == "matrix.org":
continue

if not self._federation_shard_config.should_handle(
self._instance_name, destination
):
Expand Down
2 changes: 1 addition & 1 deletion synapse/federation/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async def get_room_state(
)

async def get_event(
self, destination: str, event_id: str, timeout: Optional[int] = None
self, destination: str, event_id: str, timeout: Optional[int] = 15000
) -> JsonDict:
"""Requests the pdu with give id and origin from the given server.

Expand Down
22 changes: 20 additions & 2 deletions synapse/federation/transport/server/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,32 @@ async def on_PUT(

logger.debug("Decoded %s: %s", transaction_id, str(transaction_data))

edus_before_filter = len(transaction_data.get("edus", []))

filtered_edus = []
for edu in transaction_data.get("edus", []):
edu_type = edu.get('edu_type', 'io.t2bot.ignored')
if edu_type == 'io.t2bot.ignored':
continue
if edu_type == 'm.presence':
continue
if edu_type == 'm.receipt':
continue
if edu_type == 'm.typing':
continue
filtered_edus.append(edu)

logger.info(
"Received txn %s from %s. (PDUs: %d, EDUs: %d)",
"Received txn %s from %s. (PDUs: %d, Accepted EDUs: %d, Ignored EDUs: %d)",
transaction_id,
origin,
len(transaction_data.get("pdus", [])),
len(transaction_data.get("edus", [])),
len(filtered_edus),
edus_before_filter - len(filtered_edus),
)

transaction_data["edus"] = filtered_edus

if issue_8631_logger.isEnabledFor(logging.DEBUG):
DEVICE_UPDATE_EDUS = [
EduTypes.DEVICE_LIST_UPDATE,
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -2151,7 +2151,7 @@ async def _run_push_actions_and_persist_event(
# persist_events_and_notify directly.)
assert not event.internal_metadata.outlier

if not backfilled and not context.rejected:
if False and not backfilled and not context.rejected:
min_depth = await self._store.get_min_depth(event.room_id)
if min_depth is None or min_depth > event.depth:
# XXX richvdh 2021/10/07: I don't really understand what this
Expand Down
7 changes: 4 additions & 3 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -1462,9 +1462,10 @@ async def _persist_events(
a room that has been un-partial stated.
"""

await self._bulk_push_rule_evaluator.action_for_events_by_user(
events_and_context
)
# T2B: Disable push processing.
#await self._bulk_push_rule_evaluator.action_for_events_by_user(
# events_and_context
#)

try:
# If we're a worker we need to hit out to the master.
Expand Down
17 changes: 16 additions & 1 deletion synapse/handlers/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ def __init__(self, hs: "HomeServer"):
# Guard to ensure we only process deltas one at a time
self._is_processing = False

if self.update_user_directory:
# T2B: Disable user directory
if self.update_user_directory and False:
self.notifier.add_replication_callback(self.notify_new_event)

# We kick this off so that we don't have to wait for a change before
Expand Down Expand Up @@ -109,6 +110,11 @@ async def search_users(

def notify_new_event(self) -> None:
"""Called when there may be more deltas to process"""

# T2B: Disable user directory
if True:
return

if not self.update_user_directory:
return

Expand All @@ -133,6 +139,10 @@ async def handle_local_profile_change(
# FIXME(#3714): We should probably do this in the same worker as all
# the other changes.

# T2B: Disable user directory
if True:
return

if await self.store.should_include_local_user_in_dir(user_id):
await self.store.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url
Expand All @@ -142,6 +152,11 @@ async def handle_local_user_deactivated(self, user_id: str) -> None:
"""Called when a user ID is deactivated"""
# FIXME(#3714): We should probably do this in the same worker as all
# the other changes.

# T2B: Disable user directory
if True:
return

await self.store.remove_from_user_dir(user_id)

async def _unsafe_process(self) -> None:
Expand Down