Use DB for all purge/shutdown actions, including purge history
This commit is contained in:
+126
-111
@@ -14,9 +14,7 @@
|
||||
# limitations under the License.
|
||||
import json
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Dict, List, Optional, Set
|
||||
|
||||
import attr
|
||||
from typing import TYPE_CHECKING, List, Optional, Set
|
||||
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
@@ -29,7 +27,7 @@ from synapse.logging.opentracing import trace
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.rest.admin._base import assert_user_is_admin
|
||||
from synapse.streams.config import PaginationConfig
|
||||
from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType
|
||||
from synapse.types import JsonDict, Requester, StreamKeyType
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.async_helpers import ReadWriteLock
|
||||
from synapse.util.stringutils import random_string
|
||||
@@ -42,37 +40,6 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class PurgeStatus:
|
||||
"""Object tracking the status of a purge request
|
||||
|
||||
This class contains information on the progress of a purge request, for
|
||||
return by get_purge_status.
|
||||
"""
|
||||
|
||||
STATUS_ACTIVE = 0
|
||||
STATUS_COMPLETE = 1
|
||||
STATUS_FAILED = 2
|
||||
|
||||
STATUS_TEXT = {
|
||||
STATUS_ACTIVE: "active",
|
||||
STATUS_COMPLETE: "complete",
|
||||
STATUS_FAILED: "failed",
|
||||
}
|
||||
|
||||
# Save the error message if an error occurs
|
||||
error: str = ""
|
||||
|
||||
# Tracks whether this request has completed. One of STATUS_{ACTIVE,COMPLETE,FAILED}.
|
||||
status: int = STATUS_ACTIVE
|
||||
|
||||
def asdict(self) -> JsonDict:
|
||||
ret = {"status": PurgeStatus.STATUS_TEXT[self.status]}
|
||||
if self.error:
|
||||
ret["error"] = self.error
|
||||
return ret
|
||||
|
||||
|
||||
class PaginationHandler:
|
||||
"""Handles pagination and purge history requests.
|
||||
|
||||
@@ -100,13 +67,6 @@ class PaginationHandler:
|
||||
self.pagination_lock = ReadWriteLock()
|
||||
# IDs of rooms in which there currently an active purge *or delete* operation.
|
||||
self._purges_in_progress_by_room: Set[str] = set()
|
||||
# map from purge id to PurgeStatus
|
||||
self._purges_by_id: Dict[str, PurgeStatus] = {}
|
||||
# map from purge id to DeleteStatus
|
||||
self._delete_by_id: Dict[str, DeleteStatus] = {}
|
||||
# map from room id to delete ids
|
||||
# Dict[`room_id`, List[`delete_id`]]
|
||||
self._delete_by_room: Dict[str, List[str]] = {}
|
||||
self._event_serializer = hs.get_event_client_serializer()
|
||||
|
||||
self._retention_default_max_lifetime = (
|
||||
@@ -151,11 +111,12 @@ class PaginationHandler:
|
||||
|
||||
It should be run regularly.
|
||||
"""
|
||||
rooms_to_purge = await self.store.get_rooms_to_purge()
|
||||
for r in rooms_to_purge:
|
||||
rooms_to_delete = await self.store.get_rooms_to_delete()
|
||||
for r in rooms_to_delete:
|
||||
room_id = r["room_id"]
|
||||
delete_id = r["delete_id"]
|
||||
status = r["status"]
|
||||
action = r["action"]
|
||||
timestamp = r["timestamp"]
|
||||
|
||||
if (
|
||||
@@ -165,32 +126,25 @@ class PaginationHandler:
|
||||
# remove the delete from the list 24 hours after it completes or fails
|
||||
ms_since_completed = self.clock.time_msec() - timestamp
|
||||
if ms_since_completed >= PaginationHandler.CLEAR_PURGE_AFTER_MS:
|
||||
await self.store.delete_room_to_purge(room_id, delete_id)
|
||||
|
||||
del self._delete_by_id[delete_id]
|
||||
self._delete_by_room[room_id].remove(delete_id)
|
||||
if not self._delete_by_room[room_id]:
|
||||
del self._delete_by_room[room_id]
|
||||
await self.store.delete_room_to_delete(room_id, delete_id)
|
||||
|
||||
continue
|
||||
|
||||
delete_status = self._delete_by_id.get(delete_id)
|
||||
if delete_status is not None:
|
||||
if room_id in self._purges_in_progress_by_room:
|
||||
# a delete background task is already running (or has run)
|
||||
# for this delete id, let's ignore it
|
||||
# for this room id, let's ignore it for now
|
||||
continue
|
||||
|
||||
self._delete_by_id[delete_id] = DeleteStatus()
|
||||
self._delete_by_id[delete_id].status = status
|
||||
self._delete_by_room.setdefault(room_id, []).append(delete_id)
|
||||
|
||||
# If the database says we were last in the middle of shutting down the room,
|
||||
# let's continue the shutdown process.
|
||||
shutdown_response = None
|
||||
if status == DeleteStatus.STATUS_SHUTTING_DOWN:
|
||||
shutdown_params = json.loads(r["shutdown_params"])
|
||||
if r["shutdown_response"]:
|
||||
shutdown_response = json.loads(r["shutdown_response"])
|
||||
if (
|
||||
action == DeleteStatus.ACTION_SHUTDOWN
|
||||
and status == DeleteStatus.STATUS_SHUTTING_DOWN
|
||||
):
|
||||
shutdown_params = json.loads(r["params"])
|
||||
if r["response"]:
|
||||
shutdown_response = json.loads(r["response"])
|
||||
await self._shutdown_and_purge_room(
|
||||
room_id,
|
||||
delete_id,
|
||||
@@ -204,16 +158,33 @@ class PaginationHandler:
|
||||
if status == DeleteStatus.STATUS_PURGING:
|
||||
purge_now = True
|
||||
# Or if we're at or past the scheduled purge time, let's start that one as well
|
||||
elif status == DeleteStatus.STATUS_SCHEDULED_PURGE and (
|
||||
elif status == DeleteStatus.STATUS_SCHEDULED and (
|
||||
timestamp is None or self.clock.time_msec() >= timestamp
|
||||
):
|
||||
purge_now = True
|
||||
|
||||
# TODO 2 stages purge, keep memberships for a while so we don't "break" sync
|
||||
if purge_now:
|
||||
await self.purge_room(
|
||||
room_id, delete_id, True, shutdown_response=shutdown_response
|
||||
)
|
||||
params = {}
|
||||
if r["params"]:
|
||||
params = json.loads(r["params"])
|
||||
|
||||
if action == DeleteStatus.ACTION_PURGE_HISTORY:
|
||||
if "token" in params:
|
||||
await self._purge_history(
|
||||
delete_id,
|
||||
room_id,
|
||||
params["token"],
|
||||
params.get("delete_local_events", False),
|
||||
True,
|
||||
)
|
||||
elif action == DeleteStatus.ACTION_PURGE:
|
||||
await self.purge_room(
|
||||
room_id,
|
||||
delete_id,
|
||||
params.get("force", False),
|
||||
shutdown_response=shutdown_response,
|
||||
)
|
||||
|
||||
async def purge_history_for_rooms_in_range(
|
||||
self, min_ms: Optional[int], max_ms: Optional[int]
|
||||
@@ -323,8 +294,6 @@ class PaginationHandler:
|
||||
|
||||
purge_id = random_string(16)
|
||||
|
||||
self._purges_by_id[purge_id] = PurgeStatus()
|
||||
|
||||
logger.info(
|
||||
"Starting purging events in room %s (purge_id %s)" % (room_id, purge_id)
|
||||
)
|
||||
@@ -339,9 +308,10 @@ class PaginationHandler:
|
||||
room_id,
|
||||
token,
|
||||
True,
|
||||
False,
|
||||
)
|
||||
|
||||
def start_purge_history(
|
||||
async def start_purge_history(
|
||||
self, room_id: str, token: str, delete_local_events: bool = False
|
||||
) -> str:
|
||||
"""Start off a history purge on a room.
|
||||
@@ -366,7 +336,16 @@ class PaginationHandler:
|
||||
# request id in the log lines.
|
||||
logger.info("[purge] starting purge_id %s", purge_id)
|
||||
|
||||
self._purges_by_id[purge_id] = PurgeStatus()
|
||||
await self.store.upsert_room_to_delete(
|
||||
room_id,
|
||||
purge_id,
|
||||
DeleteStatus.ACTION_PURGE_HISTORY,
|
||||
DeleteStatus.STATUS_PURGING,
|
||||
params=json.dumps(
|
||||
{"token": token, "delete_local_events": delete_local_events}
|
||||
),
|
||||
)
|
||||
|
||||
run_as_background_process(
|
||||
"purge_history",
|
||||
self._purge_history,
|
||||
@@ -374,11 +353,17 @@ class PaginationHandler:
|
||||
room_id,
|
||||
token,
|
||||
delete_local_events,
|
||||
True,
|
||||
)
|
||||
return purge_id
|
||||
|
||||
async def _purge_history(
|
||||
self, purge_id: str, room_id: str, token: str, delete_local_events: bool
|
||||
self,
|
||||
purge_id: str,
|
||||
room_id: str,
|
||||
token: str,
|
||||
delete_local_events: bool,
|
||||
update_rooms_to_delete_table: bool,
|
||||
) -> None:
|
||||
"""Carry out a history purge on a room.
|
||||
|
||||
@@ -387,6 +372,10 @@ class PaginationHandler:
|
||||
room_id: The room to purge from
|
||||
token: topological token to delete events before
|
||||
delete_local_events: True to delete local events as well as remote ones
|
||||
update_rooms_to_delete_table: True if we don't want to update/persist this
|
||||
purge history action to the DB to be restorable. Used with the retention
|
||||
functionality since we don't need to explicitly restore those, they
|
||||
will be relaunch by the retention logic.
|
||||
"""
|
||||
self._purges_in_progress_by_room.add(room_id)
|
||||
try:
|
||||
@@ -395,48 +384,75 @@ class PaginationHandler:
|
||||
room_id, token, delete_local_events
|
||||
)
|
||||
logger.info("[purge] complete")
|
||||
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
|
||||
if update_rooms_to_delete_table:
|
||||
await self.store.upsert_room_to_delete(
|
||||
room_id,
|
||||
purge_id,
|
||||
DeleteStatus.ACTION_PURGE_HISTORY,
|
||||
DeleteStatus.STATUS_COMPLETE,
|
||||
timestamp=self.clock.time_msec(),
|
||||
)
|
||||
except Exception:
|
||||
f = Failure()
|
||||
logger.error(
|
||||
"[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject())
|
||||
)
|
||||
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
|
||||
self._purges_by_id[purge_id].error = f.getErrorMessage()
|
||||
if update_rooms_to_delete_table:
|
||||
await self.store.upsert_room_to_delete(
|
||||
room_id,
|
||||
purge_id,
|
||||
DeleteStatus.ACTION_PURGE_HISTORY,
|
||||
DeleteStatus.STATUS_FAILED,
|
||||
error=f.getErrorMessage(),
|
||||
timestamp=self.clock.time_msec(),
|
||||
)
|
||||
finally:
|
||||
self._purges_in_progress_by_room.discard(room_id)
|
||||
|
||||
# remove the purge from the list 24 hours after it completes
|
||||
def clear_purge() -> None:
|
||||
del self._purges_by_id[purge_id]
|
||||
if update_rooms_to_delete_table:
|
||||
# remove the purge from the list 24 hours after it completes
|
||||
async def clear_purge() -> None:
|
||||
await self.store.delete_room_to_delete(room_id, purge_id)
|
||||
|
||||
self.hs.get_reactor().callLater(
|
||||
PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge
|
||||
)
|
||||
self.hs.get_reactor().callLater(
|
||||
PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge
|
||||
)
|
||||
|
||||
def get_purge_status(self, purge_id: str) -> Optional[PurgeStatus]:
|
||||
"""Get the current status of an active purge
|
||||
|
||||
Args:
|
||||
purge_id: purge_id returned by start_purge_history
|
||||
"""
|
||||
return self._purges_by_id.get(purge_id)
|
||||
|
||||
def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]:
|
||||
async def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]:
|
||||
"""Get the current status of an active deleting
|
||||
|
||||
Args:
|
||||
delete_id: delete_id returned by start_shutdown_and_purge_room
|
||||
or start_purge_history.
|
||||
"""
|
||||
return self._delete_by_id.get(delete_id)
|
||||
res = await self.store.get_room_to_delete(delete_id)
|
||||
if res:
|
||||
status = DeleteStatus()
|
||||
status.delete_id = res["delete_id"]
|
||||
status.action = res["action"]
|
||||
status.status = res["status"]
|
||||
if status.action == DeleteStatus.ACTION_SHUTDOWN and res["response"]:
|
||||
status.shutdown_room = json.loads(res["response"])
|
||||
return status
|
||||
return None
|
||||
|
||||
def get_delete_ids_by_room(self, room_id: str) -> Optional[StrCollection]:
|
||||
"""Get all active delete ids by room
|
||||
async def get_delete_statuses_by_room(self, room_id: str) -> List[DeleteStatus]:
|
||||
"""Get all active delete statuses by room
|
||||
|
||||
Args:
|
||||
room_id: room_id that is deleted
|
||||
"""
|
||||
return self._delete_by_room.get(room_id)
|
||||
res = await self.store.get_rooms_to_delete(room_id)
|
||||
statuses = []
|
||||
for r in res:
|
||||
status = DeleteStatus()
|
||||
status.delete_id = r["delete_id"]
|
||||
status.action = r["action"]
|
||||
status.status = r["status"]
|
||||
if status.action == DeleteStatus.ACTION_SHUTDOWN and r["response"]:
|
||||
status.shutdown_room = json.loads(r["response"])
|
||||
statuses.append(status)
|
||||
return statuses
|
||||
|
||||
async def purge_room(
|
||||
self,
|
||||
@@ -455,6 +471,10 @@ class PaginationHandler:
|
||||
"""
|
||||
logger.info("starting purge room_id=%s force=%s", room_id, force)
|
||||
|
||||
action = DeleteStatus.ACTION_PURGE
|
||||
if shutdown_response:
|
||||
action = DeleteStatus.ACTION_SHUTDOWN
|
||||
|
||||
async with self.pagination_lock.write(room_id):
|
||||
# first check that we have no users in this room
|
||||
if not force:
|
||||
@@ -462,21 +482,23 @@ class PaginationHandler:
|
||||
if joined:
|
||||
raise SynapseError(400, "Users are still joined to this room")
|
||||
|
||||
await self.store.upsert_room_to_purge(
|
||||
await self.store.upsert_room_to_delete(
|
||||
room_id,
|
||||
delete_id,
|
||||
action,
|
||||
DeleteStatus.STATUS_PURGING,
|
||||
shutdown_response=json.dumps(shutdown_response),
|
||||
response=json.dumps(shutdown_response),
|
||||
)
|
||||
|
||||
await self._storage_controllers.purge_events.purge_room(room_id)
|
||||
|
||||
await self.store.upsert_room_to_purge(
|
||||
await self.store.upsert_room_to_delete(
|
||||
room_id,
|
||||
delete_id,
|
||||
action,
|
||||
DeleteStatus.STATUS_COMPLETE,
|
||||
timestamp=self.clock.time_msec(),
|
||||
shutdown_response=json.dumps(shutdown_response),
|
||||
response=json.dumps(shutdown_response),
|
||||
)
|
||||
|
||||
logger.info("purge complete for room_id %s", room_id)
|
||||
@@ -691,44 +713,41 @@ class PaginationHandler:
|
||||
|
||||
self._purges_in_progress_by_room.add(room_id)
|
||||
try:
|
||||
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
|
||||
shutdown_response = await self._room_shutdown_handler.shutdown_room(
|
||||
room_id=room_id,
|
||||
delete_id=delete_id,
|
||||
shutdown_params=shutdown_params,
|
||||
shutdown_response=shutdown_response,
|
||||
)
|
||||
self._delete_by_id[delete_id].shutdown_room = shutdown_response
|
||||
|
||||
if shutdown_params["purge"]:
|
||||
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_PURGING
|
||||
await self.purge_room(
|
||||
room_id,
|
||||
delete_id,
|
||||
shutdown_params["force_purge"],
|
||||
shutdown_response=self._delete_by_id[delete_id].shutdown_room,
|
||||
shutdown_response=shutdown_response,
|
||||
)
|
||||
|
||||
await self.store.upsert_room_to_purge(
|
||||
await self.store.upsert_room_to_delete(
|
||||
room_id,
|
||||
delete_id,
|
||||
DeleteStatus.ACTION_SHUTDOWN,
|
||||
DeleteStatus.STATUS_COMPLETE,
|
||||
timestamp=self.clock.time_msec(),
|
||||
shutdown_response=json.dumps(shutdown_response),
|
||||
response=json.dumps(shutdown_response),
|
||||
)
|
||||
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE
|
||||
except Exception:
|
||||
f = Failure()
|
||||
logger.error(
|
||||
"failed",
|
||||
exc_info=(f.type, f.value, f.getTracebackObject()),
|
||||
)
|
||||
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_FAILED
|
||||
self._delete_by_id[delete_id].error = f.getErrorMessage()
|
||||
await self.store.upsert_room_to_purge(
|
||||
await self.store.upsert_room_to_delete(
|
||||
room_id,
|
||||
delete_id,
|
||||
DeleteStatus.ACTION_SHUTDOWN,
|
||||
DeleteStatus.STATUS_FAILED,
|
||||
timestamp=self.clock.time_msec(),
|
||||
error=f.getErrorMessage(),
|
||||
)
|
||||
finally:
|
||||
@@ -749,9 +768,7 @@ class PaginationHandler:
|
||||
unique ID for this delete transaction.
|
||||
"""
|
||||
if room_id in self._purges_in_progress_by_room:
|
||||
raise SynapseError(
|
||||
400, "History purge already in progress for %s" % (room_id,)
|
||||
)
|
||||
raise SynapseError(400, "Purge already in progress for %s" % (room_id,))
|
||||
|
||||
# This check is double to `RoomShutdownHandler.shutdown_room`
|
||||
# But here the requester get a direct response / error with HTTP request
|
||||
@@ -773,8 +790,6 @@ class PaginationHandler:
|
||||
delete_id,
|
||||
)
|
||||
|
||||
self._delete_by_id[delete_id] = DeleteStatus()
|
||||
self._delete_by_room.setdefault(room_id, []).append(delete_id)
|
||||
run_as_background_process(
|
||||
"shutdown_and_purge_room",
|
||||
self._shutdown_and_purge_room,
|
||||
|
||||
+30
-15
@@ -1815,13 +1815,21 @@ class DeleteStatus:
|
||||
return by get_delete_status.
|
||||
"""
|
||||
|
||||
ACTION_SHUTDOWN = "shutdown"
|
||||
ACTION_PURGE = "purge"
|
||||
ACTION_PURGE_HISTORY = "purge_history"
|
||||
|
||||
# Scheduled delete waiting to be launch at a specific time
|
||||
STATUS_SCHEDULED = "scheduled"
|
||||
STATUS_SHUTTING_DOWN = "shutting_down"
|
||||
# Scheduled purge waiting to be launch at a specific time
|
||||
STATUS_SCHEDULED_PURGE = "scheduled_purge"
|
||||
STATUS_PURGING = "purging"
|
||||
STATUS_COMPLETE = "complete"
|
||||
STATUS_FAILED = "failed"
|
||||
|
||||
delete_id: str = ""
|
||||
|
||||
action: str = ACTION_PURGE
|
||||
|
||||
# Tracks whether this request has completed.
|
||||
# One of STATUS_{PURGING,COMPLETE,FAILED,SHUTTING_DOWN,WAIT_PURGE}.
|
||||
status: str = STATUS_PURGING
|
||||
@@ -1839,6 +1847,7 @@ class DeleteStatus:
|
||||
|
||||
def asdict(self) -> JsonDict:
|
||||
ret = {
|
||||
"delete_id": self.delete_id,
|
||||
"status": self.status,
|
||||
"shutdown_room": self.shutdown_room,
|
||||
}
|
||||
@@ -1925,12 +1934,13 @@ class RoomShutdownHandler:
|
||||
"new_room_id": None,
|
||||
}
|
||||
|
||||
await self.store.upsert_room_to_purge(
|
||||
await self.store.upsert_room_to_delete(
|
||||
room_id,
|
||||
delete_id,
|
||||
DeleteStatus.ACTION_SHUTDOWN,
|
||||
DeleteStatus.STATUS_SHUTTING_DOWN,
|
||||
shutdown_params=json.dumps(shutdown_params),
|
||||
shutdown_response=json.dumps(shutdown_response),
|
||||
params=json.dumps(shutdown_params),
|
||||
response=json.dumps(shutdown_response),
|
||||
)
|
||||
|
||||
# Action the block first (even if the room doesn't exist yet)
|
||||
@@ -1965,12 +1975,13 @@ class RoomShutdownHandler:
|
||||
)
|
||||
|
||||
shutdown_response["new_room_id"] = new_room_id
|
||||
await self.store.upsert_room_to_purge(
|
||||
await self.store.upsert_room_to_delete(
|
||||
room_id,
|
||||
delete_id,
|
||||
DeleteStatus.ACTION_SHUTDOWN,
|
||||
DeleteStatus.STATUS_SHUTTING_DOWN,
|
||||
shutdown_params=json.dumps(shutdown_params),
|
||||
shutdown_response=json.dumps(shutdown_response),
|
||||
params=json.dumps(shutdown_params),
|
||||
response=json.dumps(shutdown_response),
|
||||
)
|
||||
|
||||
logger.info(
|
||||
@@ -2017,7 +2028,9 @@ class RoomShutdownHandler:
|
||||
stream_id,
|
||||
)
|
||||
|
||||
await self.room_member_handler.forget(target_requester.user, room_id)
|
||||
await self.room_member_handler.forget(
|
||||
target_requester.user, room_id, do_not_schedule_purge=True
|
||||
)
|
||||
|
||||
# Join users to new room
|
||||
if new_room_user_id:
|
||||
@@ -2033,24 +2046,26 @@ class RoomShutdownHandler:
|
||||
)
|
||||
|
||||
shutdown_response["kicked_users"].append(user_id)
|
||||
await self.store.upsert_room_to_purge(
|
||||
await self.store.upsert_room_to_delete(
|
||||
room_id,
|
||||
delete_id,
|
||||
DeleteStatus.ACTION_SHUTDOWN,
|
||||
DeleteStatus.STATUS_SHUTTING_DOWN,
|
||||
shutdown_params=json.dumps(shutdown_params),
|
||||
shutdown_response=json.dumps(shutdown_response),
|
||||
params=json.dumps(shutdown_params),
|
||||
response=json.dumps(shutdown_response),
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Failed to leave old room and join new room for %r", user_id
|
||||
)
|
||||
shutdown_response["failed_to_kick_users"].append(user_id)
|
||||
await self.store.upsert_room_to_purge(
|
||||
await self.store.upsert_room_to_delete(
|
||||
room_id,
|
||||
delete_id,
|
||||
DeleteStatus.ACTION_SHUTDOWN,
|
||||
DeleteStatus.STATUS_SHUTTING_DOWN,
|
||||
shutdown_params=json.dumps(shutdown_params),
|
||||
shutdown_response=json.dumps(shutdown_response),
|
||||
params=json.dumps(shutdown_params),
|
||||
response=json.dumps(shutdown_response),
|
||||
)
|
||||
|
||||
# Send message in new room and move aliases
|
||||
|
||||
@@ -289,7 +289,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
async def forget(self, user: UserID, room_id: str) -> None:
|
||||
async def forget(
|
||||
self, user: UserID, room_id: str, do_not_schedule_purge: bool = False
|
||||
) -> None:
|
||||
user_id = user.to_string()
|
||||
|
||||
member = await self._storage_controllers.state.get_current_state_event(
|
||||
@@ -311,14 +313,17 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
|
||||
# If everyone locally has left the room, then there is no reason for us to keep the
|
||||
# room around and we automatically purge room after a little bit
|
||||
if self._purge_retention_period and await self.store.is_locally_forgotten_room(
|
||||
room_id
|
||||
if (
|
||||
not do_not_schedule_purge
|
||||
and self._purge_retention_period
|
||||
and await self.store.is_locally_forgotten_room(room_id)
|
||||
):
|
||||
delete_id = random_string(16)
|
||||
await self.store.upsert_room_to_purge(
|
||||
await self.store.upsert_room_to_delete(
|
||||
room_id,
|
||||
delete_id,
|
||||
DeleteStatus.STATUS_SCHEDULED_PURGE,
|
||||
DeleteStatus.ACTION_PURGE,
|
||||
DeleteStatus.STATUS_SCHEDULED,
|
||||
timestamp=self.clock.time_msec() + self._purge_retention_period,
|
||||
)
|
||||
|
||||
|
||||
@@ -196,7 +196,7 @@ class PurgeHistoryRestServlet(RestServlet):
|
||||
errcode=Codes.BAD_JSON,
|
||||
)
|
||||
|
||||
purge_id = self.pagination_handler.start_purge_history(
|
||||
purge_id = await self.pagination_handler.start_purge_history(
|
||||
room_id, token, delete_local_events=delete_local_events
|
||||
)
|
||||
|
||||
@@ -215,10 +215,11 @@ class PurgeHistoryStatusRestServlet(RestServlet):
|
||||
) -> Tuple[int, JsonDict]:
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
purge_status = self.pagination_handler.get_purge_status(purge_id)
|
||||
purge_status = await self.pagination_handler.get_delete_status(purge_id)
|
||||
if purge_status is None:
|
||||
raise NotFoundError("purge id '%s' not found" % purge_id)
|
||||
|
||||
# TODO active vs purging etc
|
||||
return HTTPStatus.OK, purge_status.asdict()
|
||||
|
||||
|
||||
|
||||
@@ -154,20 +154,14 @@ class DeleteRoomStatusByRoomIdRestServlet(RestServlet):
|
||||
HTTPStatus.BAD_REQUEST, "%s is not a legal room ID" % (room_id,)
|
||||
)
|
||||
|
||||
delete_ids = self._pagination_handler.get_delete_ids_by_room(room_id)
|
||||
if delete_ids is None:
|
||||
delete_ids = []
|
||||
delete_statuses = await self._pagination_handler.get_delete_statuses_by_room(
|
||||
room_id
|
||||
)
|
||||
|
||||
response = []
|
||||
for delete_id in delete_ids:
|
||||
delete = self._pagination_handler.get_delete_status(delete_id)
|
||||
if delete and delete.status != DeleteStatus.STATUS_SCHEDULED_PURGE:
|
||||
response += [
|
||||
{
|
||||
"delete_id": delete_id,
|
||||
**delete.asdict(),
|
||||
}
|
||||
]
|
||||
for delete_status in delete_statuses:
|
||||
if delete_status.status != DeleteStatus.STATUS_SCHEDULED:
|
||||
response += [delete_status.asdict()]
|
||||
|
||||
if response:
|
||||
return HTTPStatus.OK, {"results": cast(JsonDict, response)}
|
||||
@@ -189,7 +183,7 @@ class DeleteRoomStatusByDeleteIdRestServlet(RestServlet):
|
||||
) -> Tuple[int, JsonDict]:
|
||||
await assert_requester_is_admin(self._auth, request)
|
||||
|
||||
delete_status = self._pagination_handler.get_delete_status(delete_id)
|
||||
delete_status = await self._pagination_handler.get_delete_status(delete_id)
|
||||
if delete_status is None:
|
||||
raise NotFoundError("delete id '%s' not found" % delete_id)
|
||||
|
||||
|
||||
@@ -1284,47 +1284,48 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
# If any rows still exist it means someone has not forgotten this room yet
|
||||
return not rows[0][0]
|
||||
|
||||
async def upsert_room_to_purge(
|
||||
async def upsert_room_to_delete(
|
||||
self,
|
||||
room_id: str,
|
||||
delete_id: str,
|
||||
action: str,
|
||||
status: str,
|
||||
error: Optional[str] = None,
|
||||
timestamp: Optional[int] = None,
|
||||
shutdown_params: Optional[str] = None,
|
||||
shutdown_response: Optional[str] = None,
|
||||
params: Optional[str] = None,
|
||||
response: Optional[str] = None,
|
||||
error: Optional[str] = None,
|
||||
) -> None:
|
||||
"""Insert or update a room to shutdown/purge.
|
||||
|
||||
Args:
|
||||
room_id: The room ID to shutdown/purge
|
||||
delete_id: The delete ID identifying this action
|
||||
action: the type of job, mainly `shutdown` `purge` or `purge_history`
|
||||
status: Current status of the delete. Cf `DeleteStatus` for possible values
|
||||
error: Error message to return, if any
|
||||
timestamp: Time of the last update. If status is `wait_purge`,
|
||||
then it specifies when to do the purge, with an empty value specifying ASAP
|
||||
shutdown_params: JSON representation of shutdown parameters, cf `ShutdownRoomParams`
|
||||
shutdown_response: JSON representation of shutdown current status, cf `ShutdownRoomResponse`
|
||||
error: Error message to return, if any
|
||||
params: JSON representation of delete job parameters
|
||||
response: JSON representation of delete current status
|
||||
"""
|
||||
await self.db_pool.simple_upsert(
|
||||
"rooms_to_purge",
|
||||
"rooms_to_delete",
|
||||
{
|
||||
"room_id": room_id,
|
||||
"delete_id": delete_id,
|
||||
},
|
||||
{
|
||||
"room_id": room_id,
|
||||
"delete_id": delete_id,
|
||||
"action": action,
|
||||
"status": status,
|
||||
"error": error,
|
||||
"timestamp": timestamp,
|
||||
"shutdown_params": shutdown_params,
|
||||
"shutdown_response": shutdown_response,
|
||||
"params": params,
|
||||
"response": response,
|
||||
"error": error,
|
||||
},
|
||||
desc="upsert_room_to_purge",
|
||||
desc="upsert_room_to_delete",
|
||||
)
|
||||
|
||||
async def delete_room_to_purge(self, room_id: str, delete_id: str) -> None:
|
||||
async def delete_room_to_delete(self, room_id: str, delete_id: str) -> None:
|
||||
"""Remove a room from the list of rooms to purge.
|
||||
|
||||
Args:
|
||||
@@ -1333,32 +1334,61 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||
"""
|
||||
|
||||
await self.db_pool.simple_delete(
|
||||
"rooms_to_purge",
|
||||
"rooms_to_delete",
|
||||
keyvalues={
|
||||
"room_id": room_id,
|
||||
"delete_id": delete_id,
|
||||
},
|
||||
desc="delete_room_to_purge",
|
||||
desc="delete_room_to_delete",
|
||||
)
|
||||
|
||||
async def get_rooms_to_purge(self) -> List[Dict[str, Any]]:
|
||||
"""Returns all rooms to shutdown/purge. This includes those that have
|
||||
been interrupted by a stop/restart of synapse, but also scheduled ones
|
||||
async def get_rooms_to_delete(
|
||||
self, room_id: Optional[str] = None
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Returns all delete jobs. This includes those that have been
|
||||
interrupted by a stop/restart of synapse, but also scheduled ones
|
||||
like locally forgotten rooms.
|
||||
|
||||
Args:
|
||||
room_id: if specified, will only return the delete jobs for a specific room
|
||||
|
||||
"""
|
||||
keyvalues = {}
|
||||
if room_id is not None:
|
||||
keyvalues["room_id"] = room_id
|
||||
|
||||
return await self.db_pool.simple_select_list(
|
||||
table="rooms_to_purge",
|
||||
keyvalues={},
|
||||
table="rooms_to_delete",
|
||||
keyvalues=keyvalues,
|
||||
retcols=(
|
||||
"room_id",
|
||||
"delete_id",
|
||||
"timestamp",
|
||||
"action",
|
||||
"status",
|
||||
"timestamp",
|
||||
"params",
|
||||
"response",
|
||||
"error",
|
||||
"shutdown_params",
|
||||
"shutdown_response",
|
||||
),
|
||||
desc="rooms_to_purge_fetch",
|
||||
desc="rooms_to_delete_fetch",
|
||||
)
|
||||
|
||||
async def get_room_to_delete(self, delete_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Return the delete job identified by delete_id."""
|
||||
return await self.db_pool.simple_select_one(
|
||||
table="rooms_to_delete",
|
||||
keyvalues={"delete_id": delete_id},
|
||||
retcols=(
|
||||
"room_id",
|
||||
"delete_id",
|
||||
"action",
|
||||
"status",
|
||||
"timestamp",
|
||||
"params",
|
||||
"response",
|
||||
"error",
|
||||
),
|
||||
desc="rooms_to_delete_fetch",
|
||||
)
|
||||
|
||||
async def get_rooms_user_has_been_in(self, user_id: str) -> Set[str]:
|
||||
|
||||
+6
-5
@@ -13,14 +13,15 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- cf upsert_room_to_purge docstring for the meaning of the fields.
|
||||
CREATE TABLE IF NOT EXISTS rooms_to_purge(
|
||||
-- cf upsert_room_to_delete docstring for the meaning of the fields.
|
||||
CREATE TABLE IF NOT EXISTS rooms_to_delete(
|
||||
room_id text NOT NULL,
|
||||
delete_id text NOT NULL,
|
||||
action text NOT NULL,
|
||||
status text NOT NULL,
|
||||
error text,
|
||||
timestamp bigint,
|
||||
shutdown_params text,
|
||||
shutdown_response text,
|
||||
params text,
|
||||
response text,
|
||||
error text,
|
||||
UNIQUE(room_id, delete_id)
|
||||
);
|
||||
@@ -24,7 +24,7 @@ from twisted.test.proto_helpers import MemoryReactor
|
||||
import synapse.rest.admin
|
||||
from synapse.api.constants import EventTypes, Membership, RoomTypes
|
||||
from synapse.api.errors import Codes
|
||||
from synapse.handlers.pagination import DeleteStatus, PaginationHandler, PurgeStatus
|
||||
from synapse.handlers.pagination import DeleteStatus, PaginationHandler
|
||||
from synapse.rest.client import directory, events, login, room
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import UserID
|
||||
@@ -693,8 +693,10 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
|
||||
self.assertEqual(2, len(channel.json_body["results"]))
|
||||
self.assertEqual("complete", channel.json_body["results"][0]["status"])
|
||||
self.assertEqual("complete", channel.json_body["results"][1]["status"])
|
||||
self.assertEqual(delete_id1, channel.json_body["results"][0]["delete_id"])
|
||||
self.assertEqual(delete_id2, channel.json_body["results"][1]["delete_id"])
|
||||
delete_ids = {delete_id1, delete_id2}
|
||||
self.assertTrue(channel.json_body["results"][0]["delete_id"] in delete_ids)
|
||||
delete_ids.remove(channel.json_body["results"][0]["delete_id"])
|
||||
self.assertTrue(channel.json_body["results"][1]["delete_id"] in delete_ids)
|
||||
|
||||
# get status after more than clearing time for first task
|
||||
# second task is not cleared
|
||||
@@ -749,7 +751,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
|
||||
self.assertEqual(400, second_channel.code, msg=second_channel.json_body)
|
||||
self.assertEqual(Codes.UNKNOWN, second_channel.json_body["errcode"])
|
||||
self.assertEqual(
|
||||
f"History purge already in progress for {self.room_id}",
|
||||
f"Purge already in progress for {self.room_id}",
|
||||
second_channel.json_body["error"],
|
||||
)
|
||||
|
||||
@@ -1012,9 +1014,10 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
|
||||
self.helper.leave(room_id, user=self.admin_user, tok=self.admin_user_tok)
|
||||
|
||||
self.get_success(
|
||||
self.store.upsert_room_to_purge(
|
||||
self.store.upsert_room_to_delete(
|
||||
room_id,
|
||||
random_string(16),
|
||||
DeleteStatus.ACTION_PURGE,
|
||||
DeleteStatus.STATUS_PURGING,
|
||||
)
|
||||
)
|
||||
@@ -1039,11 +1042,12 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
|
||||
delete_id = random_string(16)
|
||||
|
||||
self.get_success(
|
||||
self.store.upsert_room_to_purge(
|
||||
self.store.upsert_room_to_delete(
|
||||
room_id,
|
||||
delete_id,
|
||||
DeleteStatus.ACTION_SHUTDOWN,
|
||||
DeleteStatus.STATUS_SHUTTING_DOWN,
|
||||
shutdown_params=json.dumps(
|
||||
params=json.dumps(
|
||||
{
|
||||
"requester_user_id": self.admin_user,
|
||||
"new_room_user_id": self.admin_user,
|
||||
@@ -2078,13 +2082,13 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
# Purge every event before the second event.
|
||||
purge_id = random_string(16)
|
||||
pagination_handler._purges_by_id[purge_id] = PurgeStatus()
|
||||
self.get_success(
|
||||
pagination_handler._purge_history(
|
||||
purge_id=purge_id,
|
||||
room_id=self.room_id,
|
||||
token=second_token_str,
|
||||
delete_local_events=True,
|
||||
update_rooms_to_delete_table=True,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -41,7 +41,6 @@ from synapse.api.errors import Codes, HttpResponseException
|
||||
from synapse.appservice import ApplicationService
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.handlers.pagination import PurgeStatus
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client import account, directory, login, profile, register, room, sync
|
||||
from synapse.server import HomeServer
|
||||
@@ -2090,13 +2089,13 @@ class RoomMessageListTestCase(RoomBase):
|
||||
|
||||
# Purge every event before the second event.
|
||||
purge_id = random_string(16)
|
||||
pagination_handler._purges_by_id[purge_id] = PurgeStatus()
|
||||
self.get_success(
|
||||
pagination_handler._purge_history(
|
||||
purge_id=purge_id,
|
||||
room_id=self.room_id,
|
||||
token=second_token_str,
|
||||
delete_local_events=True,
|
||||
update_rooms_to_delete_table=True,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user