# # This file is licensed under the Affero General Public License (AGPL) version 3. # # Copyright 2018-2021 The Matrix.org Foundation C.I.C. # Copyright (C) 2023 New Vector, Ltd # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # See the GNU Affero General Public License for more details: # . # # Originally licensed under the Apache License, Version 2.0: # . # # [This file includes modifications made by New Vector Limited] # # import urllib.parse from typing import cast from parameterized import parameterized from twisted.internet.testing import MemoryReactor from twisted.web.resource import Resource import synapse.rest.admin from synapse.http.server import JsonResource from synapse.rest.admin import VersionServlet from synapse.rest.client import login, media, room from synapse.server import HomeServer from synapse.types import UserID from synapse.util.clock import Clock from tests import unittest from tests.test_utils import SMALL_PNG class VersionTestCase(unittest.HomeserverTestCase): url = "/_synapse/admin/v1/server_version" def create_test_resource(self) -> JsonResource: resource = JsonResource(self.hs) VersionServlet(self.hs).register(resource) return resource def test_version_string(self) -> None: channel = self.make_request("GET", self.url, shorthand=False) self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual({"server_version"}, set(channel.json_body.keys())) class QuarantineMediaTestCase(unittest.HomeserverTestCase): """Test /quarantine_media admin API.""" servlets = [ synapse.rest.admin.register_servlets, synapse.rest.admin.register_servlets_for_media_repo, login.register_servlets, media.register_servlets, room.register_servlets, ] def create_resource_dict(self) -> dict[str, Resource]: resources = super().create_resource_dict() resources["/_matrix/media"] = self.hs.get_media_repository_resource() return resources def _ensure_quarantined( self, user_tok: str, server_and_media_id: str, include_bypass_param: bool = False, ) -> None: """Ensure a piece of media is quarantined when trying to access it. The include_bypass_param flag enables the presence of the admin_unsafely_bypass_quarantine query parameter, but still expects that the request will fail to download the media. """ if include_bypass_param: query_string = "?admin_unsafely_bypass_quarantine=true" channel = self.make_request( "GET", f"/_matrix/client/v1/media/download/{server_and_media_id}{query_string}", shorthand=False, access_token=user_tok, ) # Non-admins can't bypass, so this should fail regardless of whether the # media is actually quarantined. self.assertEqual( 400, channel.code, msg=( "Expected to receive a 400 when bypassing quarantined media: %s" % server_and_media_id ), ) # Repeat the request, this time without the bypass parameter. channel = self.make_request( "GET", f"/_matrix/client/v1/media/download/{server_and_media_id}", shorthand=False, access_token=user_tok, ) # Should be quarantined self.assertEqual( 404, channel.code, msg=( "Expected to receive a 404 on accessing quarantined media: %s" % server_and_media_id ), ) def test_admin_can_bypass_quarantine(self) -> None: self.register_user("admin", "pass", admin=True) admin_user_tok = self.login("admin", "pass") # Upload some media response = self.helper.upload_media(SMALL_PNG, tok=admin_user_tok) # Extract media ID from the response server_name_and_media_id = response["content_uri"][6:] # Cut off 'mxc://' server_name, media_id = server_name_and_media_id.split("/") # Attempt to access the media channel = self.make_request( "GET", f"/_matrix/client/v1/media/download/{server_name_and_media_id}", shorthand=False, access_token=admin_user_tok, ) # Should be successful self.assertEqual(200, channel.code) # Quarantine the media url = "/_synapse/admin/v1/media/quarantine/%s/%s" % ( urllib.parse.quote(server_name), urllib.parse.quote(media_id), ) channel = self.make_request( "POST", url, access_token=admin_user_tok, ) self.pump(1.0) self.assertEqual(200, channel.code, msg=channel.json_body) # Now access it *without* the bypass parameter - this should fail (as expected). self._ensure_quarantined( admin_user_tok, server_name_and_media_id, include_bypass_param=False ) # Now access it *with* the bypass parameter - this should work channel = self.make_request( "GET", f"/_matrix/client/v1/media/download/{server_name_and_media_id}?admin_unsafely_bypass_quarantine=true", shorthand=False, access_token=admin_user_tok, ) self.assertEqual( 200, channel.code, msg=( "Expected to receive a 200 on accessing (with bypass) quarantined media: %s" % server_name_and_media_id ), ) @parameterized.expand( [ # Attempt quarantine media APIs as non-admin "/_synapse/admin/v1/media/quarantine/example.org/abcde12345", # And the roomID/userID endpoint "/_synapse/admin/v1/room/!room%3Aexample.com/media/quarantine", ] ) def test_quarantine_media_requires_admin(self, url: str) -> None: self.register_user("nonadmin", "pass", admin=False) non_admin_user_tok = self.login("nonadmin", "pass") channel = self.make_request( "POST", url.encode("ascii"), access_token=non_admin_user_tok, ) # Expect a forbidden error self.assertEqual( 403, channel.code, msg="Expected forbidden on quarantining media as a non-admin", ) def test_quarantine_media_by_id(self) -> None: self.register_user("id_admin", "pass", admin=True) admin_user_tok = self.login("id_admin", "pass") self.register_user("id_nonadmin", "pass", admin=False) non_admin_user_tok = self.login("id_nonadmin", "pass") # Upload some media into the room response = self.helper.upload_media(SMALL_PNG, tok=admin_user_tok) # Extract media ID from the response server_name_and_media_id = response["content_uri"][6:] # Cut off 'mxc://' server_name, media_id = server_name_and_media_id.split("/") # Attempt to access the media channel = self.make_request( "GET", f"/_matrix/client/v1/media/download/{server_name_and_media_id}", shorthand=False, access_token=non_admin_user_tok, ) # Should be successful self.assertEqual(200, channel.code) # Quarantine the media url = "/_synapse/admin/v1/media/quarantine/%s/%s" % ( urllib.parse.quote(server_name), urllib.parse.quote(media_id), ) channel = self.make_request( "POST", url, access_token=admin_user_tok, ) self.pump(1.0) self.assertEqual(200, channel.code, msg=channel.json_body) # Attempt to access the media (and ensure non-admins can't download it, even # with a bypass parameter). Admins cannot download it without the bypass param. self._ensure_quarantined( non_admin_user_tok, server_name_and_media_id, include_bypass_param=True ) self._ensure_quarantined( admin_user_tok, server_name_and_media_id, include_bypass_param=False ) @parameterized.expand( [ # regular API path "/_synapse/admin/v1/room/%s/media/quarantine", # deprecated API path "/_synapse/admin/v1/quarantine_media/%s", ] ) def test_quarantine_all_media_in_room(self, url: str) -> None: self.register_user("room_admin", "pass", admin=True) admin_user_tok = self.login("room_admin", "pass") non_admin_user = self.register_user("room_nonadmin", "pass", admin=False) non_admin_user_tok = self.login("room_nonadmin", "pass") room_id = self.helper.create_room_as(non_admin_user, tok=admin_user_tok) self.helper.join(room_id, non_admin_user, tok=non_admin_user_tok) # Upload some media response_1 = self.helper.upload_media(SMALL_PNG, tok=non_admin_user_tok) response_2 = self.helper.upload_media(SMALL_PNG, tok=non_admin_user_tok) # Extract mxcs mxc_1 = response_1["content_uri"] mxc_2 = response_2["content_uri"] # Send it into the room self.helper.send_event( room_id, "m.room.message", content={"body": "image-1", "msgtype": "m.image", "url": mxc_1}, txn_id="111", tok=non_admin_user_tok, ) self.helper.send_event( room_id, "m.room.message", content={"body": "image-2", "msgtype": "m.image", "url": mxc_2}, txn_id="222", tok=non_admin_user_tok, ) channel = self.make_request( "POST", url % urllib.parse.quote(room_id), access_token=admin_user_tok, ) self.pump(1.0) self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual( channel.json_body, {"num_quarantined": 2}, "Expected 2 quarantined items" ) # Convert mxc URLs to server/media_id strings server_and_media_id_1 = mxc_1[6:] server_and_media_id_2 = mxc_2[6:] # Test that we cannot download any of the media anymore, especially with the # bypass parameter set. Admins cannot download the media without supplying the # bypass parameter, so we check that too. self._ensure_quarantined( non_admin_user_tok, server_and_media_id_1, include_bypass_param=True ) self._ensure_quarantined( non_admin_user_tok, server_and_media_id_2, include_bypass_param=True ) self._ensure_quarantined( admin_user_tok, server_and_media_id_1, include_bypass_param=False ) self._ensure_quarantined( admin_user_tok, server_and_media_id_2, include_bypass_param=False ) def test_quarantine_all_media_by_user(self) -> None: self.register_user("user_admin", "pass", admin=True) admin_user_tok = self.login("user_admin", "pass") non_admin_user = self.register_user("user_nonadmin", "pass", admin=False) non_admin_user_tok = self.login("user_nonadmin", "pass") # Upload some media response_1 = self.helper.upload_media(SMALL_PNG, tok=non_admin_user_tok) response_2 = self.helper.upload_media(SMALL_PNG, tok=non_admin_user_tok) response_3 = self.helper.upload_media(SMALL_PNG, tok=non_admin_user_tok) # Extract media IDs server_and_media_id_1 = response_1["content_uri"][6:] server_and_media_id_2 = response_2["content_uri"][6:] server_and_media_id_3 = response_3["content_uri"][6:] # Remove the hash from the media to simulate historic media. self.get_success( self.hs.get_datastores().main.update_local_media( media_id=server_and_media_id_3.split("/")[1], media_type="image/png", upload_name=None, media_length=123, user_id=UserID.from_string(non_admin_user), # Hack to force some media to have no hash. sha256=cast(str, None), ) ) # Quarantine all media by this user url = "/_synapse/admin/v1/user/%s/media/quarantine" % urllib.parse.quote( non_admin_user ) channel = self.make_request( "POST", url.encode("ascii"), access_token=admin_user_tok, ) self.pump(1.0) self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual( channel.json_body, {"num_quarantined": 3}, "Expected 3 quarantined items" ) # Attempt to access each piece of media, ensuring that it can't be downloaded # even with a bypass parameter. Admins should not be able to download the media # either when not supplying the bypass parameter, so we check that too. self._ensure_quarantined( non_admin_user_tok, server_and_media_id_1, include_bypass_param=True ) self._ensure_quarantined( non_admin_user_tok, server_and_media_id_2, include_bypass_param=True ) self._ensure_quarantined( non_admin_user_tok, server_and_media_id_3, include_bypass_param=True ) self._ensure_quarantined( admin_user_tok, server_and_media_id_1, include_bypass_param=False ) self._ensure_quarantined( admin_user_tok, server_and_media_id_2, include_bypass_param=False ) self._ensure_quarantined( admin_user_tok, server_and_media_id_3, include_bypass_param=False ) def test_cannot_quarantine_safe_media(self) -> None: self.register_user("user_admin", "pass", admin=True) admin_user_tok = self.login("user_admin", "pass") non_admin_user = self.register_user("user_nonadmin", "pass", admin=False) non_admin_user_tok = self.login("user_nonadmin", "pass") # Upload some media response_1 = self.helper.upload_media(SMALL_PNG, tok=non_admin_user_tok) response_2 = self.helper.upload_media(SMALL_PNG, tok=non_admin_user_tok) # Extract media IDs server_and_media_id_1 = response_1["content_uri"][6:] server_and_media_id_2 = response_2["content_uri"][6:] # Mark the second item as safe from quarantine. _, media_id_2 = server_and_media_id_2.split("/") # Quarantine the media url = "/_synapse/admin/v1/media/protect/%s" % (urllib.parse.quote(media_id_2),) channel = self.make_request("POST", url, access_token=admin_user_tok) self.pump(1.0) self.assertEqual(200, channel.code, msg=channel.json_body) # Quarantine all media by this user url = "/_synapse/admin/v1/user/%s/media/quarantine" % urllib.parse.quote( non_admin_user ) channel = self.make_request( "POST", url.encode("ascii"), access_token=admin_user_tok, ) self.pump(1.0) self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual( channel.json_body, {"num_quarantined": 1}, "Expected 1 quarantined item" ) # Attempt to access each piece of media, the first should fail, the # second should succeed. We check both the non-admin user with a bypass # parameter, and the admin user without. self._ensure_quarantined( non_admin_user_tok, server_and_media_id_1, include_bypass_param=True ) self._ensure_quarantined( admin_user_tok, server_and_media_id_1, include_bypass_param=False ) # Attempt to access each piece of media channel = self.make_request( "GET", f"/_matrix/client/v1/media/download/{server_and_media_id_2}", shorthand=False, access_token=non_admin_user_tok, ) # Shouldn't be quarantined self.assertEqual( 200, channel.code, msg=( "Expected to receive a 200 on accessing not-quarantined media: %s" % server_and_media_id_2 ), ) class PurgeHistoryTestCase(unittest.HomeserverTestCase): servlets = [ synapse.rest.admin.register_servlets, login.register_servlets, room.register_servlets, ] def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.admin_user = self.register_user("admin", "pass", admin=True) self.admin_user_tok = self.login("admin", "pass") self.other_user = self.register_user("user", "pass") self.other_user_tok = self.login("user", "pass") self.room_id = self.helper.create_room_as( self.other_user, tok=self.other_user_tok ) self.url = f"/_synapse/admin/v1/purge_history/{self.room_id}" self.url_status = "/_synapse/admin/v1/purge_history_status/" def test_purge_history(self) -> None: """ Simple test of purge history API. Test only that is is possible to call, get status 200 and purge_id. """ channel = self.make_request( "POST", self.url, content={"delete_local_events": True, "purge_up_to_ts": 0}, access_token=self.admin_user_tok, ) self.assertEqual(200, channel.code, msg=channel.json_body) self.assertIn("purge_id", channel.json_body) purge_id = channel.json_body["purge_id"] # get status channel = self.make_request( "GET", self.url_status + purge_id, access_token=self.admin_user_tok, ) self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual("complete", channel.json_body["status"]) class ExperimentalFeaturesTestCase(unittest.HomeserverTestCase): servlets = [ synapse.rest.admin.register_servlets, login.register_servlets, ] def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.admin_user = self.register_user("admin", "pass", admin=True) self.admin_user_tok = self.login("admin", "pass") self.other_user = self.register_user("user", "pass") self.other_user_tok = self.login("user", "pass") self.url = "/_synapse/admin/v1/experimental_features" def test_enable_and_disable(self) -> None: """ Test basic functionality of ExperimentalFeatures endpoint """ # test enabling features works url = f"{self.url}/{self.other_user}" channel = self.make_request( "PUT", url, content={ "features": {"msc3881": True}, }, access_token=self.admin_user_tok, ) self.assertEqual(channel.code, 200) # list which features are enabled and ensure the ones we enabled are listed self.assertEqual(channel.code, 200) url = f"{self.url}/{self.other_user}" channel = self.make_request( "GET", url, access_token=self.admin_user_tok, ) self.assertEqual(channel.code, 200) self.assertEqual( True, channel.json_body["features"]["msc3881"], ) # test disabling a feature works url = f"{self.url}/{self.other_user}" channel = self.make_request( "PUT", url, content={"features": {"msc3881": False}}, access_token=self.admin_user_tok, ) self.assertEqual(channel.code, 200) # list the features enabled/disabled and ensure they are still are correct self.assertEqual(channel.code, 200) url = f"{self.url}/{self.other_user}" channel = self.make_request( "GET", url, access_token=self.admin_user_tok, ) self.assertEqual(channel.code, 200) self.assertEqual( False, channel.json_body["features"]["msc3881"], ) # test nothing blows up if you try to disable a feature that isn't already enabled url = f"{self.url}/{self.other_user}" channel = self.make_request( "PUT", url, content={"features": {"msc3881": False}}, access_token=self.admin_user_tok, ) self.assertEqual(channel.code, 200) # test trying to enable a feature without an admin access token is denied url = f"{self.url}/f{self.other_user}" channel = self.make_request( "PUT", url, content={"features": {"msc3881": True}}, access_token=self.other_user_tok, ) self.assertEqual(channel.code, 403) self.assertEqual( channel.json_body, {"errcode": "M_FORBIDDEN", "error": "You are not a server admin"}, ) # test trying to enable a bogus msc is denied url = f"{self.url}/{self.other_user}" channel = self.make_request( "PUT", url, content={"features": {"msc6666": True}}, access_token=self.admin_user_tok, ) self.assertEqual(channel.code, 400) self.assertEqual( channel.json_body, { "errcode": "M_UNKNOWN", "error": "'msc6666' is not recognised as a valid experimental feature.", }, )