Merge branch 'release-v1.135' into develop

This commit is contained in:
Andrew Morgan
2025-07-30 14:04:29 +01:00
15 changed files with 247 additions and 58 deletions

View File

@@ -1,3 +1,16 @@
# Synapse 1.135.0rc2 (2025-07-30)
### Bugfixes
- Fix user failing to deactivate with MAS when `/_synapse/mas` is handled by a worker. ([\#18716](https://github.com/element-hq/synapse/issues/18716))
### Internal Changes
- Fix performance regression introduced in [#18238](https://github.com/element-hq/synapse/issues/18238) by adding a cache to `is_server_admin`. ([\#18747](https://github.com/element-hq/synapse/issues/18747))
# Synapse 1.135.0rc1 (2025-07-22)
### Features

6
debian/changelog vendored
View File

@@ -1,3 +1,9 @@
matrix-synapse-py3 (1.135.0~rc2) stable; urgency=medium
* New Synapse release 1.135.0rc2.
-- Synapse Packaging team <packages@matrix.org> Wed, 30 Jul 2025 12:19:14 +0100
matrix-synapse-py3 (1.135.0~rc1) stable; urgency=medium
* New Synapse release 1.135.0rc1.

View File

@@ -178,6 +178,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"^/_matrix/client/(api/v1|r0|v3|unstable)/login$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/account/3pid$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/account/whoami$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/account/deactivate$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/devices(/|$)",
"^/_matrix/client/(r0|v3)/delete_devices$",
"^/_matrix/client/versions$",

View File

@@ -238,6 +238,7 @@ information.
^/_matrix/client/unstable/im.nheko.summary/summary/.*$
^/_matrix/client/(r0|v3|unstable)/account/3pid$
^/_matrix/client/(r0|v3|unstable)/account/whoami$
^/_matrix/client/(r0|v3|unstable)/account/deactivate$
^/_matrix/client/(r0|v3)/delete_devices$
^/_matrix/client/(api/v1|r0|v3|unstable)/devices(/|$)
^/_matrix/client/versions$

View File

@@ -101,7 +101,7 @@ module-name = "synapse.synapse_rust"
[tool.poetry]
name = "matrix-synapse"
version = "1.135.0rc1"
version = "1.135.0rc2"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "AGPL-3.0-or-later"

View File

@@ -222,6 +222,7 @@ class AuthHandler:
self._password_localdb_enabled = hs.config.auth.password_localdb_enabled
self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
self._account_validity_handler = hs.get_account_validity_handler()
self._pusher_pool = hs.get_pusherpool()
# Ratelimiter for failed auth during UIA. Uses same ratelimit config
# as per `rc_login.failed_attempts`.
@@ -1662,7 +1663,7 @@ class AuthHandler:
)
if medium == "email":
await self.store.delete_pusher_by_app_id_pushkey_user_id(
await self._pusher_pool.remove_pusher(
app_id="m.email", pushkey=address, user_id=user_id
)

View File

@@ -25,6 +25,9 @@ from typing import TYPE_CHECKING, Optional
from synapse.api.constants import Membership
from synapse.api.errors import SynapseError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.deactivate_account import (
ReplicationNotifyAccountDeactivatedServlet,
)
from synapse.types import Codes, Requester, UserID, create_requester
if TYPE_CHECKING:
@@ -45,6 +48,7 @@ class DeactivateAccountHandler:
self._room_member_handler = hs.get_room_member_handler()
self._identity_handler = hs.get_identity_handler()
self._profile_handler = hs.get_profile_handler()
self._pusher_pool = hs.get_pusherpool()
self.user_directory_handler = hs.get_user_directory_handler()
self._server_name = hs.hostname
self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
@@ -53,10 +57,16 @@ class DeactivateAccountHandler:
self._user_parter_running = False
self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
self._notify_account_deactivated_client = None
# Start the user parter loop so it can resume parting users from rooms where
# it left off (if it has work left to do).
if hs.config.worker.run_background_tasks:
if hs.config.worker.worker_app is None:
hs.get_reactor().callWhenRunning(self._start_user_parting)
else:
self._notify_account_deactivated_client = (
ReplicationNotifyAccountDeactivatedServlet.make_client(hs)
)
self._account_validity_enabled = (
hs.config.account_validity.account_validity_enabled
@@ -146,7 +156,7 @@ class DeactivateAccountHandler:
# Most of the pushers will have been deleted when we logged out the
# associated devices above, but we still need to delete pushers not
# associated with devices, e.g. email pushers.
await self.store.delete_all_pushers_for_user(user_id)
await self._pusher_pool.delete_all_pushers_for_user(user_id)
# Add the user to a table of users pending deactivation (ie.
# removal from all the rooms they're a member of)
@@ -170,10 +180,6 @@ class DeactivateAccountHandler:
logger.info("Marking %s as erased", user_id)
await self.store.mark_user_erased(user_id)
# Now start the process that goes through that list and
# parts users from rooms (if it isn't already running)
self._start_user_parting()
# Reject all pending invites and knocks for the user, so that the
# user doesn't show up in the "invited" section of rooms' members list.
await self._reject_pending_invites_and_knocks_for_user(user_id)
@@ -194,15 +200,37 @@ class DeactivateAccountHandler:
# Delete any server-side backup keys
await self.store.bulk_delete_backup_keys_and_versions_for_user(user_id)
# Notify modules and start the room parting process.
await self.notify_account_deactivated(user_id, by_admin=by_admin)
return identity_server_supports_unbinding
async def notify_account_deactivated(
self,
user_id: str,
by_admin: bool = False,
) -> None:
"""Notify modules and start the room parting process.
Goes through replication if this is not the main process.
"""
if self._notify_account_deactivated_client is not None:
await self._notify_account_deactivated_client(
user_id=user_id,
by_admin=by_admin,
)
return
# Now start the process that goes through that list and
# parts users from rooms (if it isn't already running)
self._start_user_parting()
# Let modules know the user has been deactivated.
await self._third_party_rules.on_user_deactivation_status_changed(
user_id,
True,
by_admin,
by_admin=by_admin,
)
return identity_server_supports_unbinding
async def _reject_pending_invites_and_knocks_for_user(self, user_id: str) -> None:
"""Reject pending invites and knocks addressed to a given user ID.

View File

@@ -32,7 +32,10 @@ from synapse.metrics.background_process_metrics import (
)
from synapse.push import Pusher, PusherConfig, PusherConfigException
from synapse.push.pusher import PusherFactory
from synapse.replication.http.push import ReplicationRemovePusherRestServlet
from synapse.replication.http.push import (
ReplicationDeleteAllPushersForUserRestServlet,
ReplicationRemovePusherRestServlet,
)
from synapse.types import JsonDict, RoomStreamToken, StrCollection
from synapse.util.async_helpers import concurrently_execute
from synapse.util.threepids import canonicalise_email
@@ -84,10 +87,14 @@ class PusherPool:
# We can only delete pushers on master.
self._remove_pusher_client = None
self._delete_all_pushers_for_user_client = None
if hs.config.worker.worker_app:
self._remove_pusher_client = ReplicationRemovePusherRestServlet.make_client(
hs
)
self._delete_all_pushers_for_user_client = (
ReplicationDeleteAllPushersForUserRestServlet.make_client(hs)
)
# Record the last stream ID that we were poked about so we can get
# changes since then. We set this to the current max stream ID on
@@ -468,6 +475,13 @@ class PusherPool:
app_id, pushkey, user_id
)
async def delete_all_pushers_for_user(self, user_id: str) -> None:
"""Deletes all pushers for a user."""
if self._delete_all_pushers_for_user_client is not None:
await self._delete_all_pushers_for_user_client(user_id=user_id)
else:
await self.store.delete_all_pushers_for_user(user_id=user_id)
def maybe_stop_pusher(self, app_id: str, pushkey: str, user_id: str) -> None:
"""Stops a pusher with the given app ID and push key if one is running.

View File

@@ -23,6 +23,7 @@ from typing import TYPE_CHECKING
from synapse.http.server import JsonResource
from synapse.replication.http import (
account_data,
deactivate_account,
delayed_events,
devices,
federation,
@@ -64,3 +65,4 @@ class ReplicationRestResource(JsonResource):
login.register_servlets(hs, self)
register.register_servlets(hs, self)
delayed_events.register_servlets(hs, self)
deactivate_account.register_servlets(hs, self)

View File

@@ -0,0 +1,81 @@
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#
import logging
from typing import TYPE_CHECKING, Tuple
from twisted.web.server import Request
from synapse.http.server import HttpServer
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import JsonDict
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class ReplicationNotifyAccountDeactivatedServlet(ReplicationEndpoint):
"""Notify that an account has been deactivated.
Request format:
POST /_synapse/replication/notify_account_deactivated/:user_id
{
"by_admin": true,
}
"""
NAME = "notify_account_deactivated"
PATH_ARGS = ("user_id",)
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.deactivate_account_handler = hs.get_deactivate_account_handler()
@staticmethod
async def _serialize_payload( # type: ignore[override]
user_id: str,
by_admin: bool,
) -> JsonDict:
"""
Args:
user_id: The user ID which has been deactivated.
by_admin: Whether the user was deactivated by an admin.
"""
return {
"by_admin": by_admin,
}
async def _handle_request( # type: ignore[override]
self, request: Request, content: JsonDict, user_id: str
) -> Tuple[int, JsonDict]:
by_admin = content["by_admin"]
await self.deactivate_account_handler.notify_account_deactivated(
user_id, by_admin=by_admin
)
return 200, {}
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ReplicationNotifyAccountDeactivatedServlet(hs).register(http_server)

View File

@@ -118,6 +118,39 @@ class ReplicationCopyPusherRestServlet(ReplicationEndpoint):
return 200, {}
class ReplicationDeleteAllPushersForUserRestServlet(ReplicationEndpoint):
"""Deletes all pushers for a user.
Request format:
POST /_synapse/replication/delete_all_pushers_for_user/:user_id
{}
"""
NAME = "delete_all_pushers_for_user"
PATH_ARGS = ("user_id",)
CACHE = False
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self._store = hs.get_datastores().main
@staticmethod
async def _serialize_payload(user_id: str) -> JsonDict: # type: ignore[override]
return {}
async def _handle_request( # type: ignore[override]
self, request: Request, content: JsonDict, user_id: str
) -> Tuple[int, JsonDict]:
await self._store.delete_all_pushers_for_user(user_id)
return 200, {}
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ReplicationRemovePusherRestServlet(hs).register(http_server)
ReplicationCopyPusherRestServlet(hs).register(http_server)
ReplicationDeleteAllPushersForUserRestServlet(hs).register(http_server)

View File

@@ -905,23 +905,26 @@ class AccountStatusRestServlet(RestServlet):
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ThreepidRestServlet(hs).register(http_server)
WhoamiRestServlet(hs).register(http_server)
if not hs.config.experimental.msc3861.enabled:
DeactivateAccountRestServlet(hs).register(http_server)
# These servlets are only registered on the main process
if hs.config.worker.worker_app is None:
ThreepidBindRestServlet(hs).register(http_server)
ThreepidUnbindRestServlet(hs).register(http_server)
if not hs.config.experimental.msc3861.enabled:
EmailPasswordRequestTokenRestServlet(hs).register(http_server)
DeactivateAccountRestServlet(hs).register(http_server)
PasswordRestServlet(hs).register(http_server)
EmailThreepidRequestTokenRestServlet(hs).register(http_server)
MsisdnThreepidRequestTokenRestServlet(hs).register(http_server)
AddThreepidEmailSubmitTokenServlet(hs).register(http_server)
AddThreepidMsisdnSubmitTokenServlet(hs).register(http_server)
ThreepidRestServlet(hs).register(http_server)
if hs.config.worker.worker_app is None:
ThreepidBindRestServlet(hs).register(http_server)
ThreepidUnbindRestServlet(hs).register(http_server)
if not hs.config.experimental.msc3861.enabled:
ThreepidAddRestServlet(hs).register(http_server)
ThreepidDeleteRestServlet(hs).register(http_server)
WhoamiRestServlet(hs).register(http_server)
if hs.config.worker.worker_app is None and hs.config.experimental.msc3720_enabled:
if hs.config.experimental.msc3720_enabled:
AccountStatusRestServlet(hs).register(http_server)

View File

@@ -673,6 +673,7 @@ class RegistrationWorkerStore(StatsStore, CacheInvalidationWorkerStore):
desc="delete_account_validity_for_user",
)
@cached(max_entries=100000)
async def is_server_admin(self, user: UserID) -> bool:
"""Determines if a user is an admin of this homeserver.
@@ -707,6 +708,9 @@ class RegistrationWorkerStore(StatsStore, CacheInvalidationWorkerStore):
self._invalidate_cache_and_stream(
txn, self.get_user_by_id, (user.to_string(),)
)
self._invalidate_cache_and_stream(
txn, self.is_server_admin, (user.to_string(),)
)
await self.db_pool.runInteraction("set_server_admin", set_server_admin_txn)
@@ -2596,6 +2600,36 @@ class RegistrationWorkerStore(StatsStore, CacheInvalidationWorkerStore):
await self.db_pool.runInteraction("delete_access_token", f)
async def user_set_password_hash(
self, user_id: str, password_hash: Optional[str]
) -> None:
"""
NB. This does *not* evict any cache because the one use for this
removes most of the entries subsequently anyway so it would be
pointless. Use flush_user separately.
"""
def user_set_password_hash_txn(txn: LoggingTransaction) -> None:
self.db_pool.simple_update_one_txn(
txn, "users", {"name": user_id}, {"password_hash": password_hash}
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
await self.db_pool.runInteraction(
"user_set_password_hash", user_set_password_hash_txn
)
async def add_user_pending_deactivation(self, user_id: str) -> None:
"""
Adds a user to the table of users who need to be parted from all the rooms they're
in
"""
await self.db_pool.simple_insert(
"users_pending_deactivation",
values={"user_id": user_id},
desc="add_user_pending_deactivation",
)
class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
def __init__(
@@ -2820,25 +2854,6 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
return next_id
async def user_set_password_hash(
self, user_id: str, password_hash: Optional[str]
) -> None:
"""
NB. This does *not* evict any cache because the one use for this
removes most of the entries subsequently anyway so it would be
pointless. Use flush_user separately.
"""
def user_set_password_hash_txn(txn: LoggingTransaction) -> None:
self.db_pool.simple_update_one_txn(
txn, "users", {"name": user_id}, {"password_hash": password_hash}
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
await self.db_pool.runInteraction(
"user_set_password_hash", user_set_password_hash_txn
)
async def user_set_consent_version(
self, user_id: str, consent_version: str
) -> None:
@@ -2891,17 +2906,6 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
await self.db_pool.runInteraction("user_set_consent_server_notice_sent", f)
async def add_user_pending_deactivation(self, user_id: str) -> None:
"""
Adds a user to the table of users who need to be parted from all the rooms they're
in
"""
await self.db_pool.simple_insert(
"users_pending_deactivation",
values={"user_id": user_id},
desc="add_user_pending_deactivation",
)
async def validate_threepid_session(
self, session_id: str, client_secret: str, token: str, current_ts: int
) -> Optional[str]:

View File

@@ -70,8 +70,6 @@ class UserErasureWorkerStore(CacheInvalidationWorkerStore):
return {u: u in erased_users for u in user_ids}
class UserErasureStore(UserErasureWorkerStore):
async def mark_user_erased(self, user_id: str) -> None:
"""Indicate that user_id wishes their message history to be erased.
@@ -113,3 +111,7 @@ class UserErasureStore(UserErasureWorkerStore):
self._invalidate_cache_and_stream(txn, self.is_user_erased, (user_id,))
await self.db_pool.runInteraction("mark_user_not_erased", f)
class UserErasureStore(UserErasureWorkerStore):
pass

View File

@@ -1181,7 +1181,7 @@ class BundledAggregationsTestCase(BaseRelationsTestCase):
bundled_aggregations,
)
self._test_bundled_aggregations(RelationTypes.REFERENCE, assert_annotations, 9)
self._test_bundled_aggregations(RelationTypes.REFERENCE, assert_annotations, 7)
def test_thread(self) -> None:
"""
@@ -1226,21 +1226,21 @@ class BundledAggregationsTestCase(BaseRelationsTestCase):
# The "user" sent the root event and is making queries for the bundled
# aggregations: they have participated.
self._test_bundled_aggregations(RelationTypes.THREAD, _gen_assert(True), 10)
self._test_bundled_aggregations(RelationTypes.THREAD, _gen_assert(True), 7)
# The "user2" sent replies in the thread and is making queries for the
# bundled aggregations: they have participated.
#
# Note that this re-uses some cached values, so the total number of
# queries is much smaller.
self._test_bundled_aggregations(
RelationTypes.THREAD, _gen_assert(True), 7, access_token=self.user2_token
RelationTypes.THREAD, _gen_assert(True), 4, access_token=self.user2_token
)
# A user with no interactions with the thread: they have not participated.
user3_id, user3_token = self._create_user("charlie")
self.helper.join(self.room, user=user3_id, tok=user3_token)
self._test_bundled_aggregations(
RelationTypes.THREAD, _gen_assert(False), 7, access_token=user3_token
RelationTypes.THREAD, _gen_assert(False), 4, access_token=user3_token
)
def test_thread_with_bundled_aggregations_for_latest(self) -> None:
@@ -1287,7 +1287,7 @@ class BundledAggregationsTestCase(BaseRelationsTestCase):
bundled_aggregations["latest_event"].get("unsigned"),
)
self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 10)
self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 7)
def test_nested_thread(self) -> None:
"""