Compare commits
44 Commits
v0.28.1
...
erik-hacke
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f56db48f67 | ||
|
|
dd7f1b10ce | ||
|
|
661a4d4603 | ||
|
|
a401750ef4 | ||
|
|
72c673407f | ||
|
|
488a2696b6 | ||
|
|
052df50e49 | ||
|
|
3633ee8c63 | ||
|
|
912c456d60 | ||
|
|
d05575a574 | ||
|
|
2fb6d46042 | ||
|
|
c7fa6672d9 | ||
|
|
36c43eef17 | ||
|
|
411c437fc6 | ||
|
|
6783bc0ded | ||
|
|
295973e3a9 | ||
|
|
3b78452c3a | ||
|
|
1f732e05d0 | ||
|
|
9ed48cecbc | ||
|
|
cc9d4a4d3e | ||
|
|
889803a78e | ||
|
|
22245fb8a7 | ||
|
|
78c5eca141 | ||
|
|
55bef59cc7 | ||
|
|
ad683cd203 | ||
|
|
d66afef01e | ||
|
|
b07a33f024 | ||
|
|
74539aa25b | ||
|
|
8f25cd6627 | ||
|
|
dc1299a4b0 | ||
|
|
2c72d66cda | ||
|
|
cde90a89ed | ||
|
|
5aec53ad95 | ||
|
|
d10d19f0ad | ||
|
|
daec1d77be | ||
|
|
61885f7849 | ||
|
|
ba30d489d9 | ||
|
|
5e2d0650df | ||
|
|
841bcbcafa | ||
|
|
08a6b88e3d | ||
|
|
aece8e73b1 | ||
|
|
328bd35e00 | ||
|
|
7de9a28b8e | ||
|
|
2a9c3aea89 |
91
CHANGES.rst
91
CHANGES.rst
@@ -1,101 +1,15 @@
|
||||
Changes in synapse v0.28.1 (2018-05-01)
|
||||
=======================================
|
||||
|
||||
SECURITY UPDATE
|
||||
|
||||
* Clamp the allowed values of event depth received over federation to be
|
||||
[0, 2**63 - 1]. This mitigates an attack where malicious events
|
||||
injected with depth = 2**63 - 1 render rooms unusable. Depth is used to
|
||||
determine the cosmetic ordering of events within a room, and so the ordering
|
||||
of events in such a room will default to using stream_ordering rather than depth
|
||||
(topological_ordering).
|
||||
|
||||
This is a temporary solution to mitigate abuse in the wild, whilst a long solution
|
||||
is being implemented to improve how the depth parameter is used.
|
||||
|
||||
Full details at
|
||||
https://docs.google.com/document/d/1I3fi2S-XnpO45qrpCsowZv8P8dHcNZ4fsBsbOW7KABI/edit#
|
||||
|
||||
* Pin Twisted to <18.4 until we stop using the private _OpenSSLECCurve API.
|
||||
|
||||
|
||||
Changes in synapse v0.28.0 (2018-04-26)
|
||||
=======================================
|
||||
|
||||
Bug Fixes:
|
||||
|
||||
* Fix quarantine media admin API and search reindex (PR #3130)
|
||||
* Fix media admin APIs (PR #3134)
|
||||
|
||||
|
||||
Changes in synapse v0.28.0-rc1 (2018-04-24)
|
||||
===========================================
|
||||
|
||||
Minor performance improvement to federation sending and bug fixes.
|
||||
|
||||
(Note: This release does not include state resolutions discussed in matrix live)
|
||||
|
||||
Features:
|
||||
|
||||
* Add metrics for event processing lag (PR #3090)
|
||||
* Add metrics for ResponseCache (PR #3092)
|
||||
|
||||
Changes:
|
||||
|
||||
* Synapse on PyPy (PR #2760) Thanks to @Valodim!
|
||||
* move handling of auto_join_rooms to RegisterHandler (PR #2996) Thanks to @krombel!
|
||||
* Improve handling of SRV records for federation connections (PR #3016) Thanks to @silkeh!
|
||||
* Document the behaviour of ResponseCache (PR #3059)
|
||||
* Preparation for py3 (PR #3061, #3073, #3074, #3075, #3103, #3104, #3106, #3107, #3109, #3110) Thanks to @NotAFile!
|
||||
* update prometheus dashboard to use new metric names (PR #3069) Thanks to @krombel!
|
||||
* use python3-compatible prints (PR #3074) Thanks to @NotAFile!
|
||||
* Send federation events concurrently (PR #3078)
|
||||
* Limit concurrent event sends for a room (PR #3079)
|
||||
* Improve R30 stat definition (PR #3086)
|
||||
* Send events to ASes concurrently (PR #3088)
|
||||
* Refactor ResponseCache usage (PR #3093)
|
||||
* Clarify that SRV may not point to a CNAME (PR #3100) Thanks to @silkeh!
|
||||
* Use str(e) instead of e.message (PR #3103) Thanks to @NotAFile!
|
||||
* Use six.itervalues in some places (PR #3106) Thanks to @NotAFile!
|
||||
* Refactor store.have_events (PR #3117)
|
||||
|
||||
Bug Fixes:
|
||||
|
||||
* Return 401 for invalid access_token on logout (PR #2938) Thanks to @dklug!
|
||||
* Return a 404 rather than a 500 on rejoining empty rooms (PR #3080)
|
||||
* fix federation_domain_whitelist (PR #3099)
|
||||
* Avoid creating events with huge numbers of prev_events (PR #3113)
|
||||
* Reject events which have lots of prev_events (PR #3118)
|
||||
|
||||
|
||||
Changes in synapse v0.27.4 (2018-04-13)
|
||||
======================================
|
||||
|
||||
Changes:
|
||||
|
||||
* Update canonicaljson dependency (#3095)
|
||||
|
||||
|
||||
Changes in synapse v0.27.3 (2018-04-11)
|
||||
======================================
|
||||
|
||||
Bug fixes:
|
||||
|
||||
* URL quote path segments over federation (#3082)
|
||||
|
||||
Changes in synapse v0.27.3-rc2 (2018-04-09)
|
||||
==========================================
|
||||
|
||||
v0.27.3-rc1 used a stale version of the develop branch so the changelog overstates
|
||||
the functionality. v0.27.3-rc2 is up to date, rc1 should be ignored.
|
||||
|
||||
Changes in synapse v0.27.3-rc1 (2018-04-09)
|
||||
=======================================
|
||||
|
||||
Notable changes include API support for joinability of groups. Also new metrics
|
||||
Notable changes include API support for joinability of groups. Also new metrics
|
||||
and phone home stats. Phone home stats include better visibility of system usage
|
||||
so we can tweak synpase to work better for all users rather than our own experience
|
||||
with matrix.org. Also, recording 'r30' stat which is the measure we use to track
|
||||
with matrix.org. Also, recording 'r30' stat which is the measure we use to track
|
||||
overal growth of the Matrix ecosystem. It is defined as:-
|
||||
|
||||
Counts the number of native 30 day retained users, defined as:-
|
||||
@@ -131,6 +45,7 @@ Bug fixes:
|
||||
|
||||
* Add room_id to the response of `rooms/{roomId}/join` (PR #2986) Thanks to @jplatte!
|
||||
* Fix replication after switch to simplejson (PR #3015)
|
||||
* Fix replication after switch to simplejson (PR #3015)
|
||||
* 404 correctly on missing paths via NoResource (PR #3022)
|
||||
* Fix error when claiming e2e keys from offline servers (PR #3034)
|
||||
* fix tests/storage/test_user_directory.py (PR #3042)
|
||||
|
||||
@@ -157,8 +157,8 @@ if you prefer.
|
||||
|
||||
In case of problems, please see the _`Troubleshooting` section below.
|
||||
|
||||
Alternatively, Andreas Peters (previously Silvio Fricke) has contributed a Dockerfile to automate the
|
||||
above in Docker at https://hub.docker.com/r/avhost/docker-matrix/tags/
|
||||
Alternatively, Silvio Fricke has contributed a Dockerfile to automate the
|
||||
above in Docker at https://registry.hub.docker.com/u/silviof/docker-matrix/.
|
||||
|
||||
Also, Martin Giess has created an auto-deployment process with vagrant/ansible,
|
||||
tested with VirtualBox/AWS/DigitalOcean - see https://github.com/EMnify/matrix-synapse-auto-deploy
|
||||
@@ -614,9 +614,6 @@ should have the format ``_matrix._tcp.<yourdomain.com> <ttl> IN SRV 10 0 <port>
|
||||
$ dig -t srv _matrix._tcp.example.com
|
||||
_matrix._tcp.example.com. 3600 IN SRV 10 0 8448 synapse.example.com.
|
||||
|
||||
Note that the server hostname cannot be an alias (CNAME record): it has to point
|
||||
directly to the server hosting the synapse instance.
|
||||
|
||||
You can then configure your homeserver to use ``<yourdomain.com>`` as the domain in
|
||||
its user-ids, by setting ``server_name``::
|
||||
|
||||
|
||||
@@ -1,10 +0,0 @@
|
||||
Community Contributions
|
||||
=======================
|
||||
|
||||
Everything in this directory are projects submitted by the community that may be useful
|
||||
to others. As such, the project maintainers cannot guarantee support, stability
|
||||
or backwards compatibility of these projects.
|
||||
|
||||
Files in this directory should *not* be relied on directly, as they may not
|
||||
continue to work or exist in future. If you wish to use any of these files then
|
||||
they should be copied to avoid them breaking from underneath you.
|
||||
@@ -2,9 +2,6 @@
|
||||
# (e.g. https://www.archlinux.org/packages/community/any/matrix-synapse/ for ArchLinux)
|
||||
# rather than in a user home directory or similar under virtualenv.
|
||||
|
||||
# **NOTE:** This is an example service file that may change in the future. If you
|
||||
# wish to use this please copy rather than symlink it.
|
||||
|
||||
[Unit]
|
||||
Description=Synapse Matrix homeserver
|
||||
|
||||
@@ -15,7 +12,6 @@ Group=synapse
|
||||
WorkingDirectory=/var/lib/synapse
|
||||
ExecStart=/usr/bin/python2.7 -m synapse.app.homeserver --config-path=/etc/synapse/homeserver.yaml
|
||||
ExecStop=/usr/bin/synctl stop /etc/synapse/homeserver.yaml
|
||||
# EnvironmentFile=-/etc/sysconfig/synapse # Can be used to e.g. set SYNAPSE_CACHE_FACTOR
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
|
||||
@@ -16,4 +16,4 @@
|
||||
""" This is a reference implementation of a Matrix home server.
|
||||
"""
|
||||
|
||||
__version__ = "0.28.1"
|
||||
__version__ = "0.27.3-rc2"
|
||||
|
||||
@@ -16,9 +16,6 @@
|
||||
|
||||
"""Contains constants from the specification."""
|
||||
|
||||
# the "depth" field on events is limited to 2**63 - 1
|
||||
MAX_DEPTH = 2**63 - 1
|
||||
|
||||
|
||||
class Membership(object):
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@
|
||||
import logging
|
||||
|
||||
import simplejson as json
|
||||
from six import iteritems
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -298,7 +297,7 @@ def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
|
||||
A dict representing the error response JSON.
|
||||
"""
|
||||
err = {"error": msg, "errcode": code}
|
||||
for key, value in iteritems(kwargs):
|
||||
for key, value in kwargs.iteritems():
|
||||
err[key] = value
|
||||
return err
|
||||
|
||||
|
||||
@@ -36,6 +36,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||
from synapse.replication.slave.storage.devices import SlavedDeviceStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns
|
||||
from synapse.rest.client.v2_alpha._base import client_v2_patterns
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
@@ -49,6 +50,35 @@ from twisted.web.resource import NoResource
|
||||
logger = logging.getLogger("synapse.app.frontend_proxy")
|
||||
|
||||
|
||||
class PresenceStatusStubServlet(ClientV1RestServlet):
|
||||
PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status")
|
||||
|
||||
def __init__(self, hs):
|
||||
super(PresenceStatusStubServlet, self).__init__(hs)
|
||||
self.http_client = hs.get_simple_http_client()
|
||||
self.auth = hs.get_auth()
|
||||
self.main_uri = hs.config.worker_main_http_uri
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, user_id):
|
||||
# Pass through the auth headers, if any, in case the access token
|
||||
# is there.
|
||||
auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
|
||||
headers = {
|
||||
"Authorization": auth_headers,
|
||||
}
|
||||
result = yield self.http_client.get_json(
|
||||
self.main_uri + request.uri,
|
||||
headers=headers,
|
||||
)
|
||||
defer.returnValue((200, result))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_PUT(self, request, user_id):
|
||||
yield self.auth.get_user_by_req(request)
|
||||
defer.returnValue((200, {}))
|
||||
|
||||
|
||||
class KeyUploadServlet(RestServlet):
|
||||
PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
|
||||
|
||||
@@ -135,6 +165,7 @@ class FrontendProxyServer(HomeServer):
|
||||
elif name == "client":
|
||||
resource = JsonResource(self, canonical_json=False)
|
||||
KeyUploadServlet(self).register(resource)
|
||||
PresenceStatusStubServlet(self).register(resource)
|
||||
resources.update({
|
||||
"/_matrix/client/r0": resource,
|
||||
"/_matrix/client/unstable": resource,
|
||||
|
||||
@@ -58,8 +58,6 @@ from synapse.util.versionstring import get_version_string
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
from six import iteritems
|
||||
|
||||
logger = logging.getLogger("synapse.app.synchrotron")
|
||||
|
||||
|
||||
@@ -115,6 +113,7 @@ class SynchrotronPresence(object):
|
||||
logger.info("Presence process_id is %r", self.process_id)
|
||||
|
||||
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
|
||||
return
|
||||
self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
|
||||
|
||||
def mark_as_coming_online(self, user_id):
|
||||
@@ -212,8 +211,10 @@ class SynchrotronPresence(object):
|
||||
yield self.notify_from_replication(states, stream_id)
|
||||
|
||||
def get_currently_syncing_users(self):
|
||||
# presence is disabled on matrix.org, so we return the empty set
|
||||
return set()
|
||||
return [
|
||||
user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
|
||||
user_id for user_id, count in self.user_to_num_current_syncs.iteritems()
|
||||
if count > 0
|
||||
]
|
||||
|
||||
|
||||
@@ -108,7 +108,7 @@ def stop(pidfile, app):
|
||||
|
||||
|
||||
Worker = collections.namedtuple("Worker", [
|
||||
"app", "configfile", "pidfile", "cache_factor"
|
||||
"app", "configfile", "pidfile", "cache_factor", "cache_factors",
|
||||
])
|
||||
|
||||
|
||||
@@ -171,6 +171,10 @@ def main():
|
||||
if cache_factor:
|
||||
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
|
||||
|
||||
cache_factors = config.get("synctl_cache_factors", {})
|
||||
for cache_name, factor in cache_factors.iteritems():
|
||||
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
|
||||
|
||||
worker_configfiles = []
|
||||
if options.worker:
|
||||
start_stop_synapse = False
|
||||
@@ -211,6 +215,10 @@ def main():
|
||||
or pidfile
|
||||
)
|
||||
worker_cache_factor = worker_config.get("synctl_cache_factor") or cache_factor
|
||||
worker_cache_factors = (
|
||||
worker_config.get("synctl_cache_factors")
|
||||
or cache_factors
|
||||
)
|
||||
daemonize = worker_config.get("daemonize") or config.get("daemonize")
|
||||
assert daemonize, "Main process must have daemonize set to true"
|
||||
|
||||
@@ -226,8 +234,10 @@ def main():
|
||||
assert worker_daemonize, "In config %r: expected '%s' to be True" % (
|
||||
worker_configfile, "worker_daemonize")
|
||||
worker_cache_factor = worker_config.get("synctl_cache_factor")
|
||||
worker_cache_factors = worker_config.get("synctl_cache_factors", {})
|
||||
workers.append(Worker(
|
||||
worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
|
||||
worker_cache_factors,
|
||||
))
|
||||
|
||||
action = options.action
|
||||
@@ -262,15 +272,19 @@ def main():
|
||||
start(configfile)
|
||||
|
||||
for worker in workers:
|
||||
env = os.environ.copy()
|
||||
|
||||
if worker.cache_factor:
|
||||
os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
|
||||
|
||||
for cache_name, factor in worker.cache_factors.iteritems():
|
||||
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
|
||||
|
||||
start_worker(worker.app, configfile, worker.configfile)
|
||||
|
||||
if cache_factor:
|
||||
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
|
||||
else:
|
||||
os.environ.pop("SYNAPSE_CACHE_FACTOR", None)
|
||||
# Reset env back to the original
|
||||
os.environ.clear()
|
||||
os.environ.update(env)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -18,6 +18,7 @@ from synapse.api.constants import ThirdPartyEntityKind
|
||||
from synapse.api.errors import CodeMessageException
|
||||
from synapse.http.client import SimpleHttpClient
|
||||
from synapse.events.utils import serialize_event
|
||||
from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
from synapse.types import ThirdPartyInstanceID
|
||||
|
||||
@@ -193,7 +194,12 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
defer.returnValue(None)
|
||||
|
||||
key = (service.id, protocol)
|
||||
return self.protocol_meta_cache.wrap(key, _get)
|
||||
result = self.protocol_meta_cache.get(key)
|
||||
if not result:
|
||||
result = self.protocol_meta_cache.set(
|
||||
key, preserve_fn(_get)()
|
||||
)
|
||||
return make_deferred_yieldable(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def push_bulk(self, service, events, txn_id=None):
|
||||
|
||||
@@ -352,7 +352,7 @@ class Keyring(object):
|
||||
logger.exception(
|
||||
"Unable to get key from %r: %s %s",
|
||||
perspective_name,
|
||||
type(e).__name__, str(e),
|
||||
type(e).__name__, str(e.message),
|
||||
)
|
||||
defer.returnValue({})
|
||||
|
||||
@@ -384,7 +384,7 @@ class Keyring(object):
|
||||
logger.info(
|
||||
"Unable to get key %r for %r directly: %s %s",
|
||||
key_ids, server_name,
|
||||
type(e).__name__, str(e),
|
||||
type(e).__name__, str(e.message),
|
||||
)
|
||||
|
||||
if not keys:
|
||||
@@ -734,7 +734,7 @@ def _handle_key_deferred(verify_request):
|
||||
except IOError as e:
|
||||
logger.warn(
|
||||
"Got IOError when downloading keys for %s: %s %s",
|
||||
server_name, type(e).__name__, str(e),
|
||||
server_name, type(e).__name__, str(e.message),
|
||||
)
|
||||
raise SynapseError(
|
||||
502,
|
||||
@@ -744,7 +744,7 @@ def _handle_key_deferred(verify_request):
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"Got Exception when downloading keys for %s: %s %s",
|
||||
server_name, type(e).__name__, str(e),
|
||||
server_name, type(e).__name__, str(e.message),
|
||||
)
|
||||
raise SynapseError(
|
||||
401,
|
||||
|
||||
@@ -14,10 +14,7 @@
|
||||
# limitations under the License.
|
||||
import logging
|
||||
|
||||
import six
|
||||
|
||||
from synapse.api.constants import MAX_DEPTH
|
||||
from synapse.api.errors import SynapseError, Codes
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.crypto.event_signing import check_event_content_hash
|
||||
from synapse.events import FrozenEvent
|
||||
from synapse.events.utils import prune_event
|
||||
@@ -193,23 +190,11 @@ def event_from_pdu_json(pdu_json, outlier=False):
|
||||
FrozenEvent
|
||||
|
||||
Raises:
|
||||
SynapseError: if the pdu is missing required fields or is otherwise
|
||||
not a valid matrix event
|
||||
SynapseError: if the pdu is missing required fields
|
||||
"""
|
||||
# we could probably enforce a bunch of other fields here (room_id, sender,
|
||||
# origin, etc etc)
|
||||
assert_params_in_request(pdu_json, ('event_id', 'type', 'depth'))
|
||||
|
||||
depth = pdu_json['depth']
|
||||
if not isinstance(depth, six.integer_types):
|
||||
raise SynapseError(400, "Depth %r not an intger" % (depth, ),
|
||||
Codes.BAD_JSON)
|
||||
|
||||
if depth < 0:
|
||||
raise SynapseError(400, "Depth too small", Codes.BAD_JSON)
|
||||
elif depth > MAX_DEPTH:
|
||||
raise SynapseError(400, "Depth too large", Codes.BAD_JSON)
|
||||
|
||||
assert_params_in_request(pdu_json, ('event_id', 'type'))
|
||||
event = FrozenEvent(
|
||||
pdu_json
|
||||
)
|
||||
|
||||
@@ -394,7 +394,7 @@ class FederationClient(FederationBase):
|
||||
seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
|
||||
signed_events = seen_events.values()
|
||||
else:
|
||||
seen_events = yield self.store.have_seen_events(event_ids)
|
||||
seen_events = yield self.store.have_events(event_ids)
|
||||
signed_events = []
|
||||
|
||||
failed_to_fetch = set()
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2015, 2016 OpenMarket Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -31,10 +30,9 @@ import synapse.metrics
|
||||
from synapse.types import get_domain_from_id
|
||||
from synapse.util import async
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
|
||||
from synapse.util.logutils import log_function
|
||||
|
||||
from six import iteritems
|
||||
|
||||
# when processing incoming transactions, we try to handle multiple rooms in
|
||||
# parallel, up to this limit.
|
||||
TRANSACTION_CONCURRENCY_LIMIT = 10
|
||||
@@ -214,17 +212,16 @@ class FederationServer(FederationBase):
|
||||
if not in_room:
|
||||
raise AuthError(403, "Host not in room.")
|
||||
|
||||
# we grab the linearizer to protect ourselves from servers which hammer
|
||||
# us. In theory we might already have the response to this query
|
||||
# in the cache so we could return it without waiting for the linearizer
|
||||
# - but that's non-trivial to get right, and anyway somewhat defeats
|
||||
# the point of the linearizer.
|
||||
with (yield self._server_linearizer.queue((origin, room_id))):
|
||||
resp = yield self._state_resp_cache.wrap(
|
||||
(room_id, event_id),
|
||||
self._on_context_state_request_compute,
|
||||
room_id, event_id,
|
||||
)
|
||||
result = self._state_resp_cache.get((room_id, event_id))
|
||||
if not result:
|
||||
with (yield self._server_linearizer.queue((origin, room_id))):
|
||||
d = self._state_resp_cache.set(
|
||||
(room_id, event_id),
|
||||
preserve_fn(self._on_context_state_request_compute)(room_id, event_id)
|
||||
)
|
||||
resp = yield make_deferred_yieldable(d)
|
||||
else:
|
||||
resp = yield make_deferred_yieldable(result)
|
||||
|
||||
defer.returnValue((200, resp))
|
||||
|
||||
@@ -428,9 +425,9 @@ class FederationServer(FederationBase):
|
||||
"Claimed one-time-keys: %s",
|
||||
",".join((
|
||||
"%s for %s:%s" % (key_id, user_id, device_id)
|
||||
for user_id, user_keys in iteritems(json_result)
|
||||
for device_id, device_keys in iteritems(user_keys)
|
||||
for key_id, _ in iteritems(device_keys)
|
||||
for user_id, user_keys in json_result.iteritems()
|
||||
for device_id, device_keys in user_keys.iteritems()
|
||||
for key_id, _ in device_keys.iteritems()
|
||||
)),
|
||||
)
|
||||
|
||||
@@ -497,33 +494,13 @@ class FederationServer(FederationBase):
|
||||
def _handle_received_pdu(self, origin, pdu):
|
||||
""" Process a PDU received in a federation /send/ transaction.
|
||||
|
||||
If the event is invalid, then this method throws a FederationError.
|
||||
(The error will then be logged and sent back to the sender (which
|
||||
probably won't do anything with it), and other events in the
|
||||
transaction will be processed as normal).
|
||||
|
||||
It is likely that we'll then receive other events which refer to
|
||||
this rejected_event in their prev_events, etc. When that happens,
|
||||
we'll attempt to fetch the rejected event again, which will presumably
|
||||
fail, so those second-generation events will also get rejected.
|
||||
|
||||
Eventually, we get to the point where there are more than 10 events
|
||||
between any new events and the original rejected event. Since we
|
||||
only try to backfill 10 events deep on received pdu, we then accept the
|
||||
new event, possibly introducing a discontinuity in the DAG, with new
|
||||
forward extremities, so normal service is approximately returned,
|
||||
until we try to backfill across the discontinuity.
|
||||
|
||||
Args:
|
||||
origin (str): server which sent the pdu
|
||||
pdu (FrozenEvent): received pdu
|
||||
|
||||
Returns (Deferred): completes with None
|
||||
|
||||
Raises: FederationError if the signatures / hash do not match, or
|
||||
if the event was unacceptable for any other reason (eg, too large,
|
||||
too many prev_events, couldn't find the prev_events)
|
||||
"""
|
||||
Raises: FederationError if the signatures / hash do not match
|
||||
"""
|
||||
# check that it's actually being sent from a valid destination to
|
||||
# workaround bug #1753 in 0.18.5 and 0.18.6
|
||||
if origin != get_domain_from_id(pdu.event_id):
|
||||
|
||||
@@ -40,8 +40,6 @@ from collections import namedtuple
|
||||
|
||||
import logging
|
||||
|
||||
from six import itervalues, iteritems
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -124,7 +122,7 @@ class FederationRemoteSendQueue(object):
|
||||
|
||||
user_ids = set(
|
||||
user_id
|
||||
for uids in itervalues(self.presence_changed)
|
||||
for uids in self.presence_changed.itervalues()
|
||||
for user_id in uids
|
||||
)
|
||||
|
||||
@@ -278,7 +276,7 @@ class FederationRemoteSendQueue(object):
|
||||
# stream position.
|
||||
keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
|
||||
|
||||
for ((destination, edu_key), pos) in iteritems(keyed_edus):
|
||||
for ((destination, edu_key), pos) in keyed_edus.iteritems():
|
||||
rows.append((pos, KeyedEduRow(
|
||||
key=edu_key,
|
||||
edu=self.keyed_edu[(destination, edu_key)],
|
||||
@@ -311,7 +309,7 @@ class FederationRemoteSendQueue(object):
|
||||
j = keys.bisect_right(to_token) + 1
|
||||
device_messages = {self.device_messages[k]: k for k in keys[i:j]}
|
||||
|
||||
for (destination, pos) in iteritems(device_messages):
|
||||
for (destination, pos) in device_messages.iteritems():
|
||||
rows.append((pos, DeviceRow(
|
||||
destination=destination,
|
||||
)))
|
||||
@@ -530,19 +528,19 @@ def process_rows_for_federation(transaction_queue, rows):
|
||||
if buff.presence:
|
||||
transaction_queue.send_presence(buff.presence)
|
||||
|
||||
for destination, edu_map in iteritems(buff.keyed_edus):
|
||||
for destination, edu_map in buff.keyed_edus.iteritems():
|
||||
for key, edu in edu_map.items():
|
||||
transaction_queue.send_edu(
|
||||
edu.destination, edu.edu_type, edu.content, key=key,
|
||||
)
|
||||
|
||||
for destination, edu_list in iteritems(buff.edus):
|
||||
for destination, edu_list in buff.edus.iteritems():
|
||||
for edu in edu_list:
|
||||
transaction_queue.send_edu(
|
||||
edu.destination, edu.edu_type, edu.content, key=None,
|
||||
)
|
||||
|
||||
for destination, failure_list in iteritems(buff.failures):
|
||||
for destination, failure_list in buff.failures.iteritems():
|
||||
for failure in failure_list:
|
||||
transaction_queue.send_failure(destination, failure)
|
||||
|
||||
|
||||
@@ -295,6 +295,7 @@ class TransactionQueue(object):
|
||||
Args:
|
||||
states (list(UserPresenceState))
|
||||
"""
|
||||
return
|
||||
|
||||
# First we queue up the new presence by user ID, so multiple presence
|
||||
# updates in quick successtion are correctly handled
|
||||
|
||||
@@ -94,6 +94,12 @@ class Authenticator(object):
|
||||
"signatures": {},
|
||||
}
|
||||
|
||||
if (
|
||||
self.federation_domain_whitelist is not None and
|
||||
self.server_name not in self.federation_domain_whitelist
|
||||
):
|
||||
raise FederationDeniedError(self.server_name)
|
||||
|
||||
if content is not None:
|
||||
json_request["content"] = content
|
||||
|
||||
@@ -132,12 +138,6 @@ class Authenticator(object):
|
||||
json_request["origin"] = origin
|
||||
json_request["signatures"].setdefault(origin, {})[key] = sig
|
||||
|
||||
if (
|
||||
self.federation_domain_whitelist is not None and
|
||||
origin not in self.federation_domain_whitelist
|
||||
):
|
||||
raise FederationDeniedError(origin)
|
||||
|
||||
if not json_request["signatures"]:
|
||||
raise NoAuthenticationError(
|
||||
401, "Missing Authorization headers", Codes.UNAUTHORIZED,
|
||||
|
||||
@@ -15,14 +15,8 @@
|
||||
# limitations under the License.
|
||||
|
||||
"""Contains handlers for federation events."""
|
||||
|
||||
import httplib
|
||||
import itertools
|
||||
import logging
|
||||
|
||||
from signedjson.key import decode_verify_key_bytes
|
||||
from signedjson.sign import verify_signed_json
|
||||
from twisted.internet import defer
|
||||
from unpaddedbase64 import decode_base64
|
||||
|
||||
from ._base import BaseHandler
|
||||
@@ -49,6 +43,10 @@ from synapse.util.retryutils import NotRetryingDestination
|
||||
|
||||
from synapse.util.distributor import user_joined_room
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import itertools
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -117,19 +115,6 @@ class FederationHandler(BaseHandler):
|
||||
logger.debug("Already seen pdu %s", pdu.event_id)
|
||||
return
|
||||
|
||||
# do some initial sanity-checking of the event. In particular, make
|
||||
# sure it doesn't have hundreds of prev_events or auth_events, which
|
||||
# could cause a huge state resolution or cascade of event fetches.
|
||||
try:
|
||||
self._sanity_check_event(pdu)
|
||||
except SynapseError as err:
|
||||
raise FederationError(
|
||||
"ERROR",
|
||||
err.code,
|
||||
err.msg,
|
||||
affected=pdu.event_id,
|
||||
)
|
||||
|
||||
# If we are currently in the process of joining this room, then we
|
||||
# queue up events for later processing.
|
||||
if pdu.room_id in self.room_queues:
|
||||
@@ -164,6 +149,10 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
auth_chain = []
|
||||
|
||||
have_seen = yield self.store.have_events(
|
||||
[ev for ev, _ in pdu.prev_events]
|
||||
)
|
||||
|
||||
fetch_state = False
|
||||
|
||||
# Get missing pdus if necessary.
|
||||
@@ -179,7 +168,7 @@ class FederationHandler(BaseHandler):
|
||||
)
|
||||
|
||||
prevs = {e_id for e_id, _ in pdu.prev_events}
|
||||
seen = yield self.store.have_seen_events(prevs)
|
||||
seen = set(have_seen.keys())
|
||||
|
||||
if min_depth and pdu.depth < min_depth:
|
||||
# This is so that we don't notify the user about this
|
||||
@@ -207,7 +196,8 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
# Update the set of things we've seen after trying to
|
||||
# fetch the missing stuff
|
||||
seen = yield self.store.have_seen_events(prevs)
|
||||
have_seen = yield self.store.have_events(prevs)
|
||||
seen = set(have_seen.iterkeys())
|
||||
|
||||
if not prevs - seen:
|
||||
logger.info(
|
||||
@@ -258,7 +248,8 @@ class FederationHandler(BaseHandler):
|
||||
min_depth (int): Minimum depth of events to return.
|
||||
"""
|
||||
# We recalculate seen, since it may have changed.
|
||||
seen = yield self.store.have_seen_events(prevs)
|
||||
have_seen = yield self.store.have_events(prevs)
|
||||
seen = set(have_seen.keys())
|
||||
|
||||
if not prevs - seen:
|
||||
return
|
||||
@@ -370,7 +361,9 @@ class FederationHandler(BaseHandler):
|
||||
if auth_chain:
|
||||
event_ids |= {e.event_id for e in auth_chain}
|
||||
|
||||
seen_ids = yield self.store.have_seen_events(event_ids)
|
||||
seen_ids = set(
|
||||
(yield self.store.have_events(event_ids)).keys()
|
||||
)
|
||||
|
||||
if state and auth_chain is not None:
|
||||
# If we have any state or auth_chain given to us by the replication
|
||||
@@ -534,16 +527,9 @@ class FederationHandler(BaseHandler):
|
||||
def backfill(self, dest, room_id, limit, extremities):
|
||||
""" Trigger a backfill request to `dest` for the given `room_id`
|
||||
|
||||
This will attempt to get more events from the remote. If the other side
|
||||
has no new events to offer, this will return an empty list.
|
||||
|
||||
As the events are received, we check their signatures, and also do some
|
||||
sanity-checking on them. If any of the backfilled events are invalid,
|
||||
this method throws a SynapseError.
|
||||
|
||||
TODO: make this more useful to distinguish failures of the remote
|
||||
server from invalid events (there is probably no point in trying to
|
||||
re-fetch invalid events from every other HS in the room.)
|
||||
This will attempt to get more events from the remote. This may return
|
||||
be successfull and still return no events if the other side has no new
|
||||
events to offer.
|
||||
"""
|
||||
if dest == self.server_name:
|
||||
raise SynapseError(400, "Can't backfill from self.")
|
||||
@@ -555,16 +541,6 @@ class FederationHandler(BaseHandler):
|
||||
extremities=extremities,
|
||||
)
|
||||
|
||||
# ideally we'd sanity check the events here for excess prev_events etc,
|
||||
# but it's hard to reject events at this point without completely
|
||||
# breaking backfill in the same way that it is currently broken by
|
||||
# events whose signature we cannot verify (#3121).
|
||||
#
|
||||
# So for now we accept the events anyway. #3124 tracks this.
|
||||
#
|
||||
# for ev in events:
|
||||
# self._sanity_check_event(ev)
|
||||
|
||||
# Don't bother processing events we already have.
|
||||
seen_events = yield self.store.have_events_in_timeline(
|
||||
set(e.event_id for e in events)
|
||||
@@ -657,7 +633,7 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
failed_to_fetch = missing_auth - set(auth_events)
|
||||
|
||||
seen_events = yield self.store.have_seen_events(
|
||||
seen_events = yield self.store.have_events(
|
||||
set(auth_events.keys()) | set(state_events.keys())
|
||||
)
|
||||
|
||||
@@ -867,38 +843,6 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
defer.returnValue(False)
|
||||
|
||||
def _sanity_check_event(self, ev):
|
||||
"""
|
||||
Do some early sanity checks of a received event
|
||||
|
||||
In particular, checks it doesn't have an excessive number of
|
||||
prev_events or auth_events, which could cause a huge state resolution
|
||||
or cascade of event fetches.
|
||||
|
||||
Args:
|
||||
ev (synapse.events.EventBase): event to be checked
|
||||
|
||||
Returns: None
|
||||
|
||||
Raises:
|
||||
SynapseError if the event does not pass muster
|
||||
"""
|
||||
if len(ev.prev_events) > 20:
|
||||
logger.warn("Rejecting event %s which has %i prev_events",
|
||||
ev.event_id, len(ev.prev_events))
|
||||
raise SynapseError(
|
||||
httplib.BAD_REQUEST,
|
||||
"Too many prev_events",
|
||||
)
|
||||
|
||||
if len(ev.auth_events) > 10:
|
||||
logger.warn("Rejecting event %s which has %i auth_events",
|
||||
ev.event_id, len(ev.auth_events))
|
||||
raise SynapseError(
|
||||
httplib.BAD_REQUEST,
|
||||
"Too many auth_events",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def send_invite(self, target_host, event):
|
||||
""" Sends the invite to the remote server for signing.
|
||||
@@ -1792,8 +1736,7 @@ class FederationHandler(BaseHandler):
|
||||
event_key = None
|
||||
|
||||
if event_auth_events - current_state:
|
||||
# TODO: can we use store.have_seen_events here instead?
|
||||
have_events = yield self.store.get_seen_events_with_rejections(
|
||||
have_events = yield self.store.have_events(
|
||||
event_auth_events - current_state
|
||||
)
|
||||
else:
|
||||
@@ -1816,12 +1759,12 @@ class FederationHandler(BaseHandler):
|
||||
origin, event.room_id, event.event_id
|
||||
)
|
||||
|
||||
seen_remotes = yield self.store.have_seen_events(
|
||||
seen_remotes = yield self.store.have_events(
|
||||
[e.event_id for e in remote_auth_chain]
|
||||
)
|
||||
|
||||
for e in remote_auth_chain:
|
||||
if e.event_id in seen_remotes:
|
||||
if e.event_id in seen_remotes.keys():
|
||||
continue
|
||||
|
||||
if e.event_id == event.event_id:
|
||||
@@ -1848,7 +1791,7 @@ class FederationHandler(BaseHandler):
|
||||
except AuthError:
|
||||
pass
|
||||
|
||||
have_events = yield self.store.get_seen_events_with_rejections(
|
||||
have_events = yield self.store.have_events(
|
||||
[e_id for e_id, _ in event.auth_events]
|
||||
)
|
||||
seen_events = set(have_events.keys())
|
||||
@@ -1933,13 +1876,13 @@ class FederationHandler(BaseHandler):
|
||||
local_auth_chain,
|
||||
)
|
||||
|
||||
seen_remotes = yield self.store.have_seen_events(
|
||||
seen_remotes = yield self.store.have_events(
|
||||
[e.event_id for e in result["auth_chain"]]
|
||||
)
|
||||
|
||||
# 3. Process any remote auth chain events we haven't seen.
|
||||
for ev in result["auth_chain"]:
|
||||
if ev.event_id in seen_remotes:
|
||||
if ev.event_id in seen_remotes.keys():
|
||||
continue
|
||||
|
||||
if ev.event_id == event.event_id:
|
||||
|
||||
@@ -372,6 +372,7 @@ class InitialSyncHandler(BaseHandler):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_presence():
|
||||
defer.returnValue([])
|
||||
states = yield presence_handler.get_states(
|
||||
[m.user_id for m in room_members],
|
||||
as_event=True,
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership, MAX_DEPTH
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import AuthError, Codes, SynapseError
|
||||
from synapse.crypto.event_signing import add_hashes_and_signatures
|
||||
from synapse.events.utils import serialize_event
|
||||
@@ -37,6 +37,7 @@ from ._base import BaseHandler
|
||||
from canonicaljson import encode_canonical_json
|
||||
|
||||
import logging
|
||||
import random
|
||||
import simplejson
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -383,7 +384,7 @@ class MessageHandler(BaseHandler):
|
||||
# If this is an AS, double check that they are allowed to see the members.
|
||||
# This can either be because the AS user is in the room or becuase there
|
||||
# is a user in the room that the AS is "interested in"
|
||||
if requester.app_service and user_id not in users_with_profile:
|
||||
if False and requester.app_service and user_id not in users_with_profile:
|
||||
for uid in users_with_profile:
|
||||
if requester.app_service.is_interested_in_user(uid):
|
||||
break
|
||||
@@ -432,7 +433,7 @@ class EventCreationHandler(object):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def create_event(self, requester, event_dict, token_id=None, txn_id=None,
|
||||
prev_events_and_hashes=None):
|
||||
prev_event_ids=None):
|
||||
"""
|
||||
Given a dict from a client, create a new event.
|
||||
|
||||
@@ -446,13 +447,7 @@ class EventCreationHandler(object):
|
||||
event_dict (dict): An entire event
|
||||
token_id (str)
|
||||
txn_id (str)
|
||||
|
||||
prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
|
||||
the forward extremities to use as the prev_events for the
|
||||
new event. For each event, a tuple of (event_id, hashes, depth)
|
||||
where *hashes* is a map from algorithm to hash.
|
||||
|
||||
If None, they will be requested from the database.
|
||||
prev_event_ids (list): The prev event ids to use when creating the event
|
||||
|
||||
Returns:
|
||||
Tuple of created event (FrozenEvent), Context
|
||||
@@ -490,7 +485,7 @@ class EventCreationHandler(object):
|
||||
event, context = yield self.create_new_client_event(
|
||||
builder=builder,
|
||||
requester=requester,
|
||||
prev_events_and_hashes=prev_events_and_hashes,
|
||||
prev_event_ids=prev_event_ids,
|
||||
)
|
||||
|
||||
defer.returnValue((event, context))
|
||||
@@ -593,48 +588,39 @@ class EventCreationHandler(object):
|
||||
|
||||
@measure_func("create_new_client_event")
|
||||
@defer.inlineCallbacks
|
||||
def create_new_client_event(self, builder, requester=None,
|
||||
prev_events_and_hashes=None):
|
||||
"""Create a new event for a local client
|
||||
|
||||
Args:
|
||||
builder (EventBuilder):
|
||||
|
||||
requester (synapse.types.Requester|None):
|
||||
|
||||
prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
|
||||
the forward extremities to use as the prev_events for the
|
||||
new event. For each event, a tuple of (event_id, hashes, depth)
|
||||
where *hashes* is a map from algorithm to hash.
|
||||
|
||||
If None, they will be requested from the database.
|
||||
|
||||
Returns:
|
||||
Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
|
||||
"""
|
||||
|
||||
if prev_events_and_hashes is not None:
|
||||
assert len(prev_events_and_hashes) <= 10, \
|
||||
"Attempting to create an event with %i prev_events" % (
|
||||
len(prev_events_and_hashes),
|
||||
def create_new_client_event(self, builder, requester=None, prev_event_ids=None):
|
||||
if prev_event_ids:
|
||||
prev_events = yield self.store.add_event_hashes(prev_event_ids)
|
||||
prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
|
||||
depth = prev_max_depth + 1
|
||||
else:
|
||||
latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
|
||||
builder.room_id,
|
||||
)
|
||||
else:
|
||||
prev_events_and_hashes = \
|
||||
yield self.store.get_prev_events_for_room(builder.room_id)
|
||||
|
||||
if prev_events_and_hashes:
|
||||
depth = max([d for _, _, d in prev_events_and_hashes]) + 1
|
||||
# we cap depth of generated events, to ensure that they are not
|
||||
# rejected by other servers (and so that they can be persisted in
|
||||
# the db)
|
||||
depth = min(depth, MAX_DEPTH)
|
||||
else:
|
||||
depth = 1
|
||||
# We want to limit the max number of prev events we point to in our
|
||||
# new event
|
||||
if len(latest_ret) > 10:
|
||||
# Sort by reverse depth, so we point to the most recent.
|
||||
latest_ret.sort(key=lambda a: -a[2])
|
||||
new_latest_ret = latest_ret[:5]
|
||||
|
||||
prev_events = [
|
||||
(event_id, prev_hashes)
|
||||
for event_id, prev_hashes, _ in prev_events_and_hashes
|
||||
]
|
||||
# We also randomly point to some of the older events, to make
|
||||
# sure that we don't completely ignore the older events.
|
||||
if latest_ret[5:]:
|
||||
sample_size = min(5, len(latest_ret[5:]))
|
||||
new_latest_ret.extend(random.sample(latest_ret[5:], sample_size))
|
||||
latest_ret = new_latest_ret
|
||||
|
||||
if latest_ret:
|
||||
depth = max([d for _, _, d in latest_ret]) + 1
|
||||
else:
|
||||
depth = 1
|
||||
|
||||
prev_events = [
|
||||
(event_id, prev_hashes)
|
||||
for event_id, prev_hashes, _ in latest_ret
|
||||
]
|
||||
|
||||
builder.prev_events = prev_events
|
||||
builder.depth = depth
|
||||
|
||||
@@ -373,6 +373,7 @@ class PresenceHandler(object):
|
||||
"""We've seen the user do something that indicates they're interacting
|
||||
with the app.
|
||||
"""
|
||||
return
|
||||
user_id = user.to_string()
|
||||
|
||||
bump_active_time_counter.inc()
|
||||
@@ -402,6 +403,7 @@ class PresenceHandler(object):
|
||||
Useful for streams that are not associated with an actual
|
||||
client that is being used by a user.
|
||||
"""
|
||||
affect_presence = False
|
||||
if affect_presence:
|
||||
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
|
||||
self.user_to_num_current_syncs[user_id] = curr_sync + 1
|
||||
@@ -444,6 +446,8 @@ class PresenceHandler(object):
|
||||
Returns:
|
||||
set(str): A set of user_id strings.
|
||||
"""
|
||||
# presence is disabled on matrix.org, so we return the empty set
|
||||
return set()
|
||||
syncing_user_ids = {
|
||||
user_id for user_id, count in self.user_to_num_current_syncs.items()
|
||||
if count
|
||||
@@ -463,6 +467,7 @@ class PresenceHandler(object):
|
||||
syncing_user_ids(set(str)): The set of user_ids that are
|
||||
currently syncing on that server.
|
||||
"""
|
||||
return
|
||||
|
||||
# Grab the previous list of user_ids that were syncing on that process
|
||||
prev_syncing_user_ids = (
|
||||
|
||||
@@ -20,6 +20,7 @@ from ._base import BaseHandler
|
||||
from synapse.api.constants import (
|
||||
EventTypes, JoinRules,
|
||||
)
|
||||
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
|
||||
from synapse.util.async import concurrently_execute
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
@@ -43,7 +44,7 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
|
||||
class RoomListHandler(BaseHandler):
|
||||
def __init__(self, hs):
|
||||
super(RoomListHandler, self).__init__(hs)
|
||||
self.response_cache = ResponseCache(hs, "room_list")
|
||||
self.response_cache = ResponseCache(hs, "room_list", timeout_ms=10 * 60 * 1000)
|
||||
self.remote_response_cache = ResponseCache(hs, "remote_room_list",
|
||||
timeout_ms=30 * 1000)
|
||||
|
||||
@@ -77,11 +78,18 @@ class RoomListHandler(BaseHandler):
|
||||
)
|
||||
|
||||
key = (limit, since_token, network_tuple)
|
||||
return self.response_cache.wrap(
|
||||
key,
|
||||
self._get_public_room_list,
|
||||
limit, since_token, network_tuple=network_tuple,
|
||||
)
|
||||
result = self.response_cache.get(key)
|
||||
if not result:
|
||||
logger.info("No cached result, calculating one.")
|
||||
result = self.response_cache.set(
|
||||
key,
|
||||
preserve_fn(self._get_public_room_list)(
|
||||
limit, since_token, network_tuple=network_tuple
|
||||
)
|
||||
)
|
||||
else:
|
||||
logger.info("Using cached deferred result.")
|
||||
return make_deferred_yieldable(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_public_room_list(self, limit=None, since_token=None,
|
||||
@@ -415,14 +423,18 @@ class RoomListHandler(BaseHandler):
|
||||
server_name, limit, since_token, include_all_networks,
|
||||
third_party_instance_id,
|
||||
)
|
||||
return self.remote_response_cache.wrap(
|
||||
key,
|
||||
repl_layer.get_public_rooms,
|
||||
server_name, limit=limit, since_token=since_token,
|
||||
search_filter=search_filter,
|
||||
include_all_networks=include_all_networks,
|
||||
third_party_instance_id=third_party_instance_id,
|
||||
)
|
||||
result = self.remote_response_cache.get(key)
|
||||
if not result:
|
||||
result = self.remote_response_cache.set(
|
||||
key,
|
||||
repl_layer.get_public_rooms(
|
||||
server_name, limit=limit, since_token=since_token,
|
||||
search_filter=search_filter,
|
||||
include_all_networks=include_all_networks,
|
||||
third_party_instance_id=third_party_instance_id,
|
||||
)
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
class RoomListNextBatch(namedtuple("RoomListNextBatch", (
|
||||
|
||||
@@ -28,7 +28,7 @@ from synapse.api.constants import (
|
||||
)
|
||||
from synapse.api.errors import AuthError, SynapseError, Codes
|
||||
from synapse.types import UserID, RoomID
|
||||
from synapse.util.async import Linearizer
|
||||
from synapse.util.async import Linearizer, Limiter
|
||||
from synapse.util.distributor import user_left_room, user_joined_room
|
||||
|
||||
|
||||
@@ -60,6 +60,7 @@ class RoomMemberHandler(object):
|
||||
self.event_creation_hander = hs.get_event_creation_handler()
|
||||
|
||||
self.member_linearizer = Linearizer(name="member")
|
||||
self.member_limiter = Limiter(20)
|
||||
|
||||
self.clock = hs.get_clock()
|
||||
self.spam_checker = hs.get_spam_checker()
|
||||
@@ -149,7 +150,7 @@ class RoomMemberHandler(object):
|
||||
@defer.inlineCallbacks
|
||||
def _local_membership_update(
|
||||
self, requester, target, room_id, membership,
|
||||
prev_events_and_hashes,
|
||||
prev_event_ids,
|
||||
txn_id=None,
|
||||
ratelimit=True,
|
||||
content=None,
|
||||
@@ -175,7 +176,7 @@ class RoomMemberHandler(object):
|
||||
},
|
||||
token_id=requester.access_token_id,
|
||||
txn_id=txn_id,
|
||||
prev_events_and_hashes=prev_events_and_hashes,
|
||||
prev_event_ids=prev_event_ids,
|
||||
)
|
||||
|
||||
# Check if this event matches the previous membership event for the user.
|
||||
@@ -232,18 +233,23 @@ class RoomMemberHandler(object):
|
||||
):
|
||||
key = (room_id,)
|
||||
|
||||
with (yield self.member_linearizer.queue(key)):
|
||||
result = yield self._update_membership(
|
||||
requester,
|
||||
target,
|
||||
room_id,
|
||||
action,
|
||||
txn_id=txn_id,
|
||||
remote_room_hosts=remote_room_hosts,
|
||||
third_party_signed=third_party_signed,
|
||||
ratelimit=ratelimit,
|
||||
content=content,
|
||||
)
|
||||
as_id = object()
|
||||
if requester.app_service:
|
||||
as_id = requester.app_service.id
|
||||
|
||||
with (yield self.member_limiter.queue(as_id)):
|
||||
with (yield self.member_linearizer.queue(key)):
|
||||
result = yield self._update_membership(
|
||||
requester,
|
||||
target,
|
||||
room_id,
|
||||
action,
|
||||
txn_id=txn_id,
|
||||
remote_room_hosts=remote_room_hosts,
|
||||
third_party_signed=third_party_signed,
|
||||
ratelimit=ratelimit,
|
||||
content=content,
|
||||
)
|
||||
|
||||
defer.returnValue(result)
|
||||
|
||||
@@ -314,12 +320,7 @@ class RoomMemberHandler(object):
|
||||
403, "Invites have been disabled on this server",
|
||||
)
|
||||
|
||||
prev_events_and_hashes = yield self.store.get_prev_events_for_room(
|
||||
room_id,
|
||||
)
|
||||
latest_event_ids = (
|
||||
event_id for (event_id, _, _) in prev_events_and_hashes
|
||||
)
|
||||
latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
|
||||
current_state_ids = yield self.state_handler.get_current_state_ids(
|
||||
room_id, latest_event_ids=latest_event_ids,
|
||||
)
|
||||
@@ -408,7 +409,7 @@ class RoomMemberHandler(object):
|
||||
membership=effective_membership_state,
|
||||
txn_id=txn_id,
|
||||
ratelimit=ratelimit,
|
||||
prev_events_and_hashes=prev_events_and_hashes,
|
||||
prev_event_ids=latest_event_ids,
|
||||
content=content,
|
||||
)
|
||||
defer.returnValue(res)
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
|
||||
from synapse.api.constants import Membership, EventTypes
|
||||
from synapse.util.async import concurrently_execute
|
||||
from synapse.util.logcontext import LoggingContext
|
||||
from synapse.util.logcontext import LoggingContext, make_deferred_yieldable, preserve_fn
|
||||
from synapse.util.metrics import Measure, measure_func
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
from synapse.push.clientformat import format_push_rules_for_user
|
||||
@@ -30,6 +30,7 @@ import itertools
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SYNC_RESPONSE_CACHE_MS = 2 * 60 * 1000
|
||||
|
||||
SyncConfig = collections.namedtuple("SyncConfig", [
|
||||
"user",
|
||||
@@ -52,7 +53,6 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [
|
||||
to tell if room needs to be part of the sync result.
|
||||
"""
|
||||
return bool(self.events)
|
||||
__bool__ = __nonzero__ # python3
|
||||
|
||||
|
||||
class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
|
||||
@@ -77,7 +77,6 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
|
||||
# nb the notification count does not, er, count: if there's nothing
|
||||
# else in the result, we don't need to send it.
|
||||
)
|
||||
__bool__ = __nonzero__ # python3
|
||||
|
||||
|
||||
class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
|
||||
@@ -97,7 +96,6 @@ class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
|
||||
or self.state
|
||||
or self.account_data
|
||||
)
|
||||
__bool__ = __nonzero__ # python3
|
||||
|
||||
|
||||
class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
|
||||
@@ -109,7 +107,6 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
|
||||
def __nonzero__(self):
|
||||
"""Invited rooms should always be reported to the client"""
|
||||
return True
|
||||
__bool__ = __nonzero__ # python3
|
||||
|
||||
|
||||
class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
|
||||
@@ -121,7 +118,6 @@ class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
|
||||
|
||||
def __nonzero__(self):
|
||||
return bool(self.join or self.invite or self.leave)
|
||||
__bool__ = __nonzero__ # python3
|
||||
|
||||
|
||||
class DeviceLists(collections.namedtuple("DeviceLists", [
|
||||
@@ -132,7 +128,6 @@ class DeviceLists(collections.namedtuple("DeviceLists", [
|
||||
|
||||
def __nonzero__(self):
|
||||
return bool(self.changed or self.left)
|
||||
__bool__ = __nonzero__ # python3
|
||||
|
||||
|
||||
class SyncResult(collections.namedtuple("SyncResult", [
|
||||
@@ -165,7 +160,6 @@ class SyncResult(collections.namedtuple("SyncResult", [
|
||||
self.device_lists or
|
||||
self.groups
|
||||
)
|
||||
__bool__ = __nonzero__ # python3
|
||||
|
||||
|
||||
class SyncHandler(object):
|
||||
@@ -176,7 +170,10 @@ class SyncHandler(object):
|
||||
self.presence_handler = hs.get_presence_handler()
|
||||
self.event_sources = hs.get_event_sources()
|
||||
self.clock = hs.get_clock()
|
||||
self.response_cache = ResponseCache(hs, "sync")
|
||||
self.response_cache = ResponseCache(
|
||||
hs, "sync",
|
||||
timeout_ms=SYNC_RESPONSE_CACHE_MS,
|
||||
)
|
||||
self.state = hs.get_state_handler()
|
||||
|
||||
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
|
||||
@@ -187,11 +184,15 @@ class SyncHandler(object):
|
||||
Returns:
|
||||
A Deferred SyncResult.
|
||||
"""
|
||||
return self.response_cache.wrap(
|
||||
sync_config.request_key,
|
||||
self._wait_for_sync_for_user,
|
||||
sync_config, since_token, timeout, full_state,
|
||||
)
|
||||
result = self.response_cache.get(sync_config.request_key)
|
||||
if not result:
|
||||
result = self.response_cache.set(
|
||||
sync_config.request_key,
|
||||
preserve_fn(self._wait_for_sync_for_user)(
|
||||
sync_config, since_token, timeout, full_state
|
||||
)
|
||||
)
|
||||
return make_deferred_yieldable(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
|
||||
@@ -602,7 +603,7 @@ class SyncHandler(object):
|
||||
since_token is None and
|
||||
sync_config.filter_collection.blocks_all_presence()
|
||||
)
|
||||
if not block_all_presence_data:
|
||||
if False and not block_all_presence_data:
|
||||
yield self._generate_sync_entry_for_presence(
|
||||
sync_result_builder, newly_joined_rooms, newly_joined_users
|
||||
)
|
||||
|
||||
@@ -144,7 +144,6 @@ class _NotifierUserStream(object):
|
||||
class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
|
||||
def __nonzero__(self):
|
||||
return bool(self.events)
|
||||
__bool__ = __nonzero__ # python3
|
||||
|
||||
|
||||
class Notifier(object):
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
# Copyright 2015, 2016 OpenMarket Ltd
|
||||
# Copyright 2017 Vector Creations Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@@ -19,31 +18,15 @@ from distutils.version import LooseVersion
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# this dict maps from python package name to a list of modules we expect it to
|
||||
# provide.
|
||||
#
|
||||
# the key is a "requirement specifier", as used as a parameter to `pip
|
||||
# install`[1], or an `install_requires` argument to `setuptools.setup` [2].
|
||||
#
|
||||
# the value is a sequence of strings; each entry should be the name of the
|
||||
# python module, optionally followed by a version assertion which can be either
|
||||
# ">=<ver>" or "==<ver>".
|
||||
#
|
||||
# [1] https://pip.pypa.io/en/stable/reference/pip_install/#requirement-specifiers.
|
||||
# [2] https://setuptools.readthedocs.io/en/latest/setuptools.html#declaring-dependencies
|
||||
REQUIREMENTS = {
|
||||
"jsonschema>=2.5.1": ["jsonschema>=2.5.1"],
|
||||
"frozendict>=0.4": ["frozendict"],
|
||||
"unpaddedbase64>=1.1.0": ["unpaddedbase64>=1.1.0"],
|
||||
"canonicaljson>=1.1.3": ["canonicaljson>=1.1.3"],
|
||||
"canonicaljson>=1.0.0": ["canonicaljson>=1.0.0"],
|
||||
"signedjson>=1.0.0": ["signedjson>=1.0.0"],
|
||||
"pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"],
|
||||
"service_identity>=1.0.0": ["service_identity>=1.0.0"],
|
||||
|
||||
# we break under Twisted 18.4
|
||||
# (https://github.com/matrix-org/synapse/issues/3135)
|
||||
"Twisted>=16.0.0,<18.4": ["twisted>=16.0.0"],
|
||||
|
||||
"Twisted>=16.0.0": ["twisted>=16.0.0"],
|
||||
"pyopenssl>=0.14": ["OpenSSL>=0.14"],
|
||||
"pyyaml": ["yaml"],
|
||||
"pyasn1": ["pyasn1"],
|
||||
@@ -56,7 +39,6 @@ REQUIREMENTS = {
|
||||
"pymacaroons-pynacl": ["pymacaroons"],
|
||||
"msgpack-python>=0.3.0": ["msgpack"],
|
||||
"phonenumbers>=8.2.0": ["phonenumbers"],
|
||||
"six": ["six"],
|
||||
}
|
||||
CONDITIONAL_REQUIREMENTS = {
|
||||
"web_client": {
|
||||
|
||||
@@ -23,6 +23,7 @@ from synapse.events.snapshot import EventContext
|
||||
from synapse.http.servlet import RestServlet, parse_json_object_from_request
|
||||
from synapse.util.async import sleep
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.types import Requester, UserID
|
||||
|
||||
@@ -117,12 +118,17 @@ class ReplicationSendEventRestServlet(RestServlet):
|
||||
self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000)
|
||||
|
||||
def on_PUT(self, request, event_id):
|
||||
return self.response_cache.wrap(
|
||||
event_id,
|
||||
self._handle_request,
|
||||
request
|
||||
)
|
||||
result = self.response_cache.get(event_id)
|
||||
if not result:
|
||||
result = self.response_cache.set(
|
||||
event_id,
|
||||
self._handle_request(request)
|
||||
)
|
||||
else:
|
||||
logger.warn("Returning cached response")
|
||||
return make_deferred_yieldable(result)
|
||||
|
||||
@preserve_fn
|
||||
@defer.inlineCallbacks
|
||||
def _handle_request(self, request):
|
||||
with Measure(self.clock, "repl_send_event_parse"):
|
||||
|
||||
@@ -42,6 +42,8 @@ class SlavedClientIpStore(BaseSlavedStore):
|
||||
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
|
||||
return
|
||||
|
||||
self.client_ip_last_seen.prefill(key, now)
|
||||
|
||||
self.hs.get_tcp_replication().send_user_ip(
|
||||
user_id, access_token, ip, user_agent, device_id, now
|
||||
)
|
||||
|
||||
@@ -33,7 +33,7 @@ import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
MAX_EVENTS_BEHIND = 10000
|
||||
MAX_EVENTS_BEHIND = 500000
|
||||
|
||||
|
||||
EventStreamRow = namedtuple("EventStreamRow", (
|
||||
|
||||
@@ -296,17 +296,6 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
|
||||
)
|
||||
new_room_id = info["room_id"]
|
||||
|
||||
yield self.event_creation_handler.create_and_send_nonmember_event(
|
||||
room_creator_requester,
|
||||
{
|
||||
"type": "m.room.message",
|
||||
"content": {"body": message, "msgtype": "m.text"},
|
||||
"room_id": new_room_id,
|
||||
"sender": new_room_user_id,
|
||||
},
|
||||
ratelimit=False,
|
||||
)
|
||||
|
||||
requester_user_id = requester.user.to_string()
|
||||
|
||||
logger.info("Shutting down room %r", room_id)
|
||||
@@ -344,6 +333,17 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
|
||||
|
||||
kicked_users.append(user_id)
|
||||
|
||||
yield self.event_creation_handler.create_and_send_nonmember_event(
|
||||
room_creator_requester,
|
||||
{
|
||||
"type": "m.room.message",
|
||||
"content": {"body": message, "msgtype": "m.text"},
|
||||
"room_id": new_room_id,
|
||||
"sender": new_room_user_id,
|
||||
},
|
||||
ratelimit=False,
|
||||
)
|
||||
|
||||
aliases_for_room = yield self.store.get_aliases_for_room(room_id)
|
||||
|
||||
yield self.store.update_aliases_for_room(
|
||||
|
||||
@@ -81,7 +81,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet):
|
||||
except Exception:
|
||||
raise SynapseError(400, "Unable to parse state")
|
||||
|
||||
yield self.presence_handler.set_state(user, state)
|
||||
# yield self.presence_handler.set_state(user, state)
|
||||
|
||||
defer.returnValue((200, {}))
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ logger = logging.getLogger(__name__)
|
||||
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
|
||||
# times give more inserts into the database even for readonly API hits
|
||||
# 120 seconds == 2 minutes
|
||||
LAST_SEEN_GRANULARITY = 120 * 1000
|
||||
LAST_SEEN_GRANULARITY = 10 * 60 * 1000
|
||||
|
||||
|
||||
class ClientIpStore(background_updates.BackgroundUpdateStore):
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import random
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
@@ -25,9 +24,7 @@ from synapse.util.caches.descriptors import cached
|
||||
from unpaddedbase64 import encode_base64
|
||||
|
||||
import logging
|
||||
from six.moves.queue import PriorityQueue, Empty
|
||||
|
||||
from six.moves import range
|
||||
from Queue import PriorityQueue, Empty
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -81,7 +78,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
|
||||
front_list = list(front)
|
||||
chunks = [
|
||||
front_list[x:x + 100]
|
||||
for x in range(0, len(front), 100)
|
||||
for x in xrange(0, len(front), 100)
|
||||
]
|
||||
for chunk in chunks:
|
||||
txn.execute(
|
||||
@@ -136,47 +133,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
|
||||
retcol="event_id",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_prev_events_for_room(self, room_id):
|
||||
"""
|
||||
Gets a subset of the current forward extremities in the given room.
|
||||
|
||||
Limits the result to 10 extremities, so that we can avoid creating
|
||||
events which refer to hundreds of prev_events.
|
||||
|
||||
Args:
|
||||
room_id (str): room_id
|
||||
|
||||
Returns:
|
||||
Deferred[list[(str, dict[str, str], int)]]
|
||||
for each event, a tuple of (event_id, hashes, depth)
|
||||
where *hashes* is a map from algorithm to hash.
|
||||
"""
|
||||
res = yield self.get_latest_event_ids_and_hashes_in_room(room_id)
|
||||
if len(res) > 10:
|
||||
# Sort by reverse depth, so we point to the most recent.
|
||||
res.sort(key=lambda a: -a[2])
|
||||
|
||||
# we use half of the limit for the actual most recent events, and
|
||||
# the other half to randomly point to some of the older events, to
|
||||
# make sure that we don't completely ignore the older events.
|
||||
res = res[0:5] + random.sample(res[5:], 5)
|
||||
|
||||
defer.returnValue(res)
|
||||
|
||||
def get_latest_event_ids_and_hashes_in_room(self, room_id):
|
||||
"""
|
||||
Gets the current forward extremities in the given room
|
||||
|
||||
Args:
|
||||
room_id (str): room_id
|
||||
|
||||
Returns:
|
||||
Deferred[list[(str, dict[str, str], int)]]
|
||||
for each event, a tuple of (event_id, hashes, depth)
|
||||
where *hashes* is a map from algorithm to hash.
|
||||
"""
|
||||
|
||||
return self.runInteraction(
|
||||
"get_latest_event_ids_and_hashes_in_room",
|
||||
self._get_latest_event_ids_and_hashes_in_room,
|
||||
@@ -225,6 +182,22 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
|
||||
room_id,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_max_depth_of_events(self, event_ids):
|
||||
sql = (
|
||||
"SELECT MAX(depth) FROM events WHERE event_id IN (%s)"
|
||||
) % (",".join(["?"] * len(event_ids)),)
|
||||
|
||||
rows = yield self._execute(
|
||||
"get_max_depth_of_events", None,
|
||||
sql, *event_ids
|
||||
)
|
||||
|
||||
if rows:
|
||||
defer.returnValue(rows[0][0])
|
||||
else:
|
||||
defer.returnValue(1)
|
||||
|
||||
def _get_min_depth_interaction(self, txn, room_id):
|
||||
min_depth = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
|
||||
@@ -613,6 +613,9 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
|
||||
self._rotate_notifs, 30 * 60 * 1000
|
||||
)
|
||||
|
||||
self._rotate_delay = 3
|
||||
self._rotate_count = 10000
|
||||
|
||||
def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts,
|
||||
all_events_and_contexts):
|
||||
"""Handles moving push actions from staging table to main
|
||||
@@ -809,7 +812,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
|
||||
)
|
||||
if caught_up:
|
||||
break
|
||||
yield sleep(5)
|
||||
yield sleep(self._rotate_delay)
|
||||
finally:
|
||||
self._doing_notif_rotation = False
|
||||
|
||||
@@ -830,8 +833,8 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
|
||||
txn.execute("""
|
||||
SELECT stream_ordering FROM event_push_actions
|
||||
WHERE stream_ordering > ?
|
||||
ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000
|
||||
""", (old_rotate_stream_ordering,))
|
||||
ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
|
||||
""", (old_rotate_stream_ordering, self._rotate_count))
|
||||
stream_row = txn.fetchone()
|
||||
if stream_row:
|
||||
offset_stream_ordering, = stream_row
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
|
||||
from collections import OrderedDict, deque, namedtuple
|
||||
from functools import wraps
|
||||
import itertools
|
||||
import logging
|
||||
|
||||
import simplejson as json
|
||||
@@ -1014,7 +1013,6 @@ class EventsStore(EventsWorkerStore):
|
||||
"event_edge_hashes",
|
||||
"event_edges",
|
||||
"event_forward_extremities",
|
||||
"event_push_actions",
|
||||
"event_reference_hashes",
|
||||
"event_search",
|
||||
"event_signatures",
|
||||
@@ -1034,6 +1032,14 @@ class EventsStore(EventsWorkerStore):
|
||||
[(ev.event_id,) for ev, _ in events_and_contexts]
|
||||
)
|
||||
|
||||
for table in (
|
||||
"event_push_actions",
|
||||
):
|
||||
txn.executemany(
|
||||
"DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
|
||||
[(ev.event_id,) for ev, _ in events_and_contexts]
|
||||
)
|
||||
|
||||
def _store_event_txn(self, txn, events_and_contexts):
|
||||
"""Insert new events into the event and event_json tables
|
||||
|
||||
@@ -1321,49 +1327,13 @@ class EventsStore(EventsWorkerStore):
|
||||
|
||||
defer.returnValue(set(r["event_id"] for r in rows))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def have_seen_events(self, event_ids):
|
||||
def have_events(self, event_ids):
|
||||
"""Given a list of event ids, check if we have already processed them.
|
||||
|
||||
Args:
|
||||
event_ids (iterable[str]):
|
||||
|
||||
Returns:
|
||||
Deferred[set[str]]: The events we have already seen.
|
||||
"""
|
||||
results = set()
|
||||
|
||||
def have_seen_events_txn(txn, chunk):
|
||||
sql = (
|
||||
"SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
|
||||
% (",".join("?" * len(chunk)), )
|
||||
)
|
||||
txn.execute(sql, chunk)
|
||||
for (event_id, ) in txn:
|
||||
results.add(event_id)
|
||||
|
||||
# break the input up into chunks of 100
|
||||
input_iterator = iter(event_ids)
|
||||
for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
|
||||
[]):
|
||||
yield self.runInteraction(
|
||||
"have_seen_events",
|
||||
have_seen_events_txn,
|
||||
chunk,
|
||||
)
|
||||
defer.returnValue(results)
|
||||
|
||||
def get_seen_events_with_rejections(self, event_ids):
|
||||
"""Given a list of event ids, check if we rejected them.
|
||||
|
||||
Args:
|
||||
event_ids (list[str])
|
||||
|
||||
Returns:
|
||||
Deferred[dict[str, str|None):
|
||||
Has an entry for each event id we already have seen. Maps to
|
||||
the rejected reason string if we rejected the event, else maps
|
||||
to None.
|
||||
dict: Has an entry for each event id we already have seen. Maps to
|
||||
the rejected reason string if we rejected the event, else maps to
|
||||
None.
|
||||
"""
|
||||
if not event_ids:
|
||||
return defer.succeed({})
|
||||
@@ -1385,7 +1355,9 @@ class EventsStore(EventsWorkerStore):
|
||||
|
||||
return res
|
||||
|
||||
return self.runInteraction("get_rejection_reasons", f)
|
||||
return self.runInteraction(
|
||||
"have_events", f,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def count_daily_messages(self):
|
||||
|
||||
@@ -530,7 +530,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
|
||||
|
||||
# Convert the IDs to MXC URIs
|
||||
for media_id in local_mxcs:
|
||||
local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id))
|
||||
local_media_mxcs.append("mxc://%s/%s" % (self.hostname, media_id))
|
||||
for hostname, media_id in remote_mxcs:
|
||||
remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id))
|
||||
|
||||
@@ -595,7 +595,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
|
||||
while next_token:
|
||||
sql = """
|
||||
SELECT stream_ordering, json FROM events
|
||||
JOIN event_json USING (room_id, event_id)
|
||||
JOIN event_json USING (event_id)
|
||||
WHERE room_id = ?
|
||||
AND stream_ordering < ?
|
||||
AND contains_url = ? AND outlier = ?
|
||||
@@ -619,7 +619,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
|
||||
if matches:
|
||||
hostname = matches.group(1)
|
||||
media_id = matches.group(2)
|
||||
if hostname == self.hs.hostname:
|
||||
if hostname == self.hostname:
|
||||
local_media_mxcs.append(media_id)
|
||||
else:
|
||||
remote_media_mxcs.append((hostname, media_id))
|
||||
|
||||
@@ -65,7 +65,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids)
|
||||
defer.returnValue(hosts)
|
||||
|
||||
@cached(max_entries=100000, iterable=True)
|
||||
@cachedInlineCallbacks(max_entries=100000, iterable=True)
|
||||
def get_users_in_room(self, room_id):
|
||||
def f(txn):
|
||||
sql = (
|
||||
@@ -79,7 +79,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
|
||||
txn.execute(sql, (room_id, Membership.JOIN,))
|
||||
return [to_ascii(r[0]) for r in txn]
|
||||
return self.runInteraction("get_users_in_room", f)
|
||||
start_time = self._clock.time_msec()
|
||||
result = yield self.runInteraction("get_users_in_room", f)
|
||||
end_time = self._clock.time_msec()
|
||||
logger.info(
|
||||
"Fetched room membership for %s (%i users) in %i ms",
|
||||
room_id, len(result), end_time - start_time,
|
||||
)
|
||||
defer.returnValue(result)
|
||||
|
||||
@cached()
|
||||
def get_invited_rooms_for_user(self, user_id):
|
||||
@@ -453,7 +460,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
|
||||
defer.returnValue(joined_hosts)
|
||||
|
||||
@cached(max_entries=10000, iterable=True)
|
||||
@cached(max_entries=10000)
|
||||
def _get_joined_hosts_cache(self, room_id):
|
||||
return _JoinedHostsCache(self, room_id)
|
||||
|
||||
|
||||
@@ -77,7 +77,7 @@ class SearchStore(BackgroundUpdateStore):
|
||||
sql = (
|
||||
"SELECT stream_ordering, event_id, room_id, type, json, "
|
||||
" origin_server_ts FROM events"
|
||||
" JOIN event_json USING (room_id, event_id)"
|
||||
" JOIN event_json USING (event_id)"
|
||||
" WHERE ? <= stream_ordering AND stream_ordering < ?"
|
||||
" AND (%s)"
|
||||
" ORDER BY stream_ordering DESC"
|
||||
@@ -721,7 +721,7 @@ def _parse_query(database_engine, search_term):
|
||||
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
|
||||
|
||||
if isinstance(database_engine, PostgresEngine):
|
||||
return " & ".join(result + ":*" for result in results)
|
||||
return " & ".join(result for result in results)
|
||||
elif isinstance(database_engine, Sqlite3Engine):
|
||||
return " & ".join(result + "*" for result in results)
|
||||
else:
|
||||
|
||||
@@ -20,7 +20,7 @@ from twisted.internet import defer
|
||||
|
||||
from synapse.storage.background_updates import BackgroundUpdateStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.util.caches import intern_string, CACHE_SIZE_FACTOR
|
||||
from synapse.util.caches import intern_string, get_cache_factor_for
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
from synapse.util.caches.dictionary_cache import DictionaryCache
|
||||
from synapse.util.stringutils import to_ascii
|
||||
@@ -54,7 +54,7 @@ class StateGroupWorkerStore(SQLBaseStore):
|
||||
super(StateGroupWorkerStore, self).__init__(db_conn, hs)
|
||||
|
||||
self._state_group_cache = DictionaryCache(
|
||||
"*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR
|
||||
"*stateGroupCache*", 500000 * get_cache_factor_for("stateGroupCache")
|
||||
)
|
||||
|
||||
@cached(max_entries=100000, iterable=True)
|
||||
@@ -566,8 +566,7 @@ class StateGroupWorkerStore(SQLBaseStore):
|
||||
state_dict = results[group]
|
||||
|
||||
state_dict.update(
|
||||
((intern_string(k[0]), intern_string(k[1])), to_ascii(v))
|
||||
for k, v in group_state_dict.iteritems()
|
||||
group_state_dict.iteritems()
|
||||
)
|
||||
|
||||
self._state_group_cache.update(
|
||||
|
||||
@@ -18,6 +18,16 @@ import os
|
||||
|
||||
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
|
||||
|
||||
|
||||
def get_cache_factor_for(cache_name):
|
||||
env_var = "SYNAPSE_CACHE_FACTOR_" + cache_name.upper()
|
||||
factor = os.environ.get(env_var)
|
||||
if factor:
|
||||
return float(factor)
|
||||
|
||||
return CACHE_SIZE_FACTOR
|
||||
|
||||
|
||||
metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
|
||||
|
||||
caches_by_name = {}
|
||||
|
||||
@@ -17,7 +17,7 @@ import logging
|
||||
|
||||
from synapse.util.async import ObservableDeferred
|
||||
from synapse.util import unwrapFirstError, logcontext
|
||||
from synapse.util.caches import CACHE_SIZE_FACTOR
|
||||
from synapse.util.caches import get_cache_factor_for
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
|
||||
from synapse.util.stringutils import to_ascii
|
||||
@@ -310,7 +310,7 @@ class CacheDescriptor(_CacheDescriptorBase):
|
||||
orig, num_args=num_args, inlineCallbacks=inlineCallbacks,
|
||||
cache_context=cache_context)
|
||||
|
||||
max_entries = int(max_entries * CACHE_SIZE_FACTOR)
|
||||
max_entries = int(max_entries * get_cache_factor_for(orig.__name__))
|
||||
|
||||
self.max_entries = max_entries
|
||||
self.tree = tree
|
||||
|
||||
@@ -12,15 +12,9 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.util.async import ObservableDeferred
|
||||
from synapse.util.caches import metrics as cache_metrics
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ResponseCache(object):
|
||||
@@ -37,7 +31,6 @@ class ResponseCache(object):
|
||||
self.clock = hs.get_clock()
|
||||
self.timeout_sec = timeout_ms / 1000.
|
||||
|
||||
self._name = name
|
||||
self._metrics = cache_metrics.register_cache(
|
||||
"response_cache",
|
||||
size_callback=lambda: self.size(),
|
||||
@@ -50,21 +43,15 @@ class ResponseCache(object):
|
||||
def get(self, key):
|
||||
"""Look up the given key.
|
||||
|
||||
Can return either a new Deferred (which also doesn't follow the synapse
|
||||
logcontext rules), or, if the request has completed, the actual
|
||||
result. You will probably want to make_deferred_yieldable the result.
|
||||
|
||||
If there is no entry for the key, returns None. It is worth noting that
|
||||
this means there is no way to distinguish a completed result of None
|
||||
from an absent cache entry.
|
||||
Returns a deferred which doesn't follow the synapse logcontext rules,
|
||||
so you'll probably want to make_deferred_yieldable it.
|
||||
|
||||
Args:
|
||||
key (hashable):
|
||||
key (str):
|
||||
|
||||
Returns:
|
||||
twisted.internet.defer.Deferred|None|E: None if there is no entry
|
||||
for this key; otherwise either a deferred result or the result
|
||||
itself.
|
||||
twisted.internet.defer.Deferred|None: None if there is no entry
|
||||
for this key; otherwise a deferred result.
|
||||
"""
|
||||
result = self.pending_result_cache.get(key)
|
||||
if result is not None:
|
||||
@@ -81,17 +68,19 @@ class ResponseCache(object):
|
||||
you should wrap normal synapse deferreds with
|
||||
logcontext.run_in_background).
|
||||
|
||||
Can return either a new Deferred (which also doesn't follow the synapse
|
||||
logcontext rules), or, if *deferred* was already complete, the actual
|
||||
result. You will probably want to make_deferred_yieldable the result.
|
||||
Returns a new Deferred which also doesn't follow the synapse logcontext
|
||||
rules, so you will want to make_deferred_yieldable it
|
||||
|
||||
(TODO: before using this more widely, it might make sense to refactor
|
||||
it and get() so that they do the necessary wrapping rather than having
|
||||
to do it everywhere ResponseCache is used.)
|
||||
|
||||
Args:
|
||||
key (hashable):
|
||||
deferred (twisted.internet.defer.Deferred[T):
|
||||
key (str):
|
||||
deferred (twisted.internet.defer.Deferred):
|
||||
|
||||
Returns:
|
||||
twisted.internet.defer.Deferred[T]|T: a new deferred, or the actual
|
||||
result.
|
||||
twisted.internet.defer.Deferred
|
||||
"""
|
||||
result = ObservableDeferred(deferred, consumeErrors=True)
|
||||
self.pending_result_cache[key] = result
|
||||
@@ -108,52 +97,3 @@ class ResponseCache(object):
|
||||
|
||||
result.addBoth(remove)
|
||||
return result.observe()
|
||||
|
||||
def wrap(self, key, callback, *args, **kwargs):
|
||||
"""Wrap together a *get* and *set* call, taking care of logcontexts
|
||||
|
||||
First looks up the key in the cache, and if it is present makes it
|
||||
follow the synapse logcontext rules and returns it.
|
||||
|
||||
Otherwise, makes a call to *callback(*args, **kwargs)*, which should
|
||||
follow the synapse logcontext rules, and adds the result to the cache.
|
||||
|
||||
Example usage:
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def handle_request(request):
|
||||
# etc
|
||||
defer.returnValue(result)
|
||||
|
||||
result = yield response_cache.wrap(
|
||||
key,
|
||||
handle_request,
|
||||
request,
|
||||
)
|
||||
|
||||
Args:
|
||||
key (hashable): key to get/set in the cache
|
||||
|
||||
callback (callable): function to call if the key is not found in
|
||||
the cache
|
||||
|
||||
*args: positional parameters to pass to the callback, if it is used
|
||||
|
||||
**kwargs: named paramters to pass to the callback, if it is used
|
||||
|
||||
Returns:
|
||||
twisted.internet.defer.Deferred: yieldable result
|
||||
"""
|
||||
result = self.get(key)
|
||||
if not result:
|
||||
logger.info("[%s]: no cached result for [%s], calculating new one",
|
||||
self._name, key)
|
||||
d = run_in_background(callback, *args, **kwargs)
|
||||
result = self.set(key, d)
|
||||
elif not isinstance(result, defer.Deferred) or result.called:
|
||||
logger.info("[%s]: using completed cached result for [%s]",
|
||||
self._name, key)
|
||||
else:
|
||||
logger.info("[%s]: using incomplete cached result for [%s]",
|
||||
self._name, key)
|
||||
return make_deferred_yieldable(result)
|
||||
|
||||
@@ -17,7 +17,7 @@ from twisted.internet import threads, reactor
|
||||
|
||||
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
|
||||
|
||||
from six.moves import queue
|
||||
import Queue
|
||||
|
||||
|
||||
class BackgroundFileConsumer(object):
|
||||
@@ -49,7 +49,7 @@ class BackgroundFileConsumer(object):
|
||||
|
||||
# Queue of slices of bytes to be written. When producer calls
|
||||
# unregister a final None is sent.
|
||||
self._bytes_queue = queue.Queue()
|
||||
self._bytes_queue = Queue.Queue()
|
||||
|
||||
# Deferred that is resolved when finished writing
|
||||
self._finished_deferred = None
|
||||
|
||||
@@ -92,7 +92,6 @@ class LoggingContext(object):
|
||||
|
||||
def __nonzero__(self):
|
||||
return False
|
||||
__bool__ = __nonzero__ # python3
|
||||
|
||||
sentinel = Sentinel()
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@ class ConfigLoadingTestCase(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.dir = tempfile.mkdtemp()
|
||||
print(self.dir)
|
||||
print self.dir
|
||||
self.file = os.path.join(self.dir, "homeserver.yaml")
|
||||
|
||||
def tearDown(self):
|
||||
|
||||
@@ -183,7 +183,7 @@ class KeyringTestCase(unittest.TestCase):
|
||||
res_deferreds_2 = kr.verify_json_objects_for_server(
|
||||
[("server10", json1)],
|
||||
)
|
||||
yield async.sleep(1)
|
||||
yield async.sleep(01)
|
||||
self.http_client.post_json.assert_not_called()
|
||||
res_deferreds_2[0].addBoth(self.check_context, None)
|
||||
|
||||
|
||||
@@ -984,11 +984,13 @@ class RoomInitialSyncTestCase(RestTestCase):
|
||||
|
||||
self.assertTrue("presence" in response)
|
||||
|
||||
presence_by_user = {
|
||||
e["content"]["user_id"]: e for e in response["presence"]
|
||||
}
|
||||
self.assertTrue(self.user_id in presence_by_user)
|
||||
self.assertEquals("m.presence", presence_by_user[self.user_id]["type"])
|
||||
# presence is turned off on hotfixes
|
||||
|
||||
# presence_by_user = {
|
||||
# e["content"]["user_id"]: e for e in response["presence"]
|
||||
# }
|
||||
# self.assertTrue(self.user_id in presence_by_user)
|
||||
# self.assertEquals("m.presence", presence_by_user[self.user_id]["type"])
|
||||
|
||||
|
||||
class RoomMessageListTestCase(RestTestCase):
|
||||
|
||||
@@ -480,9 +480,9 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
|
||||
ApplicationServiceStore(None, hs)
|
||||
|
||||
e = cm.exception
|
||||
self.assertIn(f1, str(e))
|
||||
self.assertIn(f2, str(e))
|
||||
self.assertIn("id", str(e))
|
||||
self.assertIn(f1, e.message)
|
||||
self.assertIn(f2, e.message)
|
||||
self.assertIn("id", e.message)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_duplicate_as_tokens(self):
|
||||
@@ -504,6 +504,6 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
|
||||
ApplicationServiceStore(None, hs)
|
||||
|
||||
e = cm.exception
|
||||
self.assertIn(f1, str(e))
|
||||
self.assertIn(f2, str(e))
|
||||
self.assertIn("as_token", str(e))
|
||||
self.assertIn(f1, e.message)
|
||||
self.assertIn(f2, e.message)
|
||||
self.assertIn("as_token", e.message)
|
||||
|
||||
@@ -1,68 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the 'License');
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an 'AS IS' BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
import tests.unittest
|
||||
import tests.utils
|
||||
|
||||
|
||||
class EventFederationWorkerStoreTestCase(tests.unittest.TestCase):
|
||||
@defer.inlineCallbacks
|
||||
def setUp(self):
|
||||
hs = yield tests.utils.setup_test_homeserver()
|
||||
self.store = hs.get_datastore()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_get_prev_events_for_room(self):
|
||||
room_id = '@ROOM:local'
|
||||
|
||||
# add a bunch of events and hashes to act as forward extremities
|
||||
def insert_event(txn, i):
|
||||
event_id = '$event_%i:local' % i
|
||||
|
||||
txn.execute((
|
||||
"INSERT INTO events ("
|
||||
" room_id, event_id, type, depth, topological_ordering,"
|
||||
" content, processed, outlier) "
|
||||
"VALUES (?, ?, 'm.test', ?, ?, 'test', ?, ?)"
|
||||
), (room_id, event_id, i, i, True, False))
|
||||
|
||||
txn.execute((
|
||||
'INSERT INTO event_forward_extremities (room_id, event_id) '
|
||||
'VALUES (?, ?)'
|
||||
), (room_id, event_id))
|
||||
|
||||
txn.execute((
|
||||
'INSERT INTO event_reference_hashes '
|
||||
'(event_id, algorithm, hash) '
|
||||
"VALUES (?, 'sha256', ?)"
|
||||
), (event_id, 'ffff'))
|
||||
|
||||
for i in range(0, 11):
|
||||
yield self.store.runInteraction("insert", insert_event, i)
|
||||
|
||||
# this should get the last five and five others
|
||||
r = yield self.store.get_prev_events_for_room(room_id)
|
||||
self.assertEqual(10, len(r))
|
||||
for i in range(0, 5):
|
||||
el = r[i]
|
||||
depth = el[2]
|
||||
self.assertEqual(10 - i, depth)
|
||||
|
||||
for i in range(5, 5):
|
||||
el = r[i]
|
||||
depth = el[2]
|
||||
self.assertLessEqual(5, depth)
|
||||
@@ -62,7 +62,7 @@ class DistributorTestCase(unittest.TestCase):
|
||||
def test_signal_catch(self):
|
||||
self.dist.declare("alarm")
|
||||
|
||||
observers = [Mock() for i in (1, 2)]
|
||||
observers = [Mock() for i in 1, 2]
|
||||
for o in observers:
|
||||
self.dist.observe("alarm", o)
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ from mock import NonCallableMock
|
||||
from synapse.util.file_consumer import BackgroundFileConsumer
|
||||
|
||||
from tests import unittest
|
||||
from six import StringIO
|
||||
from StringIO import StringIO
|
||||
|
||||
import threading
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@ from tests import unittest
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.util.async import Linearizer
|
||||
from six.moves import range
|
||||
|
||||
|
||||
class LinearizerTestCase(unittest.TestCase):
|
||||
@@ -59,7 +58,7 @@ class LinearizerTestCase(unittest.TestCase):
|
||||
logcontext.LoggingContext.current_context(), lc)
|
||||
|
||||
func(0, sleep=True)
|
||||
for i in range(1, 100):
|
||||
for i in xrange(1, 100):
|
||||
func(i)
|
||||
|
||||
return func(1000)
|
||||
|
||||
@@ -59,10 +59,6 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
|
||||
config.email_enable_notifs = False
|
||||
config.block_non_admin_invites = False
|
||||
config.federation_domain_whitelist = None
|
||||
config.federation_rc_reject_limit = 10
|
||||
config.federation_rc_sleep_limit = 10
|
||||
config.federation_rc_concurrent = 10
|
||||
config.filter_timeline_limit = 5000
|
||||
config.user_directory_search_all_users = False
|
||||
|
||||
# disable user directory updates, because they get done in the
|
||||
|
||||
Reference in New Issue
Block a user