1
0

Save shutdown and purge state in DB

This commit is contained in:
Mathieu Velten
2023-05-12 15:41:15 +02:00
parent 7d9665e9ec
commit da987b4acc
2 changed files with 93 additions and 39 deletions
+37 -19
View File
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
from typing import TYPE_CHECKING, Dict, List, Optional, Set
@@ -359,6 +360,7 @@ class PaginationHandler:
room_id: str,
delete_id: str,
force: bool = False,
shutdown_response: Optional[ShutdownRoomResponse] = None,
) -> None:
"""Purge the given room from the database.
@@ -366,6 +368,7 @@ class PaginationHandler:
room_id: room to be purged
delete_id: the delete ID for this purge
force: set true to skip checking for joined users.
shutdown_response: optional response coming from the shutdown phase
"""
logger.info("starting purge room_id %s", room_id)
@@ -376,7 +379,22 @@ class PaginationHandler:
if joined:
raise SynapseError(400, "Users are still joined to this room")
await self._storage_controllers.purge_events.purge_room(room_id, delete_id)
await self.store.upsert_room_to_purge(
room_id,
delete_id,
DeleteStatus.STATUS_PURGING,
shutdown_response=json.dumps(shutdown_response),
)
await self._storage_controllers.purge_events.purge_room(room_id)
await self.store.upsert_room_to_purge(
room_id,
delete_id,
DeleteStatus.STATUS_COMPLETE,
timestamp=self.clock.time_msec(),
shutdown_response=json.dumps(shutdown_response),
)
logger.info("purge complete for room_id %s", room_id)
@@ -572,6 +590,7 @@ class PaginationHandler:
room_id: str,
delete_id: str,
shutdown_params: ShutdownRoomParams,
shutdown_response: Optional[ShutdownRoomResponse] = None,
) -> None:
"""
Shuts down and purges a room.
@@ -582,8 +601,9 @@ class PaginationHandler:
delete_id: The ID for this delete.
room_id: The ID of the room to shut down.
shutdown_params: parameters for the shutdown, cf `RoomShutdownHandler.ShutdownRoomParams`
shutdown_response: current status of the shutdown, if it was interrupted
Saves a `RoomShutdownHandler.ShutdownRoomResponse` in `DeleteStatus`:
Saves a `RoomShutdownHandler.ShutdownRoomResponse` in `DeleteStatus` and in DB
"""
self._purges_in_progress_by_room.add(room_id)
@@ -593,6 +613,7 @@ class PaginationHandler:
room_id=room_id,
delete_id=delete_id,
shutdown_params=shutdown_params,
shutdown_response=shutdown_response,
)
self._delete_by_id[delete_id].shutdown_room = shutdown_response
@@ -602,25 +623,16 @@ class PaginationHandler:
room_id,
delete_id,
shutdown_params["force_purge"],
shutdown_response=self._delete_by_id[delete_id].shutdown_room,
)
if purge:
logger.info("starting purge room_id %s", room_id)
# first check that we have no users in this room
if not force_purge:
joined = await self.store.is_host_joined(
room_id, self._server_name
)
if joined:
raise SynapseError(
400, "Users are still joined to this room"
)
await self._storage_controllers.purge_events.purge_room(room_id)
logger.info("purge complete for room_id %s", room_id)
await self.store.upsert_room_to_purge(
room_id,
delete_id,
DeleteStatus.STATUS_COMPLETE,
timestamp=self.clock.time_msec(),
shutdown_response=json.dumps(shutdown_response),
)
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE
except Exception:
f = Failure()
@@ -630,6 +642,12 @@ class PaginationHandler:
)
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_FAILED
self._delete_by_id[delete_id].error = f.getErrorMessage()
await self.store.upsert_room_to_purge(
room_id,
delete_id,
DeleteStatus.STATUS_FAILED,
error=f.getErrorMessage(),
)
finally:
self._purges_in_progress_by_room.discard(room_id)
+56 -20
View File
@@ -14,6 +14,7 @@
"""Contains functions for performing actions on rooms."""
import itertools
import json
import logging
import math
import random
@@ -1821,6 +1822,7 @@ class RoomShutdownHandler:
room_id: str,
delete_id: str,
shutdown_params: ShutdownRoomParams,
shutdown_response: Optional[ShutdownRoomResponse] = None,
) -> ShutdownRoomResponse:
"""
Shuts down a room. Moves all local users and room aliases automatically
@@ -1839,6 +1841,7 @@ class RoomShutdownHandler:
room_id: The ID of the room to shut down.
delete_id: The delete ID identifying this delete request
shutdown_params: parameters for the shutdown, cf `ShutdownRoomParams`
shutdown_response: current status of the shutdown, if it was interrupted
Returns: a dict matching `ShutdownRoomResponse`.
"""
@@ -1868,6 +1871,22 @@ class RoomShutdownHandler:
403, "Shutdown of this room is forbidden", Codes.FORBIDDEN
)
if not shutdown_response:
shutdown_response = {
"kicked_users": [],
"failed_to_kick_users": [],
"local_aliases": [],
"new_room_id": None,
}
await self.store.upsert_room_to_purge(
room_id,
delete_id,
DeleteStatus.STATUS_SHUTTING_DOWN,
shutdown_params=json.dumps(shutdown_params),
shutdown_response=json.dumps(shutdown_response),
)
# Action the block first (even if the room doesn't exist yet)
if block:
# This will work even if the room is already blocked, but that is
@@ -1876,14 +1895,10 @@ class RoomShutdownHandler:
if not await self.store.get_room(room_id):
# if we don't know about the room, there is nothing left to do.
return {
"kicked_users": [],
"failed_to_kick_users": [],
"local_aliases": [],
"new_room_id": None,
}
return shutdown_response
if new_room_user_id is not None:
new_room_id = shutdown_response.get("new_room_id")
if new_room_user_id is not None and new_room_id is None:
if not self.hs.is_mine_id(new_room_user_id):
raise SynapseError(
400, "User must be our own: %s" % (new_room_user_id,)
@@ -1903,6 +1918,15 @@ class RoomShutdownHandler:
ratelimit=False,
)
shutdown_response["new_room_id"] = new_room_id
await self.store.upsert_room_to_purge(
room_id,
delete_id,
DeleteStatus.STATUS_SHUTTING_DOWN,
shutdown_params=json.dumps(shutdown_params),
shutdown_response=json.dumps(shutdown_response),
)
logger.info(
"Shutting down room %r, joining to new room: %r", room_id, new_room_id
)
@@ -1916,12 +1940,9 @@ class RoomShutdownHandler:
stream_id,
)
else:
new_room_id = None
logger.info("Shutting down room %r", room_id)
users = await self.store.get_users_in_room(room_id)
kicked_users = []
failed_to_kick_users = []
for user_id in users:
if not self.hs.is_mine_id(user_id):
continue
@@ -1965,15 +1986,33 @@ class RoomShutdownHandler:
require_consent=False,
)
kicked_users.append(user_id)
shutdown_response["kicked_users"].append(user_id)
await self.store.upsert_room_to_purge(
room_id,
delete_id,
DeleteStatus.STATUS_SHUTTING_DOWN,
shutdown_params=json.dumps(shutdown_params),
shutdown_response=json.dumps(shutdown_response),
)
except Exception:
logger.exception(
"Failed to leave old room and join new room for %r", user_id
)
failed_to_kick_users.append(user_id)
shutdown_response["failed_to_kick_users"].append(user_id)
await self.store.upsert_room_to_purge(
room_id,
delete_id,
DeleteStatus.STATUS_SHUTTING_DOWN,
shutdown_params=json.dumps(shutdown_params),
shutdown_response=json.dumps(shutdown_response),
)
# Send message in new room and move aliases
if new_room_user_id:
room_creator_requester = create_requester(
new_room_user_id, authenticated_entity=requester_user_id
)
await self.event_creation_handler.create_and_send_nonmember_event(
room_creator_requester,
{
@@ -1985,18 +2024,15 @@ class RoomShutdownHandler:
ratelimit=False,
)
aliases_for_room = await self.store.get_aliases_for_room(room_id)
shutdown_response["local_aliases"] = list(
await self.store.get_aliases_for_room(room_id)
)
assert new_room_id is not None
await self.store.update_aliases_for_room(
room_id, new_room_id, requester_user_id
)
else:
aliases_for_room = []
shutdown_response["local_aliases"] = []
return {
"kicked_users": kicked_users,
"failed_to_kick_users": failed_to_kick_users,
"local_aliases": list(aliases_for_room),
"new_room_id": new_room_id,
}
return shutdown_response