Merge tag 'v1.36.0' into babolivier/dinsic_1.41.0
Synapse 1.36.0 (2021-06-15) =========================== No significant changes. Synapse 1.36.0rc2 (2021-06-11) ============================== Bugfixes -------- - Fix a bug which caused presence updates to stop working some time after a restart, when using a presence writer worker. Broke in v1.33.0. ([\#10149](https://github.com/matrix-org/synapse/issues/10149)) - Fix a bug when using federation sender worker where it would send out more presence updates than necessary, leading to high resource usage. Broke in v1.33.0. ([\#10163](https://github.com/matrix-org/synapse/issues/10163)) - Fix a bug where Synapse could send the same presence update to a remote twice. ([\#10165](https://github.com/matrix-org/synapse/issues/10165)) Synapse 1.36.0rc1 (2021-06-08) ============================== Features -------- - Add new endpoint `/_matrix/client/r0/rooms/{roomId}/aliases` from Client-Server API r0.6.1 (previously [MSC2432](https://github.com/matrix-org/matrix-doc/pull/2432)). ([\#9224](https://github.com/matrix-org/synapse/issues/9224)) - Improve performance of incoming federation transactions in large rooms. ([\#9953](https://github.com/matrix-org/synapse/issues/9953), [\#9973](https://github.com/matrix-org/synapse/issues/9973)) - Rewrite logic around verifying JSON object and fetching server keys to be more performant and use less memory. ([\#10035](https://github.com/matrix-org/synapse/issues/10035)) - Add new admin APIs for unprotecting local media from quarantine. Contributed by @dklimpel. ([\#10040](https://github.com/matrix-org/synapse/issues/10040)) - Add new admin APIs to remove media by media ID from quarantine. Contributed by @dklimpel. ([\#10044](https://github.com/matrix-org/synapse/issues/10044)) - Make reason and score parameters optional for reporting content. Implements [MSC2414](https://github.com/matrix-org/matrix-doc/pull/2414). Contributed by Callum Brown. ([\#10077](https://github.com/matrix-org/synapse/issues/10077)) - Add support for routing more requests to workers. ([\#10084](https://github.com/matrix-org/synapse/issues/10084)) - Report OpenTracing spans for database activity. ([\#10113](https://github.com/matrix-org/synapse/issues/10113), [\#10136](https://github.com/matrix-org/synapse/issues/10136), [\#10141](https://github.com/matrix-org/synapse/issues/10141)) - Significantly reduce memory usage of joining large remote rooms. ([\#10117](https://github.com/matrix-org/synapse/issues/10117)) Bugfixes -------- - Fixed a bug causing replication requests to fail when receiving a lot of events via federation. ([\#10082](https://github.com/matrix-org/synapse/issues/10082)) - Fix a bug in the `force_tracing_for_users` option introduced in Synapse v1.35 which meant that the OpenTracing spans produced were missing most tags. ([\#10092](https://github.com/matrix-org/synapse/issues/10092)) - Fixed a bug that could cause Synapse to stop notifying application services. Contributed by Willem Mulder. ([\#10107](https://github.com/matrix-org/synapse/issues/10107)) - Fix bug where the server would attempt to fetch the same history in the room from a remote server multiple times in parallel. ([\#10116](https://github.com/matrix-org/synapse/issues/10116)) - Fix a bug introduced in Synapse 1.33.0 which caused replication requests to fail when receiving a lot of very large events via federation. ([\#10118](https://github.com/matrix-org/synapse/issues/10118)) - Fix bug when using workers where pagination requests failed if a remote server returned zero events from `/backfill`. Introduced in 1.35.0. ([\#10133](https://github.com/matrix-org/synapse/issues/10133)) Improved Documentation ---------------------- - Clarify security note regarding hosting Synapse on the same domain as other web applications. ([\#9221](https://github.com/matrix-org/synapse/issues/9221)) - Update CAPTCHA documentation to mention turning off the verify origin feature. Contributed by @aaronraimist. ([\#10046](https://github.com/matrix-org/synapse/issues/10046)) - Tweak wording of database recommendation in `INSTALL.md`. Contributed by @aaronraimist. ([\#10057](https://github.com/matrix-org/synapse/issues/10057)) - Add initial infrastructure for rendering Synapse documentation with mdbook. ([\#10086](https://github.com/matrix-org/synapse/issues/10086)) - Convert the remaining Admin API documentation files to markdown. ([\#10089](https://github.com/matrix-org/synapse/issues/10089)) - Make a link in docs use HTTPS. Contributed by @RhnSharma. ([\#10130](https://github.com/matrix-org/synapse/issues/10130)) - Fix broken link in Docker docs. ([\#10132](https://github.com/matrix-org/synapse/issues/10132)) Deprecations and Removals ------------------------- - Remove the experimental `spaces_enabled` flag. The spaces features are always available now. ([\#10063](https://github.com/matrix-org/synapse/issues/10063)) Internal Changes ---------------- - Tell CircleCI to build Docker images from `main` branch. ([\#9906](https://github.com/matrix-org/synapse/issues/9906)) - Simplify naming convention for release branches to only include the major and minor version numbers. ([\#10013](https://github.com/matrix-org/synapse/issues/10013)) - Add `parse_strings_from_args` for parsing an array from query parameters. ([\#10048](https://github.com/matrix-org/synapse/issues/10048), [\#10137](https://github.com/matrix-org/synapse/issues/10137)) - Remove some dead code regarding TLS certificate handling. ([\#10054](https://github.com/matrix-org/synapse/issues/10054)) - Remove redundant, unmaintained `convert_server_keys` script. ([\#10055](https://github.com/matrix-org/synapse/issues/10055)) - Improve the error message printed by synctl when synapse fails to start. ([\#10059](https://github.com/matrix-org/synapse/issues/10059)) - Fix GitHub Actions lint for newsfragments. ([\#10069](https://github.com/matrix-org/synapse/issues/10069)) - Update opentracing to inject the right context into the carrier. ([\#10074](https://github.com/matrix-org/synapse/issues/10074)) - Fix up `BatchingQueue` implementation. ([\#10078](https://github.com/matrix-org/synapse/issues/10078)) - Log method and path when dropping request due to size limit. ([\#10091](https://github.com/matrix-org/synapse/issues/10091)) - In Github Actions workflows, summarize the Sytest results in an easy-to-read format. ([\#10094](https://github.com/matrix-org/synapse/issues/10094)) - Make `/sync` do fewer state resolutions. ([\#10102](https://github.com/matrix-org/synapse/issues/10102)) - Add missing type hints to the admin API servlets. ([\#10105](https://github.com/matrix-org/synapse/issues/10105)) - Improve opentracing annotations for `Notifier`. ([\#10111](https://github.com/matrix-org/synapse/issues/10111)) - Enable Prometheus metrics for the jaeger client library. ([\#10112](https://github.com/matrix-org/synapse/issues/10112)) - Work to improve the responsiveness of `/sync` requests. ([\#10124](https://github.com/matrix-org/synapse/issues/10124)) - OpenTracing: use a consistent name for background processes. ([\#10135](https://github.com/matrix-org/synapse/issues/10135))
This commit is contained in:
@@ -74,12 +74,11 @@ s4niecZKPBizL6aucT59CsunNmmb5Glq8rlAcU+1ZTZZzGYqVYhF6axB9Qg=
|
||||
|
||||
config = {
|
||||
"tls_certificate_path": os.path.join(config_dir, "cert.pem"),
|
||||
"tls_fingerprints": [],
|
||||
}
|
||||
|
||||
t = TestConfig()
|
||||
t.read_config(config, config_dir_path="", data_dir_path="")
|
||||
t.read_certificate_from_disk(require_cert_and_key=False)
|
||||
t.read_tls_certificate()
|
||||
|
||||
warnings = self.flushWarnings()
|
||||
self.assertEqual(len(warnings), 1)
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import time
|
||||
from typing import Dict, List
|
||||
from unittest.mock import Mock
|
||||
|
||||
import attr
|
||||
@@ -21,7 +22,6 @@ import signedjson.sign
|
||||
from nacl.signing import SigningKey
|
||||
from signedjson.key import encode_verify_key_base64, get_verify_key
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.defer import Deferred, ensureDeferred
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
@@ -92,23 +92,23 @@ class KeyringTestCase(unittest.HomeserverTestCase):
|
||||
# deferred completes.
|
||||
first_lookup_deferred = Deferred()
|
||||
|
||||
async def first_lookup_fetch(keys_to_fetch):
|
||||
self.assertEquals(current_context().request.id, "context_11")
|
||||
self.assertEqual(keys_to_fetch, {"server10": {get_key_id(key1): 0}})
|
||||
async def first_lookup_fetch(
|
||||
server_name: str, key_ids: List[str], minimum_valid_until_ts: int
|
||||
) -> Dict[str, FetchKeyResult]:
|
||||
# self.assertEquals(current_context().request.id, "context_11")
|
||||
self.assertEqual(server_name, "server10")
|
||||
self.assertEqual(key_ids, [get_key_id(key1)])
|
||||
self.assertEqual(minimum_valid_until_ts, 0)
|
||||
|
||||
await make_deferred_yieldable(first_lookup_deferred)
|
||||
return {
|
||||
"server10": {
|
||||
get_key_id(key1): FetchKeyResult(get_verify_key(key1), 100)
|
||||
}
|
||||
}
|
||||
return {get_key_id(key1): FetchKeyResult(get_verify_key(key1), 100)}
|
||||
|
||||
mock_fetcher.get_keys.side_effect = first_lookup_fetch
|
||||
|
||||
async def first_lookup():
|
||||
with LoggingContext("context_11", request=FakeRequest("context_11")):
|
||||
res_deferreds = kr.verify_json_objects_for_server(
|
||||
[("server10", json1, 0, "test10"), ("server11", {}, 0, "test11")]
|
||||
[("server10", json1, 0), ("server11", {}, 0)]
|
||||
)
|
||||
|
||||
# the unsigned json should be rejected pretty quickly
|
||||
@@ -126,18 +126,18 @@ class KeyringTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
d0 = ensureDeferred(first_lookup())
|
||||
|
||||
self.pump()
|
||||
|
||||
mock_fetcher.get_keys.assert_called_once()
|
||||
|
||||
# a second request for a server with outstanding requests
|
||||
# should block rather than start a second call
|
||||
|
||||
async def second_lookup_fetch(keys_to_fetch):
|
||||
self.assertEquals(current_context().request.id, "context_12")
|
||||
return {
|
||||
"server10": {
|
||||
get_key_id(key1): FetchKeyResult(get_verify_key(key1), 100)
|
||||
}
|
||||
}
|
||||
async def second_lookup_fetch(
|
||||
server_name: str, key_ids: List[str], minimum_valid_until_ts: int
|
||||
) -> Dict[str, FetchKeyResult]:
|
||||
# self.assertEquals(current_context().request.id, "context_12")
|
||||
return {get_key_id(key1): FetchKeyResult(get_verify_key(key1), 100)}
|
||||
|
||||
mock_fetcher.get_keys.reset_mock()
|
||||
mock_fetcher.get_keys.side_effect = second_lookup_fetch
|
||||
@@ -146,7 +146,13 @@ class KeyringTestCase(unittest.HomeserverTestCase):
|
||||
async def second_lookup():
|
||||
with LoggingContext("context_12", request=FakeRequest("context_12")):
|
||||
res_deferreds_2 = kr.verify_json_objects_for_server(
|
||||
[("server10", json1, 0, "test")]
|
||||
[
|
||||
(
|
||||
"server10",
|
||||
json1,
|
||||
0,
|
||||
)
|
||||
]
|
||||
)
|
||||
res_deferreds_2[0].addBoth(self.check_context, None)
|
||||
second_lookup_state[0] = 1
|
||||
@@ -183,11 +189,11 @@ class KeyringTestCase(unittest.HomeserverTestCase):
|
||||
signedjson.sign.sign_json(json1, "server9", key1)
|
||||
|
||||
# should fail immediately on an unsigned object
|
||||
d = _verify_json_for_server(kr, "server9", {}, 0, "test unsigned")
|
||||
d = kr.verify_json_for_server("server9", {}, 0)
|
||||
self.get_failure(d, SynapseError)
|
||||
|
||||
# should succeed on a signed object
|
||||
d = _verify_json_for_server(kr, "server9", json1, 500, "test signed")
|
||||
d = kr.verify_json_for_server("server9", json1, 500)
|
||||
# self.assertFalse(d.called)
|
||||
self.get_success(d)
|
||||
|
||||
@@ -214,24 +220,24 @@ class KeyringTestCase(unittest.HomeserverTestCase):
|
||||
signedjson.sign.sign_json(json1, "server9", key1)
|
||||
|
||||
# should fail immediately on an unsigned object
|
||||
d = _verify_json_for_server(kr, "server9", {}, 0, "test unsigned")
|
||||
d = kr.verify_json_for_server("server9", {}, 0)
|
||||
self.get_failure(d, SynapseError)
|
||||
|
||||
# should fail on a signed object with a non-zero minimum_valid_until_ms,
|
||||
# as it tries to refetch the keys and fails.
|
||||
d = _verify_json_for_server(
|
||||
kr, "server9", json1, 500, "test signed non-zero min"
|
||||
)
|
||||
d = kr.verify_json_for_server("server9", json1, 500)
|
||||
self.get_failure(d, SynapseError)
|
||||
|
||||
# We expect the keyring tried to refetch the key once.
|
||||
mock_fetcher.get_keys.assert_called_once_with(
|
||||
{"server9": {get_key_id(key1): 500}}
|
||||
"server9", [get_key_id(key1)], 500
|
||||
)
|
||||
|
||||
# should succeed on a signed object with a 0 minimum_valid_until_ms
|
||||
d = _verify_json_for_server(
|
||||
kr, "server9", json1, 0, "test signed with zero min"
|
||||
d = kr.verify_json_for_server(
|
||||
"server9",
|
||||
json1,
|
||||
0,
|
||||
)
|
||||
self.get_success(d)
|
||||
|
||||
@@ -239,15 +245,15 @@ class KeyringTestCase(unittest.HomeserverTestCase):
|
||||
"""Two requests for the same key should be deduped."""
|
||||
key1 = signedjson.key.generate_signing_key(1)
|
||||
|
||||
async def get_keys(keys_to_fetch):
|
||||
async def get_keys(
|
||||
server_name: str, key_ids: List[str], minimum_valid_until_ts: int
|
||||
) -> Dict[str, FetchKeyResult]:
|
||||
# there should only be one request object (with the max validity)
|
||||
self.assertEqual(keys_to_fetch, {"server1": {get_key_id(key1): 1500}})
|
||||
self.assertEqual(server_name, "server1")
|
||||
self.assertEqual(key_ids, [get_key_id(key1)])
|
||||
self.assertEqual(minimum_valid_until_ts, 1500)
|
||||
|
||||
return {
|
||||
"server1": {
|
||||
get_key_id(key1): FetchKeyResult(get_verify_key(key1), 1200)
|
||||
}
|
||||
}
|
||||
return {get_key_id(key1): FetchKeyResult(get_verify_key(key1), 1200)}
|
||||
|
||||
mock_fetcher = Mock()
|
||||
mock_fetcher.get_keys = Mock(side_effect=get_keys)
|
||||
@@ -259,7 +265,14 @@ class KeyringTestCase(unittest.HomeserverTestCase):
|
||||
# the first request should succeed; the second should fail because the key
|
||||
# has expired
|
||||
results = kr.verify_json_objects_for_server(
|
||||
[("server1", json1, 500, "test1"), ("server1", json1, 1500, "test2")]
|
||||
[
|
||||
(
|
||||
"server1",
|
||||
json1,
|
||||
500,
|
||||
),
|
||||
("server1", json1, 1500),
|
||||
]
|
||||
)
|
||||
self.assertEqual(len(results), 2)
|
||||
self.get_success(results[0])
|
||||
@@ -274,19 +287,21 @@ class KeyringTestCase(unittest.HomeserverTestCase):
|
||||
"""If the first fetcher cannot provide a recent enough key, we fall back"""
|
||||
key1 = signedjson.key.generate_signing_key(1)
|
||||
|
||||
async def get_keys1(keys_to_fetch):
|
||||
self.assertEqual(keys_to_fetch, {"server1": {get_key_id(key1): 1500}})
|
||||
return {
|
||||
"server1": {get_key_id(key1): FetchKeyResult(get_verify_key(key1), 800)}
|
||||
}
|
||||
async def get_keys1(
|
||||
server_name: str, key_ids: List[str], minimum_valid_until_ts: int
|
||||
) -> Dict[str, FetchKeyResult]:
|
||||
self.assertEqual(server_name, "server1")
|
||||
self.assertEqual(key_ids, [get_key_id(key1)])
|
||||
self.assertEqual(minimum_valid_until_ts, 1500)
|
||||
return {get_key_id(key1): FetchKeyResult(get_verify_key(key1), 800)}
|
||||
|
||||
async def get_keys2(keys_to_fetch):
|
||||
self.assertEqual(keys_to_fetch, {"server1": {get_key_id(key1): 1500}})
|
||||
return {
|
||||
"server1": {
|
||||
get_key_id(key1): FetchKeyResult(get_verify_key(key1), 1200)
|
||||
}
|
||||
}
|
||||
async def get_keys2(
|
||||
server_name: str, key_ids: List[str], minimum_valid_until_ts: int
|
||||
) -> Dict[str, FetchKeyResult]:
|
||||
self.assertEqual(server_name, "server1")
|
||||
self.assertEqual(key_ids, [get_key_id(key1)])
|
||||
self.assertEqual(minimum_valid_until_ts, 1500)
|
||||
return {get_key_id(key1): FetchKeyResult(get_verify_key(key1), 1200)}
|
||||
|
||||
mock_fetcher1 = Mock()
|
||||
mock_fetcher1.get_keys = Mock(side_effect=get_keys1)
|
||||
@@ -298,7 +313,18 @@ class KeyringTestCase(unittest.HomeserverTestCase):
|
||||
signedjson.sign.sign_json(json1, "server1", key1)
|
||||
|
||||
results = kr.verify_json_objects_for_server(
|
||||
[("server1", json1, 1200, "test1"), ("server1", json1, 1500, "test2")]
|
||||
[
|
||||
(
|
||||
"server1",
|
||||
json1,
|
||||
1200,
|
||||
),
|
||||
(
|
||||
"server1",
|
||||
json1,
|
||||
1500,
|
||||
),
|
||||
]
|
||||
)
|
||||
self.assertEqual(len(results), 2)
|
||||
self.get_success(results[0])
|
||||
@@ -349,9 +375,8 @@ class ServerKeyFetcherTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
self.http_client.get_json.side_effect = get_json
|
||||
|
||||
keys_to_fetch = {SERVER_NAME: {"key1": 0}}
|
||||
keys = self.get_success(fetcher.get_keys(keys_to_fetch))
|
||||
k = keys[SERVER_NAME][testverifykey_id]
|
||||
keys = self.get_success(fetcher.get_keys(SERVER_NAME, ["key1"], 0))
|
||||
k = keys[testverifykey_id]
|
||||
self.assertEqual(k.valid_until_ts, VALID_UNTIL_TS)
|
||||
self.assertEqual(k.verify_key, testverifykey)
|
||||
self.assertEqual(k.verify_key.alg, "ed25519")
|
||||
@@ -378,7 +403,7 @@ class ServerKeyFetcherTestCase(unittest.HomeserverTestCase):
|
||||
# change the server name: the result should be ignored
|
||||
response["server_name"] = "OTHER_SERVER"
|
||||
|
||||
keys = self.get_success(fetcher.get_keys(keys_to_fetch))
|
||||
keys = self.get_success(fetcher.get_keys(SERVER_NAME, ["key1"], 0))
|
||||
self.assertEqual(keys, {})
|
||||
|
||||
|
||||
@@ -465,10 +490,9 @@ class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
self.expect_outgoing_key_query(SERVER_NAME, "key1", response)
|
||||
|
||||
keys_to_fetch = {SERVER_NAME: {"key1": 0}}
|
||||
keys = self.get_success(fetcher.get_keys(keys_to_fetch))
|
||||
self.assertIn(SERVER_NAME, keys)
|
||||
k = keys[SERVER_NAME][testverifykey_id]
|
||||
keys = self.get_success(fetcher.get_keys(SERVER_NAME, ["key1"], 0))
|
||||
self.assertIn(testverifykey_id, keys)
|
||||
k = keys[testverifykey_id]
|
||||
self.assertEqual(k.valid_until_ts, VALID_UNTIL_TS)
|
||||
self.assertEqual(k.verify_key, testverifykey)
|
||||
self.assertEqual(k.verify_key.alg, "ed25519")
|
||||
@@ -515,10 +539,9 @@ class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
self.expect_outgoing_key_query(SERVER_NAME, "key1", response)
|
||||
|
||||
keys_to_fetch = {SERVER_NAME: {"key1": 0}}
|
||||
keys = self.get_success(fetcher.get_keys(keys_to_fetch))
|
||||
self.assertIn(SERVER_NAME, keys)
|
||||
k = keys[SERVER_NAME][testverifykey_id]
|
||||
keys = self.get_success(fetcher.get_keys(SERVER_NAME, ["key1"], 0))
|
||||
self.assertIn(testverifykey_id, keys)
|
||||
k = keys[testverifykey_id]
|
||||
self.assertEqual(k.valid_until_ts, VALID_UNTIL_TS)
|
||||
self.assertEqual(k.verify_key, testverifykey)
|
||||
self.assertEqual(k.verify_key.alg, "ed25519")
|
||||
@@ -559,14 +582,13 @@ class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
def get_key_from_perspectives(response):
|
||||
fetcher = PerspectivesKeyFetcher(self.hs)
|
||||
keys_to_fetch = {SERVER_NAME: {"key1": 0}}
|
||||
self.expect_outgoing_key_query(SERVER_NAME, "key1", response)
|
||||
return self.get_success(fetcher.get_keys(keys_to_fetch))
|
||||
return self.get_success(fetcher.get_keys(SERVER_NAME, ["key1"], 0))
|
||||
|
||||
# start with a valid response so we can check we are testing the right thing
|
||||
response = build_response()
|
||||
keys = get_key_from_perspectives(response)
|
||||
k = keys[SERVER_NAME][testverifykey_id]
|
||||
k = keys[testverifykey_id]
|
||||
self.assertEqual(k.verify_key, testverifykey)
|
||||
|
||||
# remove the perspectives server's signature
|
||||
@@ -585,23 +607,3 @@ class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase):
|
||||
def get_key_id(key):
|
||||
"""Get the matrix ID tag for a given SigningKey or VerifyKey"""
|
||||
return "%s:%s" % (key.alg, key.version)
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def run_in_context(f, *args, **kwargs):
|
||||
with LoggingContext("testctx"):
|
||||
rv = yield f(*args, **kwargs)
|
||||
return rv
|
||||
|
||||
|
||||
def _verify_json_for_server(kr, *args):
|
||||
"""thin wrapper around verify_json_for_server which makes sure it is wrapped
|
||||
with the patched defer.inlineCallbacks.
|
||||
"""
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def v():
|
||||
rv1 = yield kr.verify_json_for_server(*args)
|
||||
return rv1
|
||||
|
||||
return run_in_context(v)
|
||||
|
||||
@@ -57,10 +57,10 @@ class AppServiceHandlerTestCase(unittest.TestCase):
|
||||
sender="@someone:anywhere", type="m.room.message", room_id="!foo:bar"
|
||||
)
|
||||
self.mock_store.get_new_events_for_appservice.side_effect = [
|
||||
make_awaitable((0, [event])),
|
||||
make_awaitable((0, [])),
|
||||
make_awaitable((1, [event])),
|
||||
]
|
||||
self.handler.notify_interested_services(RoomStreamToken(None, 0))
|
||||
self.handler.notify_interested_services(RoomStreamToken(None, 1))
|
||||
|
||||
self.mock_scheduler.submit_event_for_as.assert_called_once_with(
|
||||
interested_service, event
|
||||
@@ -77,7 +77,6 @@ class AppServiceHandlerTestCase(unittest.TestCase):
|
||||
self.mock_as_api.query_user.return_value = make_awaitable(True)
|
||||
self.mock_store.get_new_events_for_appservice.side_effect = [
|
||||
make_awaitable((0, [event])),
|
||||
make_awaitable((0, [])),
|
||||
]
|
||||
|
||||
self.handler.notify_interested_services(RoomStreamToken(None, 0))
|
||||
@@ -95,7 +94,6 @@ class AppServiceHandlerTestCase(unittest.TestCase):
|
||||
self.mock_as_api.query_user.return_value = make_awaitable(True)
|
||||
self.mock_store.get_new_events_for_appservice.side_effect = [
|
||||
make_awaitable((0, [event])),
|
||||
make_awaitable((0, [])),
|
||||
]
|
||||
|
||||
self.handler.notify_interested_services(RoomStreamToken(None, 0))
|
||||
|
||||
@@ -64,7 +64,7 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
|
||||
user_tok=self.admin_user_tok,
|
||||
)
|
||||
for _ in range(5):
|
||||
self._create_event_and_report(
|
||||
self._create_event_and_report_without_parameters(
|
||||
room_id=self.room_id2,
|
||||
user_tok=self.admin_user_tok,
|
||||
)
|
||||
@@ -378,6 +378,19 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
|
||||
)
|
||||
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
|
||||
|
||||
def _create_event_and_report_without_parameters(self, room_id, user_tok):
|
||||
"""Create and report an event, but omit reason and score"""
|
||||
resp = self.helper.send(room_id, tok=user_tok)
|
||||
event_id = resp["event_id"]
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
"rooms/%s/report/%s" % (room_id, event_id),
|
||||
json.dumps({}),
|
||||
access_token=user_tok,
|
||||
)
|
||||
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
|
||||
|
||||
def _check_fields(self, content):
|
||||
"""Checks that all attributes are present in an event report"""
|
||||
for c in content:
|
||||
|
||||
@@ -16,6 +16,8 @@ import json
|
||||
import os
|
||||
from binascii import unhexlify
|
||||
|
||||
from parameterized import parameterized
|
||||
|
||||
import synapse.rest.admin
|
||||
from synapse.api.errors import Codes
|
||||
from synapse.rest.client.v1 import login, profile, room
|
||||
@@ -562,3 +564,228 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
|
||||
)
|
||||
# Test that the file is deleted
|
||||
self.assertFalse(os.path.exists(local_path))
|
||||
|
||||
|
||||
class QuarantineMediaByIDTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
servlets = [
|
||||
synapse.rest.admin.register_servlets,
|
||||
synapse.rest.admin.register_servlets_for_media_repo,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor, clock, hs):
|
||||
media_repo = hs.get_media_repository_resource()
|
||||
self.store = hs.get_datastore()
|
||||
self.server_name = hs.hostname
|
||||
|
||||
self.admin_user = self.register_user("admin", "pass", admin=True)
|
||||
self.admin_user_tok = self.login("admin", "pass")
|
||||
|
||||
# Create media
|
||||
upload_resource = media_repo.children[b"upload"]
|
||||
# file size is 67 Byte
|
||||
image_data = unhexlify(
|
||||
b"89504e470d0a1a0a0000000d4948445200000001000000010806"
|
||||
b"0000001f15c4890000000a49444154789c63000100000500010d"
|
||||
b"0a2db40000000049454e44ae426082"
|
||||
)
|
||||
|
||||
# Upload some media into the room
|
||||
response = self.helper.upload_media(
|
||||
upload_resource, image_data, tok=self.admin_user_tok, expect_code=200
|
||||
)
|
||||
# Extract media ID from the response
|
||||
server_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
|
||||
self.media_id = server_and_media_id.split("/")[1]
|
||||
|
||||
self.url = "/_synapse/admin/v1/media/%s/%s/%s"
|
||||
|
||||
@parameterized.expand(["quarantine", "unquarantine"])
|
||||
def test_no_auth(self, action: str):
|
||||
"""
|
||||
Try to protect media without authentication.
|
||||
"""
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.url % (action, self.server_name, self.media_id),
|
||||
b"{}",
|
||||
)
|
||||
|
||||
self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
|
||||
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
|
||||
|
||||
@parameterized.expand(["quarantine", "unquarantine"])
|
||||
def test_requester_is_no_admin(self, action: str):
|
||||
"""
|
||||
If the user is not a server admin, an error is returned.
|
||||
"""
|
||||
self.other_user = self.register_user("user", "pass")
|
||||
self.other_user_token = self.login("user", "pass")
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.url % (action, self.server_name, self.media_id),
|
||||
access_token=self.other_user_token,
|
||||
)
|
||||
|
||||
self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
|
||||
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
|
||||
|
||||
def test_quarantine_media(self):
|
||||
"""
|
||||
Tests that quarantining and remove from quarantine a media is successfully
|
||||
"""
|
||||
|
||||
media_info = self.get_success(self.store.get_local_media(self.media_id))
|
||||
self.assertFalse(media_info["quarantined_by"])
|
||||
|
||||
# quarantining
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.url % ("quarantine", self.server_name, self.media_id),
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertFalse(channel.json_body)
|
||||
|
||||
media_info = self.get_success(self.store.get_local_media(self.media_id))
|
||||
self.assertTrue(media_info["quarantined_by"])
|
||||
|
||||
# remove from quarantine
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.url % ("unquarantine", self.server_name, self.media_id),
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertFalse(channel.json_body)
|
||||
|
||||
media_info = self.get_success(self.store.get_local_media(self.media_id))
|
||||
self.assertFalse(media_info["quarantined_by"])
|
||||
|
||||
def test_quarantine_protected_media(self):
|
||||
"""
|
||||
Tests that quarantining from protected media fails
|
||||
"""
|
||||
|
||||
# protect
|
||||
self.get_success(self.store.mark_local_media_as_safe(self.media_id, safe=True))
|
||||
|
||||
# verify protection
|
||||
media_info = self.get_success(self.store.get_local_media(self.media_id))
|
||||
self.assertTrue(media_info["safe_from_quarantine"])
|
||||
|
||||
# quarantining
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.url % ("quarantine", self.server_name, self.media_id),
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertFalse(channel.json_body)
|
||||
|
||||
# verify that is not in quarantine
|
||||
media_info = self.get_success(self.store.get_local_media(self.media_id))
|
||||
self.assertFalse(media_info["quarantined_by"])
|
||||
|
||||
|
||||
class ProtectMediaByIDTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
servlets = [
|
||||
synapse.rest.admin.register_servlets,
|
||||
synapse.rest.admin.register_servlets_for_media_repo,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor, clock, hs):
|
||||
media_repo = hs.get_media_repository_resource()
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
self.admin_user = self.register_user("admin", "pass", admin=True)
|
||||
self.admin_user_tok = self.login("admin", "pass")
|
||||
|
||||
# Create media
|
||||
upload_resource = media_repo.children[b"upload"]
|
||||
# file size is 67 Byte
|
||||
image_data = unhexlify(
|
||||
b"89504e470d0a1a0a0000000d4948445200000001000000010806"
|
||||
b"0000001f15c4890000000a49444154789c63000100000500010d"
|
||||
b"0a2db40000000049454e44ae426082"
|
||||
)
|
||||
|
||||
# Upload some media into the room
|
||||
response = self.helper.upload_media(
|
||||
upload_resource, image_data, tok=self.admin_user_tok, expect_code=200
|
||||
)
|
||||
# Extract media ID from the response
|
||||
server_and_media_id = response["content_uri"][6:] # Cut off 'mxc://'
|
||||
self.media_id = server_and_media_id.split("/")[1]
|
||||
|
||||
self.url = "/_synapse/admin/v1/media/%s/%s"
|
||||
|
||||
@parameterized.expand(["protect", "unprotect"])
|
||||
def test_no_auth(self, action: str):
|
||||
"""
|
||||
Try to protect media without authentication.
|
||||
"""
|
||||
|
||||
channel = self.make_request("POST", self.url % (action, self.media_id), b"{}")
|
||||
|
||||
self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
|
||||
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
|
||||
|
||||
@parameterized.expand(["protect", "unprotect"])
|
||||
def test_requester_is_no_admin(self, action: str):
|
||||
"""
|
||||
If the user is not a server admin, an error is returned.
|
||||
"""
|
||||
self.other_user = self.register_user("user", "pass")
|
||||
self.other_user_token = self.login("user", "pass")
|
||||
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.url % (action, self.media_id),
|
||||
access_token=self.other_user_token,
|
||||
)
|
||||
|
||||
self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
|
||||
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
|
||||
|
||||
def test_protect_media(self):
|
||||
"""
|
||||
Tests that protect and unprotect a media is successfully
|
||||
"""
|
||||
|
||||
media_info = self.get_success(self.store.get_local_media(self.media_id))
|
||||
self.assertFalse(media_info["safe_from_quarantine"])
|
||||
|
||||
# protect
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.url % ("protect", self.media_id),
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertFalse(channel.json_body)
|
||||
|
||||
media_info = self.get_success(self.store.get_local_media(self.media_id))
|
||||
self.assertTrue(media_info["safe_from_quarantine"])
|
||||
|
||||
# unprotect
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.url % ("unprotect", self.media_id),
|
||||
access_token=self.admin_user_tok,
|
||||
)
|
||||
|
||||
self.assertEqual(200, channel.code, msg=channel.json_body)
|
||||
self.assertFalse(channel.json_body)
|
||||
|
||||
media_info = self.get_success(self.store.get_local_media(self.media_id))
|
||||
self.assertFalse(media_info["safe_from_quarantine"])
|
||||
|
||||
@@ -1880,8 +1880,7 @@ class RoomAliasListTestCase(unittest.HomeserverTestCase):
|
||||
"""Calls the endpoint under test. returns the json response object."""
|
||||
channel = self.make_request(
|
||||
"GET",
|
||||
"/_matrix/client/unstable/org.matrix.msc2432/rooms/%s/aliases"
|
||||
% (self.room_id,),
|
||||
"/_matrix/client/r0/rooms/%s/aliases" % (self.room_id,),
|
||||
access_token=access_token,
|
||||
)
|
||||
self.assertEqual(channel.code, expected_code, channel.result)
|
||||
|
||||
83
tests/rest/client/v2_alpha/test_report_event.py
Normal file
83
tests/rest/client/v2_alpha/test_report_event.py
Normal file
@@ -0,0 +1,83 @@
|
||||
# Copyright 2021 Callum Brown
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
|
||||
import synapse.rest.admin
|
||||
from synapse.rest.client.v1 import login, room
|
||||
from synapse.rest.client.v2_alpha import report_event
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
class ReportEventTestCase(unittest.HomeserverTestCase):
|
||||
servlets = [
|
||||
synapse.rest.admin.register_servlets,
|
||||
login.register_servlets,
|
||||
room.register_servlets,
|
||||
report_event.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor, clock, hs):
|
||||
self.admin_user = self.register_user("admin", "pass", admin=True)
|
||||
self.admin_user_tok = self.login("admin", "pass")
|
||||
self.other_user = self.register_user("user", "pass")
|
||||
self.other_user_tok = self.login("user", "pass")
|
||||
|
||||
self.room_id = self.helper.create_room_as(
|
||||
self.other_user, tok=self.other_user_tok, is_public=True
|
||||
)
|
||||
self.helper.join(self.room_id, user=self.admin_user, tok=self.admin_user_tok)
|
||||
resp = self.helper.send(self.room_id, tok=self.admin_user_tok)
|
||||
self.event_id = resp["event_id"]
|
||||
self.report_path = "rooms/{}/report/{}".format(self.room_id, self.event_id)
|
||||
|
||||
def test_reason_str_and_score_int(self):
|
||||
data = {"reason": "this makes me sad", "score": -100}
|
||||
self._assert_status(200, data)
|
||||
|
||||
def test_no_reason(self):
|
||||
data = {"score": 0}
|
||||
self._assert_status(200, data)
|
||||
|
||||
def test_no_score(self):
|
||||
data = {"reason": "this makes me sad"}
|
||||
self._assert_status(200, data)
|
||||
|
||||
def test_no_reason_and_no_score(self):
|
||||
data = {}
|
||||
self._assert_status(200, data)
|
||||
|
||||
def test_reason_int_and_score_str(self):
|
||||
data = {"reason": 10, "score": "string"}
|
||||
self._assert_status(400, data)
|
||||
|
||||
def test_reason_zero_and_score_blank(self):
|
||||
data = {"reason": 0, "score": ""}
|
||||
self._assert_status(400, data)
|
||||
|
||||
def test_reason_and_score_null(self):
|
||||
data = {"reason": None, "score": None}
|
||||
self._assert_status(400, data)
|
||||
|
||||
def _assert_status(self, response_status, data):
|
||||
channel = self.make_request(
|
||||
"POST",
|
||||
self.report_path,
|
||||
json.dumps(data),
|
||||
access_token=self.other_user_tok,
|
||||
)
|
||||
self.assertEqual(
|
||||
response_status, int(channel.result["code"]), msg=channel.result["body"]
|
||||
)
|
||||
@@ -208,10 +208,10 @@ class EndToEndPerspectivesTests(BaseRemoteKeyResourceTestCase):
|
||||
keyid = "ed25519:%s" % (testkey.version,)
|
||||
|
||||
fetcher = PerspectivesKeyFetcher(self.hs2)
|
||||
d = fetcher.get_keys({"targetserver": {keyid: 1000}})
|
||||
d = fetcher.get_keys("targetserver", [keyid], 1000)
|
||||
res = self.get_success(d)
|
||||
self.assertIn("targetserver", res)
|
||||
keyres = res["targetserver"][keyid]
|
||||
self.assertIn(keyid, res)
|
||||
keyres = res[keyid]
|
||||
assert isinstance(keyres, FetchKeyResult)
|
||||
self.assertEqual(
|
||||
signedjson.key.encode_verify_key_base64(keyres.verify_key),
|
||||
@@ -230,10 +230,10 @@ class EndToEndPerspectivesTests(BaseRemoteKeyResourceTestCase):
|
||||
keyid = "ed25519:%s" % (testkey.version,)
|
||||
|
||||
fetcher = PerspectivesKeyFetcher(self.hs2)
|
||||
d = fetcher.get_keys({self.hs.hostname: {keyid: 1000}})
|
||||
d = fetcher.get_keys(self.hs.hostname, [keyid], 1000)
|
||||
res = self.get_success(d)
|
||||
self.assertIn(self.hs.hostname, res)
|
||||
keyres = res[self.hs.hostname][keyid]
|
||||
self.assertIn(keyid, res)
|
||||
keyres = res[keyid]
|
||||
assert isinstance(keyres, FetchKeyResult)
|
||||
self.assertEqual(
|
||||
signedjson.key.encode_verify_key_base64(keyres.verify_key),
|
||||
@@ -247,10 +247,10 @@ class EndToEndPerspectivesTests(BaseRemoteKeyResourceTestCase):
|
||||
keyid = "ed25519:%s" % (self.hs_signing_key.version,)
|
||||
|
||||
fetcher = PerspectivesKeyFetcher(self.hs2)
|
||||
d = fetcher.get_keys({self.hs.hostname: {keyid: 1000}})
|
||||
d = fetcher.get_keys(self.hs.hostname, [keyid], 1000)
|
||||
res = self.get_success(d)
|
||||
self.assertIn(self.hs.hostname, res)
|
||||
keyres = res[self.hs.hostname][keyid]
|
||||
self.assertIn(keyid, res)
|
||||
keyres = res[keyid]
|
||||
assert isinstance(keyres, FetchKeyResult)
|
||||
self.assertEqual(
|
||||
signedjson.key.encode_verify_key_base64(keyres.verify_key),
|
||||
|
||||
13
tests/storage/databases/__init__.py
Normal file
13
tests/storage/databases/__init__.py
Normal file
@@ -0,0 +1,13 @@
|
||||
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
13
tests/storage/databases/main/__init__.py
Normal file
13
tests/storage/databases/main/__init__.py
Normal file
@@ -0,0 +1,13 @@
|
||||
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
96
tests/storage/databases/main/test_events_worker.py
Normal file
96
tests/storage/databases/main/test_events_worker.py
Normal file
@@ -0,0 +1,96 @@
|
||||
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import json
|
||||
|
||||
from synapse.logging.context import LoggingContext
|
||||
from synapse.storage.databases.main.events_worker import EventsWorkerStore
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
|
||||
def prepare(self, reactor, clock, hs):
|
||||
self.store: EventsWorkerStore = hs.get_datastore()
|
||||
|
||||
# insert some test data
|
||||
for rid in ("room1", "room2"):
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_insert(
|
||||
"rooms",
|
||||
{"room_id": rid, "room_version": 4},
|
||||
)
|
||||
)
|
||||
|
||||
for idx, (rid, eid) in enumerate(
|
||||
(
|
||||
("room1", "event10"),
|
||||
("room1", "event11"),
|
||||
("room1", "event12"),
|
||||
("room2", "event20"),
|
||||
)
|
||||
):
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_insert(
|
||||
"events",
|
||||
{
|
||||
"event_id": eid,
|
||||
"room_id": rid,
|
||||
"topological_ordering": idx,
|
||||
"stream_ordering": idx,
|
||||
"type": "test",
|
||||
"processed": True,
|
||||
"outlier": False,
|
||||
},
|
||||
)
|
||||
)
|
||||
self.get_success(
|
||||
self.store.db_pool.simple_insert(
|
||||
"event_json",
|
||||
{
|
||||
"event_id": eid,
|
||||
"room_id": rid,
|
||||
"json": json.dumps({"type": "test", "room_id": rid}),
|
||||
"internal_metadata": "{}",
|
||||
"format_version": 3,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
def test_simple(self):
|
||||
with LoggingContext(name="test") as ctx:
|
||||
res = self.get_success(
|
||||
self.store.have_seen_events("room1", ["event10", "event19"])
|
||||
)
|
||||
self.assertEquals(res, {"event10"})
|
||||
|
||||
# that should result in a single db query
|
||||
self.assertEquals(ctx.get_resource_usage().db_txn_count, 1)
|
||||
|
||||
# a second lookup of the same events should cause no queries
|
||||
with LoggingContext(name="test") as ctx:
|
||||
res = self.get_success(
|
||||
self.store.have_seen_events("room1", ["event10", "event19"])
|
||||
)
|
||||
self.assertEquals(res, {"event10"})
|
||||
self.assertEquals(ctx.get_resource_usage().db_txn_count, 0)
|
||||
|
||||
def test_query_via_event_cache(self):
|
||||
# fetch an event into the event cache
|
||||
self.get_success(self.store.get_event("event10"))
|
||||
|
||||
# looking it up should now cause no db hits
|
||||
with LoggingContext(name="test") as ctx:
|
||||
res = self.get_success(self.store.have_seen_events("room1", ["event10"]))
|
||||
self.assertEquals(res, {"event10"})
|
||||
self.assertEquals(ctx.get_resource_usage().db_txn_count, 0)
|
||||
@@ -622,17 +622,17 @@ class CacheDecoratorTestCase(unittest.HomeserverTestCase):
|
||||
self.assertEquals(callcount2[0], 1)
|
||||
|
||||
a.func2.invalidate(("foo",))
|
||||
self.assertEquals(a.func2.cache.cache.pop.call_count, 1)
|
||||
self.assertEquals(a.func2.cache.cache.del_multi.call_count, 1)
|
||||
|
||||
yield a.func2("foo")
|
||||
a.func2.invalidate(("foo",))
|
||||
self.assertEquals(a.func2.cache.cache.pop.call_count, 2)
|
||||
self.assertEquals(a.func2.cache.cache.del_multi.call_count, 2)
|
||||
|
||||
self.assertEquals(callcount[0], 1)
|
||||
self.assertEquals(callcount2[0], 2)
|
||||
|
||||
a.func.invalidate(("foo",))
|
||||
self.assertEquals(a.func2.cache.cache.pop.call_count, 3)
|
||||
self.assertEquals(a.func2.cache.cache.del_multi.call_count, 3)
|
||||
yield a.func("foo")
|
||||
|
||||
self.assertEquals(callcount[0], 2)
|
||||
|
||||
@@ -14,7 +14,12 @@
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.util.batching_queue import BatchingQueue
|
||||
from synapse.util.batching_queue import (
|
||||
BatchingQueue,
|
||||
number_in_flight,
|
||||
number_of_keys,
|
||||
number_queued,
|
||||
)
|
||||
|
||||
from tests.server import get_clock
|
||||
from tests.unittest import TestCase
|
||||
@@ -24,6 +29,14 @@ class BatchingQueueTestCase(TestCase):
|
||||
def setUp(self):
|
||||
self.clock, hs_clock = get_clock()
|
||||
|
||||
# We ensure that we remove any existing metrics for "test_queue".
|
||||
try:
|
||||
number_queued.remove("test_queue")
|
||||
number_of_keys.remove("test_queue")
|
||||
number_in_flight.remove("test_queue")
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
self._pending_calls = []
|
||||
self.queue = BatchingQueue("test_queue", hs_clock, self._process_queue)
|
||||
|
||||
@@ -32,6 +45,36 @@ class BatchingQueueTestCase(TestCase):
|
||||
self._pending_calls.append((values, d))
|
||||
return await make_deferred_yieldable(d)
|
||||
|
||||
def _get_sample_with_name(self, metric, name) -> int:
|
||||
"""For a prometheus metric get the value of the sample that has a
|
||||
matching "name" label.
|
||||
"""
|
||||
for sample in metric.collect()[0].samples:
|
||||
if sample.labels.get("name") == name:
|
||||
return sample.value
|
||||
|
||||
self.fail("Found no matching sample")
|
||||
|
||||
def _assert_metrics(self, queued, keys, in_flight):
|
||||
"""Assert that the metrics are correct"""
|
||||
|
||||
sample = self._get_sample_with_name(number_queued, self.queue._name)
|
||||
self.assertEqual(
|
||||
sample,
|
||||
queued,
|
||||
"number_queued",
|
||||
)
|
||||
|
||||
sample = self._get_sample_with_name(number_of_keys, self.queue._name)
|
||||
self.assertEqual(sample, keys, "number_of_keys")
|
||||
|
||||
sample = self._get_sample_with_name(number_in_flight, self.queue._name)
|
||||
self.assertEqual(
|
||||
sample,
|
||||
in_flight,
|
||||
"number_in_flight",
|
||||
)
|
||||
|
||||
def test_simple(self):
|
||||
"""Tests the basic case of calling `add_to_queue` once and having
|
||||
`_process_queue` return.
|
||||
@@ -41,6 +84,8 @@ class BatchingQueueTestCase(TestCase):
|
||||
|
||||
queue_d = defer.ensureDeferred(self.queue.add_to_queue("foo"))
|
||||
|
||||
self._assert_metrics(queued=1, keys=1, in_flight=1)
|
||||
|
||||
# The queue should wait a reactor tick before calling the processing
|
||||
# function.
|
||||
self.assertFalse(self._pending_calls)
|
||||
@@ -52,12 +97,15 @@ class BatchingQueueTestCase(TestCase):
|
||||
self.assertEqual(len(self._pending_calls), 1)
|
||||
self.assertEqual(self._pending_calls[0][0], ["foo"])
|
||||
self.assertFalse(queue_d.called)
|
||||
self._assert_metrics(queued=0, keys=0, in_flight=1)
|
||||
|
||||
# Return value of the `_process_queue` should be propagated back.
|
||||
self._pending_calls.pop()[1].callback("bar")
|
||||
|
||||
self.assertEqual(self.successResultOf(queue_d), "bar")
|
||||
|
||||
self._assert_metrics(queued=0, keys=0, in_flight=0)
|
||||
|
||||
def test_batching(self):
|
||||
"""Test that multiple calls at the same time get batched up into one
|
||||
call to `_process_queue`.
|
||||
@@ -68,6 +116,8 @@ class BatchingQueueTestCase(TestCase):
|
||||
queue_d1 = defer.ensureDeferred(self.queue.add_to_queue("foo1"))
|
||||
queue_d2 = defer.ensureDeferred(self.queue.add_to_queue("foo2"))
|
||||
|
||||
self._assert_metrics(queued=2, keys=1, in_flight=2)
|
||||
|
||||
self.clock.pump([0])
|
||||
|
||||
# We should see only *one* call to `_process_queue`
|
||||
@@ -75,12 +125,14 @@ class BatchingQueueTestCase(TestCase):
|
||||
self.assertEqual(self._pending_calls[0][0], ["foo1", "foo2"])
|
||||
self.assertFalse(queue_d1.called)
|
||||
self.assertFalse(queue_d2.called)
|
||||
self._assert_metrics(queued=0, keys=0, in_flight=2)
|
||||
|
||||
# Return value of the `_process_queue` should be propagated back to both.
|
||||
self._pending_calls.pop()[1].callback("bar")
|
||||
|
||||
self.assertEqual(self.successResultOf(queue_d1), "bar")
|
||||
self.assertEqual(self.successResultOf(queue_d2), "bar")
|
||||
self._assert_metrics(queued=0, keys=0, in_flight=0)
|
||||
|
||||
def test_queuing(self):
|
||||
"""Test that we queue up requests while a `_process_queue` is being
|
||||
@@ -92,13 +144,20 @@ class BatchingQueueTestCase(TestCase):
|
||||
queue_d1 = defer.ensureDeferred(self.queue.add_to_queue("foo1"))
|
||||
self.clock.pump([0])
|
||||
|
||||
self.assertEqual(len(self._pending_calls), 1)
|
||||
|
||||
# We queue up work after the process function has been called, testing
|
||||
# that they get correctly queued up.
|
||||
queue_d2 = defer.ensureDeferred(self.queue.add_to_queue("foo2"))
|
||||
queue_d3 = defer.ensureDeferred(self.queue.add_to_queue("foo3"))
|
||||
|
||||
# We should see only *one* call to `_process_queue`
|
||||
self.assertEqual(len(self._pending_calls), 1)
|
||||
self.assertEqual(self._pending_calls[0][0], ["foo1"])
|
||||
self.assertFalse(queue_d1.called)
|
||||
self.assertFalse(queue_d2.called)
|
||||
self.assertFalse(queue_d3.called)
|
||||
self._assert_metrics(queued=2, keys=1, in_flight=3)
|
||||
|
||||
# Return value of the `_process_queue` should be propagated back to the
|
||||
# first.
|
||||
@@ -106,18 +165,24 @@ class BatchingQueueTestCase(TestCase):
|
||||
|
||||
self.assertEqual(self.successResultOf(queue_d1), "bar1")
|
||||
self.assertFalse(queue_d2.called)
|
||||
self.assertFalse(queue_d3.called)
|
||||
self._assert_metrics(queued=2, keys=1, in_flight=2)
|
||||
|
||||
# We should now see a second call to `_process_queue`
|
||||
self.clock.pump([0])
|
||||
self.assertEqual(len(self._pending_calls), 1)
|
||||
self.assertEqual(self._pending_calls[0][0], ["foo2"])
|
||||
self.assertEqual(self._pending_calls[0][0], ["foo2", "foo3"])
|
||||
self.assertFalse(queue_d2.called)
|
||||
self.assertFalse(queue_d3.called)
|
||||
self._assert_metrics(queued=0, keys=0, in_flight=2)
|
||||
|
||||
# Return value of the `_process_queue` should be propagated back to the
|
||||
# second.
|
||||
self._pending_calls.pop()[1].callback("bar2")
|
||||
|
||||
self.assertEqual(self.successResultOf(queue_d2), "bar2")
|
||||
self.assertEqual(self.successResultOf(queue_d3), "bar2")
|
||||
self._assert_metrics(queued=0, keys=0, in_flight=0)
|
||||
|
||||
def test_different_keys(self):
|
||||
"""Test that calls to different keys get processed in parallel."""
|
||||
@@ -140,6 +205,7 @@ class BatchingQueueTestCase(TestCase):
|
||||
self.assertFalse(queue_d1.called)
|
||||
self.assertFalse(queue_d2.called)
|
||||
self.assertFalse(queue_d3.called)
|
||||
self._assert_metrics(queued=1, keys=1, in_flight=3)
|
||||
|
||||
# Return value of the `_process_queue` should be propagated back to the
|
||||
# first.
|
||||
@@ -148,6 +214,7 @@ class BatchingQueueTestCase(TestCase):
|
||||
self.assertEqual(self.successResultOf(queue_d1), "bar1")
|
||||
self.assertFalse(queue_d2.called)
|
||||
self.assertFalse(queue_d3.called)
|
||||
self._assert_metrics(queued=1, keys=1, in_flight=2)
|
||||
|
||||
# Return value of the `_process_queue` should be propagated back to the
|
||||
# second.
|
||||
@@ -161,9 +228,11 @@ class BatchingQueueTestCase(TestCase):
|
||||
self.assertEqual(len(self._pending_calls), 1)
|
||||
self.assertEqual(self._pending_calls[0][0], ["foo3"])
|
||||
self.assertFalse(queue_d3.called)
|
||||
self._assert_metrics(queued=0, keys=0, in_flight=1)
|
||||
|
||||
# Return value of the `_process_queue` should be propagated back to the
|
||||
# third deferred.
|
||||
self._pending_calls.pop()[1].callback("bar4")
|
||||
|
||||
self.assertEqual(self.successResultOf(queue_d3), "bar4")
|
||||
self._assert_metrics(queued=0, keys=0, in_flight=0)
|
||||
|
||||
Reference in New Issue
Block a user