Add an Admin API endpoint for listing quarantined media (#19268)

Co-authored-by: turt2live <1190097+turt2live@users.noreply.github.com>
Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
This commit is contained in:
Travis Ralston
2025-12-12 06:30:21 -07:00
committed by GitHub
parent 1f7f16477d
commit 3f636386a6
8 changed files with 266 additions and 10 deletions

View File

@@ -0,0 +1 @@
Add an admin API for retrieving a paginated list of quarantined media.

View File

@@ -73,6 +73,33 @@ Response:
} }
``` ```
## Listing all quarantined media
This API returns a list of all quarantined media on the server. It is paginated, and can be scoped to either local or
remote media. Note that the pagination values are also scoped to the request parameters - changing them but keeping the
same pagination values will result in unexpected results.
Request:
```http
GET /_synapse/admin/v1/media/quarantined?from=0&limit=100&kind=local
```
`from` and `limit` are optional parameters, and default to `0` and `100` respectively. They are the row index and number
of rows to return - they are not timestamps.
`kind` *MUST* either be `local` or `remote`.
The API returns a JSON body containing MXC URIs for the quarantined media, like the following:
```json
{
"media": [
"mxc://localhost/xwvutsrqponmlkjihgfedcba",
"mxc://localhost/abcdefghijklmnopqrstuvwx"
]
}
```
# Quarantine media # Quarantine media
Quarantining media means that it is marked as inaccessible by users. It applies Quarantining media means that it is marked as inaccessible by users. It applies

View File

@@ -914,6 +914,7 @@ class MediaRepository:
filesystem_id=file_id, filesystem_id=file_id,
last_access_ts=time_now_ms, last_access_ts=time_now_ms,
quarantined_by=None, quarantined_by=None,
quarantined_ts=None,
authenticated=authenticated, authenticated=authenticated,
sha256=sha256writer.hexdigest(), sha256=sha256writer.hexdigest(),
) )
@@ -1047,6 +1048,7 @@ class MediaRepository:
filesystem_id=file_id, filesystem_id=file_id,
last_access_ts=time_now_ms, last_access_ts=time_now_ms,
quarantined_by=None, quarantined_by=None,
quarantined_ts=None,
authenticated=authenticated, authenticated=authenticated,
sha256=sha256writer.hexdigest(), sha256=sha256writer.hexdigest(),
) )

View File

@@ -293,6 +293,38 @@ class ListMediaInRoom(RestServlet):
return HTTPStatus.OK, {"local": local_mxcs, "remote": remote_mxcs} return HTTPStatus.OK, {"local": local_mxcs, "remote": remote_mxcs}
class ListQuarantinedMedia(RestServlet):
"""Lists all quarantined media on the server."""
PATTERNS = admin_patterns("/media/quarantined$")
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self.auth = hs.get_auth()
async def on_GET(
self,
request: SynapseRequest,
) -> tuple[int, JsonDict]:
await assert_requester_is_admin(self.auth, request)
start = parse_integer(request, "from", default=0)
limit = parse_integer(request, "limit", default=100)
local_or_remote = parse_string(request, "kind", required=True)
if local_or_remote not in ["local", "remote"]:
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"Query parameter `kind` must be either 'local' or 'remote'.",
)
mxcs = await self.store.get_quarantined_media_mxcs(
start, limit, local_or_remote == "local"
)
return HTTPStatus.OK, {"media": mxcs}
class PurgeMediaCacheRestServlet(RestServlet): class PurgeMediaCacheRestServlet(RestServlet):
PATTERNS = admin_patterns("/purge_media_cache$") PATTERNS = admin_patterns("/purge_media_cache$")
@@ -532,6 +564,7 @@ def register_servlets_for_media_repo(hs: "HomeServer", http_server: HttpServer)
ProtectMediaByID(hs).register(http_server) ProtectMediaByID(hs).register(http_server)
UnprotectMediaByID(hs).register(http_server) UnprotectMediaByID(hs).register(http_server)
ListMediaInRoom(hs).register(http_server) ListMediaInRoom(hs).register(http_server)
ListQuarantinedMedia(hs).register(http_server)
# XXX DeleteMediaByDateSize must be registered before DeleteMediaByID as # XXX DeleteMediaByDateSize must be registered before DeleteMediaByID as
# their URL routes overlap. # their URL routes overlap.
DeleteMediaByDateSize(hs).register(http_server) DeleteMediaByDateSize(hs).register(http_server)

View File

@@ -61,6 +61,7 @@ class LocalMedia:
url_cache: str | None url_cache: str | None
last_access_ts: int last_access_ts: int
quarantined_by: str | None quarantined_by: str | None
quarantined_ts: int | None
safe_from_quarantine: bool safe_from_quarantine: bool
user_id: str | None user_id: str | None
authenticated: bool | None authenticated: bool | None
@@ -78,6 +79,7 @@ class RemoteMedia:
created_ts: int created_ts: int
last_access_ts: int last_access_ts: int
quarantined_by: str | None quarantined_by: str | None
quarantined_ts: int | None
authenticated: bool | None authenticated: bool | None
sha256: str | None sha256: str | None
@@ -243,6 +245,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
"user_id", "user_id",
"authenticated", "authenticated",
"sha256", "sha256",
"quarantined_ts",
), ),
allow_none=True, allow_none=True,
desc="get_local_media", desc="get_local_media",
@@ -262,6 +265,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
user_id=row[8], user_id=row[8],
authenticated=row[9], authenticated=row[9],
sha256=row[10], sha256=row[10],
quarantined_ts=row[11],
) )
async def get_local_media_by_user_paginate( async def get_local_media_by_user_paginate(
@@ -319,7 +323,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
safe_from_quarantine, safe_from_quarantine,
user_id, user_id,
authenticated, authenticated,
sha256 sha256,
quarantined_ts
FROM local_media_repository FROM local_media_repository
WHERE user_id = ? WHERE user_id = ?
ORDER BY {order_by_column} {order}, media_id ASC ORDER BY {order_by_column} {order}, media_id ASC
@@ -345,6 +350,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
user_id=row[9], user_id=row[9],
authenticated=row[10], authenticated=row[10],
sha256=row[11], sha256=row[11],
quarantined_ts=row[12],
) )
for row in txn for row in txn
] ]
@@ -695,6 +701,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
"quarantined_by", "quarantined_by",
"authenticated", "authenticated",
"sha256", "sha256",
"quarantined_ts",
), ),
allow_none=True, allow_none=True,
desc="get_cached_remote_media", desc="get_cached_remote_media",
@@ -713,6 +720,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
quarantined_by=row[6], quarantined_by=row[6],
authenticated=row[7], authenticated=row[7],
sha256=row[8], sha256=row[8],
quarantined_ts=row[9],
) )
async def store_cached_remote_media( async def store_cached_remote_media(

View File

@@ -945,6 +945,50 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
max_lifetime=max_lifetime, max_lifetime=max_lifetime,
) )
async def get_quarantined_media_mxcs(
self, index_start: int, index_limit: int, local: bool
) -> list[str]:
"""Retrieves all the quarantined media MXC URIs starting from the given position,
ordered from oldest quarantined timestamp, then alphabetically by media ID
(including origin).
Note that on established servers the "quarantined timestamp" may be zero due to
being introduced after the quarantine timestamp field was introduced.
Args:
index_start: The position to start from.
index_limit: The maximum number of results to return.
local: When true, only local media will be returned. When false, only remote media will be returned.
Returns:
The quarantined media as a list of media IDs.
"""
def _get_quarantined_media_mxcs_txn(
txn: LoggingTransaction,
) -> list[str]:
# We order by quarantined timestamp *and* media ID (including origin, when
# known) to ensure the ordering is stable for established servers.
if local:
sql = "SELECT '' as media_origin, media_id FROM local_media_repository WHERE quarantined_by IS NOT NULL ORDER BY quarantined_ts, media_id ASC LIMIT ? OFFSET ?"
else:
sql = "SELECT media_origin, media_id FROM remote_media_cache WHERE quarantined_by IS NOT NULL ORDER BY quarantined_ts, media_origin, media_id ASC LIMIT ? OFFSET ?"
txn.execute(sql, (index_limit, index_start))
mxcs = []
for media_origin, media_id in txn:
if local:
media_origin = self.hs.hostname
mxcs.append(f"mxc://{media_origin}/{media_id}")
return mxcs
return await self.db_pool.runInteraction(
"get_quarantined_media_mxcs",
_get_quarantined_media_mxcs_txn,
)
async def get_media_mxcs_in_room(self, room_id: str) -> tuple[list[str], list[str]]: async def get_media_mxcs_in_room(self, room_id: str) -> tuple[list[str], list[str]]:
"""Retrieves all the local and remote media MXC URIs in a given room """Retrieves all the local and remote media MXC URIs in a given room
@@ -952,7 +996,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
room_id room_id
Returns: Returns:
The local and remote media as a lists of the media IDs. The local and remote media as lists of the media IDs.
""" """
def _get_media_mxcs_in_room_txn( def _get_media_mxcs_in_room_txn(
@@ -1147,6 +1191,10 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
The total number of media items quarantined The total number of media items quarantined
""" """
total_media_quarantined = 0 total_media_quarantined = 0
now_ts: int | None = self.clock.time_msec()
if quarantined_by is None:
now_ts = None
# Effectively a legacy path, update any media that was explicitly named. # Effectively a legacy path, update any media that was explicitly named.
if media_ids: if media_ids:
@@ -1155,13 +1203,13 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
) )
sql = f""" sql = f"""
UPDATE local_media_repository UPDATE local_media_repository
SET quarantined_by = ? SET quarantined_by = ?, quarantined_ts = ?
WHERE {sql_many_clause_sql}""" WHERE {sql_many_clause_sql}"""
if quarantined_by is not None: if quarantined_by is not None:
sql += " AND safe_from_quarantine = FALSE" sql += " AND safe_from_quarantine = FALSE"
txn.execute(sql, [quarantined_by] + sql_many_clause_args) txn.execute(sql, [quarantined_by, now_ts] + sql_many_clause_args)
# Note that a rowcount of -1 can be used to indicate no rows were affected. # Note that a rowcount of -1 can be used to indicate no rows were affected.
total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0 total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0
@@ -1172,13 +1220,13 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
) )
sql = f""" sql = f"""
UPDATE local_media_repository UPDATE local_media_repository
SET quarantined_by = ? SET quarantined_by = ?, quarantined_ts = ?
WHERE {sql_many_clause_sql}""" WHERE {sql_many_clause_sql}"""
if quarantined_by is not None: if quarantined_by is not None:
sql += " AND safe_from_quarantine = FALSE" sql += " AND safe_from_quarantine = FALSE"
txn.execute(sql, [quarantined_by] + sql_many_clause_args) txn.execute(sql, [quarantined_by, now_ts] + sql_many_clause_args)
total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0 total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0
return total_media_quarantined return total_media_quarantined
@@ -1202,6 +1250,10 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
The total number of media items quarantined The total number of media items quarantined
""" """
total_media_quarantined = 0 total_media_quarantined = 0
now_ts: int | None = self.clock.time_msec()
if quarantined_by is None:
now_ts = None
if media: if media:
sql_in_list_clause, sql_args = make_tuple_in_list_sql_clause( sql_in_list_clause, sql_args = make_tuple_in_list_sql_clause(
@@ -1211,10 +1263,10 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
) )
sql = f""" sql = f"""
UPDATE remote_media_cache UPDATE remote_media_cache
SET quarantined_by = ? SET quarantined_by = ?, quarantined_ts = ?
WHERE {sql_in_list_clause}""" WHERE {sql_in_list_clause}"""
txn.execute(sql, [quarantined_by] + sql_args) txn.execute(sql, [quarantined_by, now_ts] + sql_args)
total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0 total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0
total_media_quarantined = 0 total_media_quarantined = 0
@@ -1224,9 +1276,9 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
) )
sql = f""" sql = f"""
UPDATE remote_media_cache UPDATE remote_media_cache
SET quarantined_by = ? SET quarantined_by = ?, quarantined_ts = ?
WHERE {sql_many_clause_sql}""" WHERE {sql_many_clause_sql}"""
txn.execute(sql, [quarantined_by] + sql_many_clause_args) txn.execute(sql, [quarantined_by, now_ts] + sql_many_clause_args)
total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0 total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0
return total_media_quarantined return total_media_quarantined

View File

@@ -0,0 +1,27 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 Element Creations, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
-- Add a timestamp for when the sliding sync connection position was last used,
-- only updated with a small granularity.
--
-- This should be NOT NULL, but we need to consider existing rows. In future we
-- may want to either backfill this or delete all rows with a NULL value (and
-- then make it NOT NULL).
ALTER TABLE local_media_repository ADD COLUMN quarantined_ts BIGINT;
ALTER TABLE remote_media_cache ADD COLUMN quarantined_ts BIGINT;
UPDATE local_media_repository SET quarantined_ts = 0 WHERE quarantined_by IS NOT NULL;
UPDATE remote_media_cache SET quarantined_ts = 0 WHERE quarantined_by IS NOT NULL;
-- Note: We *probably* should have an index on quarantined_ts, but we're going
-- to try to defer that to a future migration after seeing the performance impact.

View File

@@ -756,6 +756,112 @@ class DeleteMediaByDateSizeTestCase(_AdminMediaTests):
self.assertFalse(os.path.exists(local_path)) self.assertFalse(os.path.exists(local_path))
class ListQuarantinedMediaTestCase(_AdminMediaTests):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.server_name = hs.hostname
@parameterized.expand(["local", "remote"])
def test_no_auth(self, kind: str) -> None:
"""
Try to list quarantined media without authentication.
"""
channel = self.make_request(
"GET",
"/_synapse/admin/v1/media/quarantined?kind=%s" % (kind,),
)
self.assertEqual(401, channel.code, msg=channel.json_body)
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
@parameterized.expand(["local", "remote"])
def test_requester_is_not_admin(self, kind: str) -> None:
"""
If the user is not a server admin, an error is returned.
"""
self.other_user = self.register_user("user", "pass")
self.other_user_token = self.login("user", "pass")
channel = self.make_request(
"GET",
"/_synapse/admin/v1/media/quarantined?kind=%s" % (kind,),
access_token=self.other_user_token,
)
self.assertEqual(403, channel.code, msg=channel.json_body)
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_list_quarantined_media(self) -> None:
"""
Ensure we actually get results for each page. We can't really test that
remote media is quarantined, but we can test that local media is.
"""
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
def _upload() -> str:
return self.helper.upload_media(
SMALL_PNG, tok=self.admin_user_tok, expect_code=200
)["content_uri"][6:].split("/")[1] # Cut off 'mxc://' and domain
self.media_id_1 = _upload()
self.media_id_2 = _upload()
self.media_id_3 = _upload()
def _quarantine(media_id: str) -> None:
channel = self.make_request(
"POST",
"/_synapse/admin/v1/media/quarantine/%s/%s"
% (
self.server_name,
media_id,
),
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
_quarantine(self.media_id_1)
_quarantine(self.media_id_2)
_quarantine(self.media_id_3)
# Page 1
channel = self.make_request(
"GET",
"/_synapse/admin/v1/media/quarantined?kind=local&from=0&limit=1",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(1, len(channel.json_body["media"]))
# Page 2
channel = self.make_request(
"GET",
"/_synapse/admin/v1/media/quarantined?kind=local&from=1&limit=1",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(1, len(channel.json_body["media"]))
# Page 3
channel = self.make_request(
"GET",
"/_synapse/admin/v1/media/quarantined?kind=local&from=2&limit=1",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(1, len(channel.json_body["media"]))
# Page 4 (no media)
channel = self.make_request(
"GET",
"/_synapse/admin/v1/media/quarantined?kind=local&from=3&limit=1",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(0, len(channel.json_body["media"]))
class QuarantineMediaByIDTestCase(_AdminMediaTests): class QuarantineMediaByIDTestCase(_AdminMediaTests):
def upload_media_and_return_media_id(self, data: bytes) -> str: def upload_media_and_return_media_id(self, data: bytes) -> str:
# Upload some media into the room # Upload some media into the room