From da987b4acc58a04acd9a38a8f8afd809ffd1b8d4 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Fri, 12 May 2023 15:41:15 +0200 Subject: [PATCH] Save shutdown and purge state in DB --- synapse/handlers/pagination.py | 56 ++++++++++++++++--------- synapse/handlers/room.py | 76 +++++++++++++++++++++++++--------- 2 files changed, 93 insertions(+), 39 deletions(-) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 4c4c159218..2ddada8a56 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import json import logging from typing import TYPE_CHECKING, Dict, List, Optional, Set @@ -359,6 +360,7 @@ class PaginationHandler: room_id: str, delete_id: str, force: bool = False, + shutdown_response: Optional[ShutdownRoomResponse] = None, ) -> None: """Purge the given room from the database. @@ -366,6 +368,7 @@ class PaginationHandler: room_id: room to be purged delete_id: the delete ID for this purge force: set true to skip checking for joined users. + shutdown_response: optional response coming from the shutdown phase """ logger.info("starting purge room_id %s", room_id) @@ -376,7 +379,22 @@ class PaginationHandler: if joined: raise SynapseError(400, "Users are still joined to this room") - await self._storage_controllers.purge_events.purge_room(room_id, delete_id) + await self.store.upsert_room_to_purge( + room_id, + delete_id, + DeleteStatus.STATUS_PURGING, + shutdown_response=json.dumps(shutdown_response), + ) + + await self._storage_controllers.purge_events.purge_room(room_id) + + await self.store.upsert_room_to_purge( + room_id, + delete_id, + DeleteStatus.STATUS_COMPLETE, + timestamp=self.clock.time_msec(), + shutdown_response=json.dumps(shutdown_response), + ) logger.info("purge complete for room_id %s", room_id) @@ -572,6 +590,7 @@ class PaginationHandler: room_id: str, delete_id: str, shutdown_params: ShutdownRoomParams, + shutdown_response: Optional[ShutdownRoomResponse] = None, ) -> None: """ Shuts down and purges a room. @@ -582,8 +601,9 @@ class PaginationHandler: delete_id: The ID for this delete. room_id: The ID of the room to shut down. shutdown_params: parameters for the shutdown, cf `RoomShutdownHandler.ShutdownRoomParams` + shutdown_response: current status of the shutdown, if it was interrupted - Saves a `RoomShutdownHandler.ShutdownRoomResponse` in `DeleteStatus`: + Saves a `RoomShutdownHandler.ShutdownRoomResponse` in `DeleteStatus` and in DB """ self._purges_in_progress_by_room.add(room_id) @@ -593,6 +613,7 @@ class PaginationHandler: room_id=room_id, delete_id=delete_id, shutdown_params=shutdown_params, + shutdown_response=shutdown_response, ) self._delete_by_id[delete_id].shutdown_room = shutdown_response @@ -602,25 +623,16 @@ class PaginationHandler: room_id, delete_id, shutdown_params["force_purge"], + shutdown_response=self._delete_by_id[delete_id].shutdown_room, ) - - if purge: - logger.info("starting purge room_id %s", room_id) - - # first check that we have no users in this room - if not force_purge: - joined = await self.store.is_host_joined( - room_id, self._server_name - ) - if joined: - raise SynapseError( - 400, "Users are still joined to this room" - ) - - await self._storage_controllers.purge_events.purge_room(room_id) - - logger.info("purge complete for room_id %s", room_id) + await self.store.upsert_room_to_purge( + room_id, + delete_id, + DeleteStatus.STATUS_COMPLETE, + timestamp=self.clock.time_msec(), + shutdown_response=json.dumps(shutdown_response), + ) self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE except Exception: f = Failure() @@ -630,6 +642,12 @@ class PaginationHandler: ) self._delete_by_id[delete_id].status = DeleteStatus.STATUS_FAILED self._delete_by_id[delete_id].error = f.getErrorMessage() + await self.store.upsert_room_to_purge( + room_id, + delete_id, + DeleteStatus.STATUS_FAILED, + error=f.getErrorMessage(), + ) finally: self._purges_in_progress_by_room.discard(room_id) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index bd7dc79afb..73810f66d1 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -14,6 +14,7 @@ """Contains functions for performing actions on rooms.""" import itertools +import json import logging import math import random @@ -1821,6 +1822,7 @@ class RoomShutdownHandler: room_id: str, delete_id: str, shutdown_params: ShutdownRoomParams, + shutdown_response: Optional[ShutdownRoomResponse] = None, ) -> ShutdownRoomResponse: """ Shuts down a room. Moves all local users and room aliases automatically @@ -1839,6 +1841,7 @@ class RoomShutdownHandler: room_id: The ID of the room to shut down. delete_id: The delete ID identifying this delete request shutdown_params: parameters for the shutdown, cf `ShutdownRoomParams` + shutdown_response: current status of the shutdown, if it was interrupted Returns: a dict matching `ShutdownRoomResponse`. """ @@ -1868,6 +1871,22 @@ class RoomShutdownHandler: 403, "Shutdown of this room is forbidden", Codes.FORBIDDEN ) + if not shutdown_response: + shutdown_response = { + "kicked_users": [], + "failed_to_kick_users": [], + "local_aliases": [], + "new_room_id": None, + } + + await self.store.upsert_room_to_purge( + room_id, + delete_id, + DeleteStatus.STATUS_SHUTTING_DOWN, + shutdown_params=json.dumps(shutdown_params), + shutdown_response=json.dumps(shutdown_response), + ) + # Action the block first (even if the room doesn't exist yet) if block: # This will work even if the room is already blocked, but that is @@ -1876,14 +1895,10 @@ class RoomShutdownHandler: if not await self.store.get_room(room_id): # if we don't know about the room, there is nothing left to do. - return { - "kicked_users": [], - "failed_to_kick_users": [], - "local_aliases": [], - "new_room_id": None, - } + return shutdown_response - if new_room_user_id is not None: + new_room_id = shutdown_response.get("new_room_id") + if new_room_user_id is not None and new_room_id is None: if not self.hs.is_mine_id(new_room_user_id): raise SynapseError( 400, "User must be our own: %s" % (new_room_user_id,) @@ -1903,6 +1918,15 @@ class RoomShutdownHandler: ratelimit=False, ) + shutdown_response["new_room_id"] = new_room_id + await self.store.upsert_room_to_purge( + room_id, + delete_id, + DeleteStatus.STATUS_SHUTTING_DOWN, + shutdown_params=json.dumps(shutdown_params), + shutdown_response=json.dumps(shutdown_response), + ) + logger.info( "Shutting down room %r, joining to new room: %r", room_id, new_room_id ) @@ -1916,12 +1940,9 @@ class RoomShutdownHandler: stream_id, ) else: - new_room_id = None logger.info("Shutting down room %r", room_id) users = await self.store.get_users_in_room(room_id) - kicked_users = [] - failed_to_kick_users = [] for user_id in users: if not self.hs.is_mine_id(user_id): continue @@ -1965,15 +1986,33 @@ class RoomShutdownHandler: require_consent=False, ) - kicked_users.append(user_id) + shutdown_response["kicked_users"].append(user_id) + await self.store.upsert_room_to_purge( + room_id, + delete_id, + DeleteStatus.STATUS_SHUTTING_DOWN, + shutdown_params=json.dumps(shutdown_params), + shutdown_response=json.dumps(shutdown_response), + ) except Exception: logger.exception( "Failed to leave old room and join new room for %r", user_id ) - failed_to_kick_users.append(user_id) + shutdown_response["failed_to_kick_users"].append(user_id) + await self.store.upsert_room_to_purge( + room_id, + delete_id, + DeleteStatus.STATUS_SHUTTING_DOWN, + shutdown_params=json.dumps(shutdown_params), + shutdown_response=json.dumps(shutdown_response), + ) # Send message in new room and move aliases if new_room_user_id: + room_creator_requester = create_requester( + new_room_user_id, authenticated_entity=requester_user_id + ) + await self.event_creation_handler.create_and_send_nonmember_event( room_creator_requester, { @@ -1985,18 +2024,15 @@ class RoomShutdownHandler: ratelimit=False, ) - aliases_for_room = await self.store.get_aliases_for_room(room_id) + shutdown_response["local_aliases"] = list( + await self.store.get_aliases_for_room(room_id) + ) assert new_room_id is not None await self.store.update_aliases_for_room( room_id, new_room_id, requester_user_id ) else: - aliases_for_room = [] + shutdown_response["local_aliases"] = [] - return { - "kicked_users": kicked_users, - "failed_to_kick_users": failed_to_kick_users, - "local_aliases": list(aliases_for_room), - "new_room_id": new_room_id, - } + return shutdown_response