Fix deactivation running off the main process (#18716)
Best reviewed commit by commit. With the new dedicated MAS API (https://github.com/element-hq/synapse/pull/18520), it's possible that deactivation starts off the main process, which was not possible because of a few calls. I basically looked at everything that the deactivation handler was doing, reviewed whether it could run on workers or not, and find a workaround when possible --------- Co-authored-by: Eric Eastwood <erice@element.io>
This commit is contained in:
1
changelog.d/18716.bugfix
Normal file
1
changelog.d/18716.bugfix
Normal file
@@ -0,0 +1 @@
|
|||||||
|
Fix user failing to deactivate with MAS when `/_synapse/mas` is handled by a worker.
|
||||||
@@ -178,6 +178,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
|
|||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/login$",
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/login$",
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/account/3pid$",
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/account/3pid$",
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/account/whoami$",
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/account/whoami$",
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/account/deactivate$",
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/devices(/|$)",
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/devices(/|$)",
|
||||||
"^/_matrix/client/(r0|v3)/delete_devices$",
|
"^/_matrix/client/(r0|v3)/delete_devices$",
|
||||||
"^/_matrix/client/versions$",
|
"^/_matrix/client/versions$",
|
||||||
|
|||||||
@@ -238,6 +238,7 @@ information.
|
|||||||
^/_matrix/client/unstable/im.nheko.summary/summary/.*$
|
^/_matrix/client/unstable/im.nheko.summary/summary/.*$
|
||||||
^/_matrix/client/(r0|v3|unstable)/account/3pid$
|
^/_matrix/client/(r0|v3|unstable)/account/3pid$
|
||||||
^/_matrix/client/(r0|v3|unstable)/account/whoami$
|
^/_matrix/client/(r0|v3|unstable)/account/whoami$
|
||||||
|
^/_matrix/client/(r0|v3|unstable)/account/deactivate$
|
||||||
^/_matrix/client/(r0|v3)/delete_devices$
|
^/_matrix/client/(r0|v3)/delete_devices$
|
||||||
^/_matrix/client/(api/v1|r0|v3|unstable)/devices(/|$)
|
^/_matrix/client/(api/v1|r0|v3|unstable)/devices(/|$)
|
||||||
^/_matrix/client/versions$
|
^/_matrix/client/versions$
|
||||||
|
|||||||
@@ -220,6 +220,7 @@ class AuthHandler:
|
|||||||
self._password_localdb_enabled = hs.config.auth.password_localdb_enabled
|
self._password_localdb_enabled = hs.config.auth.password_localdb_enabled
|
||||||
self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
|
self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
|
||||||
self._account_validity_handler = hs.get_account_validity_handler()
|
self._account_validity_handler = hs.get_account_validity_handler()
|
||||||
|
self._pusher_pool = hs.get_pusherpool()
|
||||||
|
|
||||||
# Ratelimiter for failed auth during UIA. Uses same ratelimit config
|
# Ratelimiter for failed auth during UIA. Uses same ratelimit config
|
||||||
# as per `rc_login.failed_attempts`.
|
# as per `rc_login.failed_attempts`.
|
||||||
@@ -1652,7 +1653,7 @@ class AuthHandler:
|
|||||||
)
|
)
|
||||||
|
|
||||||
if medium == "email":
|
if medium == "email":
|
||||||
await self.store.delete_pusher_by_app_id_pushkey_user_id(
|
await self._pusher_pool.remove_pusher(
|
||||||
app_id="m.email", pushkey=address, user_id=user_id
|
app_id="m.email", pushkey=address, user_id=user_id
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -25,6 +25,9 @@ from typing import TYPE_CHECKING, Optional
|
|||||||
from synapse.api.constants import Membership
|
from synapse.api.constants import Membership
|
||||||
from synapse.api.errors import SynapseError
|
from synapse.api.errors import SynapseError
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
|
from synapse.replication.http.deactivate_account import (
|
||||||
|
ReplicationNotifyAccountDeactivatedServlet,
|
||||||
|
)
|
||||||
from synapse.types import Codes, Requester, UserID, create_requester
|
from synapse.types import Codes, Requester, UserID, create_requester
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
@@ -44,6 +47,7 @@ class DeactivateAccountHandler:
|
|||||||
self._room_member_handler = hs.get_room_member_handler()
|
self._room_member_handler = hs.get_room_member_handler()
|
||||||
self._identity_handler = hs.get_identity_handler()
|
self._identity_handler = hs.get_identity_handler()
|
||||||
self._profile_handler = hs.get_profile_handler()
|
self._profile_handler = hs.get_profile_handler()
|
||||||
|
self._pusher_pool = hs.get_pusherpool()
|
||||||
self.user_directory_handler = hs.get_user_directory_handler()
|
self.user_directory_handler = hs.get_user_directory_handler()
|
||||||
self._server_name = hs.hostname
|
self._server_name = hs.hostname
|
||||||
self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
|
self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
|
||||||
@@ -52,10 +56,16 @@ class DeactivateAccountHandler:
|
|||||||
self._user_parter_running = False
|
self._user_parter_running = False
|
||||||
self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
|
self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
|
||||||
|
|
||||||
|
self._notify_account_deactivated_client = None
|
||||||
|
|
||||||
# Start the user parter loop so it can resume parting users from rooms where
|
# Start the user parter loop so it can resume parting users from rooms where
|
||||||
# it left off (if it has work left to do).
|
# it left off (if it has work left to do).
|
||||||
if hs.config.worker.run_background_tasks:
|
if hs.config.worker.worker_app is None:
|
||||||
hs.get_reactor().callWhenRunning(self._start_user_parting)
|
hs.get_reactor().callWhenRunning(self._start_user_parting)
|
||||||
|
else:
|
||||||
|
self._notify_account_deactivated_client = (
|
||||||
|
ReplicationNotifyAccountDeactivatedServlet.make_client(hs)
|
||||||
|
)
|
||||||
|
|
||||||
self._account_validity_enabled = (
|
self._account_validity_enabled = (
|
||||||
hs.config.account_validity.account_validity_enabled
|
hs.config.account_validity.account_validity_enabled
|
||||||
@@ -145,7 +155,7 @@ class DeactivateAccountHandler:
|
|||||||
# Most of the pushers will have been deleted when we logged out the
|
# Most of the pushers will have been deleted when we logged out the
|
||||||
# associated devices above, but we still need to delete pushers not
|
# associated devices above, but we still need to delete pushers not
|
||||||
# associated with devices, e.g. email pushers.
|
# associated with devices, e.g. email pushers.
|
||||||
await self.store.delete_all_pushers_for_user(user_id)
|
await self._pusher_pool.delete_all_pushers_for_user(user_id)
|
||||||
|
|
||||||
# Add the user to a table of users pending deactivation (ie.
|
# Add the user to a table of users pending deactivation (ie.
|
||||||
# removal from all the rooms they're a member of)
|
# removal from all the rooms they're a member of)
|
||||||
@@ -169,10 +179,6 @@ class DeactivateAccountHandler:
|
|||||||
logger.info("Marking %s as erased", user_id)
|
logger.info("Marking %s as erased", user_id)
|
||||||
await self.store.mark_user_erased(user_id)
|
await self.store.mark_user_erased(user_id)
|
||||||
|
|
||||||
# Now start the process that goes through that list and
|
|
||||||
# parts users from rooms (if it isn't already running)
|
|
||||||
self._start_user_parting()
|
|
||||||
|
|
||||||
# Reject all pending invites and knocks for the user, so that the
|
# Reject all pending invites and knocks for the user, so that the
|
||||||
# user doesn't show up in the "invited" section of rooms' members list.
|
# user doesn't show up in the "invited" section of rooms' members list.
|
||||||
await self._reject_pending_invites_and_knocks_for_user(user_id)
|
await self._reject_pending_invites_and_knocks_for_user(user_id)
|
||||||
@@ -193,15 +199,37 @@ class DeactivateAccountHandler:
|
|||||||
# Delete any server-side backup keys
|
# Delete any server-side backup keys
|
||||||
await self.store.bulk_delete_backup_keys_and_versions_for_user(user_id)
|
await self.store.bulk_delete_backup_keys_and_versions_for_user(user_id)
|
||||||
|
|
||||||
|
# Notify modules and start the room parting process.
|
||||||
|
await self.notify_account_deactivated(user_id, by_admin=by_admin)
|
||||||
|
|
||||||
|
return identity_server_supports_unbinding
|
||||||
|
|
||||||
|
async def notify_account_deactivated(
|
||||||
|
self,
|
||||||
|
user_id: str,
|
||||||
|
by_admin: bool = False,
|
||||||
|
) -> None:
|
||||||
|
"""Notify modules and start the room parting process.
|
||||||
|
Goes through replication if this is not the main process.
|
||||||
|
"""
|
||||||
|
if self._notify_account_deactivated_client is not None:
|
||||||
|
await self._notify_account_deactivated_client(
|
||||||
|
user_id=user_id,
|
||||||
|
by_admin=by_admin,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Now start the process that goes through that list and
|
||||||
|
# parts users from rooms (if it isn't already running)
|
||||||
|
self._start_user_parting()
|
||||||
|
|
||||||
# Let modules know the user has been deactivated.
|
# Let modules know the user has been deactivated.
|
||||||
await self._third_party_rules.on_user_deactivation_status_changed(
|
await self._third_party_rules.on_user_deactivation_status_changed(
|
||||||
user_id,
|
user_id,
|
||||||
True,
|
True,
|
||||||
by_admin,
|
by_admin=by_admin,
|
||||||
)
|
)
|
||||||
|
|
||||||
return identity_server_supports_unbinding
|
|
||||||
|
|
||||||
async def _reject_pending_invites_and_knocks_for_user(self, user_id: str) -> None:
|
async def _reject_pending_invites_and_knocks_for_user(self, user_id: str) -> None:
|
||||||
"""Reject pending invites and knocks addressed to a given user ID.
|
"""Reject pending invites and knocks addressed to a given user ID.
|
||||||
|
|
||||||
|
|||||||
@@ -31,7 +31,10 @@ from synapse.metrics.background_process_metrics import (
|
|||||||
)
|
)
|
||||||
from synapse.push import Pusher, PusherConfig, PusherConfigException
|
from synapse.push import Pusher, PusherConfig, PusherConfigException
|
||||||
from synapse.push.pusher import PusherFactory
|
from synapse.push.pusher import PusherFactory
|
||||||
from synapse.replication.http.push import ReplicationRemovePusherRestServlet
|
from synapse.replication.http.push import (
|
||||||
|
ReplicationDeleteAllPushersForUserRestServlet,
|
||||||
|
ReplicationRemovePusherRestServlet,
|
||||||
|
)
|
||||||
from synapse.types import JsonDict, RoomStreamToken, StrCollection
|
from synapse.types import JsonDict, RoomStreamToken, StrCollection
|
||||||
from synapse.util.async_helpers import concurrently_execute
|
from synapse.util.async_helpers import concurrently_execute
|
||||||
from synapse.util.threepids import canonicalise_email
|
from synapse.util.threepids import canonicalise_email
|
||||||
@@ -78,10 +81,14 @@ class PusherPool:
|
|||||||
|
|
||||||
# We can only delete pushers on master.
|
# We can only delete pushers on master.
|
||||||
self._remove_pusher_client = None
|
self._remove_pusher_client = None
|
||||||
|
self._delete_all_pushers_for_user_client = None
|
||||||
if hs.config.worker.worker_app:
|
if hs.config.worker.worker_app:
|
||||||
self._remove_pusher_client = ReplicationRemovePusherRestServlet.make_client(
|
self._remove_pusher_client = ReplicationRemovePusherRestServlet.make_client(
|
||||||
hs
|
hs
|
||||||
)
|
)
|
||||||
|
self._delete_all_pushers_for_user_client = (
|
||||||
|
ReplicationDeleteAllPushersForUserRestServlet.make_client(hs)
|
||||||
|
)
|
||||||
|
|
||||||
# Record the last stream ID that we were poked about so we can get
|
# Record the last stream ID that we were poked about so we can get
|
||||||
# changes since then. We set this to the current max stream ID on
|
# changes since then. We set this to the current max stream ID on
|
||||||
@@ -454,6 +461,13 @@ class PusherPool:
|
|||||||
app_id, pushkey, user_id
|
app_id, pushkey, user_id
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def delete_all_pushers_for_user(self, user_id: str) -> None:
|
||||||
|
"""Deletes all pushers for a user."""
|
||||||
|
if self._delete_all_pushers_for_user_client is not None:
|
||||||
|
await self._delete_all_pushers_for_user_client(user_id=user_id)
|
||||||
|
else:
|
||||||
|
await self.store.delete_all_pushers_for_user(user_id=user_id)
|
||||||
|
|
||||||
def maybe_stop_pusher(self, app_id: str, pushkey: str, user_id: str) -> None:
|
def maybe_stop_pusher(self, app_id: str, pushkey: str, user_id: str) -> None:
|
||||||
"""Stops a pusher with the given app ID and push key if one is running.
|
"""Stops a pusher with the given app ID and push key if one is running.
|
||||||
|
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ from typing import TYPE_CHECKING
|
|||||||
from synapse.http.server import JsonResource
|
from synapse.http.server import JsonResource
|
||||||
from synapse.replication.http import (
|
from synapse.replication.http import (
|
||||||
account_data,
|
account_data,
|
||||||
|
deactivate_account,
|
||||||
delayed_events,
|
delayed_events,
|
||||||
devices,
|
devices,
|
||||||
federation,
|
federation,
|
||||||
@@ -66,3 +67,4 @@ class ReplicationRestResource(JsonResource):
|
|||||||
login.register_servlets(hs, self)
|
login.register_servlets(hs, self)
|
||||||
register.register_servlets(hs, self)
|
register.register_servlets(hs, self)
|
||||||
delayed_events.register_servlets(hs, self)
|
delayed_events.register_servlets(hs, self)
|
||||||
|
deactivate_account.register_servlets(hs, self)
|
||||||
|
|||||||
81
synapse/replication/http/deactivate_account.py
Normal file
81
synapse/replication/http/deactivate_account.py
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
#
|
||||||
|
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||||
|
#
|
||||||
|
# Copyright (C) 2023 New Vector, Ltd
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as
|
||||||
|
# published by the Free Software Foundation, either version 3 of the
|
||||||
|
# License, or (at your option) any later version.
|
||||||
|
#
|
||||||
|
# See the GNU Affero General Public License for more details:
|
||||||
|
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||||
|
#
|
||||||
|
# Originally licensed under the Apache License, Version 2.0:
|
||||||
|
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
||||||
|
#
|
||||||
|
# [This file includes modifications made by New Vector Limited]
|
||||||
|
#
|
||||||
|
#
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from typing import TYPE_CHECKING, Tuple
|
||||||
|
|
||||||
|
from twisted.web.server import Request
|
||||||
|
|
||||||
|
from synapse.http.server import HttpServer
|
||||||
|
from synapse.replication.http._base import ReplicationEndpoint
|
||||||
|
from synapse.types import JsonDict
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from synapse.server import HomeServer
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ReplicationNotifyAccountDeactivatedServlet(ReplicationEndpoint):
|
||||||
|
"""Notify that an account has been deactivated.
|
||||||
|
|
||||||
|
Request format:
|
||||||
|
|
||||||
|
POST /_synapse/replication/notify_account_deactivated/:user_id
|
||||||
|
|
||||||
|
{
|
||||||
|
"by_admin": true,
|
||||||
|
}
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
NAME = "notify_account_deactivated"
|
||||||
|
PATH_ARGS = ("user_id",)
|
||||||
|
|
||||||
|
def __init__(self, hs: "HomeServer"):
|
||||||
|
super().__init__(hs)
|
||||||
|
self.deactivate_account_handler = hs.get_deactivate_account_handler()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
async def _serialize_payload( # type: ignore[override]
|
||||||
|
user_id: str,
|
||||||
|
by_admin: bool,
|
||||||
|
) -> JsonDict:
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
user_id: The user ID which has been deactivated.
|
||||||
|
by_admin: Whether the user was deactivated by an admin.
|
||||||
|
"""
|
||||||
|
return {
|
||||||
|
"by_admin": by_admin,
|
||||||
|
}
|
||||||
|
|
||||||
|
async def _handle_request( # type: ignore[override]
|
||||||
|
self, request: Request, content: JsonDict, user_id: str
|
||||||
|
) -> Tuple[int, JsonDict]:
|
||||||
|
by_admin = content["by_admin"]
|
||||||
|
await self.deactivate_account_handler.notify_account_deactivated(
|
||||||
|
user_id, by_admin=by_admin
|
||||||
|
)
|
||||||
|
return 200, {}
|
||||||
|
|
||||||
|
|
||||||
|
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||||
|
ReplicationNotifyAccountDeactivatedServlet(hs).register(http_server)
|
||||||
@@ -118,6 +118,39 @@ class ReplicationCopyPusherRestServlet(ReplicationEndpoint):
|
|||||||
return 200, {}
|
return 200, {}
|
||||||
|
|
||||||
|
|
||||||
|
class ReplicationDeleteAllPushersForUserRestServlet(ReplicationEndpoint):
|
||||||
|
"""Deletes all pushers for a user.
|
||||||
|
|
||||||
|
Request format:
|
||||||
|
|
||||||
|
POST /_synapse/replication/delete_all_pushers_for_user/:user_id
|
||||||
|
|
||||||
|
{}
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
NAME = "delete_all_pushers_for_user"
|
||||||
|
PATH_ARGS = ("user_id",)
|
||||||
|
CACHE = False
|
||||||
|
|
||||||
|
def __init__(self, hs: "HomeServer"):
|
||||||
|
super().__init__(hs)
|
||||||
|
|
||||||
|
self._store = hs.get_datastores().main
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
async def _serialize_payload(user_id: str) -> JsonDict: # type: ignore[override]
|
||||||
|
return {}
|
||||||
|
|
||||||
|
async def _handle_request( # type: ignore[override]
|
||||||
|
self, request: Request, content: JsonDict, user_id: str
|
||||||
|
) -> Tuple[int, JsonDict]:
|
||||||
|
await self._store.delete_all_pushers_for_user(user_id)
|
||||||
|
|
||||||
|
return 200, {}
|
||||||
|
|
||||||
|
|
||||||
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||||
ReplicationRemovePusherRestServlet(hs).register(http_server)
|
ReplicationRemovePusherRestServlet(hs).register(http_server)
|
||||||
ReplicationCopyPusherRestServlet(hs).register(http_server)
|
ReplicationCopyPusherRestServlet(hs).register(http_server)
|
||||||
|
ReplicationDeleteAllPushersForUserRestServlet(hs).register(http_server)
|
||||||
|
|||||||
@@ -896,23 +896,26 @@ class AccountStatusRestServlet(RestServlet):
|
|||||||
|
|
||||||
|
|
||||||
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
|
||||||
|
ThreepidRestServlet(hs).register(http_server)
|
||||||
|
WhoamiRestServlet(hs).register(http_server)
|
||||||
|
|
||||||
|
if not hs.config.experimental.msc3861.enabled:
|
||||||
|
DeactivateAccountRestServlet(hs).register(http_server)
|
||||||
|
|
||||||
|
# These servlets are only registered on the main process
|
||||||
if hs.config.worker.worker_app is None:
|
if hs.config.worker.worker_app is None:
|
||||||
|
ThreepidBindRestServlet(hs).register(http_server)
|
||||||
|
ThreepidUnbindRestServlet(hs).register(http_server)
|
||||||
|
|
||||||
if not hs.config.experimental.msc3861.enabled:
|
if not hs.config.experimental.msc3861.enabled:
|
||||||
EmailPasswordRequestTokenRestServlet(hs).register(http_server)
|
EmailPasswordRequestTokenRestServlet(hs).register(http_server)
|
||||||
DeactivateAccountRestServlet(hs).register(http_server)
|
|
||||||
PasswordRestServlet(hs).register(http_server)
|
PasswordRestServlet(hs).register(http_server)
|
||||||
EmailThreepidRequestTokenRestServlet(hs).register(http_server)
|
EmailThreepidRequestTokenRestServlet(hs).register(http_server)
|
||||||
MsisdnThreepidRequestTokenRestServlet(hs).register(http_server)
|
MsisdnThreepidRequestTokenRestServlet(hs).register(http_server)
|
||||||
AddThreepidEmailSubmitTokenServlet(hs).register(http_server)
|
AddThreepidEmailSubmitTokenServlet(hs).register(http_server)
|
||||||
AddThreepidMsisdnSubmitTokenServlet(hs).register(http_server)
|
AddThreepidMsisdnSubmitTokenServlet(hs).register(http_server)
|
||||||
ThreepidRestServlet(hs).register(http_server)
|
|
||||||
if hs.config.worker.worker_app is None:
|
|
||||||
ThreepidBindRestServlet(hs).register(http_server)
|
|
||||||
ThreepidUnbindRestServlet(hs).register(http_server)
|
|
||||||
if not hs.config.experimental.msc3861.enabled:
|
|
||||||
ThreepidAddRestServlet(hs).register(http_server)
|
ThreepidAddRestServlet(hs).register(http_server)
|
||||||
ThreepidDeleteRestServlet(hs).register(http_server)
|
ThreepidDeleteRestServlet(hs).register(http_server)
|
||||||
WhoamiRestServlet(hs).register(http_server)
|
|
||||||
|
|
||||||
if hs.config.worker.worker_app is None and hs.config.experimental.msc3720_enabled:
|
if hs.config.experimental.msc3720_enabled:
|
||||||
AccountStatusRestServlet(hs).register(http_server)
|
AccountStatusRestServlet(hs).register(http_server)
|
||||||
|
|||||||
@@ -2596,6 +2596,36 @@ class RegistrationWorkerStore(StatsStore, CacheInvalidationWorkerStore):
|
|||||||
|
|
||||||
await self.db_pool.runInteraction("delete_access_token", f)
|
await self.db_pool.runInteraction("delete_access_token", f)
|
||||||
|
|
||||||
|
async def user_set_password_hash(
|
||||||
|
self, user_id: str, password_hash: Optional[str]
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
NB. This does *not* evict any cache because the one use for this
|
||||||
|
removes most of the entries subsequently anyway so it would be
|
||||||
|
pointless. Use flush_user separately.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def user_set_password_hash_txn(txn: LoggingTransaction) -> None:
|
||||||
|
self.db_pool.simple_update_one_txn(
|
||||||
|
txn, "users", {"name": user_id}, {"password_hash": password_hash}
|
||||||
|
)
|
||||||
|
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
|
||||||
|
|
||||||
|
await self.db_pool.runInteraction(
|
||||||
|
"user_set_password_hash", user_set_password_hash_txn
|
||||||
|
)
|
||||||
|
|
||||||
|
async def add_user_pending_deactivation(self, user_id: str) -> None:
|
||||||
|
"""
|
||||||
|
Adds a user to the table of users who need to be parted from all the rooms they're
|
||||||
|
in
|
||||||
|
"""
|
||||||
|
await self.db_pool.simple_insert(
|
||||||
|
"users_pending_deactivation",
|
||||||
|
values={"user_id": user_id},
|
||||||
|
desc="add_user_pending_deactivation",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
|
class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
|
||||||
def __init__(
|
def __init__(
|
||||||
@@ -2820,25 +2850,6 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
|
|||||||
|
|
||||||
return next_id
|
return next_id
|
||||||
|
|
||||||
async def user_set_password_hash(
|
|
||||||
self, user_id: str, password_hash: Optional[str]
|
|
||||||
) -> None:
|
|
||||||
"""
|
|
||||||
NB. This does *not* evict any cache because the one use for this
|
|
||||||
removes most of the entries subsequently anyway so it would be
|
|
||||||
pointless. Use flush_user separately.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def user_set_password_hash_txn(txn: LoggingTransaction) -> None:
|
|
||||||
self.db_pool.simple_update_one_txn(
|
|
||||||
txn, "users", {"name": user_id}, {"password_hash": password_hash}
|
|
||||||
)
|
|
||||||
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
|
|
||||||
|
|
||||||
await self.db_pool.runInteraction(
|
|
||||||
"user_set_password_hash", user_set_password_hash_txn
|
|
||||||
)
|
|
||||||
|
|
||||||
async def user_set_consent_version(
|
async def user_set_consent_version(
|
||||||
self, user_id: str, consent_version: str
|
self, user_id: str, consent_version: str
|
||||||
) -> None:
|
) -> None:
|
||||||
@@ -2891,17 +2902,6 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
|
|||||||
|
|
||||||
await self.db_pool.runInteraction("user_set_consent_server_notice_sent", f)
|
await self.db_pool.runInteraction("user_set_consent_server_notice_sent", f)
|
||||||
|
|
||||||
async def add_user_pending_deactivation(self, user_id: str) -> None:
|
|
||||||
"""
|
|
||||||
Adds a user to the table of users who need to be parted from all the rooms they're
|
|
||||||
in
|
|
||||||
"""
|
|
||||||
await self.db_pool.simple_insert(
|
|
||||||
"users_pending_deactivation",
|
|
||||||
values={"user_id": user_id},
|
|
||||||
desc="add_user_pending_deactivation",
|
|
||||||
)
|
|
||||||
|
|
||||||
async def validate_threepid_session(
|
async def validate_threepid_session(
|
||||||
self, session_id: str, client_secret: str, token: str, current_ts: int
|
self, session_id: str, client_secret: str, token: str, current_ts: int
|
||||||
) -> Optional[str]:
|
) -> Optional[str]:
|
||||||
|
|||||||
@@ -70,8 +70,6 @@ class UserErasureWorkerStore(CacheInvalidationWorkerStore):
|
|||||||
|
|
||||||
return {u: u in erased_users for u in user_ids}
|
return {u: u in erased_users for u in user_ids}
|
||||||
|
|
||||||
|
|
||||||
class UserErasureStore(UserErasureWorkerStore):
|
|
||||||
async def mark_user_erased(self, user_id: str) -> None:
|
async def mark_user_erased(self, user_id: str) -> None:
|
||||||
"""Indicate that user_id wishes their message history to be erased.
|
"""Indicate that user_id wishes their message history to be erased.
|
||||||
|
|
||||||
@@ -113,3 +111,7 @@ class UserErasureStore(UserErasureWorkerStore):
|
|||||||
self._invalidate_cache_and_stream(txn, self.is_user_erased, (user_id,))
|
self._invalidate_cache_and_stream(txn, self.is_user_erased, (user_id,))
|
||||||
|
|
||||||
await self.db_pool.runInteraction("mark_user_not_erased", f)
|
await self.db_pool.runInteraction("mark_user_not_erased", f)
|
||||||
|
|
||||||
|
|
||||||
|
class UserErasureStore(UserErasureWorkerStore):
|
||||||
|
pass
|
||||||
|
|||||||
Reference in New Issue
Block a user