1
0

Compare commits

...

31 Commits

Author SHA1 Message Date
Mathieu Velten 5065d7df75 Use task scheduler 2023-07-25 12:24:55 +02:00
Mathieu Velten c8b8c96b6e Merge branch 'mv/task-scheduler' into mv/purge-room-when-forgotten-wip 2023-07-24 21:54:40 +02:00
Mathieu Velten 0961f52c57 Add filters to task retrieval + clean less often 2023-07-24 21:53:13 +02:00
Mathieu Velten 1a6a0cbb1c Merge branch 'mv/task-scheduler' into mv/purge-room-when-forgotten 2023-07-12 00:11:30 +02:00
Mathieu Velten a5a74d8b80 Merge remote-tracking branch 'origin/develop' into mv/purge-room-when-forgotten 2023-07-12 00:11:18 +02:00
Mathieu Velten 470f385419 Implements a task scheduler for resumable potentially long running tasks 2023-07-11 16:26:01 +02:00
Mathieu Velten 1d6f3c5429 Add log 2023-07-03 10:40:31 +02:00
Mathieu Velten 648b211d6a Fix purge history response format 2023-07-02 22:54:34 +02:00
Mathieu Velten a78f112faa Fix error handling 2023-06-30 16:27:45 +02:00
Mathieu Velten 32e976f226 Comment 2023-06-30 16:03:58 +02:00
Mathieu Velten 5bce6397aa Use DB for all purge/shutdown actions, including purge history 2023-06-30 15:58:11 +02:00
Mathieu Velten 0020b333f9 comments 2023-06-09 15:26:08 +02:00
Mathieu Velten c155eefe19 rename var 2023-06-09 15:20:15 +02:00
Mathieu Velten 5b83e0df39 Address comments 2023-06-09 15:14:59 +02:00
Mathieu Velten 0d7b4505bd Less quotes 2023-06-09 15:05:50 +02:00
Mathieu Velten ede8b17e11 Merge branch 'develop' into mv/purge-room-when-forgotten 2023-06-09 14:59:36 +02:00
Mathieu Velten dfdca3f9e0 Apply suggestions from code review
Co-authored-by: Eric Eastwood <erice@element.io>
2023-06-09 14:59:14 +02:00
Mathieu Velten a90a189ee3 Update synapse/handlers/pagination.py
Co-authored-by: Eric Eastwood <erice@element.io>
2023-06-09 14:56:45 +02:00
Mathieu Velten 8b837e1ea1 Apply suggestions from code review
Co-authored-by: Eric Eastwood <erice@element.io>
2023-06-08 17:15:08 +02:00
Mathieu Velten c0e0a0a231 Add tests 2023-05-12 15:52:41 +02:00
Mathieu Velten a5f7a031c0 Fix test 2023-05-12 15:52:41 +02:00
Mathieu Velten 7a6daa51bd Add changelog 2023-05-12 15:52:41 +02:00
Mathieu Velten 74a0e7ab8f Mark locally forgotten rooms for purging after purge_retention_period 2023-05-12 15:52:41 +02:00
Mathieu Velten 490beb7b16 Don't return scheduled purges in the admin API 2023-05-12 15:52:41 +02:00
Mathieu Velten 3d2a61d00f Break circular dependency 2023-05-12 15:52:41 +02:00
Mathieu Velten d05bc56a21 Restore purge and shutdown from DB on startup
It will also launch scheduled purge (`wait_purge` status) hourly
2023-05-12 15:52:41 +02:00
Mathieu Velten da987b4acc Save shutdown and purge state in DB 2023-05-12 15:52:41 +02:00
Mathieu Velten 7d9665e9ec Use delete_id and ShutdownRoomParams everywhere 2023-05-12 15:52:41 +02:00
Mathieu Velten 7031a23c41 Move DeleteStatus and add ShutdownRoomParams
DeleteStatus is also changed to remove the integer representation which
seems to complicate the code with no benefits.
2023-05-12 15:52:41 +02:00
Mathieu Velten a59eb8ccb1 Add purge_retention_period config 2023-05-12 15:52:41 +02:00
Mathieu Velten 34adad8e80 Add rooms_to_purge table and related methods 2023-05-12 15:52:41 +02:00
20 changed files with 1226 additions and 461 deletions
+1
View File
@@ -0,0 +1 @@
Add automatic purge after all users forgotten a room. Also add restore of purge/shutdown rooms after a synapse restart.
+1
View File
@@ -0,0 +1 @@
Implements a task scheduler for resumable potentially long running tasks.
@@ -923,7 +923,7 @@ allowed_avatar_mimetypes: ["image/png", "image/jpeg", "image/gif"]
How long to keep redacted events in unredacted form in the database. After
this period redacted events get replaced with their redacted form in the DB.
Synapse will check whether the rentention period has concluded for redacted
Synapse will check whether the retention period has concluded for redacted
events every 5 minutes. Thus, even if this option is set to `0`, Synapse may
still take up to 5 minutes to purge redacted events from the database.
@@ -934,6 +934,23 @@ Example configuration:
redaction_retention_period: 28d
```
---
---
### `purge_retention_period`
How long to keep locally forgotten room in the DB. After this period the room
will be fully purged from the DB.
Synapse will check whether the retention period has concluded for room
purges every hour. Thus, even if this option is set to `0`, Synapse may
still take up to one hour to purge forgotten rooms from the database.
Defaults to `7d`. Set to `null` to disable.
Example configuration:
```yaml
purge_retention_period: 28d
```
---
### `user_ips_max_age`
How long to track users' last seen time and IPs in the database.
+2
View File
@@ -91,6 +91,7 @@ from synapse.storage.databases.main.state import StateGroupWorkerStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.databases.main.tags import TagsWorkerStore
from synapse.storage.databases.main.task_scheduler import TaskSchedulerWorkerStore
from synapse.storage.databases.main.transactions import TransactionWorkerStore
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
from synapse.storage.databases.main.user_directory import UserDirectoryStore
@@ -144,6 +145,7 @@ class GenericWorkerStore(
TransactionWorkerStore,
LockStore,
SessionStore,
TaskSchedulerWorkerStore,
):
# Properties that multiple storage classes define. Tell mypy what the
# expected type is.
+9
View File
@@ -486,6 +486,15 @@ class ServerConfig(Config):
else:
self.redaction_retention_period = None
# How long to keep locally forgotten rooms before purging them.
purge_retention_period = config.get("purge_retention_period", "7d")
if purge_retention_period is not None:
self.purge_retention_period: Optional[int] = self.parse_duration(
purge_retention_period
)
else:
self.purge_retention_period = None
# How long to keep entries in the `users_ips` table.
user_ips_max_age = config.get("user_ips_max_age", "28d")
if user_ips_max_age is not None:
+143 -294
View File
@@ -13,9 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Dict, List, Optional, Set
import attr
from typing import TYPE_CHECKING, List, Optional, Set, Tuple
from twisted.python.failure import Failure
@@ -23,12 +21,19 @@ from synapse.api.constants import Direction, EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
from synapse.events.utils import SerializeEventConfig
from synapse.handlers.room import ShutdownRoomResponse
from synapse.handlers.room import ShutdownRoomParams
from synapse.logging.opentracing import trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.admin._base import assert_user_is_admin
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType
from synapse.types import (
JsonDict,
JsonMapping,
Requester,
ScheduledTask,
StreamKeyType,
TaskStatus,
)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.stringutils import random_string
@@ -46,82 +51,6 @@ logger = logging.getLogger(__name__)
BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3
@attr.s(slots=True, auto_attribs=True)
class PurgeStatus:
"""Object tracking the status of a purge request
This class contains information on the progress of a purge request, for
return by get_purge_status.
"""
STATUS_ACTIVE = 0
STATUS_COMPLETE = 1
STATUS_FAILED = 2
STATUS_TEXT = {
STATUS_ACTIVE: "active",
STATUS_COMPLETE: "complete",
STATUS_FAILED: "failed",
}
# Save the error message if an error occurs
error: str = ""
# Tracks whether this request has completed. One of STATUS_{ACTIVE,COMPLETE,FAILED}.
status: int = STATUS_ACTIVE
def asdict(self) -> JsonDict:
ret = {"status": PurgeStatus.STATUS_TEXT[self.status]}
if self.error:
ret["error"] = self.error
return ret
@attr.s(slots=True, auto_attribs=True)
class DeleteStatus:
"""Object tracking the status of a delete room request
This class contains information on the progress of a delete room request, for
return by get_delete_status.
"""
STATUS_PURGING = 0
STATUS_COMPLETE = 1
STATUS_FAILED = 2
STATUS_SHUTTING_DOWN = 3
STATUS_TEXT = {
STATUS_PURGING: "purging",
STATUS_COMPLETE: "complete",
STATUS_FAILED: "failed",
STATUS_SHUTTING_DOWN: "shutting_down",
}
# Tracks whether this request has completed.
# One of STATUS_{PURGING,COMPLETE,FAILED,SHUTTING_DOWN}.
status: int = STATUS_PURGING
# Save the error message if an error occurs
error: str = ""
# Saves the result of an action to give it back to REST API
shutdown_room: ShutdownRoomResponse = {
"kicked_users": [],
"failed_to_kick_users": [],
"local_aliases": [],
"new_room_id": None,
}
def asdict(self) -> JsonDict:
ret = {
"status": DeleteStatus.STATUS_TEXT[self.status],
"shutdown_room": self.shutdown_room,
}
if self.error:
ret["error"] = self.error
return ret
class PaginationHandler:
"""Handles pagination and purge history requests.
@@ -129,9 +58,6 @@ class PaginationHandler:
paginating during a purge.
"""
# when to remove a completed deletion/purge from the results map
CLEAR_PURGE_AFTER_MS = 1000 * 3600 * 24 # 24 hours
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.auth = hs.get_auth()
@@ -142,17 +68,11 @@ class PaginationHandler:
self._server_name = hs.hostname
self._room_shutdown_handler = hs.get_room_shutdown_handler()
self._relations_handler = hs.get_relations_handler()
self._task_scheduler = hs.get_task_scheduler()
self.pagination_lock = ReadWriteLock()
# IDs of rooms in which there currently an active purge *or delete* operation.
self._purges_in_progress_by_room: Set[str] = set()
# map from purge id to PurgeStatus
self._purges_by_id: Dict[str, PurgeStatus] = {}
# map from purge id to DeleteStatus
self._delete_by_id: Dict[str, DeleteStatus] = {}
# map from room id to delete ids
# Dict[`room_id`, List[`delete_id`]]
self._delete_by_room: Dict[str, List[str]] = {}
self._event_serializer = hs.get_event_client_serializer()
self._retention_default_max_lifetime = (
@@ -165,6 +85,7 @@ class PaginationHandler:
self._retention_allowed_lifetime_max = (
hs.config.retention.retention_allowed_lifetime_max
)
self._purge_retention_period = hs.config.server.purge_retention_period
self._is_master = hs.config.worker.worker_app is None
if hs.config.retention.retention_enabled and self._is_master:
@@ -181,6 +102,12 @@ class PaginationHandler:
job.longest_max_lifetime,
)
self._task_scheduler.register_action(self._purge_history, "purge_history")
self._task_scheduler.register_action(self._purge_room, "purge_room")
self._task_scheduler.register_action(
self._shutdown_and_purge_room, "shutdown_and_purge_room"
)
async def purge_history_for_rooms_in_range(
self, min_ms: Optional[int], max_ms: Optional[int]
) -> None:
@@ -231,14 +158,6 @@ class PaginationHandler:
for room_id, retention_policy in rooms.items():
logger.info("[purge] Attempting to purge messages in room %s", room_id)
if room_id in self._purges_in_progress_by_room:
logger.warning(
"[purge] not purging room %s as there's an ongoing purge running"
" for this room",
room_id,
)
continue
# If max_lifetime is None, it means that the room has no retention policy.
# Given we only retrieve such rooms when there's a default retention policy
# defined in the server's configuration, we can safely assume that's the
@@ -289,8 +208,6 @@ class PaginationHandler:
purge_id = random_string(16)
self._purges_by_id[purge_id] = PurgeStatus()
logger.info(
"Starting purging events in room %s (purge_id %s)" % (room_id, purge_id)
)
@@ -305,9 +222,10 @@ class PaginationHandler:
room_id,
token,
True,
False,
)
def start_purge_history(
async def start_purge_history(
self, room_id: str, token: str, delete_local_events: bool = False
) -> str:
"""Start off a history purge on a room.
@@ -321,31 +239,49 @@ class PaginationHandler:
Returns:
unique ID for this purge transaction.
"""
if room_id in self._purges_in_progress_by_room:
raise SynapseError(
400, "History purge already in progress for %s" % (room_id,)
)
purge_id = random_string(16)
purge_id = await self._task_scheduler.schedule_task(
"purge_history",
resource_id=room_id,
params={"token": token, "delete_local_events": delete_local_events},
)
# we log the purge_id here so that it can be tied back to the
# request id in the log lines.
logger.info("[purge] starting purge_id %s", purge_id)
self._purges_by_id[purge_id] = PurgeStatus()
run_as_background_process(
"purge_history",
self._purge_history,
purge_id,
room_id,
token,
delete_local_events,
)
return purge_id
async def _purge_history(
self, purge_id: str, room_id: str, token: str, delete_local_events: bool
) -> None:
self,
task: ScheduledTask,
first_launch: bool,
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
if (
task.resource_id is None
or task.params is None
or "token" not in task.params
or "delete_local_events" not in task.params
):
return (
TaskStatus.FAILED,
None,
"Not enough parameters passed to _purge_history",
)
err = await self.purge_history(
task.resource_id,
task.params["token"],
task.params["delete_local_events"],
)
if err is not None:
return TaskStatus.FAILED, None, err
return TaskStatus.COMPLETE, None, None
async def purge_history(
self,
room_id: str,
token: str,
delete_local_events: bool,
) -> Optional[str]:
"""Carry out a history purge on a room.
Args:
@@ -353,74 +289,86 @@ class PaginationHandler:
room_id: The room to purge from
token: topological token to delete events before
delete_local_events: True to delete local events as well as remote ones
update_rooms_to_delete_table: True if we don't want to update/persist this
purge history action to the DB to be restorable. Used with the retention
functionality since we don't need to explicitly restore those, they
will be relaunch by the retention logic.
"""
self._purges_in_progress_by_room.add(room_id)
try:
async with self.pagination_lock.write(room_id):
await self._storage_controllers.purge_events.purge_history(
room_id, token, delete_local_events
)
logger.info("[purge] complete")
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
return None
except Exception:
f = Failure()
logger.error(
"[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject())
)
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
self._purges_by_id[purge_id].error = f.getErrorMessage()
finally:
self._purges_in_progress_by_room.discard(room_id)
return f.getErrorMessage()
# remove the purge from the list 24 hours after it completes
def clear_purge() -> None:
del self._purges_by_id[purge_id]
self.hs.get_reactor().callLater(
PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_purge
)
def get_purge_status(self, purge_id: str) -> Optional[PurgeStatus]:
"""Get the current status of an active purge
Args:
purge_id: purge_id returned by start_purge_history
"""
return self._purges_by_id.get(purge_id)
def get_delete_status(self, delete_id: str) -> Optional[DeleteStatus]:
async def get_delete_task(self, delete_id: str) -> Optional[ScheduledTask]:
"""Get the current status of an active deleting
Args:
delete_id: delete_id returned by start_shutdown_and_purge_room
or start_purge_history.
"""
return self._delete_by_id.get(delete_id)
return await self._task_scheduler.get_task(delete_id)
def get_delete_ids_by_room(self, room_id: str) -> Optional[StrCollection]:
"""Get all active delete ids by room
async def get_delete_tasks_by_room(self, room_id: str) -> List[ScheduledTask]:
"""Get all active delete statuses by room
Args:
room_id: room_id that is deleted
"""
return self._delete_by_room.get(room_id)
return await self._task_scheduler.get_tasks(
actions=["purge_room", "shutdown_and_purge_room"], resource_ids=[room_id]
)
async def purge_room(self, room_id: str, force: bool = False) -> None:
async def _purge_room(
self,
task: ScheduledTask,
first_launch: bool,
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
if not task.resource_id:
raise Exception("No room id passed to purge_room task")
params = task.params if task.params else {}
await self.purge_room(task.resource_id, params.get("force", False))
return TaskStatus.COMPLETE, None, None
async def purge_room(
self,
room_id: str,
force: bool,
) -> None:
"""Purge the given room from the database.
This function is part the delete room v1 API.
Args:
room_id: room to be purged
delete_id: the delete ID for this purge
force: set true to skip checking for joined users.
shutdown_response: optional response coming from the shutdown phase
"""
logger.info("starting purge room_id=%s force=%s", room_id, force)
async with self.pagination_lock.write(room_id):
# first check that we have no users in this room
if not force:
joined = await self.store.is_host_joined(room_id, self._server_name)
if joined:
joined = await self.store.is_host_joined(room_id, self._server_name)
if joined:
if force:
logger.info(
"force-purging room %s with some local users still joined",
room_id,
)
else:
raise SynapseError(400, "Users are still joined to this room")
await self._storage_controllers.purge_events.purge_room(room_id)
logger.info("purge complete for room_id %s", room_id)
@trace
async def get_messages(
self,
@@ -698,16 +646,9 @@ class PaginationHandler:
async def _shutdown_and_purge_room(
self,
delete_id: str,
room_id: str,
requester_user_id: str,
new_room_user_id: Optional[str] = None,
new_room_name: Optional[str] = None,
message: Optional[str] = None,
block: bool = False,
purge: bool = True,
force_purge: bool = False,
) -> None:
task: ScheduledTask,
first_launch: bool,
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
"""
Shuts down and purges a room.
@@ -716,149 +657,72 @@ class PaginationHandler:
Args:
delete_id: The ID for this delete.
room_id: The ID of the room to shut down.
requester_user_id:
User who requested the action. Will be recorded as putting the room on the
blocking list.
new_room_user_id:
If set, a new room will be created with this user ID
as the creator and admin, and all users in the old room will be
moved into that room. If not set, no new room will be created
and the users will just be removed from the old room.
new_room_name:
A string representing the name of the room that new users will
be invited to. Defaults to `Content Violation Notification`
message:
A string containing the first message that will be sent as
`new_room_user_id` in the new room. Ideally this will clearly
convey why the original room was shut down.
Defaults to `Sharing illegal content on this server is not
permitted and rooms in violation will be blocked.`
block:
If set to `true`, this room will be added to a blocking list,
preventing future attempts to join the room. Defaults to `false`.
purge:
If set to `true`, purge the given room from the database.
force_purge:
If set to `true`, the room will be purged from database
also if it fails to remove some users from room.
shutdown_params: parameters for the shutdown, cf `RoomShutdownHandler.ShutdownRoomParams`
shutdown_response: current status of the shutdown, if it was interrupted
Saves a `RoomShutdownHandler.ShutdownRoomResponse` in `DeleteStatus`:
Keeps track of the `DeleteStatus` (and `ShutdownRoomResponse`) in `self._delete_by_id` and persisted in DB
"""
self._purges_in_progress_by_room.add(room_id)
try:
async with self.pagination_lock.write(room_id):
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
self._delete_by_id[
delete_id
].shutdown_room = await self._room_shutdown_handler.shutdown_room(
room_id=room_id,
requester_user_id=requester_user_id,
new_room_user_id=new_room_user_id,
new_room_name=new_room_name,
message=message,
block=block,
)
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_PURGING
if purge:
logger.info("starting purge room_id %s", room_id)
# first check that we have no users in this room
if not force_purge:
joined = await self.store.is_host_joined(
room_id, self._server_name
)
if joined:
raise SynapseError(
400, "Users are still joined to this room"
)
await self._storage_controllers.purge_events.purge_room(room_id)
logger.info("purge complete for room_id %s", room_id)
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE
except Exception:
f = Failure()
logger.error(
"failed",
exc_info=(f.type, f.value, f.getTracebackObject()),
)
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_FAILED
self._delete_by_id[delete_id].error = f.getErrorMessage()
finally:
self._purges_in_progress_by_room.discard(room_id)
# remove the delete from the list 24 hours after it completes
def clear_delete() -> None:
del self._delete_by_id[delete_id]
self._delete_by_room[room_id].remove(delete_id)
if not self._delete_by_room[room_id]:
del self._delete_by_room[room_id]
self.hs.get_reactor().callLater(
PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000, clear_delete
if task.resource_id is None or task.params is None:
raise Exception(
"No room id and/or no parameters passed to shutdown_and_purge_room task"
)
def start_shutdown_and_purge_room(
room_id = task.resource_id
async def update_result(result: Optional[JsonMapping]) -> None:
await self._task_scheduler.update_task(task.id, result=result)
shutdown_result = await self._room_shutdown_handler.shutdown_room(
room_id, task.params, task.result, update_result
)
if task.params.get("purge", False):
await self.purge_room(
room_id,
task.params.get("force_purge", False),
)
return (TaskStatus.COMPLETE, shutdown_result, None)
async def get_current_delete_tasks(self, room_id: str) -> List[ScheduledTask]:
return await self._task_scheduler.get_tasks(
actions=["purge_history", "purge_room", "shutdown_and_purge_room"],
resource_ids=[room_id],
statuses=[TaskStatus.ACTIVE, TaskStatus.SCHEDULED],
)
async def start_shutdown_and_purge_room(
self,
room_id: str,
requester_user_id: str,
new_room_user_id: Optional[str] = None,
new_room_name: Optional[str] = None,
message: Optional[str] = None,
block: bool = False,
purge: bool = True,
force_purge: bool = False,
shutdown_params: ShutdownRoomParams,
) -> str:
"""Start off shut down and purge on a room.
Args:
room_id: The ID of the room to shut down.
requester_user_id:
User who requested the action and put the room on the
blocking list.
new_room_user_id:
If set, a new room will be created with this user ID
as the creator and admin, and all users in the old room will be
moved into that room. If not set, no new room will be created
and the users will just be removed from the old room.
new_room_name:
A string representing the name of the room that new users will
be invited to. Defaults to `Content Violation Notification`
message:
A string containing the first message that will be sent as
`new_room_user_id` in the new room. Ideally this will clearly
convey why the original room was shut down.
Defaults to `Sharing illegal content on this server is not
permitted and rooms in violation will be blocked.`
block:
If set to `true`, this room will be added to a blocking list,
preventing future attempts to join the room. Defaults to `false`.
purge:
If set to `true`, purge the given room from the database.
force_purge:
If set to `true`, the room will be purged from database
also if it fails to remove some users from room.
shutdown_params: parameters for the shutdown, cf `RoomShutdownHandler.ShutdownRoomParams`
Returns:
unique ID for this delete transaction.
"""
if room_id in self._purges_in_progress_by_room:
raise SynapseError(
400, "History purge already in progress for %s" % (room_id,)
)
if len(await self.get_current_delete_tasks(room_id)) > 0:
raise SynapseError(400, "Purge already in progress for %s" % (room_id,))
# This check is double to `RoomShutdownHandler.shutdown_room`
# But here the requester get a direct response / error with HTTP request
# and do not have to check the purge status
new_room_user_id = shutdown_params["new_room_user_id"]
if new_room_user_id is not None:
if not self.hs.is_mine_id(new_room_user_id):
raise SynapseError(
400, "User must be our own: %s" % (new_room_user_id,)
)
delete_id = random_string(16)
delete_id = await self._task_scheduler.schedule_task(
"shutdown_and_purge_room",
resource_id=room_id,
params=shutdown_params,
)
# we log the delete_id here so that it can be tied back to the
# request id in the log lines.
@@ -868,19 +732,4 @@ class PaginationHandler:
delete_id,
)
self._delete_by_id[delete_id] = DeleteStatus()
self._delete_by_room.setdefault(room_id, []).append(delete_id)
run_as_background_process(
"shutdown_and_purge_room",
self._shutdown_and_purge_room,
delete_id,
room_id,
requester_user_id,
new_room_user_id,
new_room_name,
message,
block,
purge,
force_purge,
)
return delete_id
+155 -70
View File
@@ -20,7 +20,7 @@ import random
import string
from collections import OrderedDict
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Awaitable, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional, Tuple
import attr
from typing_extensions import TypedDict
@@ -54,11 +54,11 @@ from synapse.events import EventBase
from synapse.events.snapshot import UnpersistedEventContext
from synapse.events.utils import copy_and_fixup_power_levels_contents
from synapse.handlers.relations import BundledAggregations
from synapse.module_api import NOT_SPAM
from synapse.rest.admin._base import assert_user_is_admin
from synapse.streams import EventSource
from synapse.types import (
JsonDict,
JsonMapping,
MutableStateMap,
Requester,
RoomAlias,
@@ -454,7 +454,7 @@ class RoomCreationHandler:
spam_check = await self._spam_checker_module_callbacks.user_may_create_room(
user_id
)
if spam_check != NOT_SPAM:
if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
raise SynapseError(
403,
"You are not permitted to create rooms",
@@ -768,7 +768,7 @@ class RoomCreationHandler:
spam_check = await self._spam_checker_module_callbacks.user_may_create_room(
user_id
)
if spam_check != NOT_SPAM:
if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
raise SynapseError(
403,
"You are not permitted to create rooms",
@@ -1750,6 +1750,45 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
return self.store.get_current_room_stream_token_for_room_id(room_id)
class ShutdownRoomParams(TypedDict):
"""
Attributes:
requester_user_id:
User who requested the action. Will be recorded as putting the room on the
blocking list.
new_room_user_id:
If set, a new room will be created with this user ID
as the creator and admin, and all users in the old room will be
moved into that room. If not set, no new room will be created
and the users will just be removed from the old room.
new_room_name:
A string representing the name of the room that new users will
be invited to. Defaults to `Content Violation Notification`
message:
A string containing the first message that will be sent as
`new_room_user_id` in the new room. Ideally this will clearly
convey why the original room was shut down.
Defaults to `Sharing illegal content on this server is not
permitted and rooms in violation will be blocked.`
block:
If set to `true`, this room will be added to a blocking list,
preventing future attempts to join the room. Defaults to `false`.
purge:
If set to `true`, purge the given room from the database.
force_purge:
If set to `true`, the room will be purged from database
even if there are still users joined to the room.
"""
requester_user_id: str
new_room_user_id: Optional[str]
new_room_name: Optional[str]
message: Optional[str]
block: bool
purge: bool
force_purge: bool
class ShutdownRoomResponse(TypedDict):
"""
Attributes:
@@ -1768,6 +1807,63 @@ class ShutdownRoomResponse(TypedDict):
new_room_id: Optional[str]
@attr.s(slots=True, auto_attribs=True)
class DeleteStatus:
"""Object tracking the status of a delete room request
This class contains information on the progress of a delete room request, for
return by get_delete_status.
"""
ACTION_SHUTDOWN = "shutdown"
ACTION_PURGE = "purge"
ACTION_PURGE_HISTORY = "purge_history"
# Scheduled delete waiting to be launch at a specific time
STATUS_SCHEDULED = "scheduled"
STATUS_SHUTTING_DOWN = "shutting_down"
STATUS_PURGING = "purging"
STATUS_COMPLETE = "complete"
STATUS_FAILED = "failed"
delete_id: str = ""
action: str = ACTION_PURGE
# Tracks whether this request has completed.
# One of STATUS_{PURGING,COMPLETE,FAILED,SHUTTING_DOWN,WAIT_PURGE}.
status: str = STATUS_PURGING
# Save the error message if an error occurs
error: str = ""
# Saves the result of an action to give it back to REST API
shutdown_room: ShutdownRoomResponse = {
"kicked_users": [],
"failed_to_kick_users": [],
"local_aliases": [],
"new_room_id": None,
}
def asdict(self, use_purge_history_format: bool = False) -> JsonDict:
if not use_purge_history_format:
ret = {
"delete_id": self.delete_id,
"status": self.status,
"shutdown_room": self.shutdown_room,
}
else:
ret = {
"status": self.status
if self.status == DeleteStatus.STATUS_COMPLETE
or self.status == DeleteStatus.STATUS_FAILED
else "active",
}
if self.error:
ret["error"] = self.error
return ret
class RoomShutdownHandler:
DEFAULT_MESSAGE = (
"Sharing illegal content on this server is not permitted and rooms in"
@@ -1787,12 +1883,12 @@ class RoomShutdownHandler:
async def shutdown_room(
self,
room_id: str,
requester_user_id: str,
new_room_user_id: Optional[str] = None,
new_room_name: Optional[str] = None,
message: Optional[str] = None,
block: bool = False,
) -> ShutdownRoomResponse:
params: JsonMapping,
result: Optional[JsonMapping] = None,
update_result_fct: Optional[
Callable[[Optional[JsonMapping]], Awaitable[None]]
] = None,
) -> Optional[JsonMapping]:
"""
Shuts down a room. Moves all local users and room aliases automatically
to a new room if `new_room_user_id` is set. Otherwise local users only
@@ -1808,48 +1904,22 @@ class RoomShutdownHandler:
Args:
room_id: The ID of the room to shut down.
requester_user_id:
User who requested the action and put the room on the
blocking list.
new_room_user_id:
If set, a new room will be created with this user ID
as the creator and admin, and all users in the old room will be
moved into that room. If not set, no new room will be created
and the users will just be removed from the old room.
new_room_name:
A string representing the name of the room that new users will
be invited to. Defaults to `Content Violation Notification`
message:
A string containing the first message that will be sent as
`new_room_user_id` in the new room. Ideally this will clearly
convey why the original room was shut down.
Defaults to `Sharing illegal content on this server is not
permitted and rooms in violation will be blocked.`
block:
If set to `True`, users will be prevented from joining the old
room. This option can also be used to pre-emptively block a room,
even if it's unknown to this homeserver. In this case, the room
will be blocked, and no further action will be taken. If `False`,
attempting to delete an unknown room is invalid.
delete_id: The delete ID identifying this delete request
shutdown_params: parameters for the shutdown, cf `ShutdownRoomParams`
shutdown_response: current status of the shutdown, if it was interrupted
Defaults to `False`.
Returns: a dict containing the following keys:
kicked_users: An array of users (`user_id`) that were kicked.
failed_to_kick_users:
An array of users (`user_id`) that that were not kicked.
local_aliases:
An array of strings representing the local aliases that were
migrated from the old room to the new.
new_room_id:
A string representing the room ID of the new room, or None if
no such room was created.
Returns: a dict matching `ShutdownRoomResponse`.
"""
requester_user_id = params["requester_user_id"]
new_room_user_id = params["new_room_user_id"]
block = params["block"]
if not new_room_name:
new_room_name = self.DEFAULT_ROOM_NAME
if not message:
message = self.DEFAULT_MESSAGE
new_room_name = (
params["new_room_name"]
if params["new_room_name"]
else self.DEFAULT_ROOM_NAME
)
message = params["message"] if params["message"] else self.DEFAULT_MESSAGE
if not RoomID.is_valid(room_id):
raise SynapseError(400, "%s is not a legal room ID" % (room_id,))
@@ -1861,6 +1931,17 @@ class RoomShutdownHandler:
403, "Shutdown of this room is forbidden", Codes.FORBIDDEN
)
result = (
dict(result)
if result
else {
"kicked_users": [],
"failed_to_kick_users": [],
"local_aliases": [],
"new_room_id": None,
}
)
# Action the block first (even if the room doesn't exist yet)
if block:
# This will work even if the room is already blocked, but that is
@@ -1869,14 +1950,10 @@ class RoomShutdownHandler:
if not await self.store.get_room(room_id):
# if we don't know about the room, there is nothing left to do.
return {
"kicked_users": [],
"failed_to_kick_users": [],
"local_aliases": [],
"new_room_id": None,
}
return result
if new_room_user_id is not None:
new_room_id = result.get("new_room_id")
if new_room_user_id is not None and new_room_id is None:
if not self.hs.is_mine_id(new_room_user_id):
raise SynapseError(
400, "User must be our own: %s" % (new_room_user_id,)
@@ -1896,6 +1973,10 @@ class RoomShutdownHandler:
ratelimit=False,
)
result["new_room_id"] = new_room_id
if update_result_fct:
await update_result_fct(result)
logger.info(
"Shutting down room %r, joining to new room: %r", room_id, new_room_id
)
@@ -1909,12 +1990,9 @@ class RoomShutdownHandler:
stream_id,
)
else:
new_room_id = None
logger.info("Shutting down room %r", room_id)
users = await self.store.get_users_in_room(room_id)
kicked_users = []
failed_to_kick_users = []
for user_id in users:
if not self.hs.is_mine_id(user_id):
continue
@@ -1943,7 +2021,9 @@ class RoomShutdownHandler:
stream_id,
)
await self.room_member_handler.forget(target_requester.user, room_id)
await self.room_member_handler.forget(
target_requester.user, room_id, do_not_schedule_purge=True
)
# Join users to new room
if new_room_user_id:
@@ -1958,15 +2038,23 @@ class RoomShutdownHandler:
require_consent=False,
)
kicked_users.append(user_id)
result["kicked_users"].append(user_id)
if update_result_fct:
await update_result_fct(result)
except Exception:
logger.exception(
"Failed to leave old room and join new room for %r", user_id
)
failed_to_kick_users.append(user_id)
result["failed_to_kick_users"].append(user_id)
if update_result_fct:
await update_result_fct(result)
# Send message in new room and move aliases
if new_room_user_id:
room_creator_requester = create_requester(
new_room_user_id, authenticated_entity=requester_user_id
)
await self.event_creation_handler.create_and_send_nonmember_event(
room_creator_requester,
{
@@ -1978,18 +2066,15 @@ class RoomShutdownHandler:
ratelimit=False,
)
aliases_for_room = await self.store.get_aliases_for_room(room_id)
result["local_aliases"] = list(
await self.store.get_aliases_for_room(room_id)
)
assert new_room_id is not None
await self.store.update_aliases_for_room(
room_id, new_room_id, requester_user_id
)
else:
aliases_for_room = []
result["local_aliases"] = []
return {
"kicked_users": kicked_users,
"failed_to_kick_users": failed_to_kick_users,
"local_aliases": list(aliases_for_room),
"new_room_id": new_room_id,
}
return result
+19 -1
View File
@@ -94,6 +94,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
self.event_creation_handler = hs.get_event_creation_handler()
self.account_data_handler = hs.get_account_data_handler()
self.event_auth_handler = hs.get_event_auth_handler()
self.task_scheduler = hs.get_task_scheduler()
self.member_linearizer: Linearizer = Linearizer(name="member")
self.member_as_limiter = Linearizer(max_count=10, name="member_as_limiter")
@@ -176,6 +177,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
self._msc3970_enabled = hs.config.experimental.msc3970_enabled
self._purge_retention_period = hs.config.server.purge_retention_period
def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
"""Notify the rate limiter that a room join has occurred.
@@ -285,7 +288,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
"""
raise NotImplementedError()
async def forget(self, user: UserID, room_id: str) -> None:
async def forget(
self, user: UserID, room_id: str, do_not_schedule_purge: bool = False
) -> None:
user_id = user.to_string()
member = await self._storage_controllers.state.get_current_state_event(
@@ -305,6 +310,19 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# the table `current_state_events` and `get_current_state_events` is `None`.
await self.store.forget(user_id, room_id)
# If everyone locally has left the room, then there is no reason for us to keep the
# room around and we automatically purge room after a little bit
if (
not do_not_schedule_purge
and self._purge_retention_period
and await self.store.is_locally_forgotten_room(room_id)
):
await self.task_scheduler.schedule_task(
"purge_room",
resource_id=room_id,
timestamp=self.clock.time_msec() + self._purge_retention_period,
)
async def ratelimit_multiple_invites(
self,
requester: Optional[Requester],
+15 -5
View File
@@ -93,7 +93,7 @@ from synapse.rest.admin.users import (
UserTokenRestServlet,
WhoisRestServlet,
)
from synapse.types import JsonDict, RoomStreamToken
from synapse.types import JsonDict, RoomStreamToken, TaskStatus
from synapse.util import SYNAPSE_VERSION
if TYPE_CHECKING:
@@ -196,7 +196,7 @@ class PurgeHistoryRestServlet(RestServlet):
errcode=Codes.BAD_JSON,
)
purge_id = self.pagination_handler.start_purge_history(
purge_id = await self.pagination_handler.start_purge_history(
room_id, token, delete_local_events=delete_local_events
)
@@ -215,11 +215,21 @@ class PurgeHistoryStatusRestServlet(RestServlet):
) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self.auth, request)
purge_status = self.pagination_handler.get_purge_status(purge_id)
if purge_status is None:
purge_task = await self.pagination_handler.get_delete_task(purge_id)
if purge_task is None or purge_task.action != "purge_history":
raise NotFoundError("purge id '%s' not found" % purge_id)
return HTTPStatus.OK, purge_status.asdict()
result = {
"status": purge_task.status
if purge_task.status == TaskStatus.COMPLETE
or purge_task.status == TaskStatus.FAILED
else "active",
}
if purge_task.error:
result["error"] = purge_task.error
# TODO active vs purging etc
return HTTPStatus.OK, result
########################################################################################
+52 -30
View File
@@ -36,7 +36,14 @@ from synapse.rest.admin._base import (
)
from synapse.storage.databases.main.room import RoomSortOrder
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, RoomID, UserID, create_requester
from synapse.types import (
JsonDict,
RoomID,
ScheduledTask,
TaskStatus,
UserID,
create_requester,
)
from synapse.types.state import StateFilter
from synapse.util import json_decoder
@@ -117,20 +124,30 @@ class RoomRestV2Servlet(RestServlet):
403, "Shutdown of this room is forbidden", Codes.FORBIDDEN
)
delete_id = self._pagination_handler.start_shutdown_and_purge_room(
delete_id = await self._pagination_handler.start_shutdown_and_purge_room(
room_id=room_id,
new_room_user_id=content.get("new_room_user_id"),
new_room_name=content.get("room_name"),
message=content.get("message"),
requester_user_id=requester.user.to_string(),
block=block,
purge=purge,
force_purge=force_purge,
shutdown_params={
"new_room_user_id": content.get("new_room_user_id"),
"new_room_name": content.get("room_name"),
"message": content.get("message"),
"requester_user_id": requester.user.to_string(),
"block": block,
"purge": purge,
"force_purge": force_purge,
},
)
return HTTPStatus.OK, {"delete_id": delete_id}
def _convert_delete_task_to_response(task: ScheduledTask) -> JsonDict:
return {
"delete_id": task.id,
"status": task.status,
"shutdown_room": task.result,
}
class DeleteRoomStatusByRoomIdRestServlet(RestServlet):
"""Get the status of the delete room background task."""
@@ -150,21 +167,19 @@ class DeleteRoomStatusByRoomIdRestServlet(RestServlet):
HTTPStatus.BAD_REQUEST, "%s is not a legal room ID" % (room_id,)
)
delete_ids = self._pagination_handler.get_delete_ids_by_room(room_id)
if delete_ids is None:
raise NotFoundError("No delete task for room_id '%s' found" % room_id)
delete_tasks = await self._pagination_handler.get_delete_tasks_by_room(room_id)
response = []
for delete_id in delete_ids:
delete = self._pagination_handler.get_delete_status(delete_id)
if delete:
response += [
{
"delete_id": delete_id,
**delete.asdict(),
}
]
return HTTPStatus.OK, {"results": cast(JsonDict, response)}
for delete_task in delete_tasks:
# We ignore scheduled deletes because currently they are only used
# for automatically purging forgotten room after X time.
if delete_task.status != TaskStatus.SCHEDULED:
response += [_convert_delete_task_to_response(delete_task)]
if response:
return HTTPStatus.OK, {"results": cast(JsonDict, response)}
else:
raise NotFoundError("No delete task for room_id '%s' found" % room_id)
class DeleteRoomStatusByDeleteIdRestServlet(RestServlet):
@@ -181,11 +196,14 @@ class DeleteRoomStatusByDeleteIdRestServlet(RestServlet):
) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self._auth, request)
delete_status = self._pagination_handler.get_delete_status(delete_id)
if delete_status is None:
delete_task = await self._pagination_handler.get_delete_task(delete_id)
if delete_task is None or (
delete_task.action != "purge_room"
and delete_task.action != "shutdown_and_purge_room"
):
raise NotFoundError("delete id '%s' not found" % delete_id)
return HTTPStatus.OK, cast(JsonDict, delete_status.asdict())
return HTTPStatus.OK, _convert_delete_task_to_response(delete_task)
class ListRoomRestServlet(RestServlet):
@@ -349,11 +367,15 @@ class RoomRestServlet(RestServlet):
ret = await room_shutdown_handler.shutdown_room(
room_id=room_id,
new_room_user_id=content.get("new_room_user_id"),
new_room_name=content.get("room_name"),
message=content.get("message"),
requester_user_id=requester.user.to_string(),
block=block,
params={
"new_room_user_id": content.get("new_room_user_id"),
"new_room_name": content.get("room_name"),
"message": content.get("message"),
"requester_user_id": requester.user.to_string(),
"block": block,
"purge": purge,
"force_purge": force_purge,
},
)
# Purge room
+6
View File
@@ -141,6 +141,7 @@ from synapse.util.distributor import Distributor
from synapse.util.macaroons import MacaroonGenerator
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.stringutils import random_string
from synapse.util.task_scheduler import TaskScheduler
logger = logging.getLogger(__name__)
@@ -359,6 +360,7 @@ class HomeServer(metaclass=abc.ABCMeta):
"""
for i in self.REQUIRED_ON_BACKGROUND_TASK_STARTUP:
getattr(self, "get_" + i + "_handler")()
self.get_task_scheduler()
def get_reactor(self) -> ISynapseReactor:
"""
@@ -912,3 +914,7 @@ class HomeServer(metaclass=abc.ABCMeta):
def get_common_usage_metrics_manager(self) -> CommonUsageMetricsManager:
"""Usage metrics shared between phone home stats and the prometheus exporter."""
return CommonUsageMetricsManager(self)
@cache_in_self
def get_task_scheduler(self) -> TaskScheduler:
return TaskScheduler(self)
@@ -70,6 +70,7 @@ from .state import StateStore
from .stats import StatsStore
from .stream import StreamWorkerStore
from .tags import TagsStore
from .task_scheduler import TaskSchedulerWorkerStore
from .transactions import TransactionWorkerStore
from .ui_auth import UIAuthStore
from .user_directory import UserDirectoryStore
@@ -127,6 +128,7 @@ class DataStore(
CacheInvalidationWorkerStore,
LockStore,
SessionStore,
TaskSchedulerWorkerStore,
):
def __init__(
self,
@@ -0,0 +1,195 @@
# Copyright 2023 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
make_in_list_sql_clause,
)
from synapse.types import JsonDict, JsonMapping, ScheduledTask, TaskStatus
if TYPE_CHECKING:
from synapse.server import HomeServer
class TaskSchedulerWorkerStore(SQLBaseStore):
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
@staticmethod
def _convert_row_to_task(row: Dict[str, Any]) -> ScheduledTask:
row["status"] = TaskStatus(row["status"])
if row["params"] is not None:
row["params"] = json.loads(row["params"])
if row["result"] is not None:
row["result"] = json.loads(row["result"])
return ScheduledTask(**row)
async def get_scheduled_tasks(
self,
actions: Optional[List[str]] = None,
resource_ids: Optional[List[str]] = None,
statuses: Optional[List[TaskStatus]] = None,
) -> List[ScheduledTask]:
"""Get a list of scheduled tasks from the DB.
If an arg is `None` all tasks matching the other args will be selected.
If an arg is an empty list, the value needs to be NULL in DB to be selected.
Args:
actions: Limit the returned tasks to those specific action names
resource_ids: Limit the returned tasks to thoe specific resource ids
statuses: Limit the returned tasks to thoe specific statuses
Returns: a list of `ScheduledTask`
"""
def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]:
clauses = []
args = []
if actions is not None:
clause, temp_args = make_in_list_sql_clause(
txn.database_engine, "action", actions
)
clauses.append(clause)
args.extend(temp_args)
if resource_ids is not None:
clause, temp_args = make_in_list_sql_clause(
txn.database_engine, "resource_id", resource_ids
)
clauses.append(clause)
args.extend(temp_args)
if statuses is not None:
clause, temp_args = make_in_list_sql_clause(
txn.database_engine, "status", statuses
)
clauses.append(clause)
args.extend(temp_args)
sql = "SELECT * FROM scheduled_tasks"
if clauses:
sql = sql + " WHERE " + " AND ".join(clauses)
txn.execute(sql, args)
return self.db_pool.cursor_to_dict(txn)
rows = await self.db_pool.runInteraction(
"get_scheduled_tasks", get_scheduled_tasks_txn
)
return [TaskSchedulerWorkerStore._convert_row_to_task(row) for row in rows]
async def upsert_scheduled_task(self, task: ScheduledTask) -> None:
"""Upsert a specified `ScheduledTask` in the DB.
Args:
task: the `ScheduledTask` to upsert
"""
await self.db_pool.simple_upsert(
"scheduled_tasks",
{"id": task.id},
{
"action": task.action,
"status": task.status,
"timestamp": task.timestamp,
"resource_id": task.resource_id,
"params": None if task.params is None else json.dumps(task.params),
"result": None if task.result is None else json.dumps(task.result),
"error": task.error,
},
desc="upsert_scheduled_task",
)
async def update_scheduled_task(
self,
id: str,
*,
timestamp: Optional[int] = None,
status: Optional[TaskStatus] = None,
result: Optional[JsonMapping] = None,
error: Optional[str] = None,
) -> bool:
"""Update a scheduled task in the DB with some new value(s).
Args:
id: id of the `ScheduledTask` to update
timestamp: new timestamp of the task
status: new status of the task
result: new result of the task
error: new error of the task
"""
updatevalues: JsonDict = {}
if timestamp is not None:
updatevalues["timestamp"] = timestamp
if status is not None:
updatevalues["status"] = status
if result is not None:
updatevalues["result"] = json.dumps(result)
if error is not None:
updatevalues["error"] = error
nb_rows = await self.db_pool.simple_update(
"scheduled_tasks",
{"id": id},
updatevalues,
desc="update_scheduled_task",
)
return nb_rows > 0
async def get_scheduled_task(self, id: str) -> Optional[ScheduledTask]:
"""Get a specific `ScheduledTask` from its id.
Args:
id: the id of the task to retrieve
Returns: the task if available, `None` otherwise
"""
row = await self.db_pool.simple_select_one(
table="scheduled_tasks",
keyvalues={"id": id},
retcols=(
"id",
"action",
"status",
"timestamp",
"resource_id",
"params",
"result",
"error",
),
allow_none=True,
desc="get_scheduled_task",
)
return TaskSchedulerWorkerStore._convert_row_to_task(row) if row else None
async def delete_scheduled_task(self, id: str) -> None:
"""Delete a specific task from its id.
Args:
id: the id of the task to delete
"""
await self.db_pool.simple_delete(
"scheduled_tasks",
keyvalues={"id": id},
desc="delete_scheduled_task",
)
@@ -0,0 +1,26 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- cf ScheduledTask docstring for the meaning of the fields.
CREATE TABLE IF NOT EXISTS scheduled_tasks(
id text PRIMARY KEY,
action text NOT NULL,
status text NOT NULL,
timestamp bigint NOT NULL,
resource_id text,
params text,
result text,
error text
);
+38
View File
@@ -15,6 +15,7 @@
import abc
import re
import string
from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
@@ -967,3 +968,40 @@ class UserProfile(TypedDict):
class RetentionPolicy:
min_lifetime: Optional[int] = None
max_lifetime: Optional[int] = None
class TaskStatus(str, Enum):
"""Status of a scheduled task"""
# Task is scheduled but not active
SCHEDULED = "scheduled"
# Task is active and probably running, and if not
# will be run on next scheduler loop run
ACTIVE = "active"
# Task has completed successfully
COMPLETE = "complete"
# Task is over and either returned a failed status, or had an exception
FAILED = "failed"
@attr.s(auto_attribs=True, frozen=True, slots=True)
class ScheduledTask:
"""Description of a scheduled task"""
# id used to identify the task
id: str
# name of the action to be run by this task
action: str
# current status of this task
status: TaskStatus
# if the status is SCHEDULED then this represents when it should be launched,
# otherwise it represents the last time this task got a change of state
timestamp: int
# Optionally bind a task to some resource id for easy retrieval
resource_id: Optional[str]
# Optional parameters that will be passed to the function ran by the task
params: Optional[JsonMapping]
# Optional result that can be updated by the running task
result: Optional[JsonMapping]
# Optional error that should be assigned a value when the status is FAILED
error: Optional[str]
+266
View File
@@ -0,0 +1,266 @@
# Copyright 2023 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set, Tuple
from twisted.python.failure import Failure
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import JsonMapping, ScheduledTask, TaskStatus
from synapse.util.stringutils import random_string
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class TaskScheduler:
# Precision of the scheduler, evaluation of tasks to run will only happen
# every `SCHEDULE_INTERVAL_MS` ms
SCHEDULE_INTERVAL_MS = 5 * 60 * 1000 # 5mn
CLEAN_INTERVAL_MS = 60 * 60 * 1000 # 1hr
# Time before a complete or failed task is deleted from the DB
KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self.clock = hs.get_clock()
self.running_tasks: Set[str] = set()
self.actions: Dict[
str,
Callable[
[ScheduledTask, bool],
Awaitable[Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]],
],
] = {}
self.run_background_tasks = hs.config.worker.run_background_tasks
if self.run_background_tasks:
self.clock.looping_call(
run_as_background_process,
TaskScheduler.SCHEDULE_INTERVAL_MS,
"run_scheduled_tasks",
self._run_scheduled_tasks,
)
self.clock.looping_call(
run_as_background_process,
TaskScheduler.CLEAN_INTERVAL_MS,
"clean_scheduled_tasks",
self._clean_scheduled_tasks,
)
def register_action(
self,
function: Callable[
[ScheduledTask, bool],
Awaitable[Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]],
],
action_name: str,
) -> None:
"""Register a function to be executed when an action is scheduled with
the specified action name.
Actions need to be registered as early as possible so that a resumed action
can find its matching function. It's usually better to NOT do that right before
calling `schedule_task` but rather in an `__init__` method.
Args:
function: The function to be executed for this action. The parameters
passed to the function when launched are the `ScheduledTask` being run,
and a `first_launch` boolean to signal if it's a resumed task or the first
launch of it. The function should return a tuple of new `status`, `result`
and `error` as specified in `ScheduledTask`.
action_name: The name of the action to be associated with the function
"""
self.actions[action_name] = function
async def schedule_task(
self,
action: str,
*,
resource_id: Optional[str] = None,
timestamp: Optional[int] = None,
params: Optional[JsonMapping] = None,
) -> str:
"""Schedule a new potentially resumable task. A function matching the specified
`action` should have been previously registered with `register_action`.
Args:
action: the name of a previously registered action
resource_id: a task can be associated with a resource id to facilitate
getting all tasks associated with a specific resource
timestamp: if `None`, the task will be launched immediately, otherwise it
will be launch after the `timestamp` value. Note that this scheduler
is not meant to be precise, and the scheduling could be delayed if
too many tasks are already running
params: a set of parameters that can be easily accessed from inside the
executed function
Returns: the id of the scheduled task
"""
if action not in self.actions:
raise Exception(
f"No function associated with the action {action} of the scheduled task"
)
launch_now = False
if timestamp is None or timestamp < self.clock.time_msec():
timestamp = self.clock.time_msec()
launch_now = True
task = ScheduledTask(
random_string(16),
action,
TaskStatus.SCHEDULED,
timestamp,
resource_id,
params,
None,
None,
)
await self.store.upsert_scheduled_task(task)
if launch_now and self.run_background_tasks:
await self._launch_task(task, True)
return task.id
async def update_task(
self,
id: str,
*,
timestamp: Optional[int] = None,
status: Optional[TaskStatus] = None,
result: Optional[JsonMapping] = None,
error: Optional[str] = None,
) -> bool:
"""Update some task associated values.
This is used internally, and also exposed publically so it can be used inside task functions.
This allows to store in DB the progress of a task so it can be resumed properly after a restart of synapse.
Args:
id: the id of the task to update
status: the new `TaskStatus` of the task
result: the new result of the task
error: the new error of the task
"""
if timestamp is None:
timestamp = self.clock.time_msec()
return await self.store.update_scheduled_task(
id,
timestamp=timestamp,
status=status,
result=result,
error=error,
)
async def get_task(self, id: str) -> Optional[ScheduledTask]:
"""Get a specific task description by id.
Args:
id: the id of the task to retrieve
Returns: the task description or `None` if it doesn't exist
or it has already been cleaned
"""
return await self.store.get_scheduled_task(id)
async def get_tasks(
self,
actions: Optional[List[str]] = None,
resource_ids: Optional[List[str]] = None,
statuses: Optional[List[TaskStatus]] = None,
) -> List[ScheduledTask]:
"""Get a list of tasks associated with some action name(s) and/or
with some resource id(s).
Args:
action: the action name of the tasks to retrieve
resource_id: if `None`, returns all associated tasks for
the specified action name, regardless of the resource id
Returns: a list of `ScheduledTask`
"""
return await self.store.get_scheduled_tasks(actions, resource_ids, statuses)
async def _run_scheduled_tasks(self) -> None:
"""Main loop taking care of launching the scheduled tasks when needed."""
for task in await self.store.get_scheduled_tasks(
statuses=[TaskStatus.SCHEDULED, TaskStatus.ACTIVE]
):
if task.id not in self.running_tasks:
if (
task.status == TaskStatus.SCHEDULED
and task.timestamp < self.clock.time_msec()
):
await self._launch_task(task, True)
elif task.status == TaskStatus.ACTIVE:
await self._launch_task(task, False)
async def _clean_scheduled_tasks(self) -> None:
"""Clean loop taking care of removing old complete or failed jobs to avoid clutter the DB."""
for task in await self.store.get_scheduled_tasks(
statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE]
):
if task.id not in self.running_tasks:
if (
task.status == TaskStatus.COMPLETE
or task.status == TaskStatus.FAILED
) and self.clock.time_msec() > task.timestamp + TaskScheduler.KEEP_TASKS_FOR_MS:
await self.store.delete_scheduled_task(task.id)
async def _launch_task(self, task: ScheduledTask, first_launch: bool) -> None:
"""Launch a scheduled task now.
Args:
task: the task to launch
first_launch: `True` if it's the first time is launched, `False` otherwise
"""
if task.action not in self.actions:
raise Exception(
f"No function associated with the action {task.action} of the scheduled task"
)
function = self.actions[task.action]
async def wrapper() -> None:
try:
(status, result, error) = await function(task, first_launch)
except Exception:
f = Failure()
logger.error(
f"scheduled task {task.id} failed",
exc_info=(f.type, f.value, f.getTracebackObject()),
)
status = TaskStatus.FAILED
result = None
error = f.getErrorMessage()
await self.update_task(
task.id,
status=status,
result=result,
error=error,
)
self.running_tasks.remove(task.id)
await self.update_task(task.id, status=TaskStatus.ACTIVE)
self.running_tasks.add(task.id)
description = task.action
if task.resource_id:
description += f"-{task.resource_id}"
run_as_background_process(description, wrapper)
+129 -53
View File
@@ -24,17 +24,20 @@ from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
from synapse.api.constants import EventTypes, Membership, RoomTypes
from synapse.api.errors import Codes
from synapse.handlers.pagination import PaginationHandler, PurgeStatus
from synapse.rest.client import directory, events, login, room
from synapse.server import HomeServer
from synapse.types import UserID
from synapse.util import Clock
from synapse.util.stringutils import random_string
from synapse.util.task_scheduler import TaskScheduler
from tests import unittest
"""Tests admin REST events for /rooms paths."""
ONE_HOUR_IN_S = 3600
class DeleteRoomTestCase(unittest.HomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets,
@@ -46,6 +49,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.event_creation_handler = hs.get_event_creation_handler()
self.task_scheduler = hs.get_task_scheduler()
hs.config.consent.user_consent_version = "1"
consent_uri_builder = Mock()
@@ -476,6 +480,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.event_creation_handler = hs.get_event_creation_handler()
self.task_scheduler = hs.get_task_scheduler()
hs.config.consent.user_consent_version = "1"
consent_uri_builder = Mock()
@@ -502,6 +507,9 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
)
self.url_status_by_delete_id = "/_synapse/admin/v2/rooms/delete_status/"
self.room_member_handler = hs.get_room_member_handler()
self.pagination_handler = hs.get_pagination_handler()
@parameterized.expand(
[
("DELETE", "/_synapse/admin/v2/rooms/%s"),
@@ -661,7 +669,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
delete_id1 = channel.json_body["delete_id"]
# go ahead
self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2)
self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2)
# second task
channel = self.make_request(
@@ -686,12 +694,14 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self.assertEqual(2, len(channel.json_body["results"]))
self.assertEqual("complete", channel.json_body["results"][0]["status"])
self.assertEqual("complete", channel.json_body["results"][1]["status"])
self.assertEqual(delete_id1, channel.json_body["results"][0]["delete_id"])
self.assertEqual(delete_id2, channel.json_body["results"][1]["delete_id"])
delete_ids = {delete_id1, delete_id2}
self.assertTrue(channel.json_body["results"][0]["delete_id"] in delete_ids)
delete_ids.remove(channel.json_body["results"][0]["delete_id"])
self.assertTrue(channel.json_body["results"][1]["delete_id"] in delete_ids)
# get status after more than clearing time for first task
# second task is not cleared
self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2)
self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2)
channel = self.make_request(
"GET",
@@ -705,7 +715,7 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self.assertEqual(delete_id2, channel.json_body["results"][0]["delete_id"])
# get status after more than clearing time for all tasks
self.reactor.advance(PaginationHandler.CLEAR_PURGE_AFTER_MS / 1000 / 2)
self.reactor.advance(TaskScheduler.KEEP_TASKS_FOR_MS / 1000 / 2)
channel = self.make_request(
"GET",
@@ -716,48 +726,6 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
self.assertEqual(404, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
def test_delete_same_room_twice(self) -> None:
"""Test that the call for delete a room at second time gives an exception."""
body = {"new_room_user_id": self.admin_user}
# first call to delete room
# and do not wait for finish the task
first_channel = self.make_request(
"DELETE",
self.url.encode("ascii"),
content=body,
access_token=self.admin_user_tok,
await_result=False,
)
# second call to delete room
second_channel = self.make_request(
"DELETE",
self.url.encode("ascii"),
content=body,
access_token=self.admin_user_tok,
)
self.assertEqual(400, second_channel.code, msg=second_channel.json_body)
self.assertEqual(Codes.UNKNOWN, second_channel.json_body["errcode"])
self.assertEqual(
f"History purge already in progress for {self.room_id}",
second_channel.json_body["error"],
)
# get result of first call
first_channel.await_result()
self.assertEqual(200, first_channel.code, msg=first_channel.json_body)
self.assertIn("delete_id", first_channel.json_body)
# check status after finish the task
self._test_result(
first_channel.json_body["delete_id"],
self.other_user,
expect_new_room=True,
)
def test_purge_room_and_block(self) -> None:
"""Test to purge a room and block it.
Members will not be moved to a new room and will not receive a message.
@@ -972,6 +940,117 @@ class DeleteRoomV2TestCase(unittest.HomeserverTestCase):
# Assert we can no longer peek into the room
self._assert_peek(self.room_id, expect_code=403)
@unittest.override_config({"purge_retention_period": "1d"})
def test_purge_forgotten_room(self) -> None:
# Create a test room
room_id = self.helper.create_room_as(
self.admin_user,
tok=self.admin_user_tok,
)
self.helper.leave(room_id, user=self.admin_user, tok=self.admin_user_tok)
self.get_success(
self.room_member_handler.forget(
UserID.from_string(self.admin_user), room_id
)
)
# Test that room is not yet purged
with self.assertRaises(AssertionError):
self._is_purged(room_id)
# Advance 24 hours in the future, past the `purge_retention_period`
self.reactor.advance(24 * ONE_HOUR_IN_S)
self._is_purged(room_id)
def test_scheduled_purge_room(self) -> None:
# Create a test room
room_id = self.helper.create_room_as(
self.admin_user,
tok=self.admin_user_tok,
)
self.helper.leave(room_id, user=self.admin_user, tok=self.admin_user_tok)
# Schedule a purge 10 seconds in the future
self.get_success(
self.task_scheduler.schedule_task(
"purge_room",
resource_id=room_id,
timestamp=self.clock.time_msec() + 10 * 1000,
)
)
# Test that room is not yet purged
with self.assertRaises(AssertionError):
self._is_purged(room_id)
# Advance one hour in the future past `TaskScheduler.SCHEDULE_INTERVAL_MS` so that
# the automatic purging takes place and launch the purge
self.reactor.advance(ONE_HOUR_IN_S)
self._is_purged(room_id)
def test_schedule_shutdown_room(self) -> None:
# Create a test room
room_id = self.helper.create_room_as(
self.other_user,
tok=self.other_user_tok,
)
# Schedule a shutdown 10 seconds in the future
delete_id = self.get_success(
self.task_scheduler.schedule_task(
"shutdown_and_purge_room",
resource_id=room_id,
params={
"requester_user_id": self.admin_user,
"new_room_user_id": self.admin_user,
"new_room_name": None,
"message": None,
"block": False,
"purge": True,
"force_purge": True,
},
timestamp=self.clock.time_msec() + 10 * 1000,
)
)
# Test that room is not yet shutdown
self._is_member(room_id, self.other_user)
# Test that room is not yet purged
with self.assertRaises(AssertionError):
self._is_purged(room_id)
# Advance one hour in the future past `TaskScheduler.SCHEDULE_INTERVAL_MS` so that
# the automatic purging takes place and resumes the purge
self.reactor.advance(ONE_HOUR_IN_S)
# Test that all users has been kicked (room is shutdown)
self._has_no_members(room_id)
self._is_purged(room_id)
# Retrieve delete results
result = self.make_request(
"GET",
self.url_status_by_delete_id + delete_id,
access_token=self.admin_user_tok,
)
self.assertEqual(200, result.code, msg=result.json_body)
# Check that the user is in kicked_users
self.assertIn(
self.other_user, result.json_body["shutdown_room"]["kicked_users"]
)
new_room_id = result.json_body["shutdown_room"]["new_room_id"]
self.assertTrue(new_room_id)
# Check that the user is actually in the new room
self._is_member(new_room_id, self.other_user)
def _is_blocked(self, room_id: str, expect: bool = True) -> None:
"""Assert that the room is blocked or not"""
d = self.store.is_room_blocked(room_id)
@@ -1957,11 +2036,8 @@ class RoomMessagesTestCase(unittest.HomeserverTestCase):
self.assertEqual(len(chunk), 2, [event["content"] for event in chunk])
# Purge every event before the second event.
purge_id = random_string(16)
pagination_handler._purges_by_id[purge_id] = PurgeStatus()
self.get_success(
pagination_handler._purge_history(
purge_id=purge_id,
pagination_handler.purge_history(
room_id=self.room_id,
token=second_token_str,
delete_local_events=True,
+16 -2
View File
@@ -22,6 +22,7 @@ from synapse.server import HomeServer
from synapse.storage.roommember import RoomsForUser
from synapse.types import JsonDict
from synapse.util import Clock
from synapse.util.stringutils import random_string
from tests import unittest
from tests.unittest import override_config
@@ -413,11 +414,24 @@ class ServerNoticeTestCase(unittest.HomeserverTestCase):
self.assertEqual(messages[0]["content"]["body"], "test msg one")
self.assertEqual(messages[0]["sender"], "@notices:test")
random_string(16)
# shut down and purge room
self.get_success(
self.room_shutdown_handler.shutdown_room(first_room_id, self.admin_user)
self.room_shutdown_handler.shutdown_room(
first_room_id,
{
"requester_user_id": self.admin_user,
"new_room_user_id": None,
"new_room_name": None,
"message": None,
"block": False,
"purge": True,
"force_purge": False,
},
)
)
self.get_success(self.pagination_handler.purge_room(first_room_id))
self.get_success(self.pagination_handler.purge_room(first_room_id, force=False))
# user is not member anymore
self._check_invite_and_join_status(self.other_user, 0, 0)
+1 -5
View File
@@ -41,7 +41,6 @@ from synapse.api.errors import Codes, HttpResponseException
from synapse.appservice import ApplicationService
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.handlers.pagination import PurgeStatus
from synapse.rest import admin
from synapse.rest.client import account, directory, login, profile, register, room, sync
from synapse.server import HomeServer
@@ -2089,11 +2088,8 @@ class RoomMessageListTestCase(RoomBase):
self.assertEqual(len(chunk), 2, [event["content"] for event in chunk])
# Purge every event before the second event.
purge_id = random_string(16)
pagination_handler._purges_by_id[purge_id] = PurgeStatus()
self.get_success(
pagination_handler._purge_history(
purge_id=purge_id,
pagination_handler.purge_history(
room_id=self.room_id,
token=second_token_str,
delete_local_events=True,
+132
View File
@@ -0,0 +1,132 @@
# Copyright 2023 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional, Tuple
from twisted.internet.task import deferLater
from twisted.test.proto_helpers import MemoryReactor
from synapse.server import HomeServer
from synapse.types import JsonMapping, ScheduledTask, TaskStatus
from synapse.util import Clock
from synapse.util.task_scheduler import TaskScheduler
from tests import unittest
class TestTaskScheduler(unittest.HomeserverTestCase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.task_scheduler = hs.get_task_scheduler()
self.task_scheduler.register_action(self._test_task, "_test_task")
self.task_scheduler.register_action(self._raising_task, "_raising_task")
self.task_scheduler.register_action(self._resumable_task, "_resumable_task")
async def _test_task(
self, task: ScheduledTask, first_launch: bool
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
# This test task will copy the parameters to the result
result = None
if task.params:
result = task.params
return (TaskStatus.COMPLETE, result, None)
def test_schedule_task(self) -> None:
"""Schedule a task in the future with some parameters to be copied as a result and check it executed correctly.
Also check that it get removed after `KEEP_TASKS_FOR_MS`."""
timestamp = self.clock.time_msec() + 2 * 60 * 1000
task_id = self.get_success(
self.task_scheduler.schedule_task(
"_test_task",
timestamp=timestamp,
params={"val": 1},
)
)
task = self.get_success(self.task_scheduler.get_task(task_id))
assert task is not None
self.assertEqual(task.status, TaskStatus.SCHEDULED)
self.assertIsNone(task.result)
# The timestamp being 2mn after now the task should been executed
# after the first scheduling loop is run
self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000) + 1)
task = self.get_success(self.task_scheduler.get_task(task_id))
assert task is not None
self.assertEqual(task.status, TaskStatus.COMPLETE)
assert task.result is not None
# The passed parameter should have been copied to the result
self.assertTrue(task.result.get("val") == 1)
# Let's wait for the complete task to be deleted and hence unavailable
self.reactor.advance((TaskScheduler.KEEP_TASKS_FOR_MS / 1000) + 1)
task = self.get_success(self.task_scheduler.get_task(task_id))
self.assertIsNone(task)
def test_schedule_task_now(self) -> None:
"""Schedule a task now and check it runs fine to completion."""
task_id = self.get_success(
self.task_scheduler.schedule_task("_test_task", params={"val": 1})
)
task = self.get_success(self.task_scheduler.get_task(task_id))
assert task is not None
self.assertEqual(task.status, TaskStatus.COMPLETE)
assert task.result is not None
self.assertTrue(task.result.get("val") == 1)
async def _raising_task(
self, task: ScheduledTask, first_launch: bool
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
raise Exception("raising")
def test_schedule_raising_task_now(self) -> None:
"""Schedule a task raising an exception and check it runs to failure and report exception content."""
task_id = self.get_success(self.task_scheduler.schedule_task("_raising_task"))
task = self.get_success(self.task_scheduler.get_task(task_id))
assert task is not None
self.assertEqual(task.status, TaskStatus.FAILED)
self.assertEqual(task.error, "raising")
async def _resumable_task(
self, task: ScheduledTask, first_launch: bool
) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
if task.result and "in_progress" in task.result:
return TaskStatus.COMPLETE, {"success": True}, None
else:
await self.task_scheduler.update_task(task.id, result={"in_progress": True})
# Await forever to simulate an aborted task because of a restart
await deferLater(self.reactor, 2**16, None)
# This should never been called
return TaskStatus.ACTIVE, None, None
def test_schedule_resumable_task_now(self) -> None:
"""Schedule a resumable task and check that it gets properly resumed and complete after simulating a synapse restart."""
task_id = self.get_success(self.task_scheduler.schedule_task("_resumable_task"))
task = self.get_success(self.task_scheduler.get_task(task_id))
assert task is not None
self.assertEqual(task.status, TaskStatus.ACTIVE)
# Simulate a synapse restart by emptying the list of running tasks
self.task_scheduler.running_tasks = set()
self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000) + 1)
task = self.get_success(self.task_scheduler.get_task(task_id))
assert task is not None
self.assertEqual(task.status, TaskStatus.COMPLETE)
assert task.result is not None
self.assertTrue(task.result.get("success"))