Compare commits
77 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c180116215 | |||
| 9a793f861c | |||
| 53969e1960 | |||
| 667c6546bd | |||
| 7e1c616452 | |||
| ba438a3ac1 | |||
| 61ab08a197 | |||
| 1e77ac66e3 | |||
| a502cfec00 | |||
| 5c9afd6f80 | |||
| 52423607bd | |||
| f116f32ace | |||
| 557b686eac | |||
| a61738b316 | |||
| 3681437c35 | |||
| 0fde1896cd | |||
| 2a4fde0a6f | |||
| 45768d1640 | |||
| 12285a1a76 | |||
| 96bad44f87 | |||
| 187a546bff | |||
| d6cc369205 | |||
| ed5a0780a4 | |||
| 1032393dfb | |||
| 0e505b1913 | |||
| ad9edd1d96 | |||
| e82db24a0e | |||
| 0834b49c6a | |||
| 36446ffedb | |||
| 13d211edc1 | |||
| 1152f495a0 | |||
| e2acf536d4 | |||
| 0160f6601f | |||
| f4caf3f83d | |||
| 0546715c18 | |||
| 57e3f923d2 | |||
| d3a8c9c55e | |||
| fef6c2cdcc | |||
| 752b7b32ed | |||
| ad459a106c | |||
| 592c162516 | |||
| 330432031b | |||
| bf54c1cf6c | |||
| 3e4bc4488c | |||
| 48e2b48888 | |||
| d8db6d9267 | |||
| 304bb22c1d | |||
| c88d50aa8f | |||
| 0c87eed294 | |||
| e316407b5d | |||
| e6cbf47773 | |||
| 607bd27c83 | |||
| d62162bbec | |||
| 617afee069 | |||
| 522bd3c8a3 | |||
| d6e3c2c79b | |||
| f7869f8f8b | |||
| 604cff1a06 | |||
| b50f18171d | |||
| f29b41fde9 | |||
| 28b0490dfd | |||
| 042eedfa2b | |||
| c5930d513a | |||
| 6a29e815fc | |||
| e44150a6de | |||
| f731e42baf | |||
| 09503126df | |||
| c1f4118bb6 | |||
| e73635191f | |||
| 219c2a322b | |||
| 2e4be8bfd9 | |||
| 08ea5fe635 | |||
| 77a23e2e05 | |||
| 9700d15611 | |||
| a21a41bad7 | |||
| b3bff53178 | |||
| 2c7866d664 |
@@ -43,6 +43,7 @@ media_store/
|
||||
|
||||
build/
|
||||
venv/
|
||||
venv*/
|
||||
|
||||
localhost-800*/
|
||||
static/client/register/register_config.js
|
||||
|
||||
@@ -1,3 +1,12 @@
|
||||
Changes in synapse v0.31.2 (2018-06-14)
|
||||
=======================================
|
||||
|
||||
SECURITY UPDATE: Prevent unauthorised users from setting state events in a room
|
||||
when there is no ``m.room.power_levels`` event in force in the room. (PR #3397)
|
||||
|
||||
Discussion around the Matrix Spec change proposal for this change can be
|
||||
followed at https://github.com/matrix-org/matrix-doc/issues/1304.
|
||||
|
||||
Changes in synapse v0.31.1 (2018-06-08)
|
||||
=======================================
|
||||
|
||||
|
||||
+9
-9
@@ -9,19 +9,19 @@ Set up database
|
||||
Assuming your PostgreSQL database user is called ``postgres``, create a user
|
||||
``synapse_user`` with::
|
||||
|
||||
su - postgres
|
||||
createuser --pwprompt synapse_user
|
||||
su - postgres
|
||||
createuser --pwprompt synapse_user
|
||||
|
||||
The PostgreSQL database used *must* have the correct encoding set, otherwise it
|
||||
would not be able to store UTF8 strings. To create a database with the correct
|
||||
encoding use, e.g.::
|
||||
|
||||
CREATE DATABASE synapse
|
||||
ENCODING 'UTF8'
|
||||
LC_COLLATE='C'
|
||||
LC_CTYPE='C'
|
||||
template=template0
|
||||
OWNER synapse_user;
|
||||
CREATE DATABASE synapse
|
||||
ENCODING 'UTF8'
|
||||
LC_COLLATE='C'
|
||||
LC_CTYPE='C'
|
||||
template=template0
|
||||
OWNER synapse_user;
|
||||
|
||||
This would create an appropriate database named ``synapse`` owned by the
|
||||
``synapse_user`` user (which must already exist).
|
||||
@@ -126,7 +126,7 @@ run::
|
||||
--postgres-config homeserver-postgres.yaml
|
||||
|
||||
Once that has completed, change the synapse config to point at the PostgreSQL
|
||||
database configuration file ``homeserver-postgres.yaml``:
|
||||
database configuration file ``homeserver-postgres.yaml``::
|
||||
|
||||
./synctl stop
|
||||
mv homeserver.yaml homeserver-old-sqlite.yaml
|
||||
|
||||
@@ -18,14 +18,22 @@
|
||||
from __future__ import print_function
|
||||
|
||||
import argparse
|
||||
from urlparse import urlparse, urlunparse
|
||||
|
||||
import nacl.signing
|
||||
import json
|
||||
import base64
|
||||
import requests
|
||||
import sys
|
||||
|
||||
from requests.adapters import HTTPAdapter
|
||||
import srvlookup
|
||||
import yaml
|
||||
|
||||
# uncomment the following to enable debug logging of http requests
|
||||
#from httplib import HTTPConnection
|
||||
#HTTPConnection.debuglevel = 1
|
||||
|
||||
def encode_base64(input_bytes):
|
||||
"""Encode bytes as a base64 string without any padding."""
|
||||
|
||||
@@ -113,17 +121,6 @@ def read_signing_keys(stream):
|
||||
return keys
|
||||
|
||||
|
||||
def lookup(destination, path):
|
||||
if ":" in destination:
|
||||
return "https://%s%s" % (destination, path)
|
||||
else:
|
||||
try:
|
||||
srv = srvlookup.lookup("matrix", "tcp", destination)[0]
|
||||
return "https://%s:%d%s" % (srv.host, srv.port, path)
|
||||
except:
|
||||
return "https://%s:%d%s" % (destination, 8448, path)
|
||||
|
||||
|
||||
def request_json(method, origin_name, origin_key, destination, path, content):
|
||||
if method is None:
|
||||
if content is None:
|
||||
@@ -152,13 +149,19 @@ def request_json(method, origin_name, origin_key, destination, path, content):
|
||||
authorization_headers.append(bytes(header))
|
||||
print ("Authorization: %s" % header, file=sys.stderr)
|
||||
|
||||
dest = lookup(destination, path)
|
||||
dest = "matrix://%s%s" % (destination, path)
|
||||
print ("Requesting %s" % dest, file=sys.stderr)
|
||||
|
||||
result = requests.request(
|
||||
s = requests.Session()
|
||||
s.mount("matrix://", MatrixConnectionAdapter())
|
||||
|
||||
result = s.request(
|
||||
method=method,
|
||||
url=dest,
|
||||
headers={"Authorization": authorization_headers[0]},
|
||||
headers={
|
||||
"Host": destination,
|
||||
"Authorization": authorization_headers[0]
|
||||
},
|
||||
verify=False,
|
||||
data=content,
|
||||
)
|
||||
@@ -242,5 +245,39 @@ def read_args_from_config(args):
|
||||
args.signing_key_path = config['signing_key_path']
|
||||
|
||||
|
||||
class MatrixConnectionAdapter(HTTPAdapter):
|
||||
@staticmethod
|
||||
def lookup(s):
|
||||
if s[-1] == ']':
|
||||
# ipv6 literal (with no port)
|
||||
return s, 8448
|
||||
|
||||
if ":" in s:
|
||||
out = s.rsplit(":",1)
|
||||
try:
|
||||
port = int(out[1])
|
||||
except ValueError:
|
||||
raise ValueError("Invalid host:port '%s'" % s)
|
||||
return out[0], port
|
||||
|
||||
try:
|
||||
srv = srvlookup.lookup("matrix", "tcp", s)[0]
|
||||
return srv.host, srv.port
|
||||
except:
|
||||
return s, 8448
|
||||
|
||||
def get_connection(self, url, proxies=None):
|
||||
parsed = urlparse(url)
|
||||
|
||||
(host, port) = self.lookup(parsed.netloc)
|
||||
netloc = "%s:%d" % (host, port)
|
||||
print("Connecting to %s" % (netloc,), file=sys.stderr)
|
||||
url = urlunparse((
|
||||
"https", netloc, parsed.path, parsed.params, parsed.query,
|
||||
parsed.fragment,
|
||||
))
|
||||
return super(MatrixConnectionAdapter, self).get_connection(url, proxies)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -17,4 +17,5 @@ ignore =
|
||||
[flake8]
|
||||
max-line-length = 90
|
||||
# W503 requires that binary operators be at the end, not start, of lines. Erik doesn't like it.
|
||||
ignore = W503
|
||||
# E203 is contrary to PEP8.
|
||||
ignore = W503,E203
|
||||
|
||||
+2
-1
@@ -1,5 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -16,4 +17,4 @@
|
||||
""" This is a reference implementation of a Matrix home server.
|
||||
"""
|
||||
|
||||
__version__ = "0.31.1"
|
||||
__version__ = "0.31.2"
|
||||
|
||||
+1
-1
@@ -655,7 +655,7 @@ class Auth(object):
|
||||
auth_events[(EventTypes.PowerLevels, "")] = power_level_event
|
||||
|
||||
send_level = event_auth.get_send_level(
|
||||
EventTypes.Aliases, "", auth_events
|
||||
EventTypes.Aliases, "", power_level_event,
|
||||
)
|
||||
user_level = event_auth.get_user_power_level(user_id, auth_events)
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ from synapse.config._base import ConfigError
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from synapse.config.logger import setup_logging
|
||||
from synapse.http.site import SynapseSite
|
||||
from synapse.metrics import RegistryProxy
|
||||
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||
from synapse.replication.slave.storage.directory import DirectoryStore
|
||||
@@ -62,7 +63,7 @@ class AppserviceServer(HomeServer):
|
||||
for res in listener_config["resources"]:
|
||||
for name in res["names"]:
|
||||
if name == "metrics":
|
||||
resources[METRICS_PREFIX] = MetricsResource(self)
|
||||
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
|
||||
|
||||
root_resource = create_resource_tree(resources, NoResource())
|
||||
|
||||
@@ -97,7 +98,7 @@ class AppserviceServer(HomeServer):
|
||||
elif listener["type"] == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warn(("Metrics listener configured, but "
|
||||
"collect_metrics is not enabled!"))
|
||||
"enable_metrics is not True!"))
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"],
|
||||
listener["port"])
|
||||
|
||||
@@ -122,7 +122,7 @@ class ClientReaderServer(HomeServer):
|
||||
elif listener["type"] == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warn(("Metrics listener configured, but "
|
||||
"collect_metrics is not enabled!"))
|
||||
"enable_metrics is not True!"))
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"],
|
||||
listener["port"])
|
||||
|
||||
@@ -138,7 +138,7 @@ class EventCreatorServer(HomeServer):
|
||||
elif listener["type"] == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warn(("Metrics listener configured, but "
|
||||
"collect_metrics is not enabled!"))
|
||||
"enable_metrics is not True!"))
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"],
|
||||
listener["port"])
|
||||
|
||||
@@ -111,7 +111,7 @@ class FederationReaderServer(HomeServer):
|
||||
elif listener["type"] == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warn(("Metrics listener configured, but "
|
||||
"collect_metrics is not enabled!"))
|
||||
"enable_metrics is not True!"))
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"],
|
||||
listener["port"])
|
||||
|
||||
@@ -125,7 +125,7 @@ class FederationSenderServer(HomeServer):
|
||||
elif listener["type"] == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warn(("Metrics listener configured, but "
|
||||
"collect_metrics is not enabled!"))
|
||||
"enable_metrics is not True!"))
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"],
|
||||
listener["port"])
|
||||
|
||||
@@ -176,7 +176,7 @@ class FrontendProxyServer(HomeServer):
|
||||
elif listener["type"] == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warn(("Metrics listener configured, but "
|
||||
"collect_metrics is not enabled!"))
|
||||
"enable_metrics is not True!"))
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"],
|
||||
listener["port"])
|
||||
|
||||
@@ -266,7 +266,7 @@ class SynapseHomeServer(HomeServer):
|
||||
elif listener["type"] == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warn(("Metrics listener configured, but "
|
||||
"collect_metrics is not enabled!"))
|
||||
"enable_metrics is not True!"))
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"],
|
||||
listener["port"])
|
||||
|
||||
@@ -118,7 +118,7 @@ class MediaRepositoryServer(HomeServer):
|
||||
elif listener["type"] == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warn(("Metrics listener configured, but "
|
||||
"collect_metrics is not enabled!"))
|
||||
"enable_metrics is not True!"))
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"],
|
||||
listener["port"])
|
||||
|
||||
@@ -128,7 +128,7 @@ class PusherServer(HomeServer):
|
||||
elif listener["type"] == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warn(("Metrics listener configured, but "
|
||||
"collect_metrics is not enabled!"))
|
||||
"enable_metrics is not True!"))
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"],
|
||||
listener["port"])
|
||||
|
||||
@@ -305,7 +305,7 @@ class SynchrotronServer(HomeServer):
|
||||
elif listener["type"] == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warn(("Metrics listener configured, but "
|
||||
"collect_metrics is not enabled!"))
|
||||
"enable_metrics is not True!"))
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"],
|
||||
listener["port"])
|
||||
|
||||
@@ -171,6 +171,10 @@ def main():
|
||||
if cache_factor:
|
||||
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
|
||||
|
||||
cache_factors = config.get("synctl_cache_factors", {})
|
||||
for cache_name, factor in cache_factors.iteritems():
|
||||
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
|
||||
|
||||
worker_configfiles = []
|
||||
if options.worker:
|
||||
start_stop_synapse = False
|
||||
|
||||
@@ -150,7 +150,7 @@ class UserDirectoryServer(HomeServer):
|
||||
elif listener["type"] == "metrics":
|
||||
if not self.get_config().enable_metrics:
|
||||
logger.warn(("Metrics listener configured, but "
|
||||
"collect_metrics is not enabled!"))
|
||||
"enable_metrics is not True!"))
|
||||
else:
|
||||
_base.listen_metrics(listener["bind_addresses"],
|
||||
listener["port"])
|
||||
|
||||
@@ -292,4 +292,8 @@ class ApplicationService(object):
|
||||
return self.rate_limited
|
||||
|
||||
def __str__(self):
|
||||
return "ApplicationService: %s" % (self.__dict__,)
|
||||
# copy dictionary and redact token fields so they don't get logged
|
||||
dict_copy = self.__dict__.copy()
|
||||
dict_copy["token"] = "<redacted>"
|
||||
dict_copy["hs_token"] = "<redacted>"
|
||||
return "ApplicationService: %s" % (dict_copy,)
|
||||
|
||||
@@ -24,8 +24,27 @@ from synapse.types import ThirdPartyInstanceID
|
||||
import logging
|
||||
import urllib
|
||||
|
||||
from prometheus_client import Counter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
sent_transactions_counter = Counter(
|
||||
"synapse_appservice_api_sent_transactions",
|
||||
"Number of /transactions/ requests sent",
|
||||
["service"]
|
||||
)
|
||||
|
||||
failed_transactions_counter = Counter(
|
||||
"synapse_appservice_api_failed_transactions",
|
||||
"Number of /transactions/ requests that failed to send",
|
||||
["service"]
|
||||
)
|
||||
|
||||
sent_events_counter = Counter(
|
||||
"synapse_appservice_api_sent_events",
|
||||
"Number of events sent to the AS",
|
||||
["service"]
|
||||
)
|
||||
|
||||
HOUR_IN_MS = 60 * 60 * 1000
|
||||
|
||||
@@ -219,12 +238,15 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
args={
|
||||
"access_token": service.hs_token
|
||||
})
|
||||
sent_transactions_counter.labels(service.id).inc()
|
||||
sent_events_counter.labels(service.id).inc(len(events))
|
||||
defer.returnValue(True)
|
||||
return
|
||||
except CodeMessageException as e:
|
||||
logger.warning("push_bulk to %s received %s", uri, e.code)
|
||||
except Exception as ex:
|
||||
logger.warning("push_bulk to %s threw exception %s", uri, ex)
|
||||
failed_transactions_counter.labels(service.id).inc()
|
||||
defer.returnValue(False)
|
||||
|
||||
def _serialize(self, events):
|
||||
|
||||
+10
-1
@@ -223,6 +223,11 @@ class Config(object):
|
||||
action="store_true",
|
||||
help="Generate a config file for the server name"
|
||||
)
|
||||
config_parser.add_argument(
|
||||
"--check-config",
|
||||
action="store_true",
|
||||
help="Check configuration supplied is valid"
|
||||
)
|
||||
config_parser.add_argument(
|
||||
"--report-stats",
|
||||
action="store",
|
||||
@@ -250,6 +255,8 @@ class Config(object):
|
||||
config_files = find_config_files(search_paths=config_args.config_path)
|
||||
|
||||
generate_keys = config_args.generate_keys
|
||||
|
||||
check_config = config_args.check_config
|
||||
|
||||
obj = cls()
|
||||
|
||||
@@ -333,7 +340,9 @@ class Config(object):
|
||||
if generate_keys:
|
||||
return None
|
||||
|
||||
obj.invoke_all("read_arguments", args)
|
||||
obj.invoke_all("read_arguments", args)
|
||||
if check_config:
|
||||
return None
|
||||
|
||||
return obj
|
||||
|
||||
|
||||
@@ -18,6 +18,9 @@ from ._base import Config
|
||||
DEFAULT_CONFIG = """\
|
||||
# User Consent configuration
|
||||
#
|
||||
# for detailed instructions, see
|
||||
# https://github.com/matrix-org/synapse/blob/master/docs/consent_tracking.md
|
||||
#
|
||||
# Parts of this section are required if enabling the 'consent' resource under
|
||||
# 'listeners', in particular 'template_dir' and 'version'.
|
||||
#
|
||||
|
||||
@@ -27,10 +27,12 @@ from synapse.util.metrics import Measure
|
||||
from twisted.internet import defer
|
||||
|
||||
from signedjson.sign import (
|
||||
verify_signed_json, signature_ids, sign_json, encode_canonical_json
|
||||
verify_signed_json, signature_ids, sign_json, encode_canonical_json,
|
||||
SignatureVerifyException,
|
||||
)
|
||||
from signedjson.key import (
|
||||
is_signing_algorithm_supported, decode_verify_key_bytes
|
||||
is_signing_algorithm_supported, decode_verify_key_bytes,
|
||||
encode_verify_key_base64,
|
||||
)
|
||||
from unpaddedbase64 import decode_base64, encode_base64
|
||||
|
||||
@@ -56,7 +58,7 @@ Attributes:
|
||||
key_ids(set(str)): The set of key_ids to that could be used to verify the
|
||||
JSON object
|
||||
json_object(dict): The JSON object to verify.
|
||||
deferred(twisted.internet.defer.Deferred):
|
||||
deferred(Deferred[str, str, nacl.signing.VerifyKey]):
|
||||
A deferred (server_name, key_id, verify_key) tuple that resolves when
|
||||
a verify key has been fetched. The deferreds' callbacks are run with no
|
||||
logcontext.
|
||||
@@ -736,6 +738,17 @@ class Keyring(object):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _handle_key_deferred(verify_request):
|
||||
"""Waits for the key to become available, and then performs a verification
|
||||
|
||||
Args:
|
||||
verify_request (VerifyKeyRequest):
|
||||
|
||||
Returns:
|
||||
Deferred[None]
|
||||
|
||||
Raises:
|
||||
SynapseError if there was a problem performing the verification
|
||||
"""
|
||||
server_name = verify_request.server_name
|
||||
try:
|
||||
with PreserveLoggingContext():
|
||||
@@ -768,11 +781,17 @@ def _handle_key_deferred(verify_request):
|
||||
))
|
||||
try:
|
||||
verify_signed_json(json_object, server_name, verify_key)
|
||||
except Exception:
|
||||
except SignatureVerifyException as e:
|
||||
logger.debug(
|
||||
"Error verifying signature for %s:%s:%s with key %s: %s",
|
||||
server_name, verify_key.alg, verify_key.version,
|
||||
encode_verify_key_base64(verify_key),
|
||||
str(e),
|
||||
)
|
||||
raise SynapseError(
|
||||
401,
|
||||
"Invalid signature for server %s with key %s:%s" % (
|
||||
server_name, verify_key.alg, verify_key.version
|
||||
"Invalid signature for server %s with key %s:%s: %s" % (
|
||||
server_name, verify_key.alg, verify_key.version, str(e),
|
||||
),
|
||||
Codes.UNAUTHORIZED,
|
||||
)
|
||||
|
||||
+66
-43
@@ -34,9 +34,11 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
|
||||
event: the event being checked.
|
||||
auth_events (dict: event-key -> event): the existing room state.
|
||||
|
||||
Raises:
|
||||
AuthError if the checks fail
|
||||
|
||||
Returns:
|
||||
True if the auth checks pass.
|
||||
if the auth checks pass.
|
||||
"""
|
||||
if do_size_check:
|
||||
_check_size_limits(event)
|
||||
@@ -71,7 +73,7 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
|
||||
# Oh, we don't know what the state of the room was, so we
|
||||
# are trusting that this is allowed (at least for now)
|
||||
logger.warn("Trusting event: %s", event.event_id)
|
||||
return True
|
||||
return
|
||||
|
||||
if event.type == EventTypes.Create:
|
||||
room_id_domain = get_domain_from_id(event.room_id)
|
||||
@@ -81,7 +83,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
|
||||
"Creation event's room_id domain does not match sender's"
|
||||
)
|
||||
# FIXME
|
||||
return True
|
||||
logger.debug("Allowing! %s", event)
|
||||
return
|
||||
|
||||
creation_event = auth_events.get((EventTypes.Create, ""), None)
|
||||
|
||||
@@ -118,7 +121,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
|
||||
403,
|
||||
"Alias event's state_key does not match sender's domain"
|
||||
)
|
||||
return True
|
||||
logger.debug("Allowing! %s", event)
|
||||
return
|
||||
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logger.debug(
|
||||
@@ -127,14 +131,9 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
|
||||
)
|
||||
|
||||
if event.type == EventTypes.Member:
|
||||
allowed = _is_membership_change_allowed(
|
||||
event, auth_events
|
||||
)
|
||||
if allowed:
|
||||
logger.debug("Allowing! %s", event)
|
||||
else:
|
||||
logger.debug("Denying! %s", event)
|
||||
return allowed
|
||||
_is_membership_change_allowed(event, auth_events)
|
||||
logger.debug("Allowing! %s", event)
|
||||
return
|
||||
|
||||
_check_event_sender_in_room(event, auth_events)
|
||||
|
||||
@@ -153,7 +152,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
|
||||
)
|
||||
)
|
||||
else:
|
||||
return True
|
||||
logger.debug("Allowing! %s", event)
|
||||
return
|
||||
|
||||
_can_send_event(event, auth_events)
|
||||
|
||||
@@ -200,7 +200,7 @@ def _is_membership_change_allowed(event, auth_events):
|
||||
create = auth_events.get(key)
|
||||
if create and event.prev_events[0][0] == create.event_id:
|
||||
if create.content["creator"] == event.state_key:
|
||||
return True
|
||||
return
|
||||
|
||||
target_user_id = event.state_key
|
||||
|
||||
@@ -265,13 +265,13 @@ def _is_membership_change_allowed(event, auth_events):
|
||||
raise AuthError(
|
||||
403, "%s is banned from the room" % (target_user_id,)
|
||||
)
|
||||
return True
|
||||
return
|
||||
|
||||
if Membership.JOIN != membership:
|
||||
if (caller_invited
|
||||
and Membership.LEAVE == membership
|
||||
and target_user_id == event.user_id):
|
||||
return True
|
||||
return
|
||||
|
||||
if not caller_in_room: # caller isn't joined
|
||||
raise AuthError(
|
||||
@@ -334,8 +334,6 @@ def _is_membership_change_allowed(event, auth_events):
|
||||
else:
|
||||
raise AuthError(500, "Unknown membership %s" % membership)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def _check_event_sender_in_room(event, auth_events):
|
||||
key = (EventTypes.Member, event.user_id, )
|
||||
@@ -355,35 +353,46 @@ def _check_joined_room(member, user_id, room_id):
|
||||
))
|
||||
|
||||
|
||||
def get_send_level(etype, state_key, auth_events):
|
||||
key = (EventTypes.PowerLevels, "", )
|
||||
send_level_event = auth_events.get(key)
|
||||
send_level = None
|
||||
if send_level_event:
|
||||
send_level = send_level_event.content.get("events", {}).get(
|
||||
etype
|
||||
)
|
||||
if send_level is None:
|
||||
if state_key is not None:
|
||||
send_level = send_level_event.content.get(
|
||||
"state_default", 50
|
||||
)
|
||||
else:
|
||||
send_level = send_level_event.content.get(
|
||||
"events_default", 0
|
||||
)
|
||||
def get_send_level(etype, state_key, power_levels_event):
|
||||
"""Get the power level required to send an event of a given type
|
||||
|
||||
if send_level:
|
||||
send_level = int(send_level)
|
||||
The federation spec [1] refers to this as "Required Power Level".
|
||||
|
||||
https://matrix.org/docs/spec/server_server/unstable.html#definitions
|
||||
|
||||
Args:
|
||||
etype (str): type of event
|
||||
state_key (str|None): state_key of state event, or None if it is not
|
||||
a state event.
|
||||
power_levels_event (synapse.events.EventBase|None): power levels event
|
||||
in force at this point in the room
|
||||
Returns:
|
||||
int: power level required to send this event.
|
||||
"""
|
||||
|
||||
if power_levels_event:
|
||||
power_levels_content = power_levels_event.content
|
||||
else:
|
||||
send_level = 0
|
||||
power_levels_content = {}
|
||||
|
||||
return send_level
|
||||
# see if we have a custom level for this event type
|
||||
send_level = power_levels_content.get("events", {}).get(etype)
|
||||
|
||||
# otherwise, fall back to the state_default/events_default.
|
||||
if send_level is None:
|
||||
if state_key is not None:
|
||||
send_level = power_levels_content.get("state_default", 50)
|
||||
else:
|
||||
send_level = power_levels_content.get("events_default", 0)
|
||||
|
||||
return int(send_level)
|
||||
|
||||
|
||||
def _can_send_event(event, auth_events):
|
||||
power_levels_event = _get_power_level_event(auth_events)
|
||||
|
||||
send_level = get_send_level(
|
||||
event.type, event.get("state_key", None), auth_events
|
||||
event.type, event.get("state_key"), power_levels_event,
|
||||
)
|
||||
user_level = get_user_power_level(event.user_id, auth_events)
|
||||
|
||||
@@ -524,13 +533,22 @@ def _check_power_levels(event, auth_events):
|
||||
|
||||
|
||||
def _get_power_level_event(auth_events):
|
||||
key = (EventTypes.PowerLevels, "", )
|
||||
return auth_events.get(key)
|
||||
return auth_events.get((EventTypes.PowerLevels, ""))
|
||||
|
||||
|
||||
def get_user_power_level(user_id, auth_events):
|
||||
power_level_event = _get_power_level_event(auth_events)
|
||||
"""Get a user's power level
|
||||
|
||||
Args:
|
||||
user_id (str): user's id to look up in power_levels
|
||||
auth_events (dict[(str, str), synapse.events.EventBase]):
|
||||
state in force at this point in the room (or rather, a subset of
|
||||
it including at least the create event and power levels event.
|
||||
|
||||
Returns:
|
||||
int: the user's power level in this room.
|
||||
"""
|
||||
power_level_event = _get_power_level_event(auth_events)
|
||||
if power_level_event:
|
||||
level = power_level_event.content.get("users", {}).get(user_id)
|
||||
if not level:
|
||||
@@ -541,6 +559,11 @@ def get_user_power_level(user_id, auth_events):
|
||||
else:
|
||||
return int(level)
|
||||
else:
|
||||
# if there is no power levels event, the creator gets 100 and everyone
|
||||
# else gets 0.
|
||||
|
||||
# some things which call this don't pass the create event: hack around
|
||||
# that.
|
||||
key = (EventTypes.Create, "", )
|
||||
create_event = auth_events.get(key)
|
||||
if (create_event is not None and
|
||||
|
||||
@@ -35,7 +35,7 @@ from synapse.storage.presence import UserPresenceState
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.metrics import LaterGauge
|
||||
|
||||
from blist import sorteddict
|
||||
from sortedcontainers import SortedDict
|
||||
from collections import namedtuple
|
||||
|
||||
import logging
|
||||
@@ -55,19 +55,19 @@ class FederationRemoteSendQueue(object):
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
|
||||
self.presence_map = {} # Pending presence map user_id -> UserPresenceState
|
||||
self.presence_changed = sorteddict() # Stream position -> user_id
|
||||
self.presence_changed = SortedDict() # Stream position -> user_id
|
||||
|
||||
self.keyed_edu = {} # (destination, key) -> EDU
|
||||
self.keyed_edu_changed = sorteddict() # stream position -> (destination, key)
|
||||
self.keyed_edu_changed = SortedDict() # stream position -> (destination, key)
|
||||
|
||||
self.edus = sorteddict() # stream position -> Edu
|
||||
self.edus = SortedDict() # stream position -> Edu
|
||||
|
||||
self.failures = sorteddict() # stream position -> (destination, Failure)
|
||||
self.failures = SortedDict() # stream position -> (destination, Failure)
|
||||
|
||||
self.device_messages = sorteddict() # stream position -> destination
|
||||
self.device_messages = SortedDict() # stream position -> destination
|
||||
|
||||
self.pos = 1
|
||||
self.pos_time = sorteddict()
|
||||
self.pos_time = SortedDict()
|
||||
|
||||
# EVERYTHING IS SAD. In particular, python only makes new scopes when
|
||||
# we make a new function, so we need to make a new function so the inner
|
||||
@@ -98,7 +98,7 @@ class FederationRemoteSendQueue(object):
|
||||
now = self.clock.time_msec()
|
||||
|
||||
keys = self.pos_time.keys()
|
||||
time = keys.bisect_left(now - FIVE_MINUTES_AGO)
|
||||
time = self.pos_time.bisect_left(now - FIVE_MINUTES_AGO)
|
||||
if not keys[:time]:
|
||||
return
|
||||
|
||||
@@ -113,7 +113,7 @@ class FederationRemoteSendQueue(object):
|
||||
with Measure(self.clock, "send_queue._clear"):
|
||||
# Delete things out of presence maps
|
||||
keys = self.presence_changed.keys()
|
||||
i = keys.bisect_left(position_to_delete)
|
||||
i = self.presence_changed.bisect_left(position_to_delete)
|
||||
for key in keys[:i]:
|
||||
del self.presence_changed[key]
|
||||
|
||||
@@ -131,7 +131,7 @@ class FederationRemoteSendQueue(object):
|
||||
|
||||
# Delete things out of keyed edus
|
||||
keys = self.keyed_edu_changed.keys()
|
||||
i = keys.bisect_left(position_to_delete)
|
||||
i = self.keyed_edu_changed.bisect_left(position_to_delete)
|
||||
for key in keys[:i]:
|
||||
del self.keyed_edu_changed[key]
|
||||
|
||||
@@ -145,19 +145,19 @@ class FederationRemoteSendQueue(object):
|
||||
|
||||
# Delete things out of edu map
|
||||
keys = self.edus.keys()
|
||||
i = keys.bisect_left(position_to_delete)
|
||||
i = self.edus.bisect_left(position_to_delete)
|
||||
for key in keys[:i]:
|
||||
del self.edus[key]
|
||||
|
||||
# Delete things out of failure map
|
||||
keys = self.failures.keys()
|
||||
i = keys.bisect_left(position_to_delete)
|
||||
i = self.failures.bisect_left(position_to_delete)
|
||||
for key in keys[:i]:
|
||||
del self.failures[key]
|
||||
|
||||
# Delete things out of device map
|
||||
keys = self.device_messages.keys()
|
||||
i = keys.bisect_left(position_to_delete)
|
||||
i = self.device_messages.bisect_left(position_to_delete)
|
||||
for key in keys[:i]:
|
||||
del self.device_messages[key]
|
||||
|
||||
@@ -250,13 +250,12 @@ class FederationRemoteSendQueue(object):
|
||||
self._clear_queue_before_pos(federation_ack)
|
||||
|
||||
# Fetch changed presence
|
||||
keys = self.presence_changed.keys()
|
||||
i = keys.bisect_right(from_token)
|
||||
j = keys.bisect_right(to_token) + 1
|
||||
i = self.presence_changed.bisect_right(from_token)
|
||||
j = self.presence_changed.bisect_right(to_token) + 1
|
||||
dest_user_ids = [
|
||||
(pos, user_id)
|
||||
for pos in keys[i:j]
|
||||
for user_id in self.presence_changed[pos]
|
||||
for pos, user_id_list in self.presence_changed.items()[i:j]
|
||||
for user_id in user_id_list
|
||||
]
|
||||
|
||||
for (key, user_id) in dest_user_ids:
|
||||
@@ -265,13 +264,12 @@ class FederationRemoteSendQueue(object):
|
||||
)))
|
||||
|
||||
# Fetch changes keyed edus
|
||||
keys = self.keyed_edu_changed.keys()
|
||||
i = keys.bisect_right(from_token)
|
||||
j = keys.bisect_right(to_token) + 1
|
||||
i = self.keyed_edu_changed.bisect_right(from_token)
|
||||
j = self.keyed_edu_changed.bisect_right(to_token) + 1
|
||||
# We purposefully clobber based on the key here, python dict comprehensions
|
||||
# always use the last value, so this will correctly point to the last
|
||||
# stream position.
|
||||
keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
|
||||
keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]}
|
||||
|
||||
for ((destination, edu_key), pos) in iteritems(keyed_edus):
|
||||
rows.append((pos, KeyedEduRow(
|
||||
@@ -280,19 +278,17 @@ class FederationRemoteSendQueue(object):
|
||||
)))
|
||||
|
||||
# Fetch changed edus
|
||||
keys = self.edus.keys()
|
||||
i = keys.bisect_right(from_token)
|
||||
j = keys.bisect_right(to_token) + 1
|
||||
edus = ((k, self.edus[k]) for k in keys[i:j])
|
||||
i = self.edus.bisect_right(from_token)
|
||||
j = self.edus.bisect_right(to_token) + 1
|
||||
edus = self.edus.items()[i:j]
|
||||
|
||||
for (pos, edu) in edus:
|
||||
rows.append((pos, EduRow(edu)))
|
||||
|
||||
# Fetch changed failures
|
||||
keys = self.failures.keys()
|
||||
i = keys.bisect_right(from_token)
|
||||
j = keys.bisect_right(to_token) + 1
|
||||
failures = ((k, self.failures[k]) for k in keys[i:j])
|
||||
i = self.failures.bisect_right(from_token)
|
||||
j = self.failures.bisect_right(to_token) + 1
|
||||
failures = self.failures.items()[i:j]
|
||||
|
||||
for (pos, (destination, failure)) in failures:
|
||||
rows.append((pos, FailureRow(
|
||||
@@ -301,10 +297,9 @@ class FederationRemoteSendQueue(object):
|
||||
)))
|
||||
|
||||
# Fetch changed device messages
|
||||
keys = self.device_messages.keys()
|
||||
i = keys.bisect_right(from_token)
|
||||
j = keys.bisect_right(to_token) + 1
|
||||
device_messages = {self.device_messages[k]: k for k in keys[i:j]}
|
||||
i = self.device_messages.bisect_right(from_token)
|
||||
j = self.device_messages.bisect_right(to_token) + 1
|
||||
device_messages = {v: k for k, v in self.device_messages.items()[i:j]}
|
||||
|
||||
for (destination, pos) in iteritems(device_messages):
|
||||
rows.append((pos, DeviceRow(
|
||||
|
||||
@@ -21,7 +21,6 @@ from .units import Transaction, Edu
|
||||
|
||||
from synapse.api.errors import HttpResponseException, FederationDeniedError
|
||||
from synapse.util import logcontext, PreserveLoggingContext
|
||||
from synapse.util.async import run_on_reactor
|
||||
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
|
||||
from synapse.util.metrics import measure_func
|
||||
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
|
||||
@@ -451,9 +450,6 @@ class TransactionQueue(object):
|
||||
# hence why we throw the result away.
|
||||
yield get_retry_limiter(destination, self.clock, self.store)
|
||||
|
||||
# XXX: what's this for?
|
||||
yield run_on_reactor()
|
||||
|
||||
pending_pdus = []
|
||||
while True:
|
||||
device_message_edus, device_stream_id, dev_list_id = (
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from twisted.internet import defer, threads
|
||||
|
||||
from ._base import BaseHandler
|
||||
@@ -23,7 +24,6 @@ from synapse.api.errors import (
|
||||
)
|
||||
from synapse.module_api import ModuleApi
|
||||
from synapse.types import UserID
|
||||
from synapse.util.async import run_on_reactor
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.logcontext import make_deferred_yieldable
|
||||
|
||||
@@ -423,15 +423,11 @@ class AuthHandler(BaseHandler):
|
||||
def _check_msisdn(self, authdict, _):
|
||||
return self._check_threepid('msisdn', authdict)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _check_dummy_auth(self, authdict, _):
|
||||
yield run_on_reactor()
|
||||
defer.returnValue(True)
|
||||
return defer.succeed(True)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _check_threepid(self, medium, authdict):
|
||||
yield run_on_reactor()
|
||||
|
||||
if 'threepid_creds' not in authdict:
|
||||
raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
|
||||
|
||||
@@ -825,6 +821,15 @@ class AuthHandler(BaseHandler):
|
||||
if medium == 'email':
|
||||
address = address.lower()
|
||||
|
||||
identity_handler = self.hs.get_handlers().identity_handler
|
||||
yield identity_handler.unbind_threepid(
|
||||
user_id,
|
||||
{
|
||||
'medium': medium,
|
||||
'address': address,
|
||||
},
|
||||
)
|
||||
|
||||
ret = yield self.store.user_delete_threepid(
|
||||
user_id, medium, address,
|
||||
)
|
||||
|
||||
@@ -17,6 +17,7 @@ from twisted.internet import defer, reactor
|
||||
from ._base import BaseHandler
|
||||
from synapse.types import UserID, create_requester
|
||||
from synapse.util.logcontext import run_in_background
|
||||
from synapse.api.errors import SynapseError
|
||||
|
||||
import logging
|
||||
|
||||
@@ -30,6 +31,7 @@ class DeactivateAccountHandler(BaseHandler):
|
||||
self._auth_handler = hs.get_auth_handler()
|
||||
self._device_handler = hs.get_device_handler()
|
||||
self._room_member_handler = hs.get_room_member_handler()
|
||||
self._identity_handler = hs.get_handlers().identity_handler
|
||||
self.user_directory_handler = hs.get_user_directory_handler()
|
||||
|
||||
# Flag that indicates whether the process to part users from rooms is running
|
||||
@@ -52,14 +54,35 @@ class DeactivateAccountHandler(BaseHandler):
|
||||
# FIXME: Theoretically there is a race here wherein user resets
|
||||
# password using threepid.
|
||||
|
||||
# first delete any devices belonging to the user, which will also
|
||||
# delete threepids first. We remove these from the IS so if this fails,
|
||||
# leave the user still active so they can try again.
|
||||
# Ideally we would prevent password resets and then do this in the
|
||||
# background thread.
|
||||
threepids = yield self.store.user_get_threepids(user_id)
|
||||
for threepid in threepids:
|
||||
try:
|
||||
yield self._identity_handler.unbind_threepid(
|
||||
user_id,
|
||||
{
|
||||
'medium': threepid['medium'],
|
||||
'address': threepid['address'],
|
||||
},
|
||||
)
|
||||
except Exception:
|
||||
# Do we want this to be a fatal error or should we carry on?
|
||||
logger.exception("Failed to remove threepid from ID server")
|
||||
raise SynapseError(400, "Failed to remove threepid from ID server")
|
||||
yield self.store.user_delete_threepid(
|
||||
user_id, threepid['medium'], threepid['address'],
|
||||
)
|
||||
|
||||
# delete any devices belonging to the user, which will also
|
||||
# delete corresponding access tokens.
|
||||
yield self._device_handler.delete_all_devices_for_user(user_id)
|
||||
# then delete any remaining access tokens which weren't associated with
|
||||
# a device.
|
||||
yield self._auth_handler.delete_access_tokens_for_user(user_id)
|
||||
|
||||
yield self.store.user_delete_threepids(user_id)
|
||||
yield self.store.user_set_password_hash(user_id, None)
|
||||
|
||||
# Add the user to a table of users pending deactivation (ie.
|
||||
|
||||
@@ -39,7 +39,7 @@ from synapse.events.validator import EventValidator
|
||||
from synapse.util import unwrapFirstError, logcontext
|
||||
from synapse.util.metrics import measure_func
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.util.async import run_on_reactor, Linearizer
|
||||
from synapse.util.async import Linearizer
|
||||
from synapse.util.frozenutils import unfreeze
|
||||
from synapse.crypto.event_signing import (
|
||||
compute_event_signature, add_hashes_and_signatures,
|
||||
@@ -1381,8 +1381,6 @@ class FederationHandler(BaseHandler):
|
||||
def get_state_for_pdu(self, room_id, event_id):
|
||||
"""Returns the state at the event. i.e. not including said event.
|
||||
"""
|
||||
yield run_on_reactor()
|
||||
|
||||
state_groups = yield self.store.get_state_groups(
|
||||
room_id, [event_id]
|
||||
)
|
||||
@@ -1425,8 +1423,6 @@ class FederationHandler(BaseHandler):
|
||||
def get_state_ids_for_pdu(self, room_id, event_id):
|
||||
"""Returns the state at the event. i.e. not including said event.
|
||||
"""
|
||||
yield run_on_reactor()
|
||||
|
||||
state_groups = yield self.store.get_state_groups_ids(
|
||||
room_id, [event_id]
|
||||
)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2015, 2016 OpenMarket Ltd
|
||||
# Copyright 2017 Vector Creations Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -26,7 +27,6 @@ from synapse.api.errors import (
|
||||
MatrixCodeMessageException, CodeMessageException
|
||||
)
|
||||
from ._base import BaseHandler
|
||||
from synapse.util.async import run_on_reactor
|
||||
from synapse.api.errors import SynapseError, Codes
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -38,6 +38,7 @@ class IdentityHandler(BaseHandler):
|
||||
super(IdentityHandler, self).__init__(hs)
|
||||
|
||||
self.http_client = hs.get_simple_http_client()
|
||||
self.federation_http_client = hs.get_http_client()
|
||||
|
||||
self.trusted_id_servers = set(hs.config.trusted_third_party_id_servers)
|
||||
self.trust_any_id_server_just_for_testing_do_not_use = (
|
||||
@@ -60,8 +61,6 @@ class IdentityHandler(BaseHandler):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def threepid_from_creds(self, creds):
|
||||
yield run_on_reactor()
|
||||
|
||||
if 'id_server' in creds:
|
||||
id_server = creds['id_server']
|
||||
elif 'idServer' in creds:
|
||||
@@ -104,7 +103,6 @@ class IdentityHandler(BaseHandler):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def bind_threepid(self, creds, mxid):
|
||||
yield run_on_reactor()
|
||||
logger.debug("binding threepid %r to %s", creds, mxid)
|
||||
data = None
|
||||
|
||||
@@ -139,9 +137,53 @@ class IdentityHandler(BaseHandler):
|
||||
defer.returnValue(data)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
|
||||
yield run_on_reactor()
|
||||
def unbind_threepid(self, mxid, threepid):
|
||||
"""
|
||||
Removes a binding from an identity server
|
||||
Args:
|
||||
mxid (str): Matrix user ID of binding to be removed
|
||||
threepid (dict): Dict with medium & address of binding to be removed
|
||||
|
||||
Returns:
|
||||
Deferred[bool]: True on success, otherwise False
|
||||
"""
|
||||
logger.debug("unbinding threepid %r from %s", threepid, mxid)
|
||||
if not self.trusted_id_servers:
|
||||
logger.warn("Can't unbind threepid: no trusted ID servers set in config")
|
||||
defer.returnValue(False)
|
||||
|
||||
# We don't track what ID server we added 3pids on (perhaps we ought to)
|
||||
# but we assume that any of the servers in the trusted list are in the
|
||||
# same ID server federation, so we can pick any one of them to send the
|
||||
# deletion request to.
|
||||
id_server = next(iter(self.trusted_id_servers))
|
||||
|
||||
url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
|
||||
content = {
|
||||
"mxid": mxid,
|
||||
"threepid": threepid,
|
||||
}
|
||||
headers = {}
|
||||
# we abuse the federation http client to sign the request, but we have to send it
|
||||
# using the normal http client since we don't want the SRV lookup and want normal
|
||||
# 'browser-like' HTTPS.
|
||||
self.federation_http_client.sign_request(
|
||||
destination=None,
|
||||
method='POST',
|
||||
url_bytes='/_matrix/identity/api/v1/3pid/unbind'.encode('ascii'),
|
||||
headers_dict=headers,
|
||||
content=content,
|
||||
destination_is=id_server,
|
||||
)
|
||||
yield self.http_client.post_json_get_json(
|
||||
url,
|
||||
content,
|
||||
headers,
|
||||
)
|
||||
defer.returnValue(True)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
|
||||
if not self._should_trust_id_server(id_server):
|
||||
raise SynapseError(
|
||||
400, "Untrusted ID server '%s'" % id_server,
|
||||
@@ -176,8 +218,6 @@ class IdentityHandler(BaseHandler):
|
||||
self, id_server, country, phone_number,
|
||||
client_secret, send_attempt, **kwargs
|
||||
):
|
||||
yield run_on_reactor()
|
||||
|
||||
if not self._should_trust_id_server(id_server):
|
||||
raise SynapseError(
|
||||
400, "Untrusted ID server '%s'" % id_server,
|
||||
|
||||
@@ -36,7 +36,7 @@ from synapse.events.validator import EventValidator
|
||||
from synapse.types import (
|
||||
UserID, RoomAlias, RoomStreamToken,
|
||||
)
|
||||
from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
|
||||
from synapse.util.async import ReadWriteLock, Limiter
|
||||
from synapse.util.logcontext import run_in_background
|
||||
from synapse.util.metrics import measure_func
|
||||
from synapse.util.frozenutils import frozendict_json_encoder
|
||||
@@ -959,9 +959,7 @@ class EventCreationHandler(object):
|
||||
event_stream_id, max_stream_id
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _notify():
|
||||
yield run_on_reactor()
|
||||
try:
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id,
|
||||
|
||||
@@ -24,7 +24,7 @@ from synapse.api.errors import (
|
||||
from synapse.http.client import CaptchaServerHttpClient
|
||||
from synapse import types
|
||||
from synapse.types import UserID, create_requester, RoomID, RoomAlias
|
||||
from synapse.util.async import run_on_reactor, Linearizer
|
||||
from synapse.util.async import Linearizer
|
||||
from synapse.util.threepids import check_3pid_allowed
|
||||
from ._base import BaseHandler
|
||||
|
||||
@@ -139,7 +139,6 @@ class RegistrationHandler(BaseHandler):
|
||||
Raises:
|
||||
RegistrationError if there was a problem registering.
|
||||
"""
|
||||
yield run_on_reactor()
|
||||
password_hash = None
|
||||
if password:
|
||||
password_hash = yield self.auth_handler().hash(password)
|
||||
@@ -431,8 +430,6 @@ class RegistrationHandler(BaseHandler):
|
||||
Raises:
|
||||
RegistrationError if there was a problem registering.
|
||||
"""
|
||||
yield run_on_reactor()
|
||||
|
||||
if localpart is None:
|
||||
raise SynapseError(400, "Request must include user id")
|
||||
|
||||
|
||||
@@ -13,6 +13,8 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import re
|
||||
|
||||
from twisted.internet.defer import CancelledError
|
||||
from twisted.python import failure
|
||||
|
||||
@@ -34,3 +36,14 @@ def cancelled_to_request_timed_out_error(value, timeout):
|
||||
value.trap(CancelledError)
|
||||
raise RequestTimedOutError()
|
||||
return value
|
||||
|
||||
|
||||
ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
|
||||
|
||||
|
||||
def redact_uri(uri):
|
||||
"""Strips access tokens from the uri replaces with <redacted>"""
|
||||
return ACCESS_TOKEN_RE.sub(
|
||||
br'\1<redacted>\3',
|
||||
uri
|
||||
)
|
||||
|
||||
@@ -19,7 +19,7 @@ from OpenSSL.SSL import VERIFY_NONE
|
||||
from synapse.api.errors import (
|
||||
CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
|
||||
)
|
||||
from synapse.http import cancelled_to_request_timed_out_error
|
||||
from synapse.http import cancelled_to_request_timed_out_error, redact_uri
|
||||
from synapse.util.async import add_timeout_to_deferred
|
||||
from synapse.util.caches import CACHE_SIZE_FACTOR
|
||||
from synapse.util.logcontext import make_deferred_yieldable
|
||||
@@ -90,7 +90,8 @@ class SimpleHttpClient(object):
|
||||
# counters to it
|
||||
outgoing_requests_counter.labels(method).inc()
|
||||
|
||||
logger.info("Sending request %s %s", method, uri)
|
||||
# log request but strip `access_token` (AS requests for example include this)
|
||||
logger.info("Sending request %s %s", method, redact_uri(uri))
|
||||
|
||||
try:
|
||||
request_deferred = self.agent.request(
|
||||
@@ -105,14 +106,14 @@ class SimpleHttpClient(object):
|
||||
incoming_responses_counter.labels(method, response.code).inc()
|
||||
logger.info(
|
||||
"Received response to %s %s: %s",
|
||||
method, uri, response.code
|
||||
method, redact_uri(uri), response.code
|
||||
)
|
||||
defer.returnValue(response)
|
||||
except Exception as e:
|
||||
incoming_responses_counter.labels(method, "ERR").inc()
|
||||
logger.info(
|
||||
"Error sending request to %s %s: %s %s",
|
||||
method, uri, type(e).__name__, e.message
|
||||
method, redact_uri(uri), type(e).__name__, e.message
|
||||
)
|
||||
raise e
|
||||
|
||||
|
||||
@@ -260,14 +260,35 @@ class MatrixFederationHttpClient(object):
|
||||
defer.returnValue(response)
|
||||
|
||||
def sign_request(self, destination, method, url_bytes, headers_dict,
|
||||
content=None):
|
||||
content=None, destination_is=None):
|
||||
"""
|
||||
Signs a request by adding an Authorization header to headers_dict
|
||||
Args:
|
||||
destination (bytes|None): The desination home server of the request.
|
||||
May be None if the destination is an identity server, in which case
|
||||
destination_is must be non-None.
|
||||
method (bytes): The HTTP method of the request
|
||||
url_bytes (bytes): The URI path of the request
|
||||
headers_dict (dict): Dictionary of request headers to append to
|
||||
content (bytes): The body of the request
|
||||
destination_is (bytes): As 'destination', but if the destination is an
|
||||
identity server
|
||||
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
request = {
|
||||
"method": method,
|
||||
"uri": url_bytes,
|
||||
"origin": self.server_name,
|
||||
"destination": destination,
|
||||
}
|
||||
|
||||
if destination is not None:
|
||||
request["destination"] = destination
|
||||
|
||||
if destination_is is not None:
|
||||
request["destination_is"] = destination_is
|
||||
|
||||
if content is not None:
|
||||
request["content"] = content
|
||||
|
||||
|
||||
@@ -14,18 +14,16 @@
|
||||
|
||||
import contextlib
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
|
||||
from twisted.web.server import Site, Request
|
||||
|
||||
from synapse.http import redact_uri
|
||||
from synapse.http.request_metrics import RequestMetrics
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
|
||||
|
||||
_next_request_seq = 0
|
||||
|
||||
|
||||
@@ -69,10 +67,7 @@ class SynapseRequest(Request):
|
||||
return "%s-%i" % (self.method, self.request_seq)
|
||||
|
||||
def get_redacted_uri(self):
|
||||
return ACCESS_TOKEN_RE.sub(
|
||||
br'\1<redacted>\3',
|
||||
self.uri
|
||||
)
|
||||
return redact_uri(self.uri)
|
||||
|
||||
def get_user_agent(self):
|
||||
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
|
||||
|
||||
@@ -190,6 +190,22 @@ event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"
|
||||
# finished being processed.
|
||||
event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
|
||||
|
||||
last_ticked = time.time()
|
||||
|
||||
|
||||
class ReactorLastSeenMetric(object):
|
||||
|
||||
def collect(self):
|
||||
cm = GaugeMetricFamily(
|
||||
"python_twisted_reactor_last_seen",
|
||||
"Seconds since the Twisted reactor was last seen",
|
||||
)
|
||||
cm.add_metric([], time.time() - last_ticked)
|
||||
yield cm
|
||||
|
||||
|
||||
REGISTRY.register(ReactorLastSeenMetric())
|
||||
|
||||
|
||||
def runUntilCurrentTimer(func):
|
||||
|
||||
@@ -222,6 +238,11 @@ def runUntilCurrentTimer(func):
|
||||
tick_time.observe(end - start)
|
||||
pending_calls_metric.observe(num_pending)
|
||||
|
||||
# Update the time we last ticked, for the metric to test whether
|
||||
# Synapse's reactor has frozen
|
||||
global last_ticked
|
||||
last_ticked = end
|
||||
|
||||
if running_on_pypy:
|
||||
return ret
|
||||
|
||||
|
||||
@@ -19,7 +19,6 @@ import logging
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.push.pusher import PusherFactory
|
||||
from synapse.util.async import run_on_reactor
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -125,7 +124,6 @@ class PusherPool:
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_new_notifications(self, min_stream_id, max_stream_id):
|
||||
yield run_on_reactor()
|
||||
try:
|
||||
users_affected = yield self.store.get_push_action_users_in_range(
|
||||
min_stream_id, max_stream_id
|
||||
@@ -151,7 +149,6 @@ class PusherPool:
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
|
||||
yield run_on_reactor()
|
||||
try:
|
||||
# Need to subtract 1 from the minimum because the lower bound here
|
||||
# is not inclusive
|
||||
|
||||
@@ -50,14 +50,16 @@ REQUIREMENTS = {
|
||||
"bcrypt": ["bcrypt>=3.1.0"],
|
||||
"pillow": ["PIL"],
|
||||
"pydenticon": ["pydenticon"],
|
||||
"blist": ["blist"],
|
||||
"sortedcontainers": ["sortedcontainers"],
|
||||
"pysaml2>=3.0.0": ["saml2>=3.0.0"],
|
||||
"pymacaroons-pynacl": ["pymacaroons"],
|
||||
"msgpack-python>=0.3.0": ["msgpack"],
|
||||
"phonenumbers>=8.2.0": ["phonenumbers"],
|
||||
"six": ["six"],
|
||||
"prometheus_client": ["prometheus_client"],
|
||||
"attr": ["attr"],
|
||||
}
|
||||
|
||||
CONDITIONAL_REQUIREMENTS = {
|
||||
"web_client": {
|
||||
"matrix_angular_sdk>=0.6.8": ["syweb>=0.6.8"],
|
||||
|
||||
@@ -169,16 +169,12 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
|
||||
yield self.store.find_first_stream_ordering_after_ts(ts)
|
||||
)
|
||||
|
||||
room_event_after_stream_ordering = (
|
||||
r = (
|
||||
yield self.store.get_room_event_after_stream_ordering(
|
||||
room_id, stream_ordering,
|
||||
)
|
||||
)
|
||||
if room_event_after_stream_ordering:
|
||||
token = yield self.store.get_topological_token_for_event(
|
||||
room_event_after_stream_ordering,
|
||||
)
|
||||
else:
|
||||
if not r:
|
||||
logger.warn(
|
||||
"[purge] purging events not possible: No event found "
|
||||
"(received_ts %i => stream_ordering %i)",
|
||||
@@ -189,8 +185,10 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
|
||||
"there is no event to be purged",
|
||||
errcode=Codes.NOT_FOUND,
|
||||
)
|
||||
(stream, topo, _event_id) = r
|
||||
token = "t%d-%d" % (topo, stream)
|
||||
logger.info(
|
||||
"[purge] purging up to token %d (received_ts %i => "
|
||||
"[purge] purging up to token %s (received_ts %i => "
|
||||
"stream_ordering %i)",
|
||||
token, ts, stream_ordering,
|
||||
)
|
||||
|
||||
@@ -24,8 +24,6 @@ import synapse.util.stringutils as stringutils
|
||||
from synapse.http.servlet import parse_json_object_from_request
|
||||
from synapse.types import create_requester
|
||||
|
||||
from synapse.util.async import run_on_reactor
|
||||
|
||||
from hashlib import sha1
|
||||
import hmac
|
||||
import logging
|
||||
@@ -272,7 +270,6 @@ class RegisterRestServlet(ClientV1RestServlet):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _do_password(self, request, register_json, session):
|
||||
yield run_on_reactor()
|
||||
if (self.hs.config.enable_registration_captcha and
|
||||
not session[LoginType.RECAPTCHA]):
|
||||
# captcha should've been done by this stage!
|
||||
@@ -333,8 +330,6 @@ class RegisterRestServlet(ClientV1RestServlet):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _do_shared_secret(self, request, register_json, session):
|
||||
yield run_on_reactor()
|
||||
|
||||
if not isinstance(register_json.get("mac", None), string_types):
|
||||
raise SynapseError(400, "Expected mac.")
|
||||
if not isinstance(register_json.get("user", None), string_types):
|
||||
@@ -423,8 +418,6 @@ class CreateUserRestServlet(ClientV1RestServlet):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _do_create(self, requester, user_json):
|
||||
yield run_on_reactor()
|
||||
|
||||
if "localpart" not in user_json:
|
||||
raise SynapseError(400, "Expected 'localpart' key.")
|
||||
|
||||
|
||||
@@ -24,7 +24,6 @@ from synapse.http.servlet import (
|
||||
RestServlet, assert_params_in_request,
|
||||
parse_json_object_from_request,
|
||||
)
|
||||
from synapse.util.async import run_on_reactor
|
||||
from synapse.util.msisdn import phone_number_to_msisdn
|
||||
from synapse.util.threepids import check_3pid_allowed
|
||||
from ._base import client_v2_patterns, interactive_auth_handler
|
||||
@@ -300,8 +299,6 @@ class ThreepidRestServlet(RestServlet):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request):
|
||||
yield run_on_reactor()
|
||||
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
|
||||
threepids = yield self.datastore.user_get_threepids(
|
||||
@@ -312,8 +309,6 @@ class ThreepidRestServlet(RestServlet):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_POST(self, request):
|
||||
yield run_on_reactor()
|
||||
|
||||
body = parse_json_object_from_request(request)
|
||||
|
||||
threePidCreds = body.get('threePidCreds')
|
||||
@@ -365,8 +360,6 @@ class ThreepidDeleteRestServlet(RestServlet):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_POST(self, request):
|
||||
yield run_on_reactor()
|
||||
|
||||
body = parse_json_object_from_request(request)
|
||||
|
||||
required = ['medium', 'address']
|
||||
@@ -381,9 +374,16 @@ class ThreepidDeleteRestServlet(RestServlet):
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
user_id = requester.user.to_string()
|
||||
|
||||
yield self.auth_handler.delete_threepid(
|
||||
user_id, body['medium'], body['address']
|
||||
)
|
||||
try:
|
||||
yield self.auth_handler.delete_threepid(
|
||||
user_id, body['medium'], body['address']
|
||||
)
|
||||
except Exception:
|
||||
# NB. This endpoint should succeed if there is nothing to
|
||||
# delete, so it should only throw if something is wrong
|
||||
# that we ought to care about.
|
||||
logger.exception("Failed to remove threepid")
|
||||
raise SynapseError(500, "Failed to remove threepid")
|
||||
|
||||
defer.returnValue((200, {}))
|
||||
|
||||
|
||||
@@ -32,7 +32,6 @@ from ._base import client_v2_patterns, interactive_auth_handler
|
||||
import logging
|
||||
import hmac
|
||||
from hashlib import sha1
|
||||
from synapse.util.async import run_on_reactor
|
||||
from synapse.util.ratelimitutils import FederationRateLimiter
|
||||
|
||||
from six import string_types
|
||||
@@ -191,8 +190,6 @@ class RegisterRestServlet(RestServlet):
|
||||
@interactive_auth_handler
|
||||
@defer.inlineCallbacks
|
||||
def on_POST(self, request):
|
||||
yield run_on_reactor()
|
||||
|
||||
body = parse_json_object_from_request(request)
|
||||
|
||||
kind = "user"
|
||||
|
||||
+2
-2
@@ -694,10 +694,10 @@ def _create_auth_events_from_maps(unconflicted_state, conflicted_state, state_ma
|
||||
return auth_events
|
||||
|
||||
|
||||
def _resolve_with_state(unconflicted_state_ids, conflicted_state_ds, auth_event_ids,
|
||||
def _resolve_with_state(unconflicted_state_ids, conflicted_state_ids, auth_event_ids,
|
||||
state_map):
|
||||
conflicted_state = {}
|
||||
for key, event_ids in iteritems(conflicted_state_ds):
|
||||
for key, event_ids in iteritems(conflicted_state_ids):
|
||||
events = [state_map[ev_id] for ev_id in event_ids if ev_id in state_map]
|
||||
if len(events) > 1:
|
||||
conflicted_state[key] = events
|
||||
|
||||
@@ -460,15 +460,6 @@ class RegistrationStore(RegistrationWorkerStore,
|
||||
defer.returnValue(ret['user_id'])
|
||||
defer.returnValue(None)
|
||||
|
||||
def user_delete_threepids(self, user_id):
|
||||
return self._simple_delete(
|
||||
"user_threepids",
|
||||
keyvalues={
|
||||
"user_id": user_id,
|
||||
},
|
||||
desc="user_delete_threepids",
|
||||
)
|
||||
|
||||
def user_delete_threepid(self, user_id, medium, address):
|
||||
return self._simple_delete(
|
||||
"user_threepids",
|
||||
|
||||
@@ -578,7 +578,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
|
||||
)
|
||||
txn.execute(sql, (user_id, room_id))
|
||||
|
||||
txn.call_after(self.was_forgotten_at.invalidate_all)
|
||||
txn.call_after(self.did_forget.invalidate, (user_id, room_id))
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.who_forgot_in_room, (room_id,)
|
||||
@@ -609,31 +608,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
|
||||
count = yield self.runInteraction("did_forget_membership", f)
|
||||
defer.returnValue(count == 0)
|
||||
|
||||
@cachedInlineCallbacks(num_args=3)
|
||||
def was_forgotten_at(self, user_id, room_id, event_id):
|
||||
"""Returns whether user_id has elected to discard history for room_id at
|
||||
event_id.
|
||||
|
||||
event_id must be a membership event."""
|
||||
def f(txn):
|
||||
sql = (
|
||||
"SELECT"
|
||||
" forgotten"
|
||||
" FROM"
|
||||
" room_memberships"
|
||||
" WHERE"
|
||||
" user_id = ?"
|
||||
" AND"
|
||||
" room_id = ?"
|
||||
" AND"
|
||||
" event_id = ?"
|
||||
)
|
||||
txn.execute(sql, (user_id, room_id, event_id))
|
||||
rows = txn.fetchall()
|
||||
return rows[0][0]
|
||||
forgot = yield self.runInteraction("did_forget_membership_at", f)
|
||||
defer.returnValue(forgot == 1)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_add_membership_profile(self, progress, batch_size):
|
||||
target_min_stream_id = progress.get(
|
||||
|
||||
@@ -23,7 +23,7 @@ from twisted.internet import defer
|
||||
|
||||
from synapse.storage.background_updates import BackgroundUpdateStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.util.caches import intern_string, CACHE_SIZE_FACTOR
|
||||
from synapse.util.caches import intern_string, get_cache_factor_for
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
from synapse.util.caches.dictionary_cache import DictionaryCache
|
||||
from synapse.util.stringutils import to_ascii
|
||||
@@ -57,7 +57,7 @@ class StateGroupWorkerStore(SQLBaseStore):
|
||||
super(StateGroupWorkerStore, self).__init__(db_conn, hs)
|
||||
|
||||
self._state_group_cache = DictionaryCache(
|
||||
"*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR
|
||||
"*stateGroupCache*", 500000 * get_cache_factor_for("stateGroupCache")
|
||||
)
|
||||
|
||||
@cached(max_entries=100000, iterable=True)
|
||||
@@ -272,7 +272,7 @@ class StateGroupWorkerStore(SQLBaseStore):
|
||||
for typ in types:
|
||||
if typ[1] is None:
|
||||
where_clauses.append("(type = ?)")
|
||||
where_args.extend(typ[0])
|
||||
where_args.append(typ[0])
|
||||
wildcard_types = True
|
||||
else:
|
||||
where_clauses.append("(type = ? AND state_key = ?)")
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.internet.defer import CancelledError
|
||||
from twisted.python import failure
|
||||
@@ -41,13 +40,6 @@ def sleep(seconds):
|
||||
defer.returnValue(res)
|
||||
|
||||
|
||||
def run_on_reactor():
|
||||
""" This will cause the rest of the function to be invoked upon the next
|
||||
iteration of the main loop
|
||||
"""
|
||||
return sleep(0)
|
||||
|
||||
|
||||
class ObservableDeferred(object):
|
||||
"""Wraps a deferred object so that we can add observer deferreds. These
|
||||
observer deferreds do not affect the callback chain of the original
|
||||
@@ -227,7 +219,7 @@ class Linearizer(object):
|
||||
# the context manager, but it needs to happen while we hold the
|
||||
# lock, and the context manager's exit code must be synchronous,
|
||||
# so actually this is the only sensible place.
|
||||
yield run_on_reactor()
|
||||
yield sleep(0)
|
||||
|
||||
else:
|
||||
logger.info("Acquired uncontended linearizer lock %r for key %r",
|
||||
|
||||
@@ -22,6 +22,16 @@ import six
|
||||
|
||||
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
|
||||
|
||||
|
||||
def get_cache_factor_for(cache_name):
|
||||
env_var = "SYNAPSE_CACHE_FACTOR_" + cache_name.upper()
|
||||
factor = os.environ.get(env_var)
|
||||
if factor:
|
||||
return float(factor)
|
||||
|
||||
return CACHE_SIZE_FACTOR
|
||||
|
||||
|
||||
caches_by_name = {}
|
||||
collectors_by_name = {}
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ import logging
|
||||
|
||||
from synapse.util.async import ObservableDeferred
|
||||
from synapse.util import unwrapFirstError, logcontext
|
||||
from synapse.util.caches import CACHE_SIZE_FACTOR
|
||||
from synapse.util.caches import get_cache_factor_for
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
|
||||
from synapse.util.stringutils import to_ascii
|
||||
@@ -313,7 +313,7 @@ class CacheDescriptor(_CacheDescriptorBase):
|
||||
orig, num_args=num_args, inlineCallbacks=inlineCallbacks,
|
||||
cache_context=cache_context)
|
||||
|
||||
max_entries = int(max_entries * CACHE_SIZE_FACTOR)
|
||||
max_entries = int(max_entries * get_cache_factor_for(orig.__name__))
|
||||
|
||||
self.max_entries = max_entries
|
||||
self.tree = tree
|
||||
|
||||
@@ -13,10 +13,10 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR
|
||||
from synapse.util import caches
|
||||
|
||||
|
||||
from blist import sorteddict
|
||||
from sortedcontainers import SortedDict
|
||||
import logging
|
||||
|
||||
|
||||
@@ -32,16 +32,18 @@ class StreamChangeCache(object):
|
||||
entities that may have changed since that position. If position key is too
|
||||
old then the cache will simply return all given entities.
|
||||
"""
|
||||
def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
|
||||
self._max_size = int(max_size * CACHE_SIZE_FACTOR)
|
||||
|
||||
def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache=None):
|
||||
self._max_size = int(max_size * caches.CACHE_SIZE_FACTOR)
|
||||
self._entity_to_key = {}
|
||||
self._cache = sorteddict()
|
||||
self._cache = SortedDict()
|
||||
self._earliest_known_stream_pos = current_stream_pos
|
||||
self.name = name
|
||||
self.metrics = register_cache("cache", self.name, self._cache)
|
||||
self.metrics = caches.register_cache("cache", self.name, self._cache)
|
||||
|
||||
for entity, stream_pos in prefilled_cache.items():
|
||||
self.entity_has_changed(entity, stream_pos)
|
||||
if prefilled_cache:
|
||||
for entity, stream_pos in prefilled_cache.items():
|
||||
self.entity_has_changed(entity, stream_pos)
|
||||
|
||||
def has_entity_changed(self, entity, stream_pos):
|
||||
"""Returns True if the entity may have been updated since stream_pos
|
||||
@@ -65,22 +67,25 @@ class StreamChangeCache(object):
|
||||
return False
|
||||
|
||||
def get_entities_changed(self, entities, stream_pos):
|
||||
"""Returns subset of entities that have had new things since the
|
||||
given position. If the position is too old it will just return the given list.
|
||||
"""
|
||||
Returns subset of entities that have had new things since the given
|
||||
position. Entities unknown to the cache will be returned. If the
|
||||
position is too old it will just return the given list.
|
||||
"""
|
||||
assert type(stream_pos) is int
|
||||
|
||||
if stream_pos >= self._earliest_known_stream_pos:
|
||||
keys = self._cache.keys()
|
||||
i = keys.bisect_right(stream_pos)
|
||||
not_known_entities = set(entities) - set(self._entity_to_key)
|
||||
|
||||
result = set(
|
||||
self._cache[k] for k in keys[i:]
|
||||
).intersection(entities)
|
||||
result = (
|
||||
set(self._cache.values()[self._cache.bisect_right(stream_pos) :])
|
||||
.intersection(entities)
|
||||
.union(not_known_entities)
|
||||
)
|
||||
|
||||
self.metrics.inc_hits()
|
||||
else:
|
||||
result = entities
|
||||
result = set(entities)
|
||||
self.metrics.inc_misses()
|
||||
|
||||
return result
|
||||
@@ -90,12 +95,13 @@ class StreamChangeCache(object):
|
||||
"""
|
||||
assert type(stream_pos) is int
|
||||
|
||||
if not self._cache:
|
||||
# If we have no cache, nothing can have changed.
|
||||
return False
|
||||
|
||||
if stream_pos >= self._earliest_known_stream_pos:
|
||||
self.metrics.inc_hits()
|
||||
keys = self._cache.keys()
|
||||
i = keys.bisect_right(stream_pos)
|
||||
|
||||
return i < len(keys)
|
||||
return self._cache.bisect_right(stream_pos) < len(self._cache)
|
||||
else:
|
||||
self.metrics.inc_misses()
|
||||
return True
|
||||
@@ -107,10 +113,7 @@ class StreamChangeCache(object):
|
||||
assert type(stream_pos) is int
|
||||
|
||||
if stream_pos >= self._earliest_known_stream_pos:
|
||||
keys = self._cache.keys()
|
||||
i = keys.bisect_right(stream_pos)
|
||||
|
||||
return [self._cache[k] for k in keys[i:]]
|
||||
return self._cache.values()[self._cache.bisect_right(stream_pos) :]
|
||||
else:
|
||||
return None
|
||||
|
||||
@@ -129,8 +132,10 @@ class StreamChangeCache(object):
|
||||
self._entity_to_key[entity] = stream_pos
|
||||
|
||||
while len(self._cache) > self._max_size:
|
||||
k, r = self._cache.popitem()
|
||||
self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos)
|
||||
k, r = self._cache.popitem(0)
|
||||
self._earliest_known_stream_pos = max(
|
||||
k, self._earliest_known_stream_pos,
|
||||
)
|
||||
self._entity_to_key.pop(r, None)
|
||||
|
||||
def get_max_pos_of_last_change(self, entity):
|
||||
|
||||
@@ -0,0 +1,72 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2016 OpenMarket Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import os.path
|
||||
import shutil
|
||||
import tempfile
|
||||
import yaml
|
||||
from synapse.config.homeserver import HomeServerConfig
|
||||
from tests import unittest
|
||||
|
||||
|
||||
class ConfigLoadingTestCase(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.dir = tempfile.mkdtemp()
|
||||
print(self.dir)
|
||||
self.file = os.path.join(self.dir, "homeserver.yaml")
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.dir)
|
||||
|
||||
def test_load_fails_if_server_name_missing(self):
|
||||
self.generate_config()
|
||||
self.remove_lines_containing("server_name")
|
||||
with self.assertRaises(Exception):
|
||||
HomeServerConfig.load_config("", ["--check-config", "-c", self.file])
|
||||
with self.assertRaises(Exception):
|
||||
HomeServerConfig.load_or_generate_config("", ["--check-config", "-c", self.file])
|
||||
|
||||
def test_generated_config_passes_check(self):
|
||||
self.generate_config()
|
||||
|
||||
config = HomeServerConfig.load_config("", ["--check-config", "-c", self.file])
|
||||
config = HomeServerConfig.load_or_generate_config("", ["--check-config", "-c", self.file])
|
||||
|
||||
def test_invalid_key(self):
|
||||
self.generate_config()
|
||||
self.add_lines_to_config([
|
||||
"lemurs_key: 125123",
|
||||
])
|
||||
config = HomeServerConfig.load_config("", ["--check-config", "-c", self.file])
|
||||
|
||||
def generate_config(self):
|
||||
HomeServerConfig.load_or_generate_config("", [
|
||||
"--generate-config",
|
||||
"-c", self.file,
|
||||
"--report-stats=yes",
|
||||
"-H", "lemurs.win"
|
||||
])
|
||||
|
||||
def remove_lines_containing(self, needle):
|
||||
with open(self.file, "r") as f:
|
||||
contents = f.readlines()
|
||||
contents = [l for l in contents if needle not in l]
|
||||
with open(self.file, "w") as f:
|
||||
f.write("".join(contents))
|
||||
|
||||
def add_lines_to_config(self, lines):
|
||||
with open(self.file, "a") as f:
|
||||
for line in lines:
|
||||
f.write(line + "\n")
|
||||
@@ -19,7 +19,6 @@ from twisted.internet import defer
|
||||
from mock import Mock, patch
|
||||
|
||||
from synapse.util.distributor import Distributor
|
||||
from synapse.util.async import run_on_reactor
|
||||
|
||||
|
||||
class DistributorTestCase(unittest.TestCase):
|
||||
@@ -95,7 +94,6 @@ class DistributorTestCase(unittest.TestCase):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def observer():
|
||||
yield run_on_reactor()
|
||||
raise MyException("Oopsie")
|
||||
|
||||
self.dist.observe("whail", observer)
|
||||
|
||||
@@ -0,0 +1,151 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse import event_auth
|
||||
from synapse.api.errors import AuthError
|
||||
from synapse.events import FrozenEvent
|
||||
import unittest
|
||||
|
||||
|
||||
class EventAuthTestCase(unittest.TestCase):
|
||||
def test_random_users_cannot_send_state_before_first_pl(self):
|
||||
"""
|
||||
Check that, before the first PL lands, the creator is the only user
|
||||
that can send a state event.
|
||||
"""
|
||||
creator = "@creator:example.com"
|
||||
joiner = "@joiner:example.com"
|
||||
auth_events = {
|
||||
("m.room.create", ""): _create_event(creator),
|
||||
("m.room.member", creator): _join_event(creator),
|
||||
("m.room.member", joiner): _join_event(joiner),
|
||||
}
|
||||
|
||||
# creator should be able to send state
|
||||
event_auth.check(
|
||||
_random_state_event(creator), auth_events,
|
||||
do_sig_check=False,
|
||||
)
|
||||
|
||||
# joiner should not be able to send state
|
||||
self.assertRaises(
|
||||
AuthError,
|
||||
event_auth.check,
|
||||
_random_state_event(joiner),
|
||||
auth_events,
|
||||
do_sig_check=False,
|
||||
),
|
||||
|
||||
def test_state_default_level(self):
|
||||
"""
|
||||
Check that users above the state_default level can send state and
|
||||
those below cannot
|
||||
"""
|
||||
creator = "@creator:example.com"
|
||||
pleb = "@joiner:example.com"
|
||||
king = "@joiner2:example.com"
|
||||
|
||||
auth_events = {
|
||||
("m.room.create", ""): _create_event(creator),
|
||||
("m.room.member", creator): _join_event(creator),
|
||||
("m.room.power_levels", ""): _power_levels_event(creator, {
|
||||
"state_default": "30",
|
||||
"users": {
|
||||
pleb: "29",
|
||||
king: "30",
|
||||
},
|
||||
}),
|
||||
("m.room.member", pleb): _join_event(pleb),
|
||||
("m.room.member", king): _join_event(king),
|
||||
}
|
||||
|
||||
# pleb should not be able to send state
|
||||
self.assertRaises(
|
||||
AuthError,
|
||||
event_auth.check,
|
||||
_random_state_event(pleb),
|
||||
auth_events,
|
||||
do_sig_check=False,
|
||||
),
|
||||
|
||||
# king should be able to send state
|
||||
event_auth.check(
|
||||
_random_state_event(king), auth_events,
|
||||
do_sig_check=False,
|
||||
)
|
||||
|
||||
|
||||
# helpers for making events
|
||||
|
||||
TEST_ROOM_ID = "!test:room"
|
||||
|
||||
|
||||
def _create_event(user_id):
|
||||
return FrozenEvent({
|
||||
"room_id": TEST_ROOM_ID,
|
||||
"event_id": _get_event_id(),
|
||||
"type": "m.room.create",
|
||||
"sender": user_id,
|
||||
"content": {
|
||||
"creator": user_id,
|
||||
},
|
||||
})
|
||||
|
||||
|
||||
def _join_event(user_id):
|
||||
return FrozenEvent({
|
||||
"room_id": TEST_ROOM_ID,
|
||||
"event_id": _get_event_id(),
|
||||
"type": "m.room.member",
|
||||
"sender": user_id,
|
||||
"state_key": user_id,
|
||||
"content": {
|
||||
"membership": "join",
|
||||
},
|
||||
})
|
||||
|
||||
|
||||
def _power_levels_event(sender, content):
|
||||
return FrozenEvent({
|
||||
"room_id": TEST_ROOM_ID,
|
||||
"event_id": _get_event_id(),
|
||||
"type": "m.room.power_levels",
|
||||
"sender": sender,
|
||||
"state_key": "",
|
||||
"content": content,
|
||||
})
|
||||
|
||||
|
||||
def _random_state_event(sender):
|
||||
return FrozenEvent({
|
||||
"room_id": TEST_ROOM_ID,
|
||||
"event_id": _get_event_id(),
|
||||
"type": "test.state",
|
||||
"sender": sender,
|
||||
"state_key": "",
|
||||
"content": {
|
||||
"membership": "join",
|
||||
},
|
||||
})
|
||||
|
||||
|
||||
event_count = 0
|
||||
|
||||
|
||||
def _get_event_id():
|
||||
global event_count
|
||||
c = event_count
|
||||
event_count += 1
|
||||
return "!%i:example.com" % (c, )
|
||||
+14
-2
@@ -606,6 +606,14 @@ class StateTestCase(unittest.TestCase):
|
||||
}
|
||||
)
|
||||
|
||||
power_levels = create_event(
|
||||
type=EventTypes.PowerLevels, state_key="",
|
||||
content={"users": {
|
||||
"@foo:bar": "100",
|
||||
"@user_id:example.com": "100",
|
||||
}}
|
||||
)
|
||||
|
||||
creation = create_event(
|
||||
type=EventTypes.Create, state_key="",
|
||||
content={"creator": "@foo:bar"}
|
||||
@@ -613,12 +621,14 @@ class StateTestCase(unittest.TestCase):
|
||||
|
||||
old_state_1 = [
|
||||
creation,
|
||||
power_levels,
|
||||
member_event,
|
||||
create_event(type="test1", state_key="1", depth=1),
|
||||
]
|
||||
|
||||
old_state_2 = [
|
||||
creation,
|
||||
power_levels,
|
||||
member_event,
|
||||
create_event(type="test1", state_key="1", depth=2),
|
||||
]
|
||||
@@ -633,7 +643,7 @@ class StateTestCase(unittest.TestCase):
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
old_state_2[2].event_id, context.current_state_ids[("test1", "1")]
|
||||
old_state_2[3].event_id, context.current_state_ids[("test1", "1")]
|
||||
)
|
||||
|
||||
# Reverse the depth to make sure we are actually using the depths
|
||||
@@ -641,12 +651,14 @@ class StateTestCase(unittest.TestCase):
|
||||
|
||||
old_state_1 = [
|
||||
creation,
|
||||
power_levels,
|
||||
member_event,
|
||||
create_event(type="test1", state_key="1", depth=2),
|
||||
]
|
||||
|
||||
old_state_2 = [
|
||||
creation,
|
||||
power_levels,
|
||||
member_event,
|
||||
create_event(type="test1", state_key="1", depth=1),
|
||||
]
|
||||
@@ -659,7 +671,7 @@ class StateTestCase(unittest.TestCase):
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
old_state_1[2].event_id, context.current_state_ids[("test1", "1")]
|
||||
old_state_1[3].event_id, context.current_state_ids[("test1", "1")]
|
||||
)
|
||||
|
||||
def _get_context(self, event, prev_event_id_1, old_state_1, prev_event_id_2,
|
||||
|
||||
@@ -18,7 +18,6 @@ import logging
|
||||
|
||||
import mock
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.util import async
|
||||
from synapse.util import logcontext
|
||||
from twisted.internet import defer
|
||||
from synapse.util.caches import descriptors
|
||||
@@ -195,7 +194,6 @@ class DescriptorTestCase(unittest.TestCase):
|
||||
def fn(self, arg1):
|
||||
@defer.inlineCallbacks
|
||||
def inner_fn():
|
||||
yield async.run_on_reactor()
|
||||
raise SynapseError(400, "blah")
|
||||
|
||||
return inner_fn()
|
||||
|
||||
@@ -0,0 +1,198 @@
|
||||
from tests import unittest
|
||||
from mock import patch
|
||||
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
|
||||
|
||||
class StreamChangeCacheTests(unittest.TestCase):
|
||||
"""
|
||||
Tests for StreamChangeCache.
|
||||
"""
|
||||
|
||||
def test_prefilled_cache(self):
|
||||
"""
|
||||
Providing a prefilled cache to StreamChangeCache will result in a cache
|
||||
with the prefilled-cache entered in.
|
||||
"""
|
||||
cache = StreamChangeCache("#test", 1, prefilled_cache={"user@foo.com": 2})
|
||||
self.assertTrue(cache.has_entity_changed("user@foo.com", 1))
|
||||
|
||||
def test_has_entity_changed(self):
|
||||
"""
|
||||
StreamChangeCache.entity_has_changed will mark entities as changed, and
|
||||
has_entity_changed will observe the changed entities.
|
||||
"""
|
||||
cache = StreamChangeCache("#test", 3)
|
||||
|
||||
cache.entity_has_changed("user@foo.com", 6)
|
||||
cache.entity_has_changed("bar@baz.net", 7)
|
||||
|
||||
# If it's been changed after that stream position, return True
|
||||
self.assertTrue(cache.has_entity_changed("user@foo.com", 4))
|
||||
self.assertTrue(cache.has_entity_changed("bar@baz.net", 4))
|
||||
|
||||
# If it's been changed at that stream position, return False
|
||||
self.assertFalse(cache.has_entity_changed("user@foo.com", 6))
|
||||
|
||||
# If there's no changes after that stream position, return False
|
||||
self.assertFalse(cache.has_entity_changed("user@foo.com", 7))
|
||||
|
||||
# If the entity does not exist, return False.
|
||||
self.assertFalse(cache.has_entity_changed("not@here.website", 7))
|
||||
|
||||
# If we request before the stream cache's earliest known position,
|
||||
# return True, whether it's a known entity or not.
|
||||
self.assertTrue(cache.has_entity_changed("user@foo.com", 0))
|
||||
self.assertTrue(cache.has_entity_changed("not@here.website", 0))
|
||||
|
||||
@patch("synapse.util.caches.CACHE_SIZE_FACTOR", 1.0)
|
||||
def test_has_entity_changed_pops_off_start(self):
|
||||
"""
|
||||
StreamChangeCache.entity_has_changed will respect the max size and
|
||||
purge the oldest items upon reaching that max size.
|
||||
"""
|
||||
cache = StreamChangeCache("#test", 1, max_size=2)
|
||||
|
||||
cache.entity_has_changed("user@foo.com", 2)
|
||||
cache.entity_has_changed("bar@baz.net", 3)
|
||||
cache.entity_has_changed("user@elsewhere.org", 4)
|
||||
|
||||
# The cache is at the max size, 2
|
||||
self.assertEqual(len(cache._cache), 2)
|
||||
|
||||
# The oldest item has been popped off
|
||||
self.assertTrue("user@foo.com" not in cache._entity_to_key)
|
||||
|
||||
# If we update an existing entity, it keeps the two existing entities
|
||||
cache.entity_has_changed("bar@baz.net", 5)
|
||||
self.assertEqual(
|
||||
set(["bar@baz.net", "user@elsewhere.org"]), set(cache._entity_to_key)
|
||||
)
|
||||
|
||||
def test_get_all_entities_changed(self):
|
||||
"""
|
||||
StreamChangeCache.get_all_entities_changed will return all changed
|
||||
entities since the given position. If the position is before the start
|
||||
of the known stream, it returns None instead.
|
||||
"""
|
||||
cache = StreamChangeCache("#test", 1)
|
||||
|
||||
cache.entity_has_changed("user@foo.com", 2)
|
||||
cache.entity_has_changed("bar@baz.net", 3)
|
||||
cache.entity_has_changed("user@elsewhere.org", 4)
|
||||
|
||||
self.assertEqual(
|
||||
cache.get_all_entities_changed(1),
|
||||
["user@foo.com", "bar@baz.net", "user@elsewhere.org"],
|
||||
)
|
||||
self.assertEqual(
|
||||
cache.get_all_entities_changed(2), ["bar@baz.net", "user@elsewhere.org"]
|
||||
)
|
||||
self.assertEqual(cache.get_all_entities_changed(3), ["user@elsewhere.org"])
|
||||
self.assertEqual(cache.get_all_entities_changed(0), None)
|
||||
|
||||
def test_has_any_entity_changed(self):
|
||||
"""
|
||||
StreamChangeCache.has_any_entity_changed will return True if any
|
||||
entities have been changed since the provided stream position, and
|
||||
False if they have not. If the cache has entries and the provided
|
||||
stream position is before it, it will return True, otherwise False if
|
||||
the cache has no entries.
|
||||
"""
|
||||
cache = StreamChangeCache("#test", 1)
|
||||
|
||||
# With no entities, it returns False for the past, present, and future.
|
||||
self.assertFalse(cache.has_any_entity_changed(0))
|
||||
self.assertFalse(cache.has_any_entity_changed(1))
|
||||
self.assertFalse(cache.has_any_entity_changed(2))
|
||||
|
||||
# We add an entity
|
||||
cache.entity_has_changed("user@foo.com", 2)
|
||||
|
||||
# With an entity, it returns True for the past, the stream start
|
||||
# position, and False for the stream position the entity was changed
|
||||
# on and ones after it.
|
||||
self.assertTrue(cache.has_any_entity_changed(0))
|
||||
self.assertTrue(cache.has_any_entity_changed(1))
|
||||
self.assertFalse(cache.has_any_entity_changed(2))
|
||||
self.assertFalse(cache.has_any_entity_changed(3))
|
||||
|
||||
def test_get_entities_changed(self):
|
||||
"""
|
||||
StreamChangeCache.get_entities_changed will return the entities in the
|
||||
given list that have changed since the provided stream ID. If the
|
||||
stream position is earlier than the earliest known position, it will
|
||||
return all of the entities queried for.
|
||||
"""
|
||||
cache = StreamChangeCache("#test", 1)
|
||||
|
||||
cache.entity_has_changed("user@foo.com", 2)
|
||||
cache.entity_has_changed("bar@baz.net", 3)
|
||||
cache.entity_has_changed("user@elsewhere.org", 4)
|
||||
|
||||
# Query all the entries, but mid-way through the stream. We should only
|
||||
# get the ones after that point.
|
||||
self.assertEqual(
|
||||
cache.get_entities_changed(
|
||||
["user@foo.com", "bar@baz.net", "user@elsewhere.org"], stream_pos=2
|
||||
),
|
||||
set(["bar@baz.net", "user@elsewhere.org"]),
|
||||
)
|
||||
|
||||
# Query all the entries mid-way through the stream, but include one
|
||||
# that doesn't exist in it. We should get back the one that doesn't
|
||||
# exist, too.
|
||||
self.assertEqual(
|
||||
cache.get_entities_changed(
|
||||
[
|
||||
"user@foo.com",
|
||||
"bar@baz.net",
|
||||
"user@elsewhere.org",
|
||||
"not@here.website",
|
||||
],
|
||||
stream_pos=2,
|
||||
),
|
||||
set(["bar@baz.net", "user@elsewhere.org", "not@here.website"]),
|
||||
)
|
||||
|
||||
# Query all the entries, but before the first known point. We will get
|
||||
# all the entries we queried for, including ones that don't exist.
|
||||
self.assertEqual(
|
||||
cache.get_entities_changed(
|
||||
[
|
||||
"user@foo.com",
|
||||
"bar@baz.net",
|
||||
"user@elsewhere.org",
|
||||
"not@here.website",
|
||||
],
|
||||
stream_pos=0,
|
||||
),
|
||||
set(
|
||||
[
|
||||
"user@foo.com",
|
||||
"bar@baz.net",
|
||||
"user@elsewhere.org",
|
||||
"not@here.website",
|
||||
]
|
||||
),
|
||||
)
|
||||
|
||||
def test_max_pos(self):
|
||||
"""
|
||||
StreamChangeCache.get_max_pos_of_last_change will return the most
|
||||
recent point where the entity could have changed. If the entity is not
|
||||
known, the stream start is provided instead.
|
||||
"""
|
||||
cache = StreamChangeCache("#test", 1)
|
||||
|
||||
cache.entity_has_changed("user@foo.com", 2)
|
||||
cache.entity_has_changed("bar@baz.net", 3)
|
||||
cache.entity_has_changed("user@elsewhere.org", 4)
|
||||
|
||||
# Known entities will return the point where they were changed.
|
||||
self.assertEqual(cache.get_max_pos_of_last_change("user@foo.com"), 2)
|
||||
self.assertEqual(cache.get_max_pos_of_last_change("bar@baz.net"), 3)
|
||||
self.assertEqual(cache.get_max_pos_of_last_change("user@elsewhere.org"), 4)
|
||||
|
||||
# Unknown entities will return the stream start position.
|
||||
self.assertEqual(cache.get_max_pos_of_last_change("not@here.website"), 1)
|
||||
@@ -52,33 +52,41 @@ commands =
|
||||
/usr/bin/find "{toxinidir}" -name '*.pyc' -delete
|
||||
coverage run {env:COVERAGE_OPTS:} --source="{toxinidir}/synapse" \
|
||||
"{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests/config \
|
||||
tests/appservice/test_scheduler.py \
|
||||
tests/api/test_filtering.py \
|
||||
tests/api/test_ratelimiting.py \
|
||||
tests/appservice \
|
||||
tests/crypto \
|
||||
tests/events \
|
||||
tests/handlers/test_appservice.py \
|
||||
tests/handlers/test_auth.py \
|
||||
tests/handlers/test_device.py \
|
||||
tests/handlers/test_directory.py \
|
||||
tests/handlers/test_e2e_keys.py \
|
||||
tests/handlers/test_presence.py \
|
||||
tests/handlers/test_profile.py \
|
||||
tests/handlers/test_register.py \
|
||||
tests/replication/slave/storage/test_account_data.py \
|
||||
tests/replication/slave/storage/test_receipts.py \
|
||||
tests/storage/test_appservice.py \
|
||||
tests/storage/test_background_update.py \
|
||||
tests/storage/test_base.py \
|
||||
tests/storage/test__base.py \
|
||||
tests/storage/test_client_ips.py \
|
||||
tests/storage/test_devices.py \
|
||||
tests/storage/test_end_to_end_keys.py \
|
||||
tests/storage/test_event_push_actions.py \
|
||||
tests/storage/test_keys.py \
|
||||
tests/storage/test_presence.py \
|
||||
tests/storage/test_profile.py \
|
||||
tests/storage/test_registration.py \
|
||||
tests/storage/test_room.py \
|
||||
tests/storage/test_user_directory.py \
|
||||
tests/test_distributor.py \
|
||||
tests/test_dns.py \
|
||||
tests/test_preview.py \
|
||||
tests/test_test_utils.py \
|
||||
tests/test_types.py \
|
||||
tests/util/test_dict_cache.py \
|
||||
tests/util/test_expiring_cache.py \
|
||||
tests/util/test_file_consumer.py \
|
||||
tests/util/test_limiter.py \
|
||||
tests/util/test_linearizer.py \
|
||||
tests/util/test_logcontext.py \
|
||||
tests/util/test_logformatter.py \
|
||||
tests/util/test_rwlock.py \
|
||||
tests/util/test_snapshot_cache.py \
|
||||
tests/util/test_wheel_timer.py} \
|
||||
tests/util} \
|
||||
{env:TOXSUFFIX:}
|
||||
{env:DUMP_COVERAGE_COMMAND:coverage report -m}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user