Restore purge and shutdown from DB on startup
It will also launch scheduled purge (`wait_purge` status) hourly
This commit is contained in:
@@ -24,7 +24,7 @@ from synapse.api.constants import Direction, EventTypes, Membership
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.api.filtering import Filter
|
||||
from synapse.events.utils import SerializeEventConfig
|
||||
from synapse.handlers.room import ShutdownRoomResponse
|
||||
from synapse.handlers.room import DeleteStatus, ShutdownRoomParams, ShutdownRoomResponse
|
||||
from synapse.logging.opentracing import trace
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.rest.admin._base import assert_user_is_admin
|
||||
@@ -83,6 +83,9 @@ class PaginationHandler:
|
||||
# when to remove a completed deletion/purge from the results map
|
||||
CLEAR_PURGE_AFTER_MS = 1000 * 3600 * 24 # 24 hours
|
||||
|
||||
# how often to run the purge rooms loop
|
||||
PURGE_ROOMS_PERIOD = 1000 * 3600 # 1 hour
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
@@ -116,6 +119,7 @@ class PaginationHandler:
|
||||
self._retention_allowed_lifetime_max = (
|
||||
hs.config.retention.retention_allowed_lifetime_max
|
||||
)
|
||||
self._purge_retention_period = hs.config.server.purge_retention_period
|
||||
self._is_master = hs.config.worker.worker_app is None
|
||||
|
||||
if hs.config.retention.retention_enabled and self._is_master:
|
||||
@@ -132,6 +136,82 @@ class PaginationHandler:
|
||||
job.longest_max_lifetime,
|
||||
)
|
||||
|
||||
if self._is_master:
|
||||
self.clock.looping_call(
|
||||
run_as_background_process,
|
||||
PaginationHandler.PURGE_ROOMS_PERIOD,
|
||||
"purge_rooms",
|
||||
self.purge_rooms,
|
||||
)
|
||||
|
||||
async def purge_rooms(self) -> None:
|
||||
"""This takes care of restoring unfinished purge/shutdown rooms from the DB.
|
||||
It also takes care to launch scheduled ones, like rooms that has been fully
|
||||
forgotten.
|
||||
|
||||
It should be run regularly.
|
||||
"""
|
||||
rooms_to_purge = await self.store.get_rooms_to_purge()
|
||||
for r in rooms_to_purge:
|
||||
room_id = r["room_id"]
|
||||
delete_id = r["delete_id"]
|
||||
status = r["status"]
|
||||
timestamp = r["timestamp"]
|
||||
|
||||
if (
|
||||
status == DeleteStatus.STATUS_COMPLETE
|
||||
or status == DeleteStatus.STATUS_FAILED
|
||||
):
|
||||
# remove the delete from the list 24 hours after it completes or fails
|
||||
time_since_completed = self.clock.time_msec() - timestamp
|
||||
if time_since_completed >= PaginationHandler.CLEAR_PURGE_AFTER_MS:
|
||||
await self.store.delete_room_to_purge(room_id, delete_id)
|
||||
|
||||
del self._delete_by_id[delete_id]
|
||||
self._delete_by_room[room_id].remove(delete_id)
|
||||
if not self._delete_by_room[room_id]:
|
||||
del self._delete_by_room[room_id]
|
||||
|
||||
continue
|
||||
|
||||
delete_status = self._delete_by_id.get(delete_id)
|
||||
if delete_status is not None:
|
||||
# a delete background task is already running (or has run)
|
||||
# for this delete id, let's ignore it
|
||||
continue
|
||||
|
||||
self._delete_by_id[delete_id] = DeleteStatus()
|
||||
self._delete_by_id[delete_id].status = status
|
||||
self._delete_by_room.setdefault(room_id, []).append(delete_id)
|
||||
|
||||
# restore a shutdown from the DB
|
||||
# also take care of purging if needed
|
||||
shutdown_response = None
|
||||
if status == DeleteStatus.STATUS_SHUTTING_DOWN:
|
||||
shutdown_params = json.loads(r["shutdown_params"])
|
||||
if r["shutdown_response"]:
|
||||
shutdown_response = json.loads(r["shutdown_response"])
|
||||
await self._shutdown_and_purge_room(
|
||||
room_id,
|
||||
delete_id,
|
||||
shutdown_params=shutdown_params,
|
||||
shutdown_response=shutdown_response,
|
||||
)
|
||||
continue
|
||||
|
||||
# launch a purge from the DB
|
||||
# it may be an interrupted purge or a scheduled one
|
||||
purge_now = True if status == DeleteStatus.STATUS_PURGING else False
|
||||
if status == DeleteStatus.STATUS_WAIT_PURGE:
|
||||
if timestamp is None or self.clock.time_msec() >= timestamp:
|
||||
purge_now = True
|
||||
|
||||
# TODO 2 stages purge, keep memberships for a while so we don't "break" sync
|
||||
if purge_now:
|
||||
await self.purge_room(
|
||||
room_id, delete_id, True, shutdown_response=shutdown_response
|
||||
)
|
||||
|
||||
async def purge_history_for_rooms_in_range(
|
||||
self, min_ms: Optional[int], max_ms: Optional[int]
|
||||
) -> None:
|
||||
@@ -651,17 +731,6 @@ class PaginationHandler:
|
||||
finally:
|
||||
self._purges_in_progress_by_room.discard(room_id)
|
||||
|
||||
# remove the delete from the list 24 hours after it completes
|
||||
def clear_delete() -> None:
|
||||
del self._delete_by_id[delete_id]
|
||||
self._delete_by_room[room_id].remove(delete_id)
|
||||
if not self._delete_by_room[room_id]:
|
||||
del self._delete_by_room[room_id]
|
||||
|
||||
self.hs.get_reactor().callLater(
|
||||
PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_delete
|
||||
)
|
||||
|
||||
def start_shutdown_and_purge_room(
|
||||
self,
|
||||
room_id: str,
|
||||
|
||||
Reference in New Issue
Block a user