Allow admins to bypass the quarantine check on media downloads (#19275)

Co-authored-by: turt2live <1190097+turt2live@users.noreply.github.com>
Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
This commit is contained in:
Travis Ralston
2025-12-15 10:23:33 -07:00
committed by GitHub
parent 466994743a
commit 0f2b29511f
5 changed files with 197 additions and 20 deletions

View File

@@ -71,14 +71,43 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
return resources
def _ensure_quarantined(
self, admin_user_tok: str, server_and_media_id: str
self,
user_tok: str,
server_and_media_id: str,
include_bypass_param: bool = False,
) -> None:
"""Ensure a piece of media is quarantined when trying to access it."""
"""Ensure a piece of media is quarantined when trying to access it.
The include_bypass_param flag enables the presence of the
admin_unsafely_bypass_quarantine query parameter, but still expects that the
request will fail to download the media.
"""
if include_bypass_param:
query_string = "?admin_unsafely_bypass_quarantine=true"
channel = self.make_request(
"GET",
f"/_matrix/client/v1/media/download/{server_and_media_id}{query_string}",
shorthand=False,
access_token=user_tok,
)
# Non-admins can't bypass, so this should fail regardless of whether the
# media is actually quarantined.
self.assertEqual(
400,
channel.code,
msg=(
"Expected to receive a 400 when bypassing quarantined media: %s"
% server_and_media_id
),
)
# Repeat the request, this time without the bypass parameter.
channel = self.make_request(
"GET",
f"/_matrix/client/v1/media/download/{server_and_media_id}",
shorthand=False,
access_token=admin_user_tok,
access_token=user_tok,
)
# Should be quarantined
@@ -91,6 +120,62 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
),
)
def test_admin_can_bypass_quarantine(self) -> None:
self.register_user("admin", "pass", admin=True)
admin_user_tok = self.login("admin", "pass")
# Upload some media
response = self.helper.upload_media(SMALL_PNG, tok=admin_user_tok)
# Extract media ID from the response
server_name_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
server_name, media_id = server_name_and_media_id.split("/")
# Attempt to access the media
channel = self.make_request(
"GET",
f"/_matrix/client/v1/media/download/{server_name_and_media_id}",
shorthand=False,
access_token=admin_user_tok,
)
# Should be successful
self.assertEqual(200, channel.code)
# Quarantine the media
url = "/_synapse/admin/v1/media/quarantine/%s/%s" % (
urllib.parse.quote(server_name),
urllib.parse.quote(media_id),
)
channel = self.make_request(
"POST",
url,
access_token=admin_user_tok,
)
self.pump(1.0)
self.assertEqual(200, channel.code, msg=channel.json_body)
# Now access it *without* the bypass parameter - this should fail (as expected).
self._ensure_quarantined(
admin_user_tok, server_name_and_media_id, include_bypass_param=False
)
# Now access it *with* the bypass parameter - this should work
channel = self.make_request(
"GET",
f"/_matrix/client/v1/media/download/{server_name_and_media_id}?admin_unsafely_bypass_quarantine=true",
shorthand=False,
access_token=admin_user_tok,
)
self.assertEqual(
200,
channel.code,
msg=(
"Expected to receive a 200 on accessing (with bypass) quarantined media: %s"
% server_name_and_media_id
),
)
@parameterized.expand(
[
# Attempt quarantine media APIs as non-admin
@@ -154,8 +239,14 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
self.pump(1.0)
self.assertEqual(200, channel.code, msg=channel.json_body)
# Attempt to access the media
self._ensure_quarantined(admin_user_tok, server_name_and_media_id)
# Attempt to access the media (and ensure non-admins can't download it, even
# with a bypass parameter). Admins cannot download it without the bypass param.
self._ensure_quarantined(
non_admin_user_tok, server_name_and_media_id, include_bypass_param=True
)
self._ensure_quarantined(
admin_user_tok, server_name_and_media_id, include_bypass_param=False
)
@parameterized.expand(
[
@@ -214,9 +305,21 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
server_and_media_id_1 = mxc_1[6:]
server_and_media_id_2 = mxc_2[6:]
# Test that we cannot download any of the media anymore
self._ensure_quarantined(admin_user_tok, server_and_media_id_1)
self._ensure_quarantined(admin_user_tok, server_and_media_id_2)
# Test that we cannot download any of the media anymore, especially with the
# bypass parameter set. Admins cannot download the media without supplying the
# bypass parameter, so we check that too.
self._ensure_quarantined(
non_admin_user_tok, server_and_media_id_1, include_bypass_param=True
)
self._ensure_quarantined(
non_admin_user_tok, server_and_media_id_2, include_bypass_param=True
)
self._ensure_quarantined(
admin_user_tok, server_and_media_id_1, include_bypass_param=False
)
self._ensure_quarantined(
admin_user_tok, server_and_media_id_2, include_bypass_param=False
)
def test_quarantine_all_media_by_user(self) -> None:
self.register_user("user_admin", "pass", admin=True)
@@ -263,10 +366,27 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
channel.json_body, {"num_quarantined": 3}, "Expected 3 quarantined items"
)
# Attempt to access each piece of media
self._ensure_quarantined(admin_user_tok, server_and_media_id_1)
self._ensure_quarantined(admin_user_tok, server_and_media_id_2)
self._ensure_quarantined(admin_user_tok, server_and_media_id_3)
# Attempt to access each piece of media, ensuring that it can't be downloaded
# even with a bypass parameter. Admins should not be able to download the media
# either when not supplying the bypass parameter, so we check that too.
self._ensure_quarantined(
non_admin_user_tok, server_and_media_id_1, include_bypass_param=True
)
self._ensure_quarantined(
non_admin_user_tok, server_and_media_id_2, include_bypass_param=True
)
self._ensure_quarantined(
non_admin_user_tok, server_and_media_id_3, include_bypass_param=True
)
self._ensure_quarantined(
admin_user_tok, server_and_media_id_1, include_bypass_param=False
)
self._ensure_quarantined(
admin_user_tok, server_and_media_id_2, include_bypass_param=False
)
self._ensure_quarantined(
admin_user_tok, server_and_media_id_3, include_bypass_param=False
)
def test_cannot_quarantine_safe_media(self) -> None:
self.register_user("user_admin", "pass", admin=True)
@@ -307,8 +427,14 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase):
)
# Attempt to access each piece of media, the first should fail, the
# second should succeed.
self._ensure_quarantined(admin_user_tok, server_and_media_id_1)
# second should succeed. We check both the non-admin user with a bypass
# parameter, and the admin user without.
self._ensure_quarantined(
non_admin_user_tok, server_and_media_id_1, include_bypass_param=True
)
self._ensure_quarantined(
admin_user_tok, server_and_media_id_1, include_bypass_param=False
)
# Attempt to access each piece of media
channel = self.make_request(