1
0

Compare commits

...

5 Commits

Author SHA1 Message Date
Travis Ralston
ca8d94585f Merge branch 'develop' into travis/fix-quarantine-list 2025-12-17 21:50:46 -07:00
Travis Ralston
516b74068b linting & fix tests 2025-12-15 23:41:20 -07:00
Travis Ralston
ad230c4530 changelog 2025-12-15 22:14:10 -07:00
Travis Ralston
a4036bf7a3 Switch to a semi-stable pagination ordering 2025-12-15 22:04:01 -07:00
Travis Ralston
32ce7a3026 Cleanup from previous PR
https://github.com/element-hq/synapse/pull/19268/changes#r2614217300 wasn't applied and the migration had copy/paste artifacts.

PR: https://github.com/element-hq/synapse/pull/19268
2025-12-15 21:24:07 -07:00
7 changed files with 109 additions and 30 deletions

View File

@@ -1 +1 @@
Add an admin API for retrieving a paginated list of quarantined media.
Add a ["Listing all quarantined media" Admin API](https://element-hq.github.io/synapse/latest/admin_api/media_admin_api.html#listing-all-quarantined-media) for retrieving a paginated list of quarantined media.

View File

@@ -0,0 +1 @@
Add a ["Listing all quarantined media" Admin API](https://element-hq.github.io/synapse/latest/admin_api/media_admin_api.html#listing-all-quarantined-media) for retrieving a paginated list of quarantined media.

View File

@@ -81,11 +81,17 @@ same pagination values will result in unexpected results.
Request:
```http
GET /_synapse/admin/v1/media/quarantined?from=0&limit=100&kind=local
GET /_synapse/admin/v1/media/quarantined?from=1765860979860-0&limit=100&kind=local
```
`from` and `limit` are optional parameters, and default to `0` and `100` respectively. They are the row index and number
of rows to return - they are not timestamps.
`from` and `limit` are optional parameters, and default to the first page and `100` respectively. `from` is the `next_batch`
token returned by a previous request and `limit` is the number of rows to return. Note that `next_batch` is not intended
to survive longer than about a minute and may produce inconsistent results if used after that time. Neither `from` or
`limit` is a timestamp, though `from` does encode a timestamp.
If you require a long-lived `from` token, split `next_batch` on `-` and combine the first part with a `0`, separated by
a `-` again. For example: `1234-5678` becomes `1234-0`. Your application will need to deduplicate `media` rows it has
already seen if using this method.
`kind` *MUST* either be `local` or `remote`.
@@ -96,10 +102,13 @@ The API returns a JSON body containing MXC URIs for the quarantined media, like
"media": [
"mxc://localhost/xwvutsrqponmlkjihgfedcba",
"mxc://localhost/abcdefghijklmnopqrstuvwx"
]
],
"next_batch": "1765860979860-2"
}
```
`media` may be empty. `next_batch` may be omitted if there are no rows in `media`.
# Quarantine media
Quarantining media means that it is marked as inaccessible by users. It applies

View File

@@ -308,7 +308,7 @@ class ListQuarantinedMedia(RestServlet):
) -> tuple[int, JsonDict]:
await assert_requester_is_admin(self.auth, request)
start = parse_integer(request, "from", default=0)
start = parse_string(request, "from", default="")
limit = parse_integer(request, "limit", default=100)
local_or_remote = parse_string(request, "kind", required=True)
@@ -318,11 +318,37 @@ class ListQuarantinedMedia(RestServlet):
"Query parameter `kind` must be either 'local' or 'remote'.",
)
mxcs = await self.store.get_quarantined_media_mxcs(
start, limit, local_or_remote == "local"
start_ts = 0
start_index = 0
if "-" in start: # indicates we have a next_batch token
# Batch tokens are structured as `timestamp-index`, where `index` is relative
# to the timestamp. This is done to support pages having many records with
# the same timestamp (like existing servers having a ton of `ts=0` records).
#
# Dev note: The structure of the `from` token is partially documented in the
# admin API. Do not change the behaviour without consulting the docs.
parts = start.split("-", maxsplit=1)
start_ts = int(parts[0])
start_index = int(parts[1])
elif len(start) > 0:
start_index = int(start)
mxcs_with_ts = await self.store.get_quarantined_media_mxcs_and_timestamps(
start_ts, start_index, limit, local_or_remote == "local"
)
return HTTPStatus.OK, {"media": mxcs}
res: JsonDict = {
"media": [mxc for mxc, _ in mxcs_with_ts],
}
if len(mxcs_with_ts) > 0:
max_ts = mxcs_with_ts[-1][1] if mxcs_with_ts else 0
count_of_max_ts = (
sum(1 for _, ts in mxcs_with_ts if ts == max_ts) + start_index
)
res["next_batch"] = f"{max_ts}-{count_of_max_ts}"
return HTTPStatus.OK, res
class PurgeMediaCacheRestServlet(RestServlet):

View File

@@ -945,9 +945,9 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
max_lifetime=max_lifetime,
)
async def get_quarantined_media_mxcs(
self, index_start: int, index_limit: int, local: bool
) -> list[str]:
async def get_quarantined_media_mxcs_and_timestamps(
self, start_ts: int, index_start: int, index_limit: int, local: bool
) -> list[tuple[str, int]]:
"""Retrieves all the quarantined media MXC URIs starting from the given position,
ordered from oldest quarantined timestamp, then alphabetically by media ID
(including origin).
@@ -956,37 +956,39 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
being introduced after the quarantine timestamp field was introduced.
Args:
start_ts: The timestamp to start from (inclusive).
index_start: The position to start from.
index_limit: The maximum number of results to return.
local: When true, only local media will be returned. When false, only remote media will be returned.
Returns:
The quarantined media as a list of media IDs.
The quarantined media as a list of tuples containing an MXC URI and the
timestamp the media was quarantined.
"""
def _get_quarantined_media_mxcs_txn(
def _get_quarantined_media_mxcs_and_timestamps_txn(
txn: LoggingTransaction,
) -> list[str]:
) -> list[tuple[str, int]]:
# We order by quarantined timestamp *and* media ID (including origin, when
# known) to ensure the ordering is stable for established servers.
if local:
sql = "SELECT '' as media_origin, media_id FROM local_media_repository WHERE quarantined_by IS NOT NULL ORDER BY quarantined_ts, media_id ASC LIMIT ? OFFSET ?"
sql = "SELECT '' as media_origin, media_id, quarantined_ts FROM local_media_repository WHERE quarantined_by IS NOT NULL AND quarantined_ts >= ? ORDER BY quarantined_ts, media_id ASC LIMIT ? OFFSET ?"
else:
sql = "SELECT media_origin, media_id FROM remote_media_cache WHERE quarantined_by IS NOT NULL ORDER BY quarantined_ts, media_origin, media_id ASC LIMIT ? OFFSET ?"
txn.execute(sql, (index_limit, index_start))
sql = "SELECT media_origin, media_id, quarantined_ts FROM remote_media_cache WHERE quarantined_by IS NOT NULL AND quarantined_ts >= ? ORDER BY quarantined_ts, media_origin, media_id ASC LIMIT ? OFFSET ?"
txn.execute(sql, (start_ts, index_limit, index_start))
mxcs = []
results = []
for media_origin, media_id in txn:
for media_origin, media_id, ts in txn:
if local:
media_origin = self.hs.hostname
mxcs.append(f"mxc://{media_origin}/{media_id}")
results.append((f"mxc://{media_origin}/{media_id}", ts))
return mxcs
return results
return await self.db_pool.runInteraction(
"get_quarantined_media_mxcs",
_get_quarantined_media_mxcs_txn,
"_get_quarantined_media_mxcs_and_timestamps",
_get_quarantined_media_mxcs_and_timestamps_txn,
)
async def get_media_mxcs_in_room(self, room_id: str) -> tuple[list[str], list[str]]:

View File

@@ -11,12 +11,6 @@
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
-- Add a timestamp for when the sliding sync connection position was last used,
-- only updated with a small granularity.
--
-- This should be NOT NULL, but we need to consider existing rows. In future we
-- may want to either backfill this or delete all rows with a NULL value (and
-- then make it NOT NULL).
ALTER TABLE local_media_repository ADD COLUMN quarantined_ts BIGINT;
ALTER TABLE remote_media_cache ADD COLUMN quarantined_ts BIGINT;

View File

@@ -861,6 +861,53 @@ class ListQuarantinedMediaTestCase(_AdminMediaTests):
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(0, len(channel.json_body["media"]))
# Now repeat the same pages, but this time use the next_batch token as intended.
# Page 1
channel = self.make_request(
"GET",
"/_synapse/admin/v1/media/quarantined?kind=local&limit=1",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(1, len(channel.json_body["media"]))
next_batch = channel.json_body["next_batch"]
self.assertNotEqual(None, next_batch)
# Page 2
channel = self.make_request(
"GET",
f"/_synapse/admin/v1/media/quarantined?kind=local&from={next_batch}&limit=1",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(1, len(channel.json_body["media"]))
next_batch2 = channel.json_body["next_batch"]
self.assertNotEqual(None, next_batch2)
self.assertNotEqual(next_batch, next_batch2)
# Page 3
channel = self.make_request(
"GET",
f"/_synapse/admin/v1/media/quarantined?kind=local&from={next_batch2}&limit=1",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(1, len(channel.json_body["media"]))
next_batch3 = channel.json_body["next_batch"]
self.assertNotEqual(None, next_batch3)
self.assertNotEqual(next_batch2, next_batch3)
# Page 4 (empty results)
channel = self.make_request(
"GET",
f"/_synapse/admin/v1/media/quarantined?kind=local&from={next_batch3}&limit=1",
access_token=self.admin_user_tok,
)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(0, len(channel.json_body["media"]))
self.assertNotIn("next_batch", channel.json_body)
class QuarantineMediaByIDTestCase(_AdminMediaTests):
def upload_media_and_return_media_id(self, data: bytes) -> str: