Compare commits
31 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 5065d7df75 | |||
| c8b8c96b6e | |||
| 0961f52c57 | |||
| 1a6a0cbb1c | |||
| a5a74d8b80 | |||
| 470f385419 | |||
| 1d6f3c5429 | |||
| 648b211d6a | |||
| a78f112faa | |||
| 32e976f226 | |||
| 5bce6397aa | |||
| 0020b333f9 | |||
| c155eefe19 | |||
| 5b83e0df39 | |||
| 0d7b4505bd | |||
| ede8b17e11 | |||
| dfdca3f9e0 | |||
| a90a189ee3 | |||
| 8b837e1ea1 | |||
| c0e0a0a231 | |||
| a5f7a031c0 | |||
| 7a6daa51bd | |||
| 74a0e7ab8f | |||
| 490beb7b16 | |||
| 3d2a61d00f | |||
| d05bc56a21 | |||
| da987b4acc | |||
| 7d9665e9ec | |||
| 7031a23c41 | |||
| a59eb8ccb1 | |||
| 34adad8e80 |
@@ -0,0 +1 @@
|
||||
Add automatic purge after all users forgotten a room. Also add restore of purge/shutdown rooms after a synapse restart.
|
||||
@@ -0,0 +1 @@
|
||||
Implements a task scheduler for resumable potentially long running tasks.
|
||||
@@ -923,7 +923,7 @@ allowed_avatar_mimetypes: ["image/png", "image/jpeg", "image/gif"]
|
||||
How long to keep redacted events in unredacted form in the database. After
|
||||
this period redacted events get replaced with their redacted form in the DB.
|
||||
|
||||
Synapse will check whether the rentention period has concluded for redacted
|
||||
Synapse will check whether the retention period has concluded for redacted
|
||||
events every 5 minutes. Thus, even if this option is set to `0`, Synapse may
|
||||
still take up to 5 minutes to purge redacted events from the database.
|
||||
|
||||
@@ -934,6 +934,23 @@ Example configuration:
|
||||
redaction_retention_period: 28d
|
||||
```
|
||||
---
|
||||
---
|
||||
### `purge_retention_period`
|
||||
|
||||
How long to keep locally forgotten room in the DB. After this period the room
|
||||
will be fully purged from the DB.
|
||||
|
||||
Synapse will check whether the retention period has concluded for room
|
||||
purges every hour. Thus, even if this option is set to `0`, Synapse may
|
||||
still take up to one hour to purge forgotten rooms from the database.
|
||||
|
||||
Defaults to `7d`. Set to `null` to disable.
|
||||
|
||||
Example configuration:
|
||||
```yaml
|
||||
purge_retention_period: 28d
|
||||
```
|
||||
---
|
||||
### `user_ips_max_age`
|
||||
|
||||
How long to track users' last seen time and IPs in the database.
|
||||
|
||||
@@ -91,6 +91,7 @@ from synapse.storage.databases.main.state import StateGroupWorkerStore
|
||||
from synapse.storage.databases.main.stats import StatsStore
|
||||
from synapse.storage.databases.main.stream import StreamWorkerStore
|
||||
from synapse.storage.databases.main.tags import TagsWorkerStore
|
||||
from synapse.storage.databases.main.task_scheduler import TaskSchedulerWorkerStore
|
||||
from synapse.storage.databases.main.transactions import TransactionWorkerStore
|
||||
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
|
||||
from synapse.storage.databases.main.user_directory import UserDirectoryStore
|
||||
@@ -144,6 +145,7 @@ class GenericWorkerStore(
|
||||
TransactionWorkerStore,
|
||||
LockStore,
|
||||
SessionStore,
|
||||
TaskSchedulerWorkerStore,
|
||||
):
|
||||
# Properties that multiple storage classes define. Tell mypy what the
|
||||
# expected type is.
|
||||
|
||||
@@ -486,6 +486,15 @@ class ServerConfig(Config):
|
||||
else:
|
||||
self.redaction_retention_period = None
|
||||
|
||||
# How long to keep locally forgotten rooms before purging them.
|
||||
purge_retention_period = config.get("purge_retention_period", "7d")
|
||||
if purge_retention_period is not None:
|
||||
self.purge_retention_period: Optional[int] = self.parse_duration(
|
||||
purge_retention_period
|
||||
)
|
||||
else:
|
||||
self.purge_retention_period = None
|
||||
|
||||
# How long to keep entries in the `users_ips` table.
|
||||
user_ips_max_age = config.get("user_ips_max_age", "28d")
|
||||
if user_ips_max_age is not None:
|
||||
|
||||
+143
-294
@@ -13,9 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Dict, List, Optional, Set
|
||||
|
||||
import attr
|
||||
from typing import TYPE_CHECKING, List, Optional, Set, Tuple
|
||||
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
@@ -23,12 +21,19 @@ from synapse.api.constants import Direction, EventTypes, Membership
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.api.filtering import Filter
|
||||
from synapse.events.utils import SerializeEventConfig
|
||||
from synapse.handlers.room import ShutdownRoomResponse
|
||||
from synapse.handlers.room import ShutdownRoomParams
|
||||
from synapse.logging.opentracing import trace
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.rest.admin._base import assert_user_is_admin
|
||||
from synapse.streams.config import PaginationConfig
|
||||
from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType
|
||||
from synapse.types import (
|
||||
JsonDict,
|
||||
JsonMapping,
|
||||
Requester,
|
||||
ScheduledTask,
|
||||
StreamKeyType,
|
||||
TaskStatus,
|
||||
)
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util.async_helpers import ReadWriteLock
|
||||
from synapse.util.stringutils import random_string
|
||||
@@ -46,82 +51,6 @@ logger = logging.getLogger(__name__)
|
||||
BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3
|
||||
|
||||
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class PurgeStatus:
|
||||
"""Object tracking the status of a purge request
|
||||
|
||||
This class contains information on the progress of a purge request, for
|
||||
return by get_purge_status.
|
||||
"""
|
||||
|
||||
STATUS_ACTIVE = 0
|
||||
STATUS_COMPLETE = 1
|
||||
STATUS_FAILED = 2
|
||||
|
||||
STATUS_TEXT = {
|
||||
STATUS_ACTIVE: "active",
|
||||
STATUS_COMPLETE: "complete",
|
||||
STATUS_FAILED: "failed",
|
||||
}
|
||||
|
||||
# Save the error message if an error occurs
|
||||
error: str = ""
|
||||
|
||||
# Tracks whether this request has completed. One of STATUS_{ACTIVE,COMPLETE,FAILED}.
|
||||
status: int = STATUS_ACTIVE
|
||||
|
||||
def asdict(self) -> JsonDict:
|
||||
ret = {"status": PurgeStatus.STATUS_TEXT[self.status]}
|
||||
if self.error:
|
||||
ret["error"] = self.error
|
||||
return ret
|
||||
|
||||
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class DeleteStatus:
|
||||
"""Object tracking the status of a delete room request
|
||||
|
||||
This class contains information on the progress of a delete room request, for
|
||||
return by get_delete_status.
|
||||
"""
|
||||
|
||||
STATUS_PURGING = 0
|
||||
STATUS_COMPLETE = 1
|
||||
STATUS_FAILED = 2
|
||||
STATUS_SHUTTING_DOWN = 3
|
||||
|
||||
STATUS_TEXT = {
|
||||
STATUS_PURGING: "purging",
|
||||
STATUS_COMPLETE: "complete",
|
||||
STATUS_FAILED: "failed",
|
||||
STATUS_SHUTTING_DOWN: "shutting_down",
|
||||
}
|
||||
|
||||
# Tracks whether this request has completed.
|
||||
# One of STATUS_{PURGING,COMPLETE,FAILED,SHUTTING_DOWN}.
|
||||
status: int = STATUS_PURGING
|
||||
|
||||
# Save the error message if an error occurs
|
||||
error: str = ""
|
||||
|
||||
# Saves the result of an action to give it back to REST API
|
||||
shutdown_room: ShutdownRoomResponse = {
|
||||
"kicked_users": [],
|
||||
"failed_to_kick_users": [],
|
||||
"local_aliases": [],
|
||||
"new_room_id": None,
|
||||
}
|
||||
|
||||
def asdict(self) -> JsonDict:
|
||||
ret = {
|
||||
"status": DeleteStatus.STATUS_TEXT[self.status],
|
||||
"shutdown_room": self.shutdown_room,
|
||||
}
|
||||
if self.error:
|
||||
ret["error"] = self.error
|
||||
return ret
|
||||
|
||||
|
||||
class PaginationHandler:
|
||||
"""Handles pagination and purge history requests.
|
||||
|
||||
@@ -129,9 +58,6 @@ class PaginationHandler:
|
||||
paginating during a purge.
|
||||
"""
|
||||
|
||||
# when to remove a completed deletion/purge from the results map
|
||||
CLEAR_PURGE_AFTER_MS = 1000 * 3600 * 24 # 24 hours
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
@@ -142,17 +68,11 @@ class PaginationHandler:
|
||||
self._server_name = hs.hostname
|
||||
self._room_shutdown_handler = hs.get_room_shutdown_handler()
|
||||
self._relations_handler = hs.get_relations_handler()
|
||||
self._task_scheduler = hs.get_task_scheduler()
|
||||
|
||||
self.pagination_lock = ReadWriteLock()
|
||||
# IDs of rooms in which there currently an active purge *or delete* operation.
|
||||
self._purges_in_progress_by_room: Set[str] = set()
|
||||
# map from purge id to PurgeStatus
|
||||
self._purges_by_id: Dict[str, PurgeStatus] = {}
|
||||
# map from purge id to DeleteStatus
|
||||
self._delete_by_id: Dict[str, DeleteStatus] = {}
|
||||
# map from room id to delete ids
|
||||
# Dict[`room_id`, List[`delete_id`]]
|
||||
self._delete_by_room: Dict[str, List[str]] = {}
|
||||
self._event_serializer = hs.get_event_client_serializer()
|
||||
|
||||
self._retention_default_max_lifetime = (
|
||||
@@ -165,6 +85,7 @@ class PaginationHandler:
|
||||
self._retention_allowed_lifetime_max = (
|
||||
hs.config.retention.retention_allowed_lifetime_max
|
||||
)
|
||||
self._purge_retention_period = hs.config.server.purge_retention_period
|
||||
self._is_master = hs.config.worker.worker_app is None
|
||||
|
||||
if hs.config.retention.retention_enabled and self._is_master:
|
||||
@@ -181,6 +102,12 @@ class PaginationHandler:
|
||||
job.longest_max_lifetime,
|
||||
)
|
||||
|
||||
self._task_scheduler.register_action(self._purge_history, "purge_history")
|
||||
self._task_scheduler.register_action(self._purge_room, "purge_room")
|
||||
self._task_scheduler.register_action(
|
||||
self._shutdown_and_purge_room, "shutdown_and_purge_room"
|
||||
)
|
||||
|
||||
async def purge_history_for_rooms_in_range(
|
||||
self, min_ms: Optional[int], max_ms: Optional[int]
|
||||
) -> None:
|
||||
@@ -231,14 +158,6 @@ class PaginationHandler:
|
||||
for room_id, retention_policy in rooms.items():
|
||||
logger.info("[purge] Attempting to purge messages in room %s", room_id)
|
||||
|
||||
if room_id in self._purges_in_progress_by_room:
|
||||
logger.warning(
|
||||
"[purge] not purging room %s as there's an ongoing purge running"
|
||||
" for this room",
|
||||
room_id,
|
||||
)
|
||||
continue
|
||||
|
||||
# If max_lifetime is None, it means that the room has no retention policy.
|
||||
# Given we only retrieve such rooms when there's a default retention policy
|
||||
# defined in the server's configuration, we can safely assume that's the
|
||||
@@ -289,8 +208,6 @@ class PaginationHandler:
|
||||
|
||||
purge_id = random_string(16)
|
||||
|
||||
self._purges_by_id[purge_id] = PurgeStatus()
|
||||
|
||||
logger.info(
|
||||
"Starting purging events in room %s (purge_id %s)" % (room_id, purge_id)
|
||||
)
|
||||
@@ -305,9 +222,10 @@ class PaginationHandler:
|
||||
room_id,
|
||||
token,
|
||||
True,
|
||||
False,
|
||||
)
|
||||
|
||||
def start_purge_history(
|
||||
async def start_purge_history(
|
||||
self, room_id: str, token: str, delete_local_events: bool = False
|
||||
) -> str:
|
||||
"""Start off a history purge on a room.
|
||||
@@ -321,31 +239,49 @@ class PaginationHandler:
|
||||
Returns:
|
||||
unique ID for this purge transaction.
|
||||
"""
|
||||
if room_id in self._purges_in_progress_by_room:
|
||||
raise SynapseError(
|
||||
400, "History purge already in progress for %s" % (room_id,)
|
||||
)
|
||||
|
||||
purge_id = random_string(16)
|
||||
purge_id = await self._task_scheduler.schedule_task(
|
||||
"purge_history",
|
||||
resource_id=room_id,
|
||||
params={"token": token, "delete_local_events": delete_local_events},
|
||||
)
|
||||
|
||||
# we log the purge_id here so that it can be tied back to the
|
||||
# request id in the log lines.
|
||||
logger.info("[purge] starting purge_id %s", purge_id)
|
||||
|
||||
self._purges_by_id[purge_id] = PurgeStatus()
|
||||
run_as_background_process(
|
||||
"purge_history",
|
||||
self._purge_history,
|
||||
purge_id,
|
||||
room_id,
|
||||
token,
|
||||
delete_local_events,
|
||||
)
|
||||
return purge_id
|
||||
|
||||
async def _purge_history(
|
||||
self, purge_id: str, room_id: str, token: str, delete_local_events: bool
|
||||
) -> None:
|
||||
self,
|
||||
task: ScheduledTask,
|
||||
first_launch: bool,
|
||||
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
|
||||
if (
|
||||
task.resource_id is None
|
||||
or task.params is None
|
||||
or "token" not in task.params
|
||||
or "delete_local_events" not in task.params
|
||||
):
|
||||
return (
|
||||
TaskStatus.FAILED,
|
||||
None,
|
||||
"Not enough parameters passed to _purge_history",
|
||||
)
|
||||
err = await self.purge_history(
|
||||
task.resource_id,
|
||||
task.params["token"],
|
||||
task.params["delete_local_events"],
|
||||
)
|
||||
if err is not None:
|
||||
return TaskStatus.FAILED, None, err
|
||||
return TaskStatus.COMPLETE, None, None
|
||||
|
||||
async def purge_history(
|
||||
self,
|
||||
room_id: str,
|
||||
token: str,
|
||||
delete_local_events: bool,
|
||||
) -> Optional[str]:
|
||||
"""Carry out a history purge on a room.
|
||||
|
||||
Args:
|
||||
@@ -353,74 +289,86 @@ class PaginationHandler:
|
||||
room_id: The room to purge from
|
||||
token: topological token to delete events before
|
||||
delete_local_events: True to delete local events as well as remote ones
|
||||
update_rooms_to_delete_table: True if we don't want to update/persist this
|
||||
purge history action to the DB to be restorable. Used with the retention
|
||||
functionality since we don't need to explicitly restore those, they
|
||||
will be relaunch by the retention logic.
|
||||
"""
|
||||
self._purges_in_progress_by_room.add(room_id)
|
||||
try:
|
||||
async with self.pagination_lock.write(room_id):
|
||||
await self._storage_controllers.purge_events.purge_history(
|
||||
room_id, token, delete_local_events
|
||||
)
|
||||
logger.info("[purge] complete")
|
||||
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
|
||||
return None
|
||||
except Exception:
|
||||
f = Failure()
|
||||
logger.error(
|
||||
"[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject())
|
||||
)
|
||||
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
|
||||
self._purges_by_id[purge_id].error = f.getErrorMessage()
|
||||
finally:
|
||||
self._purges_in_progress_by_room.discard(room_id)
|
||||
return f.getErrorMessage()
|
||||
|
||||
# remove the purge from the list 24 hours after it completes
|
||||
def clear_purge() -> None:
|
||||
del self._purges_by_id[purge_id]
|
||||
|
||||
self.hs.get_reactor().callLater(
|
||||
PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge
|
||||
)
|
||||
|
||||
def get_purge_status(self, purge_id: str) -> Optional[PurgeStatus]:
|
||||
"""Get the current status of an active purge
|
||||
|
||||
Args:
|
||||
purge_id: purge_id returned by start_purge_history
|
||||
"""
|
||||
return self._purges_by_id.get(purge_id)
|
||||
|
||||
def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]:
|
||||
async def get_delete_task(self, delete_id: str) -> Optional[ScheduledTask]:
|
||||
"""Get the current status of an active deleting
|
||||
|
||||
Args:
|
||||
delete_id: delete_id returned by start_shutdown_and_purge_room
|
||||
or start_purge_history.
|
||||
"""
|
||||
return self._delete_by_id.get(delete_id)
|
||||
return await self._task_scheduler.get_task(delete_id)
|
||||
|
||||
def get_delete_ids_by_room(self, room_id: str) -> Optional[StrCollection]:
|
||||
"""Get all active delete ids by room
|
||||
async def get_delete_tasks_by_room(self, room_id: str) -> List[ScheduledTask]:
|
||||
"""Get all active delete statuses by room
|
||||
|
||||
Args:
|
||||
room_id: room_id that is deleted
|
||||
"""
|
||||
return self._delete_by_room.get(room_id)
|
||||
return await self._task_scheduler.get_tasks(
|
||||
actions=["purge_room", "shutdown_and_purge_room"], resource_ids=[room_id]
|
||||
)
|
||||
|
||||
async def purge_room(self, room_id: str, force: bool = False) -> None:
|
||||
async def _purge_room(
|
||||
self,
|
||||
task: ScheduledTask,
|
||||
first_launch: bool,
|
||||
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
|
||||
if not task.resource_id:
|
||||
raise Exception("No room id passed to purge_room task")
|
||||
params = task.params if task.params else {}
|
||||
await self.purge_room(task.resource_id, params.get("force", False))
|
||||
return TaskStatus.COMPLETE, None, None
|
||||
|
||||
async def purge_room(
|
||||
self,
|
||||
room_id: str,
|
||||
force: bool,
|
||||
) -> None:
|
||||
"""Purge the given room from the database.
|
||||
This function is part the delete room v1 API.
|
||||
|
||||
Args:
|
||||
room_id: room to be purged
|
||||
delete_id: the delete ID for this purge
|
||||
force: set true to skip checking for joined users.
|
||||
shutdown_response: optional response coming from the shutdown phase
|
||||
"""
|
||||
logger.info("starting purge room_id=%s force=%s", room_id, force)
|
||||
|
||||
async with self.pagination_lock.write(room_id):
|
||||
# first check that we have no users in this room
|
||||
if not force:
|
||||
joined = await self.store.is_host_joined(room_id, self._server_name)
|
||||
if joined:
|
||||
joined = await self.store.is_host_joined(room_id, self._server_name)
|
||||
if joined:
|
||||
if force:
|
||||
logger.info(
|
||||
"force-purging room %s with some local users still joined",
|
||||
room_id,
|
||||
)
|
||||
else:
|
||||
raise SynapseError(400, "Users are still joined to this room")
|
||||
|
||||
await self._storage_controllers.purge_events.purge_room(room_id)
|
||||
|
||||
logger.info("purge complete for room_id %s", room_id)
|
||||
|
||||
@trace
|
||||
async def get_messages(
|
||||
self,
|
||||
@@ -698,16 +646,9 @@ class PaginationHandler:
|
||||
|
||||
async def _shutdown_and_purge_room(
|
||||
self,
|
||||
delete_id: str,
|
||||
room_id: str,
|
||||
requester_user_id: str,
|
||||
new_room_user_id: Optional[str] = None,
|
||||
new_room_name: Optional[str] = None,
|
||||
message: Optional[str] = None,
|
||||
block: bool = False,
|
||||
purge: bool = True,
|
||||
force_purge: bool = False,
|
||||
) -> None:
|
||||
task: ScheduledTask,
|
||||
first_launch: bool,
|
||||
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
|
||||
"""
|
||||
Shuts down and purges a room.
|
||||
|
||||
@@ -716,149 +657,72 @@ class PaginationHandler:
|
||||
Args:
|
||||
delete_id: The ID for this delete.
|
||||
room_id: The ID of the room to shut down.
|
||||
requester_user_id:
|
||||
User who requested the action. Will be recorded as putting the room on the
|
||||
blocking list.
|
||||
new_room_user_id:
|
||||
If set, a new room will be created with this user ID
|
||||
as the creator and admin, and all users in the old room will be
|
||||
moved into that room. If not set, no new room will be created
|
||||
and the users will just be removed from the old room.
|
||||
new_room_name:
|
||||
A string representing the name of the room that new users will
|
||||
be invited to. Defaults to `Content Violation Notification`
|
||||
message:
|
||||
A string containing the first message that will be sent as
|
||||
`new_room_user_id` in the new room. Ideally this will clearly
|
||||
convey why the original room was shut down.
|
||||
Defaults to `Sharing illegal content on this server is not
|
||||
permitted and rooms in violation will be blocked.`
|
||||
block:
|
||||
If set to `true`, this room will be added to a blocking list,
|
||||
preventing future attempts to join the room. Defaults to `false`.
|
||||
purge:
|
||||
If set to `true`, purge the given room from the database.
|
||||
force_purge:
|
||||
If set to `true`, the room will be purged from database
|
||||
also if it fails to remove some users from room.
|
||||
shutdown_params: parameters for the shutdown, cf `RoomShutdownHandler.ShutdownRoomParams`
|
||||
shutdown_response: current status of the shutdown, if it was interrupted
|
||||
|
||||
Saves a `RoomShutdownHandler.ShutdownRoomResponse` in `DeleteStatus`:
|
||||
Keeps track of the `DeleteStatus` (and `ShutdownRoomResponse`) in `self._delete_by_id` and persisted in DB
|
||||
"""
|
||||
|
||||
self._purges_in_progress_by_room.add(room_id)
|
||||
try:
|
||||
async with self.pagination_lock.write(room_id):
|
||||
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
|
||||
self._delete_by_id[
|
||||
delete_id
|
||||
].shutdown_room = await self._room_shutdown_handler.shutdown_room(
|
||||
room_id=room_id,
|
||||
requester_user_id=requester_user_id,
|
||||
new_room_user_id=new_room_user_id,
|
||||
new_room_name=new_room_name,
|
||||
message=message,
|
||||
block=block,
|
||||
)
|
||||
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_PURGING
|
||||
|
||||
if purge:
|
||||
logger.info("starting purge room_id %s", room_id)
|
||||
|
||||
# first check that we have no users in this room
|
||||
if not force_purge:
|
||||
joined = await self.store.is_host_joined(
|
||||
room_id, self._server_name
|
||||
)
|
||||
if joined:
|
||||
raise SynapseError(
|
||||
400, "Users are still joined to this room"
|
||||
)
|
||||
|
||||
await self._storage_controllers.purge_events.purge_room(room_id)
|
||||
|
||||
logger.info("purge complete for room_id %s", room_id)
|
||||
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE
|
||||
except Exception:
|
||||
f = Failure()
|
||||
logger.error(
|
||||
"failed",
|
||||
exc_info=(f.type, f.value, f.getTracebackObject()),
|
||||
)
|
||||
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_FAILED
|
||||
self._delete_by_id[delete_id].error = f.getErrorMessage()
|
||||
finally:
|
||||
self._purges_in_progress_by_room.discard(room_id)
|
||||
|
||||
# remove the delete from the list 24 hours after it completes
|
||||
def clear_delete() -> None:
|
||||
del self._delete_by_id[delete_id]
|
||||
self._delete_by_room[room_id].remove(delete_id)
|
||||
if not self._delete_by_room[room_id]:
|
||||
del self._delete_by_room[room_id]
|
||||
|
||||
self.hs.get_reactor().callLater(
|
||||
PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_delete
|
||||
if task.resource_id is None or task.params is None:
|
||||
raise Exception(
|
||||
"No room id and/or no parameters passed to shutdown_and_purge_room task"
|
||||
)
|
||||
|
||||
def start_shutdown_and_purge_room(
|
||||
room_id = task.resource_id
|
||||
|
||||
async def update_result(result: Optional[JsonMapping]) -> None:
|
||||
await self._task_scheduler.update_task(task.id, result=result)
|
||||
|
||||
shutdown_result = await self._room_shutdown_handler.shutdown_room(
|
||||
room_id, task.params, task.result, update_result
|
||||
)
|
||||
|
||||
if task.params.get("purge", False):
|
||||
await self.purge_room(
|
||||
room_id,
|
||||
task.params.get("force_purge", False),
|
||||
)
|
||||
|
||||
return (TaskStatus.COMPLETE, shutdown_result, None)
|
||||
|
||||
async def get_current_delete_tasks(self, room_id: str) -> List[ScheduledTask]:
|
||||
return await self._task_scheduler.get_tasks(
|
||||
actions=["purge_history", "purge_room", "shutdown_and_purge_room"],
|
||||
resource_ids=[room_id],
|
||||
statuses=[TaskStatus.ACTIVE, TaskStatus.SCHEDULED],
|
||||
)
|
||||
|
||||
async def start_shutdown_and_purge_room(
|
||||
self,
|
||||
room_id: str,
|
||||
requester_user_id: str,
|
||||
new_room_user_id: Optional[str] = None,
|
||||
new_room_name: Optional[str] = None,
|
||||
message: Optional[str] = None,
|
||||
block: bool = False,
|
||||
purge: bool = True,
|
||||
force_purge: bool = False,
|
||||
shutdown_params: ShutdownRoomParams,
|
||||
) -> str:
|
||||
"""Start off shut down and purge on a room.
|
||||
|
||||
Args:
|
||||
room_id: The ID of the room to shut down.
|
||||
requester_user_id:
|
||||
User who requested the action and put the room on the
|
||||
blocking list.
|
||||
new_room_user_id:
|
||||
If set, a new room will be created with this user ID
|
||||
as the creator and admin, and all users in the old room will be
|
||||
moved into that room. If not set, no new room will be created
|
||||
and the users will just be removed from the old room.
|
||||
new_room_name:
|
||||
A string representing the name of the room that new users will
|
||||
be invited to. Defaults to `Content Violation Notification`
|
||||
message:
|
||||
A string containing the first message that will be sent as
|
||||
`new_room_user_id` in the new room. Ideally this will clearly
|
||||
convey why the original room was shut down.
|
||||
Defaults to `Sharing illegal content on this server is not
|
||||
permitted and rooms in violation will be blocked.`
|
||||
block:
|
||||
If set to `true`, this room will be added to a blocking list,
|
||||
preventing future attempts to join the room. Defaults to `false`.
|
||||
purge:
|
||||
If set to `true`, purge the given room from the database.
|
||||
force_purge:
|
||||
If set to `true`, the room will be purged from database
|
||||
also if it fails to remove some users from room.
|
||||
shutdown_params: parameters for the shutdown, cf `RoomShutdownHandler.ShutdownRoomParams`
|
||||
|
||||
Returns:
|
||||
unique ID for this delete transaction.
|
||||
"""
|
||||
if room_id in self._purges_in_progress_by_room:
|
||||
raise SynapseError(
|
||||
400, "History purge already in progress for %s" % (room_id,)
|
||||
)
|
||||
if len(await self.get_current_delete_tasks(room_id)) > 0:
|
||||
raise SynapseError(400, "Purge already in progress for %s" % (room_id,))
|
||||
|
||||
# This check is double to `RoomShutdownHandler.shutdown_room`
|
||||
# But here the requester get a direct response / error with HTTP request
|
||||
# and do not have to check the purge status
|
||||
new_room_user_id = shutdown_params["new_room_user_id"]
|
||||
if new_room_user_id is not None:
|
||||
if not self.hs.is_mine_id(new_room_user_id):
|
||||
raise SynapseError(
|
||||
400, "User must be our own: %s" % (new_room_user_id,)
|
||||
)
|
||||
|
||||
delete_id = random_string(16)
|
||||
delete_id = await self._task_scheduler.schedule_task(
|
||||
"shutdown_and_purge_room",
|
||||
resource_id=room_id,
|
||||
params=shutdown_params,
|
||||
)
|
||||
|
||||
# we log the delete_id here so that it can be tied back to the
|
||||
# request id in the log lines.
|
||||
@@ -868,19 +732,4 @@ class PaginationHandler:
|
||||
delete_id,
|
||||
)
|
||||
|
||||
self._delete_by_id[delete_id] = DeleteStatus()
|
||||
self._delete_by_room.setdefault(room_id, []).append(delete_id)
|
||||
run_as_background_process(
|
||||
"shutdown_and_purge_room",
|
||||
self._shutdown_and_purge_room,
|
||||
delete_id,
|
||||
room_id,
|
||||
requester_user_id,
|
||||
new_room_user_id,
|
||||
new_room_name,
|
||||
message,
|
||||
block,
|
||||
purge,
|
||||
force_purge,
|
||||
)
|
||||
return delete_id
|
||||
|
||||
+155
-70
@@ -20,7 +20,7 @@ import random
|
||||
import string
|
||||
from collections import OrderedDict
|
||||
from http import HTTPStatus
|
||||
from typing import TYPE_CHECKING, Any, Awaitable, Dict, List, Optional, Tuple
|
||||
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional, Tuple
|
||||
|
||||
import attr
|
||||
from typing_extensions import TypedDict
|
||||
@@ -54,11 +54,11 @@ from synapse.events import EventBase
|
||||
from synapse.events.snapshot import UnpersistedEventContext
|
||||
from synapse.events.utils import copy_and_fixup_power_levels_contents
|
||||
from synapse.handlers.relations import BundledAggregations
|
||||
from synapse.module_api import NOT_SPAM
|
||||
from synapse.rest.admin._base import assert_user_is_admin
|
||||
from synapse.streams import EventSource
|
||||
from synapse.types import (
|
||||
JsonDict,
|
||||
JsonMapping,
|
||||
MutableStateMap,
|
||||
Requester,
|
||||
RoomAlias,
|
||||
@@ -454,7 +454,7 @@ class RoomCreationHandler:
|
||||
spam_check = await self._spam_checker_module_callbacks.user_may_create_room(
|
||||
user_id
|
||||
)
|
||||
if spam_check != NOT_SPAM:
|
||||
if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
|
||||
raise SynapseError(
|
||||
403,
|
||||
"You are not permitted to create rooms",
|
||||
@@ -768,7 +768,7 @@ class RoomCreationHandler:
|
||||
spam_check = await self._spam_checker_module_callbacks.user_may_create_room(
|
||||
user_id
|
||||
)
|
||||
if spam_check != NOT_SPAM:
|
||||
if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
|
||||
raise SynapseError(
|
||||
403,
|
||||
"You are not permitted to create rooms",
|
||||
@@ -1750,6 +1750,45 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
|
||||
return self.store.get_current_room_stream_token_for_room_id(room_id)
|
||||
|
||||
|
||||
class ShutdownRoomParams(TypedDict):
|
||||
"""
|
||||
Attributes:
|
||||
requester_user_id:
|
||||
User who requested the action. Will be recorded as putting the room on the
|
||||
blocking list.
|
||||
new_room_user_id:
|
||||
If set, a new room will be created with this user ID
|
||||
as the creator and admin, and all users in the old room will be
|
||||
moved into that room. If not set, no new room will be created
|
||||
and the users will just be removed from the old room.
|
||||
new_room_name:
|
||||
A string representing the name of the room that new users will
|
||||
be invited to. Defaults to `Content Violation Notification`
|
||||
message:
|
||||
A string containing the first message that will be sent as
|
||||
`new_room_user_id` in the new room. Ideally this will clearly
|
||||
convey why the original room was shut down.
|
||||
Defaults to `Sharing illegal content on this server is not
|
||||
permitted and rooms in violation will be blocked.`
|
||||
block:
|
||||
If set to `true`, this room will be added to a blocking list,
|
||||
preventing future attempts to join the room. Defaults to `false`.
|
||||
purge:
|
||||
If set to `true`, purge the given room from the database.
|
||||
force_purge:
|
||||
If set to `true`, the room will be purged from database
|
||||
even if there are still users joined to the room.
|
||||
"""
|
||||
|
||||
requester_user_id: str
|
||||
new_room_user_id: Optional[str]
|
||||
new_room_name: Optional[str]
|
||||
message: Optional[str]
|
||||
block: bool
|
||||
purge: bool
|
||||
force_purge: bool
|
||||
|
||||
|
||||
class ShutdownRoomResponse(TypedDict):
|
||||
"""
|
||||
Attributes:
|
||||
@@ -1768,6 +1807,63 @@ class ShutdownRoomResponse(TypedDict):
|
||||
new_room_id: Optional[str]
|
||||
|
||||
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class DeleteStatus:
|
||||
"""Object tracking the status of a delete room request
|
||||
|
||||
This class contains information on the progress of a delete room request, for
|
||||
return by get_delete_status.
|
||||
"""
|
||||
|
||||
ACTION_SHUTDOWN = "shutdown"
|
||||
ACTION_PURGE = "purge"
|
||||
ACTION_PURGE_HISTORY = "purge_history"
|
||||
|
||||
# Scheduled delete waiting to be launch at a specific time
|
||||
STATUS_SCHEDULED = "scheduled"
|
||||
STATUS_SHUTTING_DOWN = "shutting_down"
|
||||
STATUS_PURGING = "purging"
|
||||
STATUS_COMPLETE = "complete"
|
||||
STATUS_FAILED = "failed"
|
||||
|
||||
delete_id: str = ""
|
||||
|
||||
action: str = ACTION_PURGE
|
||||
|
||||
# Tracks whether this request has completed.
|
||||
# One of STATUS_{PURGING,COMPLETE,FAILED,SHUTTING_DOWN,WAIT_PURGE}.
|
||||
status: str = STATUS_PURGING
|
||||
|
||||
# Save the error message if an error occurs
|
||||
error: str = ""
|
||||
|
||||
# Saves the result of an action to give it back to REST API
|
||||
shutdown_room: ShutdownRoomResponse = {
|
||||
"kicked_users": [],
|
||||
"failed_to_kick_users": [],
|
||||
"local_aliases": [],
|
||||
"new_room_id": None,
|
||||
}
|
||||
|
||||
def asdict(self, use_purge_history_format: bool = False) -> JsonDict:
|
||||
if not use_purge_history_format:
|
||||
ret = {
|
||||
"delete_id": self.delete_id,
|
||||
"status": self.status,
|
||||
"shutdown_room": self.shutdown_room,
|
||||
}
|
||||
else:
|
||||
ret = {
|
||||
"status": self.status
|
||||
if self.status == DeleteStatus.STATUS_COMPLETE
|
||||
or self.status == DeleteStatus.STATUS_FAILED
|
||||
else "active",
|
||||
}
|
||||
if self.error:
|
||||
ret["error"] = self.error
|
||||
return ret
|
||||
|
||||
|
||||
class RoomShutdownHandler:
|
||||
DEFAULT_MESSAGE = (
|
||||
"Sharing illegal content on this server is not permitted and rooms in"
|
||||
@@ -1787,12 +1883,12 @@ class RoomShutdownHandler:
|
||||
async def shutdown_room(
|
||||
self,
|
||||
room_id: str,
|
||||
requester_user_id: str,
|
||||
new_room_user_id: Optional[str] = None,
|
||||
new_room_name: Optional[str] = None,
|
||||
message: Optional[str] = None,
|
||||
block: bool = False,
|
||||
) -> ShutdownRoomResponse:
|
||||
params: JsonMapping,
|
||||
result: Optional[JsonMapping] = None,
|
||||
update_result_fct: Optional[
|
||||
Callable[[Optional[JsonMapping]], Awaitable[None]]
|
||||
] = None,
|
||||
) -> Optional[JsonMapping]:
|
||||
"""
|
||||
Shuts down a room. Moves all local users and room aliases automatically
|
||||
to a new room if `new_room_user_id` is set. Otherwise local users only
|
||||
@@ -1808,48 +1904,22 @@ class RoomShutdownHandler:
|
||||
|
||||
Args:
|
||||
room_id: The ID of the room to shut down.
|
||||
requester_user_id:
|
||||
User who requested the action and put the room on the
|
||||
blocking list.
|
||||
new_room_user_id:
|
||||
If set, a new room will be created with this user ID
|
||||
as the creator and admin, and all users in the old room will be
|
||||
moved into that room. If not set, no new room will be created
|
||||
and the users will just be removed from the old room.
|
||||
new_room_name:
|
||||
A string representing the name of the room that new users will
|
||||
be invited to. Defaults to `Content Violation Notification`
|
||||
message:
|
||||
A string containing the first message that will be sent as
|
||||
`new_room_user_id` in the new room. Ideally this will clearly
|
||||
convey why the original room was shut down.
|
||||
Defaults to `Sharing illegal content on this server is not
|
||||
permitted and rooms in violation will be blocked.`
|
||||
block:
|
||||
If set to `True`, users will be prevented from joining the old
|
||||
room. This option can also be used to pre-emptively block a room,
|
||||
even if it's unknown to this homeserver. In this case, the room
|
||||
will be blocked, and no further action will be taken. If `False`,
|
||||
attempting to delete an unknown room is invalid.
|
||||
delete_id: The delete ID identifying this delete request
|
||||
shutdown_params: parameters for the shutdown, cf `ShutdownRoomParams`
|
||||
shutdown_response: current status of the shutdown, if it was interrupted
|
||||
|
||||
Defaults to `False`.
|
||||
|
||||
Returns: a dict containing the following keys:
|
||||
kicked_users: An array of users (`user_id`) that were kicked.
|
||||
failed_to_kick_users:
|
||||
An array of users (`user_id`) that that were not kicked.
|
||||
local_aliases:
|
||||
An array of strings representing the local aliases that were
|
||||
migrated from the old room to the new.
|
||||
new_room_id:
|
||||
A string representing the room ID of the new room, or None if
|
||||
no such room was created.
|
||||
Returns: a dict matching `ShutdownRoomResponse`.
|
||||
"""
|
||||
requester_user_id = params["requester_user_id"]
|
||||
new_room_user_id = params["new_room_user_id"]
|
||||
block = params["block"]
|
||||
|
||||
if not new_room_name:
|
||||
new_room_name = self.DEFAULT_ROOM_NAME
|
||||
if not message:
|
||||
message = self.DEFAULT_MESSAGE
|
||||
new_room_name = (
|
||||
params["new_room_name"]
|
||||
if params["new_room_name"]
|
||||
else self.DEFAULT_ROOM_NAME
|
||||
)
|
||||
message = params["message"] if params["message"] else self.DEFAULT_MESSAGE
|
||||
|
||||
if not RoomID.is_valid(room_id):
|
||||
raise SynapseError(400, "%s is not a legal room ID" % (room_id,))
|
||||
@@ -1861,6 +1931,17 @@ class RoomShutdownHandler:
|
||||
403, "Shutdown of this room is forbidden", Codes.FORBIDDEN
|
||||
)
|
||||
|
||||
result = (
|
||||
dict(result)
|
||||
if result
|
||||
else {
|
||||
"kicked_users": [],
|
||||
"failed_to_kick_users": [],
|
||||
"local_aliases": [],
|
||||
"new_room_id": None,
|
||||
}
|
||||
)
|
||||
|
||||
# Action the block first (even if the room doesn't exist yet)
|
||||
if block:
|
||||
# This will work even if the room is already blocked, but that is
|
||||
@@ -1869,14 +1950,10 @@ class RoomShutdownHandler:
|
||||
|
||||
if not await self.store.get_room(room_id):
|
||||
# if we don't know about the room, there is nothing left to do.
|
||||
return {
|
||||
"kicked_users": [],
|
||||
"failed_to_kick_users": [],
|
||||
"local_aliases": [],
|
||||
"new_room_id": None,
|
||||
}
|
||||
return result
|
||||
|
||||
if new_room_user_id is not None:
|
||||
new_room_id = result.get("new_room_id")
|
||||
if new_room_user_id is not None and new_room_id is None:
|
||||
if not self.hs.is_mine_id(new_room_user_id):
|
||||
raise SynapseError(
|
||||
400, "User must be our own: %s" % (new_room_user_id,)
|
||||
@@ -1896,6 +1973,10 @@ class RoomShutdownHandler:
|
||||
ratelimit=False,
|
||||
)
|
||||
|
||||
result["new_room_id"] = new_room_id
|
||||
if update_result_fct:
|
||||
await update_result_fct(result)
|
||||
|
||||
logger.info(
|
||||
"Shutting down room %r, joining to new room: %r", room_id, new_room_id
|
||||
)
|
||||
@@ -1909,12 +1990,9 @@ class RoomShutdownHandler:
|
||||
stream_id,
|
||||
)
|
||||
else:
|
||||
new_room_id = None
|
||||
logger.info("Shutting down room %r", room_id)
|
||||
|
||||
users = await self.store.get_users_in_room(room_id)
|
||||
kicked_users = []
|
||||
failed_to_kick_users = []
|
||||
for user_id in users:
|
||||
if not self.hs.is_mine_id(user_id):
|
||||
continue
|
||||
@@ -1943,7 +2021,9 @@ class RoomShutdownHandler:
|
||||
stream_id,
|
||||
)
|
||||
|
||||
await self.room_member_handler.forget(target_requester.user, room_id)
|
||||
await self.room_member_handler.forget(
|
||||
target_requester.user, room_id, do_not_schedule_purge=True
|
||||
)
|
||||
|
||||
# Join users to new room
|
||||
if new_room_user_id:
|
||||
@@ -1958,15 +2038,23 @@ class RoomShutdownHandler:
|
||||
require_consent=False,
|
||||
)
|
||||
|
||||
kicked_users.append(user_id)
|
||||
result["kicked_users"].append(user_id)
|
||||
if update_result_fct:
|
||||
await update_result_fct(result)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Failed to leave old room and join new room for %r", user_id
|
||||
)
|
||||
failed_to_kick_users.append(user_id)
|
||||
result["failed_to_kick_users"].append(user_id)
|
||||
if update_result_fct:
|
||||
await update_result_fct(result)
|
||||
|
||||
# Send message in new room and move aliases
|
||||
if new_room_user_id:
|
||||
room_creator_requester = create_requester(
|
||||
new_room_user_id, authenticated_entity=requester_user_id
|
||||
)
|
||||
|
||||
await self.event_creation_handler.create_and_send_nonmember_event(
|
||||
room_creator_requester,
|
||||
{
|
||||
@@ -1978,18 +2066,15 @@ class RoomShutdownHandler:
|
||||
ratelimit=False,
|
||||
)
|
||||
|
||||
aliases_for_room = await self.store.get_aliases_for_room(room_id)
|
||||
result["local_aliases"] = list(
|
||||
await self.store.get_aliases_for_room(room_id)
|
||||
)
|
||||
|
||||
assert new_room_id is not None
|
||||
await self.store.update_aliases_for_room(
|
||||
room_id, new_room_id, requester_user_id
|
||||
)
|
||||
else:
|
||||
aliases_for_room = []
|
||||
result["local_aliases"] = []
|
||||
|
||||
return {
|
||||
"kicked_users": kicked_users,
|
||||
"failed_to_kick_users": failed_to_kick_users,
|
||||
"local_aliases": list(aliases_for_room),
|
||||
"new_room_id": new_room_id,
|
||||
}
|
||||
return result
|
||||
|
||||
@@ -94,6 +94,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
self.event_creation_handler = hs.get_event_creation_handler()
|
||||
self.account_data_handler = hs.get_account_data_handler()
|
||||
self.event_auth_handler = hs.get_event_auth_handler()
|
||||
self.task_scheduler = hs.get_task_scheduler()
|
||||
|
||||
self.member_linearizer: Linearizer = Linearizer(name="member")
|
||||
self.member_as_limiter = Linearizer(max_count=10, name="member_as_limiter")
|
||||
@@ -176,6 +177,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
|
||||
self._msc3970_enabled = hs.config.experimental.msc3970_enabled
|
||||
|
||||
self._purge_retention_period = hs.config.server.purge_retention_period
|
||||
|
||||
def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
|
||||
"""Notify the rate limiter that a room join has occurred.
|
||||
|
||||
@@ -285,7 +288,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
async def forget(self, user: UserID, room_id: str) -> None:
|
||||
async def forget(
|
||||
self, user: UserID, room_id: str, do_not_schedule_purge: bool = False
|
||||
) -> None:
|
||||
user_id = user.to_string()
|
||||
|
||||
member = await self._storage_controllers.state.get_current_state_event(
|
||||
@@ -305,6 +310,19 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||
# the table `current_state_events` and `get_current_state_events` is `None`.
|
||||
await self.store.forget(user_id, room_id)
|
||||
|
||||
# If everyone locally has left the room, then there is no reason for us to keep the
|
||||
# room around and we automatically purge room after a little bit
|
||||
if (
|
||||
not do_not_schedule_purge
|
||||
and self._purge_retention_period
|
||||
and await self.store.is_locally_forgotten_room(room_id)
|
||||
):
|
||||
await self.task_scheduler.schedule_task(
|
||||
"purge_room",
|
||||
resource_id=room_id,
|
||||
timestamp=self.clock.time_msec() + self._purge_retention_period,
|
||||
)
|
||||
|
||||
async def ratelimit_multiple_invites(
|
||||
self,
|
||||
requester: Optional[Requester],
|
||||
|
||||
@@ -93,7 +93,7 @@ from synapse.rest.admin.users import (
|
||||
UserTokenRestServlet,
|
||||
WhoisRestServlet,
|
||||
)
|
||||
from synapse.types import JsonDict, RoomStreamToken
|
||||
from synapse.types import JsonDict, RoomStreamToken, TaskStatus
|
||||
from synapse.util import SYNAPSE_VERSION
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -196,7 +196,7 @@ class PurgeHistoryRestServlet(RestServlet):
|
||||
errcode=Codes.BAD_JSON,
|
||||
)
|
||||
|
||||
purge_id = self.pagination_handler.start_purge_history(
|
||||
purge_id = await self.pagination_handler.start_purge_history(
|
||||
room_id, token, delete_local_events=delete_local_events
|
||||
)
|
||||
|
||||
@@ -215,11 +215,21 @@ class PurgeHistoryStatusRestServlet(RestServlet):
|
||||
) -> Tuple[int, JsonDict]:
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
purge_status = self.pagination_handler.get_purge_status(purge_id)
|
||||
if purge_status is None:
|
||||
purge_task = await self.pagination_handler.get_delete_task(purge_id)
|
||||
if purge_task is None or purge_task.action != "purge_history":
|
||||
raise NotFoundError("purge id '%s' not found" % purge_id)
|
||||
|
||||
return HTTPStatus.OK, purge_status.asdict()
|
||||
result = {
|
||||
"status": purge_task.status
|
||||
if purge_task.status == TaskStatus.COMPLETE
|
||||
or purge_task.status == TaskStatus.FAILED
|
||||
else "active",
|
||||
}
|
||||
if purge_task.error:
|
||||
result["error"] = purge_task.error
|
||||
|
||||
# TODO active vs purging etc
|
||||
return HTTPStatus.OK, result
|
||||
|
||||
|
||||
########################################################################################
|
||||
|
||||
+52
-30
@@ -36,7 +36,14 @@ from synapse.rest.admin._base import (
|
||||
)
|
||||
from synapse.storage.databases.main.room import RoomSortOrder
|
||||
from synapse.streams.config import PaginationConfig
|
||||
from synapse.types import JsonDict, RoomID, UserID, create_requester
|
||||
from synapse.types import (
|
||||
JsonDict,
|
||||
RoomID,
|
||||
ScheduledTask,
|
||||
TaskStatus,
|
||||
UserID,
|
||||
create_requester,
|
||||
)
|
||||
from synapse.types.state import StateFilter
|
||||
from synapse.util import json_decoder
|
||||
|
||||
@@ -117,20 +124,30 @@ class RoomRestV2Servlet(RestServlet):
|
||||
403, "Shutdown of this room is forbidden", Codes.FORBIDDEN
|
||||
)
|
||||
|
||||
delete_id = self._pagination_handler.start_shutdown_and_purge_room(
|
||||
delete_id = await self._pagination_handler.start_shutdown_and_purge_room(
|
||||
room_id=room_id,
|
||||
new_room_user_id=content.get("new_room_user_id"),
|
||||
new_room_name=content.get("room_name"),
|
||||
message=content.get("message"),
|
||||
requester_user_id=requester.user.to_string(),
|
||||
block=block,
|
||||
purge=purge,
|
||||
force_purge=force_purge,
|
||||
shutdown_params={
|
||||
"new_room_user_id": content.get("new_room_user_id"),
|
||||
"new_room_name": content.get("room_name"),
|
||||
"message": content.get("message"),
|
||||
"requester_user_id": requester.user.to_string(),
|
||||
"block": block,
|
||||
"purge": purge,
|
||||
"force_purge": force_purge,
|
||||
},
|
||||
)
|
||||
|
||||
return HTTPStatus.OK, {"delete_id": delete_id}
|
||||
|
||||
|
||||
def _convert_delete_task_to_response(task: ScheduledTask) -> JsonDict:
|
||||
return {
|
||||
"delete_id": task.id,
|
||||
"status": task.status,
|
||||
"shutdown_room": task.result,
|
||||
}
|
||||
|
||||
|
||||
class DeleteRoomStatusByRoomIdRestServlet(RestServlet):
|
||||
"""Get the status of the delete room background task."""
|
||||
|
||||
@@ -150,21 +167,19 @@ class DeleteRoomStatusByRoomIdRestServlet(RestServlet):
|
||||
HTTPStatus.BAD_REQUEST, "%s is not a legal room ID" % (room_id,)
|
||||
)
|
||||
|
||||
delete_ids = self._pagination_handler.get_delete_ids_by_room(room_id)
|
||||
if delete_ids is None:
|
||||
raise NotFoundError("No delete task for room_id '%s' found" % room_id)
|
||||
delete_tasks = await self._pagination_handler.get_delete_tasks_by_room(room_id)
|
||||
|
||||
response = []
|
||||
for delete_id in delete_ids:
|
||||
delete = self._pagination_handler.get_delete_status(delete_id)
|
||||
if delete:
|
||||
response += [
|
||||
{
|
||||
"delete_id": delete_id,
|
||||
**delete.asdict(),
|
||||
}
|
||||
]
|
||||
return HTTPStatus.OK, {"results": cast(JsonDict, response)}
|
||||
for delete_task in delete_tasks:
|
||||
# We ignore scheduled deletes because currently they are only used
|
||||
# for automatically purging forgotten room after X time.
|
||||
if delete_task.status != TaskStatus.SCHEDULED:
|
||||
response += [_convert_delete_task_to_response(delete_task)]
|
||||
|
||||
if response:
|
||||
return HTTPStatus.OK, {"results": cast(JsonDict, response)}
|
||||
else:
|
||||
raise NotFoundError("No delete task for room_id '%s' found" % room_id)
|
||||
|
||||
|
||||
class DeleteRoomStatusByDeleteIdRestServlet(RestServlet):
|
||||
@@ -181,11 +196,14 @@ class DeleteRoomStatusByDeleteIdRestServlet(RestServlet):
|
||||
) -> Tuple[int, JsonDict]:
|
||||
await assert_requester_is_admin(self._auth, request)
|
||||
|
||||
delete_status = self._pagination_handler.get_delete_status(delete_id)
|
||||
if delete_status is None:
|
||||
delete_task = await self._pagination_handler.get_delete_task(delete_id)
|
||||
if delete_task is None or (
|
||||
delete_task.action != "purge_room"
|
||||
and delete_task.action != "shutdown_and_purge_room"
|
||||
):
|
||||
raise NotFoundError("delete id '%s' not found" % delete_id)
|
||||
|
||||
return HTTPStatus.OK, cast(JsonDict, delete_status.asdict())
|
||||
return HTTPStatus.OK, _convert_delete_task_to_response(delete_task)
|
||||
|
||||
|
||||
class ListRoomRestServlet(RestServlet):
|
||||
@@ -349,11 +367,15 @@ class RoomRestServlet(RestServlet):
|
||||
|
||||
ret = await room_shutdown_handler.shutdown_room(
|
||||
room_id=room_id,
|
||||
new_room_user_id=content.get("new_room_user_id"),
|
||||
new_room_name=content.get("room_name"),
|
||||
message=content.get("message"),
|
||||
requester_user_id=requester.user.to_string(),
|
||||
block=block,
|
||||
params={
|
||||
"new_room_user_id": content.get("new_room_user_id"),
|
||||
"new_room_name": content.get("room_name"),
|
||||
"message": content.get("message"),
|
||||
"requester_user_id": requester.user.to_string(),
|
||||
"block": block,
|
||||
"purge": purge,
|
||||
"force_purge": force_purge,
|
||||
},
|
||||
)
|
||||
|
||||
# Purge room
|
||||
|
||||
@@ -141,6 +141,7 @@ from synapse.util.distributor import Distributor
|
||||
from synapse.util.macaroons import MacaroonGenerator
|
||||
from synapse.util.ratelimitutils import FederationRateLimiter
|
||||
from synapse.util.stringutils import random_string
|
||||
from synapse.util.task_scheduler import TaskScheduler
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -359,6 +360,7 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
"""
|
||||
for i in self.REQUIRED_ON_BACKGROUND_TASK_STARTUP:
|
||||
getattr(self, "get_" + i + "_handler")()
|
||||
self.get_task_scheduler()
|
||||
|
||||
def get_reactor(self) -> ISynapseReactor:
|
||||
"""
|
||||
@@ -912,3 +914,7 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
def get_common_usage_metrics_manager(self) -> CommonUsageMetricsManager:
|
||||
"""Usage metrics shared between phone home stats and the prometheus exporter."""
|
||||
return CommonUsageMetricsManager(self)
|
||||
|
||||
@cache_in_self
|
||||
def get_task_scheduler(self) -> TaskScheduler:
|
||||
return TaskScheduler(self)
|
||||
|
||||
@@ -70,6 +70,7 @@ from .state import StateStore
|
||||
from .stats import StatsStore
|
||||
from .stream import StreamWorkerStore
|
||||
from .tags import TagsStore
|
||||
from .task_scheduler import TaskSchedulerWorkerStore
|
||||
from .transactions import TransactionWorkerStore
|
||||
from .ui_auth import UIAuthStore
|
||||
from .user_directory import UserDirectoryStore
|
||||
@@ -127,6 +128,7 @@ class DataStore(
|
||||
CacheInvalidationWorkerStore,
|
||||
LockStore,
|
||||
SessionStore,
|
||||
TaskSchedulerWorkerStore,
|
||||
):
|
||||
def __init__(
|
||||
self,
|
||||
|
||||
@@ -0,0 +1,195 @@
|
||||
# Copyright 2023 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional
|
||||
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import (
|
||||
DatabasePool,
|
||||
LoggingDatabaseConnection,
|
||||
LoggingTransaction,
|
||||
make_in_list_sql_clause,
|
||||
)
|
||||
from synapse.types import JsonDict, JsonMapping, ScheduledTask, TaskStatus
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
|
||||
class TaskSchedulerWorkerStore(SQLBaseStore):
|
||||
def __init__(
|
||||
self,
|
||||
database: DatabasePool,
|
||||
db_conn: LoggingDatabaseConnection,
|
||||
hs: "HomeServer",
|
||||
):
|
||||
super().__init__(database, db_conn, hs)
|
||||
|
||||
@staticmethod
|
||||
def _convert_row_to_task(row: Dict[str, Any]) -> ScheduledTask:
|
||||
row["status"] = TaskStatus(row["status"])
|
||||
if row["params"] is not None:
|
||||
row["params"] = json.loads(row["params"])
|
||||
if row["result"] is not None:
|
||||
row["result"] = json.loads(row["result"])
|
||||
return ScheduledTask(**row)
|
||||
|
||||
async def get_scheduled_tasks(
|
||||
self,
|
||||
actions: Optional[List[str]] = None,
|
||||
resource_ids: Optional[List[str]] = None,
|
||||
statuses: Optional[List[TaskStatus]] = None,
|
||||
) -> List[ScheduledTask]:
|
||||
"""Get a list of scheduled tasks from the DB.
|
||||
|
||||
If an arg is `None` all tasks matching the other args will be selected.
|
||||
If an arg is an empty list, the value needs to be NULL in DB to be selected.
|
||||
|
||||
Args:
|
||||
actions: Limit the returned tasks to those specific action names
|
||||
resource_ids: Limit the returned tasks to thoe specific resource ids
|
||||
statuses: Limit the returned tasks to thoe specific statuses
|
||||
|
||||
Returns: a list of `ScheduledTask`
|
||||
"""
|
||||
|
||||
def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]:
|
||||
clauses = []
|
||||
args = []
|
||||
if actions is not None:
|
||||
clause, temp_args = make_in_list_sql_clause(
|
||||
txn.database_engine, "action", actions
|
||||
)
|
||||
clauses.append(clause)
|
||||
args.extend(temp_args)
|
||||
if resource_ids is not None:
|
||||
clause, temp_args = make_in_list_sql_clause(
|
||||
txn.database_engine, "resource_id", resource_ids
|
||||
)
|
||||
clauses.append(clause)
|
||||
args.extend(temp_args)
|
||||
if statuses is not None:
|
||||
clause, temp_args = make_in_list_sql_clause(
|
||||
txn.database_engine, "status", statuses
|
||||
)
|
||||
clauses.append(clause)
|
||||
args.extend(temp_args)
|
||||
|
||||
sql = "SELECT * FROM scheduled_tasks"
|
||||
if clauses:
|
||||
sql = sql + " WHERE " + " AND ".join(clauses)
|
||||
|
||||
txn.execute(sql, args)
|
||||
return self.db_pool.cursor_to_dict(txn)
|
||||
|
||||
rows = await self.db_pool.runInteraction(
|
||||
"get_scheduled_tasks", get_scheduled_tasks_txn
|
||||
)
|
||||
return [TaskSchedulerWorkerStore._convert_row_to_task(row) for row in rows]
|
||||
|
||||
async def upsert_scheduled_task(self, task: ScheduledTask) -> None:
|
||||
"""Upsert a specified `ScheduledTask` in the DB.
|
||||
|
||||
Args:
|
||||
task: the `ScheduledTask` to upsert
|
||||
"""
|
||||
await self.db_pool.simple_upsert(
|
||||
"scheduled_tasks",
|
||||
{"id": task.id},
|
||||
{
|
||||
"action": task.action,
|
||||
"status": task.status,
|
||||
"timestamp": task.timestamp,
|
||||
"resource_id": task.resource_id,
|
||||
"params": None if task.params is None else json.dumps(task.params),
|
||||
"result": None if task.result is None else json.dumps(task.result),
|
||||
"error": task.error,
|
||||
},
|
||||
desc="upsert_scheduled_task",
|
||||
)
|
||||
|
||||
async def update_scheduled_task(
|
||||
self,
|
||||
id: str,
|
||||
*,
|
||||
timestamp: Optional[int] = None,
|
||||
status: Optional[TaskStatus] = None,
|
||||
result: Optional[JsonMapping] = None,
|
||||
error: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""Update a scheduled task in the DB with some new value(s).
|
||||
|
||||
Args:
|
||||
id: id of the `ScheduledTask` to update
|
||||
timestamp: new timestamp of the task
|
||||
status: new status of the task
|
||||
result: new result of the task
|
||||
error: new error of the task
|
||||
"""
|
||||
updatevalues: JsonDict = {}
|
||||
if timestamp is not None:
|
||||
updatevalues["timestamp"] = timestamp
|
||||
if status is not None:
|
||||
updatevalues["status"] = status
|
||||
if result is not None:
|
||||
updatevalues["result"] = json.dumps(result)
|
||||
if error is not None:
|
||||
updatevalues["error"] = error
|
||||
nb_rows = await self.db_pool.simple_update(
|
||||
"scheduled_tasks",
|
||||
{"id": id},
|
||||
updatevalues,
|
||||
desc="update_scheduled_task",
|
||||
)
|
||||
return nb_rows > 0
|
||||
|
||||
async def get_scheduled_task(self, id: str) -> Optional[ScheduledTask]:
|
||||
"""Get a specific `ScheduledTask` from its id.
|
||||
|
||||
Args:
|
||||
id: the id of the task to retrieve
|
||||
|
||||
Returns: the task if available, `None` otherwise
|
||||
"""
|
||||
row = await self.db_pool.simple_select_one(
|
||||
table="scheduled_tasks",
|
||||
keyvalues={"id": id},
|
||||
retcols=(
|
||||
"id",
|
||||
"action",
|
||||
"status",
|
||||
"timestamp",
|
||||
"resource_id",
|
||||
"params",
|
||||
"result",
|
||||
"error",
|
||||
),
|
||||
allow_none=True,
|
||||
desc="get_scheduled_task",
|
||||
)
|
||||
|
||||
return TaskSchedulerWorkerStore._convert_row_to_task(row) if row else None
|
||||
|
||||
async def delete_scheduled_task(self, id: str) -> None:
|
||||
"""Delete a specific task from its id.
|
||||
|
||||
Args:
|
||||
id: the id of the task to delete
|
||||
"""
|
||||
await self.db_pool.simple_delete(
|
||||
"scheduled_tasks",
|
||||
keyvalues={"id": id},
|
||||
desc="delete_scheduled_task",
|
||||
)
|
||||
@@ -0,0 +1,26 @@
|
||||
/* Copyright 2023 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- cf ScheduledTask docstring for the meaning of the fields.
|
||||
CREATE TABLE IF NOT EXISTS scheduled_tasks(
|
||||
id text PRIMARY KEY,
|
||||
action text NOT NULL,
|
||||
status text NOT NULL,
|
||||
timestamp bigint NOT NULL,
|
||||
resource_id text,
|
||||
params text,
|
||||
result text,
|
||||
error text
|
||||
);
|
||||
@@ -15,6 +15,7 @@
|
||||
import abc
|
||||
import re
|
||||
import string
|
||||
from enum import Enum
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
AbstractSet,
|
||||
@@ -967,3 +968,40 @@ class UserProfile(TypedDict):
|
||||
class RetentionPolicy:
|
||||
min_lifetime: Optional[int] = None
|
||||
max_lifetime: Optional[int] = None
|
||||
|
||||
|
||||
class TaskStatus(str, Enum):
|
||||
"""Status of a scheduled task"""
|
||||
|
||||
# Task is scheduled but not active
|
||||
SCHEDULED = "scheduled"
|
||||
# Task is active and probably running, and if not
|
||||
# will be run on next scheduler loop run
|
||||
ACTIVE = "active"
|
||||
# Task has completed successfully
|
||||
COMPLETE = "complete"
|
||||
# Task is over and either returned a failed status, or had an exception
|
||||
FAILED = "failed"
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True, frozen=True, slots=True)
|
||||
class ScheduledTask:
|
||||
"""Description of a scheduled task"""
|
||||
|
||||
# id used to identify the task
|
||||
id: str
|
||||
# name of the action to be run by this task
|
||||
action: str
|
||||
# current status of this task
|
||||
status: TaskStatus
|
||||
# if the status is SCHEDULED then this represents when it should be launched,
|
||||
# otherwise it represents the last time this task got a change of state
|
||||
timestamp: int
|
||||
# Optionally bind a task to some resource id for easy retrieval
|
||||
resource_id: Optional[str]
|
||||
# Optional parameters that will be passed to the function ran by the task
|
||||
params: Optional[JsonMapping]
|
||||
# Optional result that can be updated by the running task
|
||||
result: Optional[JsonMapping]
|
||||
# Optional error that should be assigned a value when the status is FAILED
|
||||
error: Optional[str]
|
||||
|
||||
@@ -0,0 +1,266 @@
|
||||
# Copyright 2023 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set, Tuple
|
||||
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.types import JsonMapping, ScheduledTask, TaskStatus
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TaskScheduler:
|
||||
# Precision of the scheduler, evaluation of tasks to run will only happen
|
||||
# every `SCHEDULE_INTERVAL_MS` ms
|
||||
SCHEDULE_INTERVAL_MS = 5 * 60 * 1000 # 5mn
|
||||
CLEAN_INTERVAL_MS = 60 * 60 * 1000 # 1hr
|
||||
# Time before a complete or failed task is deleted from the DB
|
||||
KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self.store = hs.get_datastores().main
|
||||
self.clock = hs.get_clock()
|
||||
self.running_tasks: Set[str] = set()
|
||||
self.actions: Dict[
|
||||
str,
|
||||
Callable[
|
||||
[ScheduledTask, bool],
|
||||
Awaitable[Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]],
|
||||
],
|
||||
] = {}
|
||||
self.run_background_tasks = hs.config.worker.run_background_tasks
|
||||
|
||||
if self.run_background_tasks:
|
||||
self.clock.looping_call(
|
||||
run_as_background_process,
|
||||
TaskScheduler.SCHEDULE_INTERVAL_MS,
|
||||
"run_scheduled_tasks",
|
||||
self._run_scheduled_tasks,
|
||||
)
|
||||
self.clock.looping_call(
|
||||
run_as_background_process,
|
||||
TaskScheduler.CLEAN_INTERVAL_MS,
|
||||
"clean_scheduled_tasks",
|
||||
self._clean_scheduled_tasks,
|
||||
)
|
||||
|
||||
def register_action(
|
||||
self,
|
||||
function: Callable[
|
||||
[ScheduledTask, bool],
|
||||
Awaitable[Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]],
|
||||
],
|
||||
action_name: str,
|
||||
) -> None:
|
||||
"""Register a function to be executed when an action is scheduled with
|
||||
the specified action name.
|
||||
|
||||
Actions need to be registered as early as possible so that a resumed action
|
||||
can find its matching function. It's usually better to NOT do that right before
|
||||
calling `schedule_task` but rather in an `__init__` method.
|
||||
|
||||
Args:
|
||||
function: The function to be executed for this action. The parameters
|
||||
passed to the function when launched are the `ScheduledTask` being run,
|
||||
and a `first_launch` boolean to signal if it's a resumed task or the first
|
||||
launch of it. The function should return a tuple of new `status`, `result`
|
||||
and `error` as specified in `ScheduledTask`.
|
||||
action_name: The name of the action to be associated with the function
|
||||
"""
|
||||
self.actions[action_name] = function
|
||||
|
||||
async def schedule_task(
|
||||
self,
|
||||
action: str,
|
||||
*,
|
||||
resource_id: Optional[str] = None,
|
||||
timestamp: Optional[int] = None,
|
||||
params: Optional[JsonMapping] = None,
|
||||
) -> str:
|
||||
"""Schedule a new potentially resumable task. A function matching the specified
|
||||
`action` should have been previously registered with `register_action`.
|
||||
|
||||
Args:
|
||||
action: the name of a previously registered action
|
||||
resource_id: a task can be associated with a resource id to facilitate
|
||||
getting all tasks associated with a specific resource
|
||||
timestamp: if `None`, the task will be launched immediately, otherwise it
|
||||
will be launch after the `timestamp` value. Note that this scheduler
|
||||
is not meant to be precise, and the scheduling could be delayed if
|
||||
too many tasks are already running
|
||||
params: a set of parameters that can be easily accessed from inside the
|
||||
executed function
|
||||
|
||||
Returns: the id of the scheduled task
|
||||
"""
|
||||
if action not in self.actions:
|
||||
raise Exception(
|
||||
f"No function associated with the action {action} of the scheduled task"
|
||||
)
|
||||
|
||||
launch_now = False
|
||||
if timestamp is None or timestamp < self.clock.time_msec():
|
||||
timestamp = self.clock.time_msec()
|
||||
launch_now = True
|
||||
|
||||
task = ScheduledTask(
|
||||
random_string(16),
|
||||
action,
|
||||
TaskStatus.SCHEDULED,
|
||||
timestamp,
|
||||
resource_id,
|
||||
params,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
await self.store.upsert_scheduled_task(task)
|
||||
|
||||
if launch_now and self.run_background_tasks:
|
||||
await self._launch_task(task, True)
|
||||
|
||||
return task.id
|
||||
|
||||
async def update_task(
|
||||
self,
|
||||
id: str,
|
||||
*,
|
||||
timestamp: Optional[int] = None,
|
||||
status: Optional[TaskStatus] = None,
|
||||
result: Optional[JsonMapping] = None,
|
||||
error: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""Update some task associated values.
|
||||
|
||||
This is used internally, and also exposed publically so it can be used inside task functions.
|
||||
This allows to store in DB the progress of a task so it can be resumed properly after a restart of synapse.
|
||||
|
||||
Args:
|
||||
id: the id of the task to update
|
||||
status: the new `TaskStatus` of the task
|
||||
result: the new result of the task
|
||||
error: the new error of the task
|
||||
"""
|
||||
if timestamp is None:
|
||||
timestamp = self.clock.time_msec()
|
||||
return await self.store.update_scheduled_task(
|
||||
id,
|
||||
timestamp=timestamp,
|
||||
status=status,
|
||||
result=result,
|
||||
error=error,
|
||||
)
|
||||
|
||||
async def get_task(self, id: str) -> Optional[ScheduledTask]:
|
||||
"""Get a specific task description by id.
|
||||
|
||||
Args:
|
||||
id: the id of the task to retrieve
|
||||
|
||||
Returns: the task description or `None` if it doesn't exist
|
||||
or it has already been cleaned
|
||||
"""
|
||||
return await self.store.get_scheduled_task(id)
|
||||
|
||||
async def get_tasks(
|
||||
self,
|
||||
actions: Optional[List[str]] = None,
|
||||
resource_ids: Optional[List[str]] = None,
|
||||
statuses: Optional[List[TaskStatus]] = None,
|
||||
) -> List[ScheduledTask]:
|
||||
"""Get a list of tasks associated with some action name(s) and/or
|
||||
with some resource id(s).
|
||||
|
||||
Args:
|
||||
action: the action name of the tasks to retrieve
|
||||
resource_id: if `None`, returns all associated tasks for
|
||||
the specified action name, regardless of the resource id
|
||||
|
||||
Returns: a list of `ScheduledTask`
|
||||
"""
|
||||
return await self.store.get_scheduled_tasks(actions, resource_ids, statuses)
|
||||
|
||||
async def _run_scheduled_tasks(self) -> None:
|
||||
"""Main loop taking care of launching the scheduled tasks when needed."""
|
||||
for task in await self.store.get_scheduled_tasks(
|
||||
statuses=[TaskStatus.SCHEDULED, TaskStatus.ACTIVE]
|
||||
):
|
||||
if task.id not in self.running_tasks:
|
||||
if (
|
||||
task.status == TaskStatus.SCHEDULED
|
||||
and task.timestamp < self.clock.time_msec()
|
||||
):
|
||||
await self._launch_task(task, True)
|
||||
elif task.status == TaskStatus.ACTIVE:
|
||||
await self._launch_task(task, False)
|
||||
|
||||
async def _clean_scheduled_tasks(self) -> None:
|
||||
"""Clean loop taking care of removing old complete or failed jobs to avoid clutter the DB."""
|
||||
for task in await self.store.get_scheduled_tasks(
|
||||
statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE]
|
||||
):
|
||||
if task.id not in self.running_tasks:
|
||||
if (
|
||||
task.status == TaskStatus.COMPLETE
|
||||
or task.status == TaskStatus.FAILED
|
||||
) and self.clock.time_msec() > task.timestamp + TaskScheduler.KEEP_TASKS_FOR_MS:
|
||||
await self.store.delete_scheduled_task(task.id)
|
||||
|
||||
async def _launch_task(self, task: ScheduledTask, first_launch: bool) -> None:
|
||||
"""Launch a scheduled task now.
|
||||
|
||||
Args:
|
||||
task: the task to launch
|
||||
first_launch: `True` if it's the first time is launched, `False` otherwise
|
||||
"""
|
||||
if task.action not in self.actions:
|
||||
raise Exception(
|
||||
f"No function associated with the action {task.action} of the scheduled task"
|
||||
)
|
||||
|
||||
function = self.actions[task.action]
|
||||
|
||||
async def wrapper() -> None:
|
||||
try:
|
||||
(status, result, error) = await function(task, first_launch)
|
||||
except Exception:
|
||||
f = Failure()
|
||||
logger.error(
|
||||
f"scheduled task {task.id} failed",
|
||||
exc_info=(f.type, f.value, f.getTracebackObject()),
|
||||
)
|
||||
status = TaskStatus.FAILED
|
||||
result = None
|
||||
error = f.getErrorMessage()
|
||||
|
||||
await self.update_task(
|
||||
task.id,
|
||||
status=status,
|
||||
result=result,
|
||||
error=error,
|
||||
)
|
||||
self.running_tasks.remove(task.id)
|
||||
|
||||
await self.update_task(task.id, status=TaskStatus.ACTIVE)
|
||||
self.running_tasks.add(task.id)
|
||||
description = task.action
|
||||
if task.resource_id:
|
||||
description += f"-{task.resource_id}"
|
||||
run_as_background_process(description, wrapper)
|
||||
+129
-53
@@ -24,17 +24,20 @@ from twisted.test.proto_helpers import MemoryReactor
|
||||
import synapse.rest.admin
|
||||
from synapse.api.constants import EventTypes, Membership, RoomTypes
|
||||
from synapse.api.errors import Codes
|
||||
from synapse.handlers.pagination import PaginationHandler, PurgeStatus
|
||||
from synapse.rest.client import directory, events, login, room
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import UserID
|
||||
from synapse.util import Clock
|
||||
from synapse.util.stringutils import random_string
|
||||
from synapse.util.task_scheduler import TaskScheduler
|
||||
|
||||
from tests import unittest
|
||||
|
||||
"""Tests admin REST events for /rooms paths."""
|
||||
|
||||
|
||||
ONE_HOUR_IN_S = 3600
|
||||
|
||||
|
||||
class DeleteRoomTestCase(unittest.HomeserverTestCase):
|
||||
servlets = [
|
||||
synapse.rest.admin.register_servlets,
|
||||
@@ -46,6 +49,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.event_creation_handler = hs.get_event_creation_handler()
|
||||
self.task_scheduler = hs.get_task_scheduler()
|
||||
hs.config.consent.user_consent_version = "1"
|
||||
|
||||
consent_uri_builder = Mock()
|
||||
@@ -476,6 +480,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.event_creation_handler = hs.get_event_creation_handler()
|
||||
self.task_scheduler = hs.get_task_scheduler()
|
||||
hs.config.consent.user_consent_version = "1"
|
||||
|
||||
consent_uri_builder = Mock()
|
||||
@@ -502,6 +507,9 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
|
||||
)
|
||||
self.url_status_by_delete_id = "/_synapse/admin/v2/rooms/delete_status/"
|
||||
|
||||
self.room_member_handler = hs.get_room_member_handler()
|
||||
self.pagination_handler = hs.get_pagination_handler()
|
||||
|
||||
@parameterized.expand(
|
||||
[
|
||||
("DELETE", "/_synapse/admin/v2/rooms/%s"),
|
||||
@@ -661,7 +669,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
|
||||
delete_id1 = channel.json_body["delete_id"]
|
||||
|
||||
# go ahead
|
||||
self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2)
|
||||
self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2)
|
||||
|
||||
# second task
|
||||
channel = self.make_request(
|
||||
@@ -686,12 +694,14 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
|
||||
self.assertEqual(2, len(channel.json_body["results"]))
|
||||
self.assertEqual("complete", channel.json_body["results"][0]["status"])
|
||||
self.assertEqual("complete", channel.json_body["results"][1]["status"])
|
||||
self.assertEqual(delete_id1, channel.json_body["results"][0]["delete_id"])
|
||||
self.assertEqual(delete_id2, channel.json_body["results"][1]["delete_id"])
|
||||
delete_ids = {delete_id1, delete_id2}
|
||||
self.assertTrue(channel.json_body["results"][0]["delete_id"] in delete_ids)
|
||||
delete_ids.remove(channel.json_body["results"][0]["delete_id"])
|
||||
self.assertTrue(channel.json_body["results"][1]["delete_id"] in delete_ids)
|
||||
|
||||
# get status after more than clearing time for first task
|
||||
# second task is not cleared
|
||||
self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2)
|
||||
self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2)
|
||||
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
@@ -705,7 +715,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
|
||||
self.assertEqual(delete_id2, channel.json_body["results"][0]["delete_id"])
|
||||
|
||||
# get status after more than clearing time for all tasks
|
||||
self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2)
|
||||
self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2)
|
||||
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
@@ -716,48 +726,6 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
|
||||
self.assertEqual(404, channel.code, msg=channel.json_body)
|
||||
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
|
||||
|
||||
def test_delete_same_room_twice(self) -> None:
|
||||
"""Test that the call for delete a room at second time gives an exception."""
|
||||
|
||||
body = {"new_room_user_id": self.admin_user}
|
||||
|
||||
# first call to delete room
|
||||
# and do not wait for finish the task
|
||||
first_channel = self.make_request(
|
||||
"DELETE",
|
||||
self.url.encode("ascii"),
|
||||
content=body,
|
||||
access_token=self.admin_user_tok,
|
||||
await_result=False,
|
||||
)
|
||||
|
||||
# second call to delete room
|
||||
second_channel = self.make_request(
|
||||
"DELETE",
|
||||
self.url.encode("ascii"),
|
||||
content=body,
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(400, second_channel.code, msg=second_channel.json_body)
|
||||
self.assertEqual(Codes.UNKNOWN, second_channel.json_body["errcode"])
|
||||
self.assertEqual(
|
||||
f"History purge already in progress for {self.room_id}",
|
||||
second_channel.json_body["error"],
|
||||
)
|
||||
|
||||
# get result of first call
|
||||
first_channel.await_result()
|
||||
self.assertEqual(200, first_channel.code, msg=first_channel.json_body)
|
||||
self.assertIn("delete_id", first_channel.json_body)
|
||||
|
||||
# check status after finish the task
|
||||
self._test_result(
|
||||
first_channel.json_body["delete_id"],
|
||||
self.other_user,
|
||||
expect_new_room=True,
|
||||
)
|
||||
|
||||
def test_purge_room_and_block(self) -> None:
|
||||
"""Test to purge a room and block it.
|
||||
Members will not be moved to a new room and will not receive a message.
|
||||
@@ -972,6 +940,117 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
|
||||
# Assert we can no longer peek into the room
|
||||
self._assert_peek(self.room_id, expect_code=403)
|
||||
|
||||
@unittest.override_config({"purge_retention_period": "1d"})
|
||||
def test_purge_forgotten_room(self) -> None:
|
||||
# Create a test room
|
||||
room_id = self.helper.create_room_as(
|
||||
self.admin_user,
|
||||
tok=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.helper.leave(room_id, user=self.admin_user, tok=self.admin_user_tok)
|
||||
self.get_success(
|
||||
self.room_member_handler.forget(
|
||||
UserID.from_string(self.admin_user), room_id
|
||||
)
|
||||
)
|
||||
|
||||
# Test that room is not yet purged
|
||||
with self.assertRaises(AssertionError):
|
||||
self._is_purged(room_id)
|
||||
|
||||
# Advance 24 hours in the future, past the `purge_retention_period`
|
||||
self.reactor.advance(24 * ONE_HOUR_IN_S)
|
||||
|
||||
self._is_purged(room_id)
|
||||
|
||||
def test_scheduled_purge_room(self) -> None:
|
||||
# Create a test room
|
||||
room_id = self.helper.create_room_as(
|
||||
self.admin_user,
|
||||
tok=self.admin_user_tok,
|
||||
)
|
||||
self.helper.leave(room_id, user=self.admin_user, tok=self.admin_user_tok)
|
||||
|
||||
# Schedule a purge 10 seconds in the future
|
||||
self.get_success(
|
||||
self.task_scheduler.schedule_task(
|
||||
"purge_room",
|
||||
resource_id=room_id,
|
||||
timestamp=self.clock.time_msec() + 10 * 1000,
|
||||
)
|
||||
)
|
||||
|
||||
# Test that room is not yet purged
|
||||
with self.assertRaises(AssertionError):
|
||||
self._is_purged(room_id)
|
||||
|
||||
# Advance one hour in the future past `TaskScheduler.SCHEDULE_INTERVAL_MS` so that
|
||||
# the automatic purging takes place and launch the purge
|
||||
self.reactor.advance(ONE_HOUR_IN_S)
|
||||
|
||||
self._is_purged(room_id)
|
||||
|
||||
def test_schedule_shutdown_room(self) -> None:
|
||||
# Create a test room
|
||||
room_id = self.helper.create_room_as(
|
||||
self.other_user,
|
||||
tok=self.other_user_tok,
|
||||
)
|
||||
|
||||
# Schedule a shutdown 10 seconds in the future
|
||||
delete_id = self.get_success(
|
||||
self.task_scheduler.schedule_task(
|
||||
"shutdown_and_purge_room",
|
||||
resource_id=room_id,
|
||||
params={
|
||||
"requester_user_id": self.admin_user,
|
||||
"new_room_user_id": self.admin_user,
|
||||
"new_room_name": None,
|
||||
"message": None,
|
||||
"block": False,
|
||||
"purge": True,
|
||||
"force_purge": True,
|
||||
},
|
||||
timestamp=self.clock.time_msec() + 10 * 1000,
|
||||
)
|
||||
)
|
||||
|
||||
# Test that room is not yet shutdown
|
||||
self._is_member(room_id, self.other_user)
|
||||
|
||||
# Test that room is not yet purged
|
||||
with self.assertRaises(AssertionError):
|
||||
self._is_purged(room_id)
|
||||
|
||||
# Advance one hour in the future past `TaskScheduler.SCHEDULE_INTERVAL_MS` so that
|
||||
# the automatic purging takes place and resumes the purge
|
||||
self.reactor.advance(ONE_HOUR_IN_S)
|
||||
|
||||
# Test that all users has been kicked (room is shutdown)
|
||||
self._has_no_members(room_id)
|
||||
|
||||
self._is_purged(room_id)
|
||||
|
||||
# Retrieve delete results
|
||||
result = self.make_request(
|
||||
"GET",
|
||||
self.url_status_by_delete_id + delete_id,
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
self.assertEqual(200, result.code, msg=result.json_body)
|
||||
|
||||
# Check that the user is in kicked_users
|
||||
self.assertIn(
|
||||
self.other_user, result.json_body["shutdown_room"]["kicked_users"]
|
||||
)
|
||||
|
||||
new_room_id = result.json_body["shutdown_room"]["new_room_id"]
|
||||
self.assertTrue(new_room_id)
|
||||
|
||||
# Check that the user is actually in the new room
|
||||
self._is_member(new_room_id, self.other_user)
|
||||
|
||||
def _is_blocked(self, room_id: str, expect: bool = True) -> None:
|
||||
"""Assert that the room is blocked or not"""
|
||||
d = self.store.is_room_blocked(room_id)
|
||||
@@ -1957,11 +2036,8 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase):
|
||||
self.assertEqual(len(chunk), 2, [event["content"] for event in chunk])
|
||||
|
||||
# Purge every event before the second event.
|
||||
purge_id = random_string(16)
|
||||
pagination_handler._purges_by_id[purge_id] = PurgeStatus()
|
||||
self.get_success(
|
||||
pagination_handler._purge_history(
|
||||
purge_id=purge_id,
|
||||
pagination_handler.purge_history(
|
||||
room_id=self.room_id,
|
||||
token=second_token_str,
|
||||
delete_local_events=True,
|
||||
|
||||
@@ -22,6 +22,7 @@ from synapse.server import HomeServer
|
||||
from synapse.storage.roommember import RoomsForUser
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import Clock
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
from tests import unittest
|
||||
from tests.unittest import override_config
|
||||
@@ -413,11 +414,24 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
|
||||
self.assertEqual(messages[0]["content"]["body"], "test msg one")
|
||||
self.assertEqual(messages[0]["sender"], "@notices:test")
|
||||
|
||||
random_string(16)
|
||||
|
||||
# shut down and purge room
|
||||
self.get_success(
|
||||
self.room_shutdown_handler.shutdown_room(first_room_id, self.admin_user)
|
||||
self.room_shutdown_handler.shutdown_room(
|
||||
first_room_id,
|
||||
{
|
||||
"requester_user_id": self.admin_user,
|
||||
"new_room_user_id": None,
|
||||
"new_room_name": None,
|
||||
"message": None,
|
||||
"block": False,
|
||||
"purge": True,
|
||||
"force_purge": False,
|
||||
},
|
||||
)
|
||||
)
|
||||
self.get_success(self.pagination_handler.purge_room(first_room_id))
|
||||
self.get_success(self.pagination_handler.purge_room(first_room_id, force=False))
|
||||
|
||||
# user is not member anymore
|
||||
self._check_invite_and_join_status(self.other_user, 0, 0)
|
||||
|
||||
@@ -41,7 +41,6 @@ from synapse.api.errors import Codes, HttpResponseException
|
||||
from synapse.appservice import ApplicationService
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.handlers.pagination import PurgeStatus
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client import account, directory, login, profile, register, room, sync
|
||||
from synapse.server import HomeServer
|
||||
@@ -2089,11 +2088,8 @@ class RoomMessageListTestCase(RoomBase):
|
||||
self.assertEqual(len(chunk), 2, [event["content"] for event in chunk])
|
||||
|
||||
# Purge every event before the second event.
|
||||
purge_id = random_string(16)
|
||||
pagination_handler._purges_by_id[purge_id] = PurgeStatus()
|
||||
self.get_success(
|
||||
pagination_handler._purge_history(
|
||||
purge_id=purge_id,
|
||||
pagination_handler.purge_history(
|
||||
room_id=self.room_id,
|
||||
token=second_token_str,
|
||||
delete_local_events=True,
|
||||
|
||||
@@ -0,0 +1,132 @@
|
||||
# Copyright 2023 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from twisted.internet.task import deferLater
|
||||
from twisted.test.proto_helpers import MemoryReactor
|
||||
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import JsonMapping, ScheduledTask, TaskStatus
|
||||
from synapse.util import Clock
|
||||
from synapse.util.task_scheduler import TaskScheduler
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
class TestTaskScheduler(unittest.HomeserverTestCase):
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
|
||||
self.task_scheduler = hs.get_task_scheduler()
|
||||
self.task_scheduler.register_action(self._test_task, "_test_task")
|
||||
self.task_scheduler.register_action(self._raising_task, "_raising_task")
|
||||
self.task_scheduler.register_action(self._resumable_task, "_resumable_task")
|
||||
|
||||
async def _test_task(
|
||||
self, task: ScheduledTask, first_launch: bool
|
||||
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
|
||||
# This test task will copy the parameters to the result
|
||||
result = None
|
||||
if task.params:
|
||||
result = task.params
|
||||
return (TaskStatus.COMPLETE, result, None)
|
||||
|
||||
def test_schedule_task(self) -> None:
|
||||
"""Schedule a task in the future with some parameters to be copied as a result and check it executed correctly.
|
||||
Also check that it get removed after `KEEP_TASKS_FOR_MS`."""
|
||||
timestamp = self.clock.time_msec() + 2 * 60 * 1000
|
||||
task_id = self.get_success(
|
||||
self.task_scheduler.schedule_task(
|
||||
"_test_task",
|
||||
timestamp=timestamp,
|
||||
params={"val": 1},
|
||||
)
|
||||
)
|
||||
|
||||
task = self.get_success(self.task_scheduler.get_task(task_id))
|
||||
assert task is not None
|
||||
self.assertEqual(task.status, TaskStatus.SCHEDULED)
|
||||
self.assertIsNone(task.result)
|
||||
|
||||
# The timestamp being 2mn after now the task should been executed
|
||||
# after the first scheduling loop is run
|
||||
self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000) + 1)
|
||||
|
||||
task = self.get_success(self.task_scheduler.get_task(task_id))
|
||||
assert task is not None
|
||||
self.assertEqual(task.status, TaskStatus.COMPLETE)
|
||||
assert task.result is not None
|
||||
# The passed parameter should have been copied to the result
|
||||
self.assertTrue(task.result.get("val") == 1)
|
||||
|
||||
# Let's wait for the complete task to be deleted and hence unavailable
|
||||
self.reactor.advance((TaskScheduler.KEEP_TASKS_FOR_MS / 1000) + 1)
|
||||
|
||||
task = self.get_success(self.task_scheduler.get_task(task_id))
|
||||
self.assertIsNone(task)
|
||||
|
||||
def test_schedule_task_now(self) -> None:
|
||||
"""Schedule a task now and check it runs fine to completion."""
|
||||
task_id = self.get_success(
|
||||
self.task_scheduler.schedule_task("_test_task", params={"val": 1})
|
||||
)
|
||||
|
||||
task = self.get_success(self.task_scheduler.get_task(task_id))
|
||||
assert task is not None
|
||||
self.assertEqual(task.status, TaskStatus.COMPLETE)
|
||||
assert task.result is not None
|
||||
self.assertTrue(task.result.get("val") == 1)
|
||||
|
||||
async def _raising_task(
|
||||
self, task: ScheduledTask, first_launch: bool
|
||||
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
|
||||
raise Exception("raising")
|
||||
|
||||
def test_schedule_raising_task_now(self) -> None:
|
||||
"""Schedule a task raising an exception and check it runs to failure and report exception content."""
|
||||
task_id = self.get_success(self.task_scheduler.schedule_task("_raising_task"))
|
||||
|
||||
task = self.get_success(self.task_scheduler.get_task(task_id))
|
||||
assert task is not None
|
||||
self.assertEqual(task.status, TaskStatus.FAILED)
|
||||
self.assertEqual(task.error, "raising")
|
||||
|
||||
async def _resumable_task(
|
||||
self, task: ScheduledTask, first_launch: bool
|
||||
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
|
||||
if task.result and "in_progress" in task.result:
|
||||
return TaskStatus.COMPLETE, {"success": True}, None
|
||||
else:
|
||||
await self.task_scheduler.update_task(task.id, result={"in_progress": True})
|
||||
# Await forever to simulate an aborted task because of a restart
|
||||
await deferLater(self.reactor, 2**16, None)
|
||||
# This should never been called
|
||||
return TaskStatus.ACTIVE, None, None
|
||||
|
||||
def test_schedule_resumable_task_now(self) -> None:
|
||||
"""Schedule a resumable task and check that it gets properly resumed and complete after simulating a synapse restart."""
|
||||
task_id = self.get_success(self.task_scheduler.schedule_task("_resumable_task"))
|
||||
|
||||
task = self.get_success(self.task_scheduler.get_task(task_id))
|
||||
assert task is not None
|
||||
self.assertEqual(task.status, TaskStatus.ACTIVE)
|
||||
|
||||
# Simulate a synapse restart by emptying the list of running tasks
|
||||
self.task_scheduler.running_tasks = set()
|
||||
self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000) + 1)
|
||||
|
||||
task = self.get_success(self.task_scheduler.get_task(task_id))
|
||||
assert task is not None
|
||||
self.assertEqual(task.status, TaskStatus.COMPLETE)
|
||||
assert task.result is not None
|
||||
self.assertTrue(task.result.get("success"))
|
||||
Reference in New Issue
Block a user